diff --git a/adapter/outbound/vmess.go b/adapter/outbound/vmess.go index d92a02bdc7..6985a37e59 100644 --- a/adapter/outbound/vmess.go +++ b/adapter/outbound/vmess.go @@ -17,6 +17,7 @@ import ( C "github.com/metacubex/mihomo/constant" "github.com/metacubex/mihomo/ntp" "github.com/metacubex/mihomo/transport/gun" + "github.com/metacubex/mihomo/transport/mkcp" mihomoVMess "github.com/metacubex/mihomo/transport/vmess" "github.com/metacubex/http" @@ -36,6 +37,9 @@ type Vmess struct { // for gun mux gunClient *gun.Client + // for mkcp transport + mkcpConfig *mkcp.Config + realityConfig *tlsC.RealityConfig echConfig *ech.Config } @@ -63,6 +67,7 @@ type VmessOption struct { HTTP2Opts HTTP2Options `proxy:"h2-opts,omitempty"` GrpcOpts GrpcOptions `proxy:"grpc-opts,omitempty"` WSOpts WSOptions `proxy:"ws-opts,omitempty"` + KCPOpts KCPOptions `proxy:"kcp-opts,omitempty"` PacketAddr bool `proxy:"packet-addr,omitempty"` XUDP bool `proxy:"xudp,omitempty"` PacketEncoding string `proxy:"packet-encoding,omitempty"` @@ -91,6 +96,40 @@ type GrpcOptions struct { MaxStreams int `proxy:"max-streams,omitempty"` } +// KCPOptions configures the mKCP transport (Xray-compatible). Field names and +// semantics mirror Xray's kcpSettings. Buffer sizes are in MB like Xray's JSON. +type KCPOptions struct { + MTU int `proxy:"mtu,omitempty"` + TTI int `proxy:"tti,omitempty"` + UplinkCapacity int `proxy:"uplink-capacity,omitempty"` + DownlinkCapacity int `proxy:"downlink-capacity,omitempty"` + Congestion bool `proxy:"congestion,omitempty"` + ReadBufferSize int `proxy:"read-buffer-size,omitempty"` + WriteBufferSize int `proxy:"write-buffer-size,omitempty"` + Header string `proxy:"header,omitempty"` + Seed string `proxy:"seed,omitempty"` +} + +func (o KCPOptions) Build() *mkcp.Config { + cfg := &mkcp.Config{ + Mtu: uint32(o.MTU), + Tti: uint32(o.TTI), + UplinkCapacity: uint32(o.UplinkCapacity), + DownlinkCapacity: uint32(o.DownlinkCapacity), + Congestion: o.Congestion, + Header: o.Header, + Seed: o.Seed, + } + // Xray expresses these buffers in MB; convert to bytes (0 keeps the default). + if o.ReadBufferSize > 0 { + cfg.ReadBufferSize = uint32(o.ReadBufferSize) * 1024 * 1024 + } + if o.WriteBufferSize > 0 { + cfg.WriteBufferSize = uint32(o.WriteBufferSize) * 1024 * 1024 + } + return cfg +} + type WSOptions struct { Path string `proxy:"path,omitempty"` Headers map[string]string `proxy:"headers,omitempty"` @@ -278,11 +317,35 @@ func (v *Vmess) dialContext(ctx context.Context) (c net.Conn, err error) { switch v.option.Network { case "grpc": // gun transport return v.gunClient.Dial() + case "kcp": // mkcp transport over udp + pc, addr, err := v.listenPacketContext(ctx) + if err != nil { + return nil, err + } + c, err := mkcp.Dial(pc, addr, v.mkcpConfig) + if err != nil { + _ = pc.Close() + return nil, err + } + return c, nil default: } return v.dialer.DialContext(ctx, "tcp", v.addr) } +func (v *Vmess) listenPacketContext(ctx context.Context) (net.PacketConn, net.Addr, error) { + addr, err := resolveUDPAddr(ctx, "udp", v.addr, v.prefer) + if err != nil { + return nil, nil, err + } + + pc, err := v.dialer.ListenPacket(ctx, "udp", "", addr.AddrPort()) + if err != nil { + return nil, nil, err + } + return pc, addr, nil +} + // DialContext implements C.ProxyAdapter func (v *Vmess) DialContext(ctx context.Context, metadata *C.Metadata) (_ C.Conn, err error) { c, err := v.dialContext(ctx) @@ -407,6 +470,8 @@ func NewVmess(option VmessOption) (*Vmess, error) { if len(option.HTTP2Opts.Host) == 0 { option.HTTP2Opts.Host = append(option.HTTP2Opts.Host, "www.example.com") } + case "kcp": + v.mkcpConfig = option.KCPOpts.Build() case "grpc": dialFn := func(ctx context.Context, network, addr string) (net.Conn, error) { c, err := v.dialer.DialContext(ctx, "tcp", v.addr) diff --git a/component/sniffer/dispatcher.go b/component/sniffer/dispatcher.go index 1dafcf1f27..9b0fb4c5ca 100644 --- a/component/sniffer/dispatcher.go +++ b/component/sniffer/dispatcher.go @@ -91,6 +91,17 @@ func (sd *Dispatcher) UDPSniff(packet C.PacketAdapter, packetSender C.PacketSend continue } + // Protocol detected but no domain extracted (e.g. STUN) + if host == "" { + metadata.SniffProtocol = current.Protocol() + log.Debugln("[Sniffer] Sniff %s [%s]-->[%s] protocol [%s] detected (no domain)", + metadata.NetWork, + metadata.SourceDetail(), + metadata.RemoteAddress(), + current.Protocol()) + return packetSender + } + replaceDomain(metadata, host) return packetSender } @@ -285,6 +296,17 @@ func NewDispatcher(snifferConfig *Config) (*Dispatcher, error) { dispatcher.sniffers[s] = config } + // Auto-register STUN sniffer when sniffer is enabled but STUN was not explicitly configured. + if snifferConfig.Enable { + if _, exists := snifferConfig.Sniffers[sniffer.STUN]; !exists { + s, err := NewSTUNSniffer(SnifferConfig{}) + if err == nil { + dispatcher.sniffers[s] = SnifferConfig{} + log.Infoln("[Sniffer] STUN sniffer auto-enabled") + } + } + } + return &dispatcher, nil } @@ -296,6 +318,8 @@ func NewSniffer(name sniffer.Type, snifferConfig SnifferConfig) (sniffer.Sniffer return NewHTTPSniffer(snifferConfig) case sniffer.QUIC: return NewQuicSniffer(snifferConfig) + case sniffer.STUN: + return NewSTUNSniffer(snifferConfig) default: return nil, ErrorUnsupportedSniffer } diff --git a/component/sniffer/sniff_test.go b/component/sniffer/sniff_test.go index f911b209e5..21239fa660 100644 --- a/component/sniffer/sniff_test.go +++ b/component/sniffer/sniff_test.go @@ -194,6 +194,119 @@ func TestQuicHeaders(t *testing.T) { } } +func TestSTUNSniffer(t *testing.T) { + sniffer, err := NewSTUNSniffer(SnifferConfig{}) + assert.NoError(t, err) + assert.Equal(t, "stun", sniffer.Protocol()) + assert.Equal(t, constant.UDP, sniffer.SupportNetwork()) + + cases := []struct { + name string + input []byte + isSTUN bool + }{ + { + name: "valid STUN Binding Request", + input: []byte{ + 0x00, 0x01, + 0x00, 0x00, + 0x21, 0x12, 0xA4, 0x42, + 0x01, 0x02, 0x03, 0x04, + 0x05, 0x06, 0x07, 0x08, + 0x09, 0x0a, 0x0b, 0x0c, + }, + isSTUN: true, + }, + { + name: "valid STUN Binding Response with attributes", + input: []byte{ + 0x01, 0x01, + 0x00, 0x0c, + 0x21, 0x12, 0xA4, 0x42, + 0xaa, 0xbb, 0xcc, 0xdd, + 0xee, 0xff, 0x00, 0x11, + 0x22, 0x33, 0x44, 0x55, + 0x00, 0x20, + 0x00, 0x08, + 0x00, 0x01, + 0x63, 0x46, + 0x73, 0x92, 0xa5, 0x46, + }, + isSTUN: true, + }, + { + name: "too short packet", + input: []byte{0x00, 0x01, 0x00, 0x00}, + isSTUN: false, + }, + { + name: "wrong magic cookie", + input: []byte{ + 0x00, 0x01, + 0x00, 0x00, + 0xDE, 0xAD, 0xBE, 0xEF, + 0x01, 0x02, 0x03, 0x04, + 0x05, 0x06, 0x07, 0x08, + 0x09, 0x0a, 0x0b, 0x0c, + }, + isSTUN: false, + }, + { + name: "first 2 bits not zero (DTLS or other)", + input: []byte{ + 0x80, 0x01, + 0x00, 0x00, + 0x21, 0x12, 0xA4, 0x42, + 0x01, 0x02, 0x03, 0x04, + 0x05, 0x06, 0x07, 0x08, + 0x09, 0x0a, 0x0b, 0x0c, + }, + isSTUN: false, + }, + { + name: "message length not multiple of 4", + input: []byte{ + 0x00, 0x01, + 0x00, 0x03, + 0x21, 0x12, 0xA4, 0x42, + 0x01, 0x02, 0x03, 0x04, + 0x05, 0x06, 0x07, 0x08, + 0x09, 0x0a, 0x0b, 0x0c, + }, + isSTUN: false, + }, + { + name: "TLS handshake packet (not STUN)", + input: []byte{ + 0x16, 0x03, 0x01, 0x00, 0xc8, 0x01, 0x00, 0x00, + 0xc4, 0x03, 0x03, 0x1a, 0xac, 0xb2, 0xa8, 0xfe, + 0xb4, 0x96, 0x04, 0x5b, + }, + isSTUN: false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + host, err := sniffer.SniffData(tc.input) + if tc.isSTUN { + assert.NoError(t, err) + assert.Equal(t, "", host) + } else { + assert.Error(t, err) + } + }) + } + + // Default: no port restriction (all ports match) + assert.True(t, sniffer.SupportPort(3478)) + assert.True(t, sniffer.SupportPort(5349)) + assert.True(t, sniffer.SupportPort(19302)) + assert.True(t, sniffer.SupportPort(443)) + assert.True(t, sniffer.SupportPort(80)) + assert.True(t, sniffer.SupportPort(12345)) +} + func TestTLSHeaders(t *testing.T) { cases := []struct { input []byte diff --git a/component/sniffer/stun_sniffer.go b/component/sniffer/stun_sniffer.go new file mode 100644 index 0000000000..8c0d65f2d4 --- /dev/null +++ b/component/sniffer/stun_sniffer.go @@ -0,0 +1,68 @@ +package sniffer + +import ( + "encoding/binary" + "errors" + + C "github.com/metacubex/mihomo/constant" + "github.com/metacubex/mihomo/constant/sniffer" +) + +// https://datatracker.ietf.org/doc/html/rfc8489 +const ( + stunHeaderSize = 20 + stunMagicCookie = 0x2112A442 +) + +var ( + errNotSTUN = errors.New("not STUN message") +) + +var _ sniffer.Sniffer = (*STUNSniffer)(nil) + +type STUNSniffer struct { + *BaseSniffer +} + +func NewSTUNSniffer(snifferConfig SnifferConfig) (*STUNSniffer, error) { + return &STUNSniffer{ + BaseSniffer: NewBaseSniffer(snifferConfig.Ports, C.UDP), + }, nil +} + +func (s *STUNSniffer) Protocol() string { + return "stun" +} + +func (s *STUNSniffer) SupportNetwork() C.NetWork { + return C.UDP +} + +func (s *STUNSniffer) SniffData(b []byte) (string, error) { + if err := detectSTUN(b); err != nil { + return "", err + } + + return "", nil +} + +func detectSTUN(b []byte) error { + if len(b) < stunHeaderSize { + return errNotSTUN + } + + if b[0]&0xC0 != 0x00 { + return errNotSTUN + } + + if binary.BigEndian.Uint32(b[4:8]) != stunMagicCookie { + return errNotSTUN + } + + msgLen := binary.BigEndian.Uint16(b[2:4]) + if msgLen%4 != 0 { + return errNotSTUN + } + + return nil +} diff --git a/constant/metadata.go b/constant/metadata.go index 043285eec8..a021a26d7f 100644 --- a/constant/metadata.go +++ b/constant/metadata.go @@ -205,7 +205,8 @@ type Metadata struct { RawSrcAddr net.Addr `json:"-"` RawDstAddr net.Addr `json:"-"` // Only domain rule - SniffHost string `json:"sniffHost"` + SniffHost string `json:"sniffHost"` + SniffProtocol string `json:"sniffProtocol,omitempty"` } func (m *Metadata) RemoteAddress() string { diff --git a/constant/rule.go b/constant/rule.go index 256ef0fc4a..c46125a3fb 100644 --- a/constant/rule.go +++ b/constant/rule.go @@ -33,6 +33,7 @@ const ( ProcessPathWildcard RuleSet Network + SniffProtocol Uid SubRules MATCH @@ -103,6 +104,8 @@ func (rt RuleType) String() string { return "RuleSet" case Network: return "Network" + case SniffProtocol: + return "SniffProtocol" case DSCP: return "DSCP" case Uid: diff --git a/constant/sniffer/sniffer.go b/constant/sniffer/sniffer.go index 7418b67361..e2d7bfb8c7 100644 --- a/constant/sniffer/sniffer.go +++ b/constant/sniffer/sniffer.go @@ -20,10 +20,11 @@ const ( TLS Type = iota HTTP QUIC + STUN ) var ( - List = []Type{TLS, HTTP, QUIC} + List = []Type{TLS, HTTP, QUIC, STUN} ) type Type int @@ -36,6 +37,8 @@ func (rt Type) String() string { return "HTTP" case QUIC: return "QUIC" + case STUN: + return "STUN" default: return "Unknown" } diff --git a/rules/common/sniff_protocol.go b/rules/common/sniff_protocol.go new file mode 100644 index 0000000000..79f9dec0e2 --- /dev/null +++ b/rules/common/sniff_protocol.go @@ -0,0 +1,39 @@ +package common + +import ( + "strings" + + C "github.com/metacubex/mihomo/constant" +) + +type SniffProtocolRule struct { + Base + protocol string + adapter string +} + +func (s *SniffProtocolRule) RuleType() C.RuleType { + return C.SniffProtocol +} + +func (s *SniffProtocolRule) Match(metadata *C.Metadata, helper C.RuleMatchHelper) (bool, string) { + return strings.EqualFold(metadata.SniffProtocol, s.protocol), s.adapter +} + +func (s *SniffProtocolRule) Adapter() string { + return s.adapter +} + +func (s *SniffProtocolRule) Payload() string { + return s.protocol +} + +func NewSniffProtocol(protocol, adapter string) (*SniffProtocolRule, error) { + return &SniffProtocolRule{ + Base: Base{}, + protocol: strings.ToLower(protocol), + adapter: adapter, + }, nil +} + +var _ C.Rule = (*SniffProtocolRule)(nil) diff --git a/rules/parser.go b/rules/parser.go index b2bf96424f..bcd2a9a463 100644 --- a/rules/parser.go +++ b/rules/parser.go @@ -69,6 +69,8 @@ func ParseRule(tp, payload, target string, params []string, subRules map[string] parsed, parseErr = RC.NewProcess(payload, target, C.ProcessPathWildcard) case "NETWORK": parsed, parseErr = RC.NewNetworkType(payload, target) + case "SNIFF-PROTOCOL": + parsed, parseErr = RC.NewSniffProtocol(payload, target) case "UID": parsed, parseErr = RC.NewUid(payload, target) case "IN-TYPE": diff --git a/test/go.mod b/test/go.mod index f364c2c89d..ed681b4f8e 100644 --- a/test/go.mod +++ b/test/go.mod @@ -6,9 +6,9 @@ require ( github.com/docker/docker v20.10.21+incompatible github.com/docker/go-connections v0.4.0 github.com/metacubex/mihomo v0.0.0 - github.com/miekg/dns v1.1.57 - github.com/stretchr/testify v1.8.4 - golang.org/x/net v0.18.0 + github.com/miekg/dns v1.1.63 + github.com/stretchr/testify v1.11.1 + golang.org/x/net v0.35.0 ) replace github.com/metacubex/mihomo => ../ @@ -16,39 +16,39 @@ replace github.com/metacubex/mihomo => ../ require ( github.com/3andne/restls-client-go v0.1.6 // indirect github.com/Microsoft/go-winio v0.6.0 // indirect - github.com/RyuaNerin/go-krypto v1.0.2 // indirect + github.com/RyuaNerin/go-krypto v1.3.0 // indirect github.com/Yawning/aez v0.0.0-20211027044916-e49e68abd344 // indirect github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect - github.com/andybalholm/brotli v1.0.5 // indirect + github.com/andybalholm/brotli v1.0.6 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/buger/jsonparser v1.1.1 // indirect - github.com/coreos/go-iptables v0.7.0 // indirect + github.com/coreos/go-iptables v0.8.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/distribution/reference v0.5.0 // indirect - github.com/dlclark/regexp2 v1.10.0 // indirect + github.com/dlclark/regexp2 v1.12.0 // indirect github.com/docker/distribution v2.8.3+incompatible // indirect github.com/docker/go-units v0.4.0 // indirect - github.com/ericlagergren/aegis v0.0.0-20230312195928-b4ce538b56f9 // indirect + github.com/ericlagergren/aegis v0.0.0-20250325060835-cd0defd64358 // indirect github.com/ericlagergren/polyval v0.0.0-20220411101811-e25bc10ba391 // indirect github.com/ericlagergren/siv v0.0.0-20220507050439-0b757b3aa5f1 // indirect github.com/ericlagergren/subtle v0.0.0-20220507045147-890d697da010 // indirect - github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/gaukas/godicttls v0.0.4 // indirect github.com/go-ole/go-ole v1.3.0 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/gobwas/httphead v0.1.0 // indirect github.com/gobwas/pool v0.2.1 // indirect - github.com/gobwas/ws v1.3.1 // indirect - github.com/gofrs/uuid/v5 v5.0.0 // indirect + github.com/gobwas/ws v1.4.0 // indirect + github.com/gofrs/uuid/v5 v5.4.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/google/btree v1.1.2 // indirect + github.com/google/btree v1.1.3 // indirect github.com/google/go-cmp v0.6.0 // indirect - github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect + github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 // indirect github.com/hashicorp/yamux v0.1.1 // indirect - github.com/insomniacslk/dhcp v0.0.0-20231016090811-6a2c8fbdcc1c // indirect + github.com/insomniacslk/dhcp v0.0.0-20250109001534-8abf58130905 // indirect github.com/josharian/native v1.1.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect - github.com/klauspost/compress v1.16.7 // indirect + github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.6 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/lunixbochs/struc v0.0.0-20200707160740-784aaebc1d40 // indirect @@ -56,14 +56,14 @@ require ( github.com/mdlayher/netlink v1.7.2 // indirect github.com/mdlayher/socket v0.4.1 // indirect github.com/metacubex/gopacket v1.1.20-0.20230608035415-7e2f98a3e759 // indirect - github.com/metacubex/gvisor v0.0.0-20231001104248-0f672c3fb8d8 // indirect - github.com/metacubex/quic-go v0.40.1-0.20231130135418-0c1b47cf9394 // indirect - github.com/metacubex/sing-quic v0.0.0-20231130141855-0022295e524b // indirect - github.com/metacubex/sing-shadowsocks v0.2.5 // indirect - github.com/metacubex/sing-shadowsocks2 v0.1.4 // indirect - github.com/metacubex/sing-tun v0.1.15-0.20231103033938-170591e8d5bd // indirect - github.com/metacubex/sing-vmess v0.1.9-0.20230921005247-a0488d7dac74 // indirect - github.com/metacubex/sing-wireguard v0.0.0-20231001110902-321836559170 // indirect + github.com/metacubex/gvisor v0.0.0-20251227095601-261ec1326fe8 // indirect + github.com/metacubex/quic-go v0.59.1-0.20260413153657-53bb22f2c306 // indirect + github.com/metacubex/sing-quic v0.0.0-20260414034501-3ea3410d197a // indirect + github.com/metacubex/sing-shadowsocks v0.2.12 // indirect + github.com/metacubex/sing-shadowsocks2 v0.2.7 // indirect + github.com/metacubex/sing-tun v0.4.17 // indirect + github.com/metacubex/sing-vmess v0.2.5 // indirect + github.com/metacubex/sing-wireguard v0.0.0-20250503063753-2dc62acc626f // indirect github.com/moby/term v0.5.0 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/mroth/weightedrand/v2 v2.1.0 // indirect @@ -82,7 +82,7 @@ require ( github.com/quic-go/qtls-go1-20 v0.4.1 // indirect github.com/sagernet/bbolt v0.0.0-20231014093535-ea5cb2fe9f0a // indirect github.com/sagernet/go-tun2socks v1.16.12-0.20220818015926-16cb67876a61 // indirect - github.com/sagernet/netlink v0.0.0-20220905062125-8043b4a9aa97 // indirect + github.com/sagernet/netlink v0.0.0-20240612041022-b9a21c07ac6a // indirect github.com/sagernet/sing v0.2.18-0.20231108041402-4fbbd193203c // indirect github.com/sagernet/sing-mux v0.1.5-0.20231109075101-6b086ed6bb07 // indirect github.com/sagernet/sing-shadowtls v0.1.4 // indirect @@ -90,33 +90,33 @@ require ( github.com/sagernet/tfo-go v0.0.0-20230816093905-5a5c285d44a6 // indirect github.com/sagernet/utls v0.0.0-20230309024959-6732c2ab36f2 // indirect github.com/sagernet/wireguard-go v0.0.0-20230807125731-5d4a7ef2dc5f // indirect - github.com/samber/lo v1.38.1 // indirect + github.com/samber/lo v1.53.0 // indirect github.com/scjalliance/comshim v0.0.0-20230315213746-5e51f40bd3b9 // indirect github.com/shirou/gopsutil/v3 v3.23.10 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/sina-ghaderi/poly1305 v0.0.0-20220724002748-c5926b03988b // indirect github.com/sina-ghaderi/rabaead v0.0.0-20220730151906-ab6e06b96e8c // indirect github.com/sina-ghaderi/rabbitio v0.0.0-20220730151941-9ce26f4f872e // indirect - github.com/sirupsen/logrus v1.9.3 // indirect + github.com/sirupsen/logrus v1.9.4 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/u-root/uio v0.0.0-20230220225925-ffce2a382923 // indirect - github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 // indirect + github.com/vishvananda/netns v0.0.4 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/yusufpapurcu/wmi v1.2.3 // indirect github.com/zhangyunhao116/fastrand v0.3.0 // indirect gitlab.com/yawning/bsaes.git v0.0.0-20190805113838-0a714cd429ec // indirect - go.uber.org/mock v0.3.0 // indirect - go4.org/netipx v0.0.0-20230824141953-6213f710f925 // indirect - golang.org/x/crypto v0.16.0 // indirect - golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect - golang.org/x/mod v0.14.0 // indirect - golang.org/x/sync v0.5.0 // indirect - golang.org/x/sys v0.15.0 // indirect - golang.org/x/text v0.14.0 // indirect - golang.org/x/time v0.3.0 // indirect - golang.org/x/tools v0.15.0 // indirect - google.golang.org/protobuf v1.31.0 // indirect + go.uber.org/mock v0.4.0 // indirect + go4.org/netipx v0.0.0-20231129151722-fdeea329fbba // indirect + golang.org/x/crypto v0.33.0 // indirect + golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect + golang.org/x/mod v0.20.0 // indirect + golang.org/x/sync v0.11.0 // indirect + golang.org/x/sys v0.30.0 // indirect + golang.org/x/text v0.22.0 // indirect + golang.org/x/time v0.10.0 // indirect + golang.org/x/tools v0.24.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.2.1 // indirect ) diff --git a/test/go.sum b/test/go.sum index 05a23ce937..cdf4f9a9a4 100644 --- a/test/go.sum +++ b/test/go.sum @@ -5,12 +5,14 @@ github.com/Microsoft/go-winio v0.6.0 h1:slsWYD/zyx7lCXoZVlvQrj0hPTM1HI4+v1sIda2y github.com/Microsoft/go-winio v0.6.0/go.mod h1:cTAf44im0RAYeL23bpB+fzCyDH2MJiz2BO69KH/soAE= github.com/RyuaNerin/go-krypto v1.0.2 h1:9KiZrrBs+tDrQ66dNy4nrX6SzntKtSKdm0wKHhdB4WM= github.com/RyuaNerin/go-krypto v1.0.2/go.mod h1:17LzMeJCgzGTkPH3TmfzRnEJ/yA7ErhTPp9sxIqONtA= +github.com/RyuaNerin/go-krypto v1.3.0/go.mod h1:9R9TU936laAIqAmjcHo/LsaXYOZlymudOAxjaBf62UM= github.com/Yawning/aez v0.0.0-20211027044916-e49e68abd344 h1:cDVUiFo+npB0ZASqnw4q90ylaVAbnYyx0JYqK4YcGok= github.com/Yawning/aez v0.0.0-20211027044916-e49e68abd344/go.mod h1:9pIqrY6SXNL8vjRQE5Hd/OL5GyK/9MrGUWs87z/eFfk= github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da h1:KjTM2ks9d14ZYCvmHS9iAKVt9AyzRSqNU1qabPih5BY= github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da/go.mod h1:eHEWzANqSiWQsof+nXEI9bUVUyV6F53Fp89EuCh2EAA= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +github.com/andybalholm/brotli v1.0.6/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= @@ -21,6 +23,7 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/coreos/go-iptables v0.7.0 h1:XWM3V+MPRr5/q51NuWSgU0fqMad64Zyxs8ZUoMsamr8= github.com/coreos/go-iptables v0.7.0/go.mod h1:Qe8Bv2Xik5FyTXwgIbLAnv2sWSBmvWdFETJConOQ//Q= +github.com/coreos/go-iptables v0.8.0/go.mod h1:Qe8Bv2Xik5FyTXwgIbLAnv2sWSBmvWdFETJConOQ//Q= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -28,6 +31,7 @@ github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/dlclark/regexp2 v1.10.0 h1:+/GIL799phkJqYW+3YbOd8LCcbHzT0Pbo8zl70MHsq0= github.com/dlclark/regexp2 v1.10.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/dlclark/regexp2 v1.12.0/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBirtxJnzDrHLEKxTAYk= github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/docker v20.10.21+incompatible h1:UTLdBmHk3bEY+w8qeO5KttOhy6OmXWsl/FEet9Uswog= @@ -38,6 +42,7 @@ github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/ericlagergren/aegis v0.0.0-20230312195928-b4ce538b56f9 h1:/5RkVc9Rc81XmMyVqawCiDyrBHZbLAZgTTCqou4mwj8= github.com/ericlagergren/aegis v0.0.0-20230312195928-b4ce538b56f9/go.mod h1:hkIFzoiIPZYxdFOOLyDho59b7SrDfo+w3h+yWdlg45I= +github.com/ericlagergren/aegis v0.0.0-20250325060835-cd0defd64358/go.mod h1:hkIFzoiIPZYxdFOOLyDho59b7SrDfo+w3h+yWdlg45I= github.com/ericlagergren/polyval v0.0.0-20220411101811-e25bc10ba391 h1:8j2RH289RJplhA6WfdaPqzg1MjH2K8wX5e0uhAxrw2g= github.com/ericlagergren/polyval v0.0.0-20220411101811-e25bc10ba391/go.mod h1:K2R7GhgxrlJzHw2qiPWsCZXf/kXEJN9PLnQK73Ll0po= github.com/ericlagergren/saferand v0.0.0-20220206064634-960a4dd2bc5c h1:RUzBDdZ+e/HEe2Nh8lYsduiPAZygUfVXJn0Ncj5sHMg= @@ -48,6 +53,7 @@ github.com/ericlagergren/subtle v0.0.0-20220507045147-890d697da010/go.mod h1:JtB github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/gaukas/godicttls v0.0.4 h1:NlRaXb3J6hAnTmWdsEKb9bcSBD6BvcIjdGdeb0zfXbk= github.com/gaukas/godicttls v0.0.4/go.mod h1:l6EenT4TLWgTdwslVb4sEMOCf7Bv0JAK67deKr9/NCI= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= @@ -62,8 +68,10 @@ github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.3.1 h1:Qi34dfLMWJbiKaNbDVzM9x27nZBjmkaW6i4+Ku+pGVU= github.com/gobwas/ws v1.3.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY= +github.com/gobwas/ws v1.4.0/go.mod h1:G3gNqMNtPppf5XUz7O4shetPpcZ1VJ7zt18dlUeakrc= github.com/gofrs/uuid/v5 v5.0.0 h1:p544++a97kEL+svbcFbCQVM9KFu0Yo25UoISXGNNH9M= github.com/gofrs/uuid/v5 v5.0.0/go.mod h1:CDOjlDMVAtN56jqyRUZh58JT31Tiw7/oQyEXZV+9bD8= +github.com/gofrs/uuid/v5 v5.4.0/go.mod h1:CDOjlDMVAtN56jqyRUZh58JT31Tiw7/oQyEXZV+9bD8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -71,6 +79,7 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= +github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -78,12 +87,14 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/tink/go v1.6.1 h1:t7JHqO8Ath2w2ig5vjwQYJzhGEZymedQc90lQXUBa4I= github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE= github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/insomniacslk/dhcp v0.0.0-20231016090811-6a2c8fbdcc1c h1:PgxFEySCI41sH0mB7/2XswdXbUykQsRUGod8Rn+NubM= github.com/insomniacslk/dhcp v0.0.0-20231016090811-6a2c8fbdcc1c/go.mod h1:3A9PQ1cunSDF/1rbTq99Ts4pVnycWg+vlPkfeD2NLFI= +github.com/insomniacslk/dhcp v0.0.0-20250109001534-8abf58130905/go.mod h1:VvGYjkZoJyKqlmT1yzakUs4mfKMNB0XdODP0+rdml6k= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/josharian/native v1.0.1-0.20221213033349-c1e37c09b531/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= @@ -94,6 +105,7 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc= github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -112,22 +124,31 @@ github.com/metacubex/gopacket v1.1.20-0.20230608035415-7e2f98a3e759 h1:cjd4biTvO github.com/metacubex/gopacket v1.1.20-0.20230608035415-7e2f98a3e759/go.mod h1:UHOv2xu+RIgLwpXca7TLrXleEd4oR3sPatW6IF8wU88= github.com/metacubex/gvisor v0.0.0-20231001104248-0f672c3fb8d8 h1:npBvaPAT145UY8682AzpUMWpdIxJti/WPLjy7gCiYYs= github.com/metacubex/gvisor v0.0.0-20231001104248-0f672c3fb8d8/go.mod h1:ZR6Gas7P1GcADCVBc1uOrA0bLQqDDyp70+63fD/BE2c= +github.com/metacubex/gvisor v0.0.0-20251227095601-261ec1326fe8/go.mod h1:8LpS0IJW1VmWzUm3ylb0e2SK5QDm5lO/2qwWLZgRpBU= github.com/metacubex/quic-go v0.40.1-0.20231130135418-0c1b47cf9394 h1:dIT+KB2hknBCrwVAXPeY9tpzzkOZP5m40yqUteRT6/Y= github.com/metacubex/quic-go v0.40.1-0.20231130135418-0c1b47cf9394/go.mod h1:F/t8VnA47xoia8ABlNA4InkZjssvFJ5p6E6jKdbkgAs= +github.com/metacubex/quic-go v0.59.1-0.20260413153657-53bb22f2c306/go.mod h1:oNzMrmylS897M3zSMuapIdwSwfq6F2qW01Z3NhVRJhk= github.com/metacubex/sing-quic v0.0.0-20231130141855-0022295e524b h1:7XXoEePvxfkQN9b2wB8UXU3uzb9uL8syEFF7A9VAKKQ= github.com/metacubex/sing-quic v0.0.0-20231130141855-0022295e524b/go.mod h1:Gu5/zqZDd5G1AUtoV2yjAPWOEy7zwbU2DBUjdxJh0Kw= +github.com/metacubex/sing-quic v0.0.0-20260414034501-3ea3410d197a/go.mod h1:6ayFGfzzBE85csgQkM3gf4neFq6s0losHlPRSxY+nuk= github.com/metacubex/sing-shadowsocks v0.2.5 h1:O2RRSHlKGEpAVG/OHJQxyHqDy8uvvdCW/oW2TDBOIhc= github.com/metacubex/sing-shadowsocks v0.2.5/go.mod h1:Xz2uW9BEYGEoA8B4XEpoxt7ERHClFCwsMAvWaruoyMo= +github.com/metacubex/sing-shadowsocks v0.2.12/go.mod h1:2e5EIaw0rxKrm1YTRmiMnDulwbGxH9hAFlrwQLQMQkU= github.com/metacubex/sing-shadowsocks2 v0.1.4 h1:OOCf8lgsVcpTOJUeaFAMzyKVebaQOBnKirDdUdBoKIE= github.com/metacubex/sing-shadowsocks2 v0.1.4/go.mod h1:Qz028sLfdY3qxGRm9FDI+IM2Ae3ty2wR7HIzD/56h/k= +github.com/metacubex/sing-shadowsocks2 v0.2.7/go.mod h1:vOEbfKC60txi0ca+yUlqEwOGc3Obl6cnSgx9Gf45KjE= github.com/metacubex/sing-tun v0.1.15-0.20231103033938-170591e8d5bd h1:k0+92eARqyTAovGhg2AxdsMWHjUsdiGCnR5NuXF3CQY= github.com/metacubex/sing-tun v0.1.15-0.20231103033938-170591e8d5bd/go.mod h1:Q7zmpJ+qOvMMXyUoYlxGQuWkqALUpXzFSSqO+KLPyzA= +github.com/metacubex/sing-tun v0.4.17/go.mod h1:L/TjQY5JEGy8nvsuYmy/XgMFMCPiF0+AWSFCYfS6r9w= github.com/metacubex/sing-vmess v0.1.9-0.20230921005247-a0488d7dac74 h1:FtupiyFkaVjFvRa7B/uDtRWg5BNsoyPC9MTev3sDasY= github.com/metacubex/sing-vmess v0.1.9-0.20230921005247-a0488d7dac74/go.mod h1:8EWBZpc+qNvf5gmvjAtMHK1/DpcWqzfcBL842K00BsM= +github.com/metacubex/sing-vmess v0.2.5/go.mod h1:AwtlzUgf8COe9tRYAKqWZ+leDH7p5U98a0ZUpYehl8Q= github.com/metacubex/sing-wireguard v0.0.0-20231001110902-321836559170 h1:DBGA0hmrP4pVIwLiXUONdphjcppED+plmVaKf1oqkwk= github.com/metacubex/sing-wireguard v0.0.0-20231001110902-321836559170/go.mod h1:/VbJfbdLnANE+SKXyMk/96sTRrD4GdFLh5mkegqqFcY= +github.com/metacubex/sing-wireguard v0.0.0-20250503063753-2dc62acc626f/go.mod h1:jpAkVLPnCpGSfNyVmj6Cq4YbuZsFepm/Dc+9BAOcR80= github.com/miekg/dns v1.1.57 h1:Jzi7ApEIzwEPLHWRcafCN9LZSBbqQpxjt/wpgvg7wcM= github.com/miekg/dns v1.1.57/go.mod h1:uqRjCRUuEAA6qsOiJvDd+CFo/vW+y5WR6SNmHE55hZk= +github.com/miekg/dns v1.1.63/go.mod h1:6NGHfjhpmr5lt3XPLuyfDJi5AXbNIPM9PY6H6sF1Nfs= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= @@ -171,6 +192,7 @@ github.com/sagernet/go-tun2socks v1.16.12-0.20220818015926-16cb67876a61 h1:5+m7c github.com/sagernet/go-tun2socks v1.16.12-0.20220818015926-16cb67876a61/go.mod h1:QUQ4RRHD6hGGHdFMEtR8T2P6GS6R3D/CXKdaYHKKXms= github.com/sagernet/netlink v0.0.0-20220905062125-8043b4a9aa97 h1:iL5gZI3uFp0X6EslacyapiRz7LLSJyr4RajF/BhMVyE= github.com/sagernet/netlink v0.0.0-20220905062125-8043b4a9aa97/go.mod h1:xLnfdiJbSp8rNqYEdIW/6eDO4mVoogml14Bh2hSiFpM= +github.com/sagernet/netlink v0.0.0-20240612041022-b9a21c07ac6a/go.mod h1:xLnfdiJbSp8rNqYEdIW/6eDO4mVoogml14Bh2hSiFpM= github.com/sagernet/sing v0.0.0-20220817130738-ce854cda8522/go.mod h1:QVsS5L/ZA2Q5UhQwLrn0Trw+msNd/NPGEhBKR/ioWiY= github.com/sagernet/sing v0.1.8/go.mod h1:jt1w2u7lJQFFSGLiRrRIs5YWmx4kAPfWuOejuDW9qMk= github.com/sagernet/sing v0.2.18-0.20231108041402-4fbbd193203c h1:uask61Pxc3nGqsOSjqnBKrwfODWRoEa80lXm04LNk0E= @@ -189,6 +211,7 @@ github.com/sagernet/wireguard-go v0.0.0-20230807125731-5d4a7ef2dc5f h1:Kvo8w8Y9l github.com/sagernet/wireguard-go v0.0.0-20230807125731-5d4a7ef2dc5f/go.mod h1:mySs0abhpc/gLlvhoq7HP1RzOaRmIXVeZGCh++zoApk= github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= +github.com/samber/lo v1.53.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= github.com/scjalliance/comshim v0.0.0-20230315213746-5e51f40bd3b9 h1:rc/CcqLH3lh8n+csdOuDfP+NuykE0U6AeYSJJHKDgSg= github.com/scjalliance/comshim v0.0.0-20230315213746-5e51f40bd3b9/go.mod h1:a/83NAfUXvEuLpmxDssAXxgUgrEy12MId3Wd7OTs76s= github.com/shirou/gopsutil/v3 v3.23.10 h1:/N42opWlYzegYaVkWejXWJpbzKv2JDy3mrgGzKsh9hM= @@ -205,6 +228,7 @@ github.com/sina-ghaderi/rabbitio v0.0.0-20220730151941-9ce26f4f872e h1:ur8uMsPIF github.com/sina-ghaderi/rabbitio v0.0.0-20220730151941-9ce26f4f872e/go.mod h1:+e5fBW3bpPyo+3uLo513gIUblc03egGjMM0+5GKbzK8= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/sirupsen/logrus v1.9.4/go.mod h1:ftWc9WdOfJ0a92nsE2jF5u5ZwH8Bv2zdeOC42RjbV2g= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -215,6 +239,7 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= @@ -226,6 +251,7 @@ github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17 github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695APm9hlsSMoOoE65U4/TcqNj90mc69Rlg= github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= +github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -238,32 +264,39 @@ gitlab.com/yawning/bsaes.git v0.0.0-20190805113838-0a714cd429ec h1:FpfFs4EhNehiV gitlab.com/yawning/bsaes.git v0.0.0-20190805113838-0a714cd429ec/go.mod h1:BZ1RAoRPbCxum9Grlv5aeksu2H8BiKehBYooU2LFiOQ= go.uber.org/mock v0.3.0 h1:3mUxI1No2/60yUYax92Pt8eNOEecx2D3lcXZh2NEZJo= go.uber.org/mock v0.3.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go4.org/netipx v0.0.0-20230824141953-6213f710f925 h1:eeQDDVKFkx0g4Hyy8pHgmZaK0EqB4SD6rvKbUdN3ziQ= go4.org/netipx v0.0.0-20230824141953-6213f710f925/go.mod h1:PLyyIXexvUFg3Owu6p/WfdlivPbZJsZdgWZlrGope/Y= +go4.org/netipx v0.0.0-20231129151722-fdeea329fbba/go.mod h1:PLyyIXexvUFg3Owu6p/WfdlivPbZJsZdgWZlrGope/Y= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= +golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190606203320-7fc4e5ec1444/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -284,12 +317,15 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= @@ -297,6 +333,7 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.15.0 h1:zdAyfUGbYmuVokhzVmghFl2ZJh5QhcfebBgmVPFYA+8= golang.org/x/tools v0.15.0/go.mod h1:hpksKq4dtpQWS1uQ61JkdqWM3LscIS6Slf+VVkm+wQk= +golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -304,6 +341,7 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/transport/mkcp/buffer.go b/transport/mkcp/buffer.go new file mode 100644 index 0000000000..e97a386db5 --- /dev/null +++ b/transport/mkcp/buffer.go @@ -0,0 +1,207 @@ +package mkcp + +import ( + "errors" + "io" + "sync" +) + +// bufSize is the size of a regular buffer, matching xray-core's buf.Size so that +// a single buffer always fits one UDP datagram worth of mKCP payload. +const bufSize = 8192 + +var errBufferFull = errors.New("buffer is full") + +var pool = sync.Pool{New: func() any { return make([]byte, bufSize) }} + +// Buffer is a recyclable fixed-capacity byte buffer, a trimmed port of +// xray-core's common/buf.Buffer holding only the methods the mKCP core needs. +type Buffer struct { + v []byte + start int32 + end int32 +} + +// New creates a Buffer with 0 length and bufSize capacity. +func New() *Buffer { + return &Buffer{v: pool.Get().([]byte)} +} + +// StackNew creates a Buffer value (intended to be released in the same function). +func StackNew() Buffer { + return Buffer{v: pool.Get().([]byte)} +} + +// Release recycles the underlying byte array into the pool. +func (b *Buffer) Release() { + if b == nil || b.v == nil { + return + } + p := b.v + b.v = nil + b.Clear() + if cap(p) >= bufSize { + pool.Put(p[:bufSize]) + } +} + +// Clear resets the buffer to empty without releasing the array. +func (b *Buffer) Clear() { + b.start = 0 + b.end = 0 +} + +// Bytes returns the content of the buffer. +func (b *Buffer) Bytes() []byte { + return b.v[b.start:b.end] +} + +// Extend grows the buffer by n bytes and returns the (zeroed) extended part. +func (b *Buffer) Extend(n int32) []byte { + end := b.end + n + if end > int32(len(b.v)) { + panic("mkcp: extending out of bound") + } + ext := b.v[b.end:end] + b.end = end + for i := range ext { + ext[i] = 0 + } + return ext +} + +// BytesFrom returns the content of the buffer starting at the given position. +func (b *Buffer) BytesFrom(from int32) []byte { + if from < 0 { + from += b.Len() + } + return b.v[b.start+from : b.end] +} + +// Len returns the length of the content. +func (b *Buffer) Len() int32 { + if b == nil { + return 0 + } + return b.end - b.start +} + +// IsEmpty returns true if the buffer has no content. +func (b *Buffer) IsEmpty() bool { + return b.Len() == 0 +} + +// IsFull returns true if the buffer cannot grow further. +func (b *Buffer) IsFull() bool { + return b != nil && b.end == int32(len(b.v)) +} + +// Write appends data into the buffer. +func (b *Buffer) Write(data []byte) (int, error) { + n := copy(b.v[b.end:], data) + b.end += int32(n) + if n < len(data) { + return n, errBufferFull + } + return n, nil +} + +// Read implements io.Reader, draining from the front of the buffer. +func (b *Buffer) Read(data []byte) (int, error) { + if b.Len() == 0 { + return 0, io.EOF + } + n := copy(data, b.v[b.start:b.end]) + if int32(n) == b.Len() { + b.Clear() + } else { + b.start += int32(n) + } + return n, nil +} + +// ReadFrom reads once from reader into the free space of the buffer. +func (b *Buffer) ReadFrom(reader io.Reader) (int64, error) { + n, err := reader.Read(b.v[b.end:]) + b.end += int32(n) + return int64(n), err +} + +// ReadFullFrom reads exactly size bytes from reader, or until an error occurs. +func (b *Buffer) ReadFullFrom(reader io.Reader, size int32) (int64, error) { + end := b.end + size + if end > int32(len(b.v)) { + return 0, errors.New("mkcp: read out of bound") + } + n, err := io.ReadFull(reader, b.v[b.end:end]) + b.end += int32(n) + return int64(n), err +} + +// MultiBuffer is an ordered list of Buffers. +type MultiBuffer []*Buffer + +// ReleaseMulti releases all buffers and returns an emptied slice. +func ReleaseMulti(mb MultiBuffer) MultiBuffer { + for i := range mb { + mb[i].Release() + mb[i] = nil + } + return mb[:0] +} + +// IsEmpty returns true if the MultiBuffer holds no content. +func (mb MultiBuffer) IsEmpty() bool { + for _, b := range mb { + if !b.IsEmpty() { + return false + } + } + return true +} + +// SplitBytes copies bytes from the front of the MultiBuffer into b, releasing +// drained buffers, and returns the leftover MultiBuffer and bytes written. +func SplitBytes(mb MultiBuffer, b []byte) (MultiBuffer, int) { + totalBytes := 0 + endIndex := -1 + for i := range mb { + pBuffer := mb[i] + nBytes, _ := pBuffer.Read(b) + totalBytes += nBytes + b = b[nBytes:] + if !pBuffer.IsEmpty() { + endIndex = i + break + } + pBuffer.Release() + mb[i] = nil + } + if endIndex == -1 { + mb = mb[:0] + } else { + mb = mb[endIndex:] + } + return mb, totalBytes +} + +// MultiBufferContainer wraps a MultiBuffer as an io.ReadCloser. +type MultiBufferContainer struct { + MultiBuffer +} + +// Read implements io.Reader. +func (c *MultiBufferContainer) Read(b []byte) (int, error) { + if c.MultiBuffer.IsEmpty() { + return 0, io.EOF + } + mb, nBytes := SplitBytes(c.MultiBuffer, b) + c.MultiBuffer = mb + return nBytes, nil +} + +// Close implements io.Closer. +func (c *MultiBufferContainer) Close() error { + c.MultiBuffer = ReleaseMulti(c.MultiBuffer) + return nil +} diff --git a/transport/mkcp/client.go b/transport/mkcp/client.go new file mode 100644 index 0000000000..82d3d7947c --- /dev/null +++ b/transport/mkcp/client.go @@ -0,0 +1,104 @@ +package mkcp + +import ( + "io" + "net" + "sync/atomic" + + "github.com/metacubex/randv2" +) + +var globalConv = randv2.Uint32() + +// packetConnWrapper turns an unconnected net.PacketConn into a stream-like +// io.ReadWriteCloser bound to a single remote address, which is what the mKCP +// Connection expects as its underlying transport. +type packetConnWrapper struct { + net.PacketConn + remote net.Addr +} + +func (c *packetConnWrapper) Read(b []byte) (int, error) { + n, _, err := c.PacketConn.ReadFrom(b) + return n, err +} + +func (c *packetConnWrapper) Write(b []byte) (int, error) { + return c.PacketConn.WriteTo(b, c.remote) +} + +func (c *packetConnWrapper) RemoteAddr() net.Addr { + return c.remote +} + +func fetchInput(input io.Reader, reader PacketReader, conn *Connection) { + cache := make(chan *Buffer, 1024) + go func() { + for { + payload := New() + if _, err := payload.ReadFrom(input); err != nil { + payload.Release() + close(cache) + return + } + select { + case cache <- payload: + default: + payload.Release() + } + } + }() + + for payload := range cache { + segments := reader.Read(payload.Bytes()) + payload.Release() + if len(segments) > 0 { + conn.Input(segments) + } + } +} + +// Dial wraps an established UDP net.PacketConn into an mKCP *Connection talking +// to remote. The returned Connection implements net.Conn. It takes ownership of +// pc and closes it when the connection terminates. +func Dial(pc net.PacketConn, remote net.Addr, config *Config) (*Connection, error) { + conv := uint16(atomic.AddUint32(&globalConv, 1)) + return dialConv(pc, remote, config, conv) +} + +// dialConv is Dial with an explicit conversation id, used by Dial and tests. +func dialConv(pc net.PacketConn, remote net.Addr, config *Config, conv uint16) (*Connection, error) { + if config == nil { + config = &Config{} + } + + header, err := config.GetPackerHeader() + if err != nil { + return nil, err + } + security, err := config.GetSecurity() + if err != nil { + return nil, err + } + + rawConn := &packetConnWrapper{PacketConn: pc, remote: remote} + reader := &KCPPacketReader{ + Header: header, + Security: security, + } + writer := &KCPPacketWriter{ + Header: header, + Security: security, + Writer: rawConn, + } + + session := NewConnection(ConnMetadata{ + LocalAddr: pc.LocalAddr(), + RemoteAddr: remote, + Conversation: conv, + }, writer, rawConn, config) + + go fetchInput(rawConn, reader, session) + + return session, nil +} diff --git a/transport/mkcp/config.go b/transport/mkcp/config.go new file mode 100644 index 0000000000..5c397a66af --- /dev/null +++ b/transport/mkcp/config.go @@ -0,0 +1,100 @@ +package mkcp + +import ( + "crypto/cipher" +) + +// Config holds the mKCP tuning and security parameters. Field semantics and +// defaults match Xray's kcpSettings so a mihomo client interoperates with an +// Xray mKCP server. +type Config struct { + Mtu uint32 // maximum transmission unit, default 1350 + Tti uint32 // transmission time interval in ms, default 50 + UplinkCapacity uint32 // uplink bandwidth in MB/s, default 5 + DownlinkCapacity uint32 // downlink bandwidth in MB/s, default 20 + Congestion bool // enable congestion control, default false + WriteBufferSize uint32 // write buffer in bytes, default 2MB + ReadBufferSize uint32 // read buffer in bytes, default 2MB + Seed string // security seed; empty uses the default authenticator + Header string // header masquerade type (none/srtp/utp/wechat-video/wireguard) +} + +func (c *Config) GetMTUValue() uint32 { + if c == nil || c.Mtu == 0 { + return 1350 + } + return c.Mtu +} + +func (c *Config) GetTTIValue() uint32 { + if c == nil || c.Tti == 0 { + return 50 + } + return c.Tti +} + +func (c *Config) GetUplinkCapacityValue() uint32 { + if c == nil || c.UplinkCapacity == 0 { + return 5 + } + return c.UplinkCapacity +} + +func (c *Config) GetDownlinkCapacityValue() uint32 { + if c == nil || c.DownlinkCapacity == 0 { + return 20 + } + return c.DownlinkCapacity +} + +func (c *Config) GetWriteBufferSize() uint32 { + if c == nil || c.WriteBufferSize == 0 { + return 2 * 1024 * 1024 + } + return c.WriteBufferSize +} + +func (c *Config) GetReadBufferSize() uint32 { + if c == nil || c.ReadBufferSize == 0 { + return 2 * 1024 * 1024 + } + return c.ReadBufferSize +} + +// GetSecurity returns the AEAD used to authenticate/encrypt packets. A non-empty +// Seed selects AES-128-GCM; otherwise the legacy SimpleAuthenticator is used. +func (c *Config) GetSecurity() (cipher.AEAD, error) { + if c.Seed != "" { + return NewAEADAESGCMBasedOnSeed(c.Seed), nil + } + return NewSimpleAuthenticator(), nil +} + +// GetPackerHeader returns the configured header masquerade, or nil for none. +func (c *Config) GetPackerHeader() (PacketHeader, error) { + return NewPacketHeader(c.Header) +} + +func (c *Config) GetSendingInFlightSize() uint32 { + size := c.GetUplinkCapacityValue() * 1024 * 1024 / c.GetMTUValue() / (1000 / c.GetTTIValue()) + if size < 8 { + size = 8 + } + return size +} + +func (c *Config) GetSendingBufferSize() uint32 { + return c.GetWriteBufferSize() / c.GetMTUValue() +} + +func (c *Config) GetReceivingInFlightSize() uint32 { + size := c.GetDownlinkCapacityValue() * 1024 * 1024 / c.GetMTUValue() / (1000 / c.GetTTIValue()) + if size < 8 { + size = 8 + } + return size +} + +func (c *Config) GetReceivingBufferSize() uint32 { + return c.GetReadBufferSize() / c.GetMTUValue() +} diff --git a/transport/mkcp/connection.go b/transport/mkcp/connection.go new file mode 100644 index 0000000000..9677114e4d --- /dev/null +++ b/transport/mkcp/connection.go @@ -0,0 +1,658 @@ +package mkcp + +import ( + "bytes" + "errors" + "io" + "net" + "runtime" + "sync" + "sync/atomic" + "time" + + "github.com/metacubex/mihomo/log" +) + +var ( + ErrIOTimeout = errors.New("read/write timeout") + ErrClosedConnection = errors.New("connection closed") +) + +// State of the connection +type State int32 + +// Is returns true if current State is one of the candidates. +func (s State) Is(states ...State) bool { + for _, state := range states { + if s == state { + return true + } + } + return false +} + +const ( + StateActive State = 0 // Connection is active + StateReadyToClose State = 1 // Connection is closed locally + StatePeerClosed State = 2 // Connection is closed on remote + StateTerminating State = 3 // Connection is ready to be destroyed locally + StatePeerTerminating State = 4 // Connection is ready to be destroyed on remote + StateTerminated State = 5 // Connection is destroyed. +) + +func nowMillisec() int64 { + now := time.Now() + return now.Unix()*1000 + int64(now.Nanosecond()/1000000) +} + +type RoundTripInfo struct { + sync.RWMutex + variation uint32 + srtt uint32 + rto uint32 + minRtt uint32 + updatedTimestamp uint32 +} + +func (info *RoundTripInfo) UpdatePeerRTO(rto uint32, current uint32) { + info.Lock() + defer info.Unlock() + + if current-info.updatedTimestamp < 3000 { + return + } + + info.updatedTimestamp = current + info.rto = rto +} + +func (info *RoundTripInfo) Update(rtt uint32, current uint32) { + if rtt > 0x7FFFFFFF { + return + } + info.Lock() + defer info.Unlock() + + // https://tools.ietf.org/html/rfc6298 + if info.srtt == 0 { + info.srtt = rtt + info.variation = rtt / 2 + } else { + delta := rtt - info.srtt + if info.srtt > rtt { + delta = info.srtt - rtt + } + info.variation = (3*info.variation + delta) / 4 + info.srtt = (7*info.srtt + rtt) / 8 + if info.srtt < info.minRtt { + info.srtt = info.minRtt + } + } + var rto uint32 + if info.minRtt < 4*info.variation { + rto = info.srtt + 4*info.variation + } else { + rto = info.srtt + info.variation + } + + if rto > 10000 { + rto = 10000 + } + info.rto = rto * 5 / 4 + info.updatedTimestamp = current +} + +func (info *RoundTripInfo) Timeout() uint32 { + info.RLock() + defer info.RUnlock() + + return info.rto +} + +func (info *RoundTripInfo) SmoothedTime() uint32 { + info.RLock() + defer info.RUnlock() + + return info.srtt +} + +type Updater struct { + interval int64 + shouldContinue func() bool + shouldTerminate func() bool + updateFunc func() + notifier *semaphore +} + +func NewUpdater(interval uint32, shouldContinue func() bool, shouldTerminate func() bool, updateFunc func()) *Updater { + u := &Updater{ + interval: int64(time.Duration(interval) * time.Millisecond), + shouldContinue: shouldContinue, + shouldTerminate: shouldTerminate, + updateFunc: updateFunc, + notifier: newSemaphore(1), + } + return u +} + +func (u *Updater) WakeUp() { + select { + case <-u.notifier.Wait(): + go u.run() + default: + } +} + +func (u *Updater) run() { + defer u.notifier.Signal() + + if u.shouldTerminate() { + return + } + ticker := time.NewTicker(u.Interval()) + for u.shouldContinue() { + u.updateFunc() + <-ticker.C + } + ticker.Stop() +} + +func (u *Updater) Interval() time.Duration { + return time.Duration(atomic.LoadInt64(&u.interval)) +} + +func (u *Updater) SetInterval(d time.Duration) { + atomic.StoreInt64(&u.interval, int64(d)) +} + +type ConnMetadata struct { + LocalAddr net.Addr + RemoteAddr net.Addr + Conversation uint16 +} + +// Connection is a KCP connection over UDP. +type Connection struct { + meta ConnMetadata + closer io.Closer + rd time.Time + wd time.Time // write deadline + since int64 + dataInput *notifier + dataOutput *notifier + Config *Config + + state State + stateBeginTime uint32 + lastIncomingTime uint32 + lastPingTime uint32 + + mss uint32 + roundTrip *RoundTripInfo + + receivingWorker *ReceivingWorker + sendingWorker *SendingWorker + + output SegmentWriter + + dataUpdater *Updater + pingUpdater *Updater +} + +// NewConnection create a new KCP connection between local and remote. +func NewConnection(meta ConnMetadata, writer PacketWriter, closer io.Closer, config *Config) *Connection { + log.Debugln("[mkcp] #%d creating connection to %s", meta.Conversation, meta.RemoteAddr) + + conn := &Connection{ + meta: meta, + closer: closer, + since: nowMillisec(), + dataInput: newNotifier(), + dataOutput: newNotifier(), + Config: config, + output: NewRetryableWriter(NewSegmentWriter(writer)), + mss: config.GetMTUValue() - uint32(writer.Overhead()) - DataSegmentOverhead, + roundTrip: &RoundTripInfo{ + rto: 100, + minRtt: config.GetTTIValue(), + }, + } + + conn.receivingWorker = NewReceivingWorker(conn) + conn.sendingWorker = NewSendingWorker(conn) + + isTerminating := func() bool { + return conn.State().Is(StateTerminating, StateTerminated) + } + isTerminated := func() bool { + return conn.State() == StateTerminated + } + conn.dataUpdater = NewUpdater( + config.GetTTIValue(), + func() bool { + return !isTerminating() && (conn.sendingWorker.UpdateNecessary() || conn.receivingWorker.UpdateNecessary()) + }, + isTerminating, + conn.updateTask) + conn.pingUpdater = NewUpdater( + 5000, // 5 seconds + func() bool { return !isTerminated() }, + isTerminated, + conn.updateTask) + conn.pingUpdater.WakeUp() + + return conn +} + +func (c *Connection) Elapsed() uint32 { + return uint32(nowMillisec() - c.since) +} + +// ReadMultiBuffer reads queued data as a MultiBuffer. +func (c *Connection) ReadMultiBuffer() (MultiBuffer, error) { + if c == nil { + return nil, io.EOF + } + + for { + if c.State().Is(StateReadyToClose, StateTerminating, StateTerminated) { + return nil, io.EOF + } + mb := c.receivingWorker.ReadMultiBuffer() + if !mb.IsEmpty() { + c.dataUpdater.WakeUp() + return mb, nil + } + + if c.State() == StatePeerTerminating { + return nil, io.EOF + } + + if err := c.waitForDataInput(); err != nil { + return nil, err + } + } +} + +func (c *Connection) waitForDataInput() error { + for i := 0; i < 16; i++ { + select { + case <-c.dataInput.Wait(): + return nil + default: + runtime.Gosched() + } + } + + duration := time.Second * 16 + if !c.rd.IsZero() { + duration = time.Until(c.rd) + if duration < 0 { + return ErrIOTimeout + } + } + + timeout := time.NewTimer(duration) + defer timeout.Stop() + + select { + case <-c.dataInput.Wait(): + case <-timeout.C: + if !c.rd.IsZero() && c.rd.Before(time.Now()) { + return ErrIOTimeout + } + } + + return nil +} + +// Read implements the Conn Read method. +func (c *Connection) Read(b []byte) (int, error) { + if c == nil { + return 0, io.EOF + } + + for { + if c.State().Is(StateReadyToClose, StateTerminating, StateTerminated) { + return 0, io.EOF + } + nBytes := c.receivingWorker.Read(b) + if nBytes > 0 { + c.dataUpdater.WakeUp() + return nBytes, nil + } + + if err := c.waitForDataInput(); err != nil { + return 0, err + } + } +} + +func (c *Connection) waitForDataOutput() error { + for i := 0; i < 16; i++ { + select { + case <-c.dataOutput.Wait(): + return nil + default: + runtime.Gosched() + } + } + + duration := time.Second * 16 + if !c.wd.IsZero() { + duration = time.Until(c.wd) + if duration < 0 { + return ErrIOTimeout + } + } + + timeout := time.NewTimer(duration) + defer timeout.Stop() + + select { + case <-c.dataOutput.Wait(): + case <-timeout.C: + if !c.wd.IsZero() && c.wd.Before(time.Now()) { + return ErrIOTimeout + } + } + + return nil +} + +// Write implements io.Writer. +func (c *Connection) Write(b []byte) (int, error) { + reader := bytes.NewReader(b) + if err := c.writeMultiBufferInternal(reader); err != nil { + return 0, err + } + return len(b), nil +} + +// WriteMultiBuffer writes a MultiBuffer to the connection. +func (c *Connection) WriteMultiBuffer(mb MultiBuffer) error { + reader := &MultiBufferContainer{ + MultiBuffer: mb, + } + defer reader.Close() + + return c.writeMultiBufferInternal(reader) +} + +func (c *Connection) writeMultiBufferInternal(reader io.Reader) error { + updatePending := false + defer func() { + if updatePending { + c.dataUpdater.WakeUp() + } + }() + + var b *Buffer + defer b.Release() + + for { + for { + if c == nil || c.State() != StateActive { + return io.ErrClosedPipe + } + + if b == nil { + b = New() + _, err := b.ReadFrom(io.LimitReader(reader, int64(c.mss))) + if err != nil { + return nil + } + } + + if !c.sendingWorker.Push(b) { + break + } + updatePending = true + b = nil + } + + if updatePending { + c.dataUpdater.WakeUp() + updatePending = false + } + + if err := c.waitForDataOutput(); err != nil { + return err + } + } +} + +func (c *Connection) SetState(state State) { + current := c.Elapsed() + atomic.StoreInt32((*int32)(&c.state), int32(state)) + atomic.StoreUint32(&c.stateBeginTime, current) + log.Debugln("[mkcp] #%d entering state %d at %d", c.meta.Conversation, state, current) + + switch state { + case StateReadyToClose: + c.receivingWorker.CloseRead() + case StatePeerClosed: + c.sendingWorker.CloseWrite() + case StateTerminating: + c.receivingWorker.CloseRead() + c.sendingWorker.CloseWrite() + c.pingUpdater.SetInterval(time.Second) + case StatePeerTerminating: + c.sendingWorker.CloseWrite() + c.pingUpdater.SetInterval(time.Second) + case StateTerminated: + c.receivingWorker.CloseRead() + c.sendingWorker.CloseWrite() + c.pingUpdater.SetInterval(time.Second) + c.dataUpdater.WakeUp() + c.pingUpdater.WakeUp() + go c.Terminate() + } +} + +// Close closes the connection. +func (c *Connection) Close() error { + if c == nil { + return ErrClosedConnection + } + + c.dataInput.Signal() + c.dataOutput.Signal() + + switch c.State() { + case StateReadyToClose, StateTerminating, StateTerminated: + return ErrClosedConnection + case StateActive: + c.SetState(StateReadyToClose) + case StatePeerClosed: + c.SetState(StateTerminating) + case StatePeerTerminating: + c.SetState(StateTerminated) + } + + log.Debugln("[mkcp] #%d closing connection to %s", c.meta.Conversation, c.meta.RemoteAddr) + + return nil +} + +// LocalAddr returns the local network address. +func (c *Connection) LocalAddr() net.Addr { + if c == nil { + return nil + } + return c.meta.LocalAddr +} + +// RemoteAddr returns the remote network address. +func (c *Connection) RemoteAddr() net.Addr { + if c == nil { + return nil + } + return c.meta.RemoteAddr +} + +// SetDeadline sets the read and write deadlines. A zero value disables them. +func (c *Connection) SetDeadline(t time.Time) error { + if err := c.SetReadDeadline(t); err != nil { + return err + } + return c.SetWriteDeadline(t) +} + +// SetReadDeadline implements the Conn SetReadDeadline method. +func (c *Connection) SetReadDeadline(t time.Time) error { + if c == nil || c.State() != StateActive { + return ErrClosedConnection + } + c.rd = t + return nil +} + +// SetWriteDeadline implements the Conn SetWriteDeadline method. +func (c *Connection) SetWriteDeadline(t time.Time) error { + if c == nil || c.State() != StateActive { + return ErrClosedConnection + } + c.wd = t + return nil +} + +// kcp update, input loop +func (c *Connection) updateTask() { + c.flush() +} + +func (c *Connection) Terminate() { + if c == nil { + return + } + log.Debugln("[mkcp] #%d terminating connection to %s", c.meta.Conversation, c.RemoteAddr()) + + c.dataInput.Signal() + c.dataOutput.Signal() + + c.closer.Close() + c.sendingWorker.Release() + c.receivingWorker.Release() +} + +func (c *Connection) HandleOption(opt SegmentOption) { + if (opt & SegmentOptionClose) == SegmentOptionClose { + c.OnPeerClosed() + } +} + +func (c *Connection) OnPeerClosed() { + switch c.State() { + case StateReadyToClose: + c.SetState(StateTerminating) + case StateActive: + c.SetState(StatePeerClosed) + } +} + +// Input when you received a low level packet (eg. UDP packet), call it +func (c *Connection) Input(segments []Segment) { + current := c.Elapsed() + atomic.StoreUint32(&c.lastIncomingTime, current) + + for _, seg := range segments { + if seg.Conversation() != c.meta.Conversation { + break + } + + switch seg := seg.(type) { + case *DataSegment: + c.HandleOption(seg.Option) + c.receivingWorker.ProcessSegment(seg) + if c.receivingWorker.IsDataAvailable() { + c.dataInput.Signal() + } + c.dataUpdater.WakeUp() + case *AckSegment: + c.HandleOption(seg.Option) + c.sendingWorker.ProcessSegment(current, seg, c.roundTrip.Timeout()) + c.dataOutput.Signal() + c.dataUpdater.WakeUp() + case *CmdOnlySegment: + c.HandleOption(seg.Option) + if seg.Command() == CommandTerminate { + switch c.State() { + case StateActive, StatePeerClosed: + c.SetState(StatePeerTerminating) + case StateReadyToClose: + c.SetState(StateTerminating) + case StateTerminating: + c.SetState(StateTerminated) + } + } + if seg.Option == SegmentOptionClose || seg.Command() == CommandTerminate { + c.dataInput.Signal() + c.dataOutput.Signal() + } + c.sendingWorker.ProcessReceivingNext(seg.ReceivingNext) + c.receivingWorker.ProcessSendingNext(seg.SendingNext) + c.roundTrip.UpdatePeerRTO(seg.PeerRTO, current) + seg.Release() + default: + } + } +} + +func (c *Connection) flush() { + current := c.Elapsed() + + if c.State() == StateTerminated { + return + } + if c.State() == StateActive && current-atomic.LoadUint32(&c.lastIncomingTime) >= 30000 { + c.Close() + } + if c.State() == StateReadyToClose && c.sendingWorker.IsEmpty() { + c.SetState(StateTerminating) + } + + if c.State() == StateTerminating { + log.Debugln("[mkcp] #%d sending terminating cmd", c.meta.Conversation) + c.Ping(current, CommandTerminate) + + if current-atomic.LoadUint32(&c.stateBeginTime) > 8000 { + c.SetState(StateTerminated) + } + return + } + if c.State() == StatePeerTerminating && current-atomic.LoadUint32(&c.stateBeginTime) > 4000 { + c.SetState(StateTerminating) + } + + if c.State() == StateReadyToClose && current-atomic.LoadUint32(&c.stateBeginTime) > 15000 { + c.SetState(StateTerminating) + } + + // flush acknowledges + c.receivingWorker.Flush(current) + c.sendingWorker.Flush(current) + + if current-atomic.LoadUint32(&c.lastPingTime) >= 3000 { + c.Ping(current, CommandPing) + } +} + +func (c *Connection) State() State { + return State(atomic.LoadInt32((*int32)(&c.state))) +} + +func (c *Connection) Ping(current uint32, cmd Command) { + seg := NewCmdOnlySegment() + seg.Conv = c.meta.Conversation + seg.Cmd = cmd + seg.ReceivingNext = c.receivingWorker.NextNumber() + seg.SendingNext = c.sendingWorker.FirstUnacknowledged() + seg.PeerRTO = c.roundTrip.Timeout() + if c.State() == StateReadyToClose { + seg.Option = SegmentOptionClose + } + c.output.Write(seg) + atomic.StoreUint32(&c.lastPingTime, current) + seg.Release() +} diff --git a/transport/mkcp/crypt.go b/transport/mkcp/crypt.go new file mode 100644 index 0000000000..35f2b2d421 --- /dev/null +++ b/transport/mkcp/crypt.go @@ -0,0 +1,77 @@ +package mkcp + +import ( + "crypto/cipher" + "encoding/binary" + "errors" + "hash/fnv" +) + +// SimpleAuthenticator is the default (no-seed) mKCP authenticator. It is a +// legacy AEAD used for KCP packet integrity, ported verbatim from xray-core so +// that the wire format stays byte-compatible. +type SimpleAuthenticator struct{} + +// NewSimpleAuthenticator creates a new SimpleAuthenticator. +func NewSimpleAuthenticator() cipher.AEAD { + return &SimpleAuthenticator{} +} + +// NonceSize implements cipher.AEAD. +func (*SimpleAuthenticator) NonceSize() int { + return 0 +} + +// Overhead implements cipher.AEAD. +func (*SimpleAuthenticator) Overhead() int { + return 6 +} + +// Seal implements cipher.AEAD. +func (a *SimpleAuthenticator) Seal(dst, nonce, plain, extra []byte) []byte { + dst = append(dst, 0, 0, 0, 0, 0, 0) // 4 bytes for hash, and then 2 bytes for length + binary.BigEndian.PutUint16(dst[4:], uint16(len(plain))) + dst = append(dst, plain...) + + fnvHash := fnv.New32a() + _, _ = fnvHash.Write(dst[4:]) + fnvHash.Sum(dst[:0]) + + dstLen := len(dst) + xtra := 4 - dstLen%4 + if xtra != 4 { + dst = append(dst, make([]byte, xtra)...) + } + xorfwd(dst) + if xtra != 4 { + dst = dst[:dstLen] + } + return dst +} + +// Open implements cipher.AEAD. +func (a *SimpleAuthenticator) Open(dst, nonce, cipherText, extra []byte) ([]byte, error) { + dst = append(dst, cipherText...) + dstLen := len(dst) + xtra := 4 - dstLen%4 + if xtra != 4 { + dst = append(dst, make([]byte, xtra)...) + } + xorbkd(dst) + if xtra != 4 { + dst = dst[:dstLen] + } + + fnvHash := fnv.New32a() + _, _ = fnvHash.Write(dst[4:]) + if binary.BigEndian.Uint32(dst[:4]) != fnvHash.Sum32() { + return nil, errors.New("invalid auth") + } + + length := binary.BigEndian.Uint16(dst[4:6]) + if len(dst)-6 != int(length) { + return nil, errors.New("invalid auth") + } + + return dst[6:], nil +} diff --git a/transport/mkcp/cryptreal.go b/transport/mkcp/cryptreal.go new file mode 100644 index 0000000000..48935d2d07 --- /dev/null +++ b/transport/mkcp/cryptreal.go @@ -0,0 +1,24 @@ +package mkcp + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/sha256" +) + +// NewAEADAESGCMBasedOnSeed derives an AES-128-GCM AEAD from a seed string, +// using the first 16 bytes of SHA-256(seed) as the key. This matches Xray's +// mKCP "seed" security exactly so encrypted packets interoperate. +func NewAEADAESGCMBasedOnSeed(seed string) cipher.AEAD { + hashedSeed := sha256.Sum256([]byte(seed)) + block, err := aes.NewCipher(hashedSeed[:16]) + if err != nil { + // aes.NewCipher only fails on invalid key sizes; 16 bytes is always valid. + panic(err) + } + aead, err := cipher.NewGCM(block) + if err != nil { + panic(err) + } + return aead +} diff --git a/transport/mkcp/header.go b/transport/mkcp/header.go new file mode 100644 index 0000000000..30f94efbba --- /dev/null +++ b/transport/mkcp/header.go @@ -0,0 +1,103 @@ +package mkcp + +import ( + "encoding/binary" + "fmt" + + "github.com/metacubex/randv2" +) + +// PacketHeader masquerades each UDP datagram with a fake protocol header. +// Only Size and Serialize are needed on the client side; the reader merely +// strips Size() bytes from incoming packets without validating them, matching +// xray-core's internet.PacketHeader. +type PacketHeader interface { + Size() int32 + Serialize([]byte) +} + +func rollUint16() uint16 { + return uint16(randv2.Uint32()) +} + +// NewPacketHeader builds a header masquerade by its mKCP header.type name. +// An empty name or "none" yields a nil header (no masquerade), which is +// equivalent on the wire to a zero-size header. +func NewPacketHeader(name string) (PacketHeader, error) { + switch name { + case "", "none", "noop": + return nil, nil + case "srtp": + return &srtpHeader{header: 0xB5E8, number: rollUint16()}, nil + case "utp": + return &utpHeader{header: 1, extension: 0, connectionID: rollUint16()}, nil + case "wechat-video", "wechat": + return &wechatVideoHeader{sn: uint32(rollUint16())}, nil + case "wireguard": + return &wireguardHeader{}, nil + default: + return nil, fmt.Errorf("unknown mkcp header type: %s", name) + } +} + +// srtpHeader masquerades as SRTP traffic. (4 bytes) +type srtpHeader struct { + header uint16 + number uint16 +} + +func (*srtpHeader) Size() int32 { return 4 } + +func (h *srtpHeader) Serialize(b []byte) { + h.number++ + binary.BigEndian.PutUint16(b, h.header) + binary.BigEndian.PutUint16(b[2:], h.number) +} + +// utpHeader masquerades as uTP (BitTorrent) traffic. (4 bytes) +type utpHeader struct { + header byte + extension byte + connectionID uint16 +} + +func (*utpHeader) Size() int32 { return 4 } + +func (h *utpHeader) Serialize(b []byte) { + binary.BigEndian.PutUint16(b, h.connectionID) + b[2] = h.header + b[3] = h.extension +} + +// wechatVideoHeader masquerades as WeChat video call traffic. (13 bytes) +type wechatVideoHeader struct { + sn uint32 +} + +func (*wechatVideoHeader) Size() int32 { return 13 } + +func (h *wechatVideoHeader) Serialize(b []byte) { + h.sn++ + b[0] = 0xa1 + b[1] = 0x08 + binary.BigEndian.PutUint32(b[2:], h.sn) // b[2:6] + b[6] = 0x00 + b[7] = 0x10 + b[8] = 0x11 + b[9] = 0x18 + b[10] = 0x30 + b[11] = 0x22 + b[12] = 0x30 +} + +// wireguardHeader masquerades as WireGuard traffic. (4 bytes) +type wireguardHeader struct{} + +func (wireguardHeader) Size() int32 { return 4 } + +func (wireguardHeader) Serialize(b []byte) { + b[0] = 0x04 + b[1] = 0x00 + b[2] = 0x00 + b[3] = 0x00 +} diff --git a/transport/mkcp/io.go b/transport/mkcp/io.go new file mode 100644 index 0000000000..30c0a2d536 --- /dev/null +++ b/transport/mkcp/io.go @@ -0,0 +1,97 @@ +package mkcp + +import ( + "crypto/cipher" + "crypto/rand" + "io" +) + +type PacketReader interface { + Read([]byte) []Segment +} + +type PacketWriter interface { + Overhead() int + io.Writer +} + +// KCPPacketReader strips the (optional) header masquerade and decrypts each +// incoming UDP datagram, then parses out the KCP segments it contains. +type KCPPacketReader struct { + Security cipher.AEAD + Header PacketHeader +} + +func (r *KCPPacketReader) Read(b []byte) []Segment { + if r.Header != nil { + if int32(len(b)) <= r.Header.Size() { + return nil + } + b = b[r.Header.Size():] + } + if r.Security != nil { + nonceSize := r.Security.NonceSize() + overhead := r.Security.Overhead() + if len(b) <= nonceSize+overhead { + return nil + } + out, err := r.Security.Open(b[nonceSize:nonceSize], b[:nonceSize], b[nonceSize:], nil) + if err != nil { + return nil + } + b = out + } + var result []Segment + for len(b) > 0 { + seg, x := ReadSegment(b) + if seg == nil { + break + } + result = append(result, seg) + b = x + } + return result +} + +// KCPPacketWriter serializes the header masquerade, a random nonce, and the +// AEAD-sealed KCP segments into a single UDP datagram. +type KCPPacketWriter struct { + Header PacketHeader + Security cipher.AEAD + Writer io.Writer +} + +func (w *KCPPacketWriter) Overhead() int { + overhead := 0 + if w.Header != nil { + overhead += int(w.Header.Size()) + } + if w.Security != nil { + overhead += w.Security.Overhead() + } + return overhead +} + +func (w *KCPPacketWriter) Write(b []byte) (int, error) { + bb := StackNew() + defer bb.Release() + + if w.Header != nil { + w.Header.Serialize(bb.Extend(w.Header.Size())) + } + if w.Security != nil { + nonceSize := w.Security.NonceSize() + if _, err := bb.ReadFullFrom(rand.Reader, int32(nonceSize)); err != nil { + return 0, err + } + nonce := bb.BytesFrom(int32(-nonceSize)) + + encrypted := bb.Extend(int32(w.Security.Overhead() + len(b))) + w.Security.Seal(encrypted[:0], nonce, b, nil) + } else { + bb.Write(b) + } + + _, err := w.Writer.Write(bb.Bytes()) + return len(b), err +} diff --git a/transport/mkcp/mkcp_test.go b/transport/mkcp/mkcp_test.go new file mode 100644 index 0000000000..dadcc5d6b3 --- /dev/null +++ b/transport/mkcp/mkcp_test.go @@ -0,0 +1,125 @@ +package mkcp + +import ( + "bytes" + "crypto/rand" + "io" + "net" + "testing" + "time" +) + +// TestSimpleAuthenticatorRoundTrip verifies the no-seed authenticator seals and +// opens to the original plaintext. +func TestSimpleAuthenticatorRoundTrip(t *testing.T) { + a := NewSimpleAuthenticator() + for _, n := range []int{0, 1, 7, 18, 100, 1291} { + plain := make([]byte, n) + _, _ = rand.Read(plain) + sealed := a.Seal(nil, nil, plain, nil) + opened, err := a.Open(nil, nil, sealed, nil) + if err != nil { + t.Fatalf("open failed for len %d: %v", n, err) + } + if !bytes.Equal(opened, plain) { + t.Fatalf("round trip mismatch for len %d", n) + } + } +} + +// TestAEADSeedRoundTrip verifies the seed-based AES-128-GCM authenticator. +func TestAEADSeedRoundTrip(t *testing.T) { + a := NewAEADAESGCMBasedOnSeed("a-shared-seed") + nonce := make([]byte, a.NonceSize()) + _, _ = rand.Read(nonce) + plain := []byte("the quick brown fox jumps over the lazy dog") + sealed := a.Seal(nil, nonce, plain, nil) + opened, err := a.Open(nil, nonce, sealed, nil) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(opened, plain) { + t.Fatal("aead round trip mismatch") + } +} + +func dialPair(t *testing.T, cfg *Config) (*Connection, *Connection, func()) { + t.Helper() + pcA, err := net.ListenPacket("udp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + pcB, err := net.ListenPacket("udp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + const conv = 0x4242 + connA, err := dialConv(pcA, pcB.LocalAddr(), cfg, conv) + if err != nil { + t.Fatal(err) + } + connB, err := dialConv(pcB, pcA.LocalAddr(), cfg, conv) + if err != nil { + t.Fatal(err) + } + cleanup := func() { + connA.Close() + connB.Close() + } + return connA, connB, cleanup +} + +// TestMKCPEndToEnd runs two mKCP connections over real UDP loopback (sharing a +// conversation id, as a client and server would) and exchanges data in both +// directions, exercising the full segment/window/crypto/header path. +func TestMKCPEndToEnd(t *testing.T) { + cases := []struct { + name string + cfg *Config + }{ + {"none-noseed", &Config{}}, + {"wechat-seed", &Config{Header: "wechat-video", Seed: "test-seed"}}, + {"wireguard-seed", &Config{Header: "wireguard", Seed: "another-seed"}}, + {"utp-noseed", &Config{Header: "utp"}}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + connA, connB, cleanup := dialPair(t, tc.cfg) + defer cleanup() + + payload := make([]byte, 64*1024) + _, _ = rand.Read(payload) + + errc := make(chan error, 1) + go func() { + _, err := connA.Write(payload) + errc <- err + }() + + connB.SetReadDeadline(time.Now().Add(10 * time.Second)) + got := make([]byte, len(payload)) + if _, err := io.ReadFull(connB, got); err != nil { + t.Fatalf("read on B: %v", err) + } + if err := <-errc; err != nil { + t.Fatalf("write on A: %v", err) + } + if !bytes.Equal(got, payload) { + t.Fatal("payload mismatch A->B") + } + + // reverse direction + reply := []byte("pong from B") + go connB.Write(reply) + connA.SetReadDeadline(time.Now().Add(10 * time.Second)) + rgot := make([]byte, len(reply)) + if _, err := io.ReadFull(connA, rgot); err != nil { + t.Fatalf("read on A: %v", err) + } + if !bytes.Equal(rgot, reply) { + t.Fatal("payload mismatch B->A") + } + }) + } +} diff --git a/transport/mkcp/output.go b/transport/mkcp/output.go new file mode 100644 index 0000000000..8bfb7dfafa --- /dev/null +++ b/transport/mkcp/output.go @@ -0,0 +1,56 @@ +package mkcp + +import ( + "io" + "sync" + "time" +) + +type SegmentWriter interface { + Write(seg Segment) error +} + +type SimpleSegmentWriter struct { + sync.Mutex + buffer *Buffer + writer io.Writer +} + +func NewSegmentWriter(writer io.Writer) SegmentWriter { + return &SimpleSegmentWriter{ + writer: writer, + buffer: New(), + } +} + +func (w *SimpleSegmentWriter) Write(seg Segment) error { + w.Lock() + defer w.Unlock() + + w.buffer.Clear() + rawBytes := w.buffer.Extend(seg.ByteSize()) + seg.Serialize(rawBytes) + _, err := w.writer.Write(w.buffer.Bytes()) + return err +} + +// RetryableWriter retries a failed segment write a few times before giving up, +// mirroring xray-core's retry.Timed(5, 100) wrapping of the UDP writer. +type RetryableWriter struct { + writer SegmentWriter +} + +func NewRetryableWriter(writer SegmentWriter) SegmentWriter { + return &RetryableWriter{writer: writer} +} + +func (w *RetryableWriter) Write(seg Segment) error { + var err error + for i := 0; i < 5; i++ { + if err = w.writer.Write(seg); err == nil { + return nil + } + time.Sleep(100 * time.Millisecond) + } + return err +} diff --git a/transport/mkcp/receiving.go b/transport/mkcp/receiving.go new file mode 100644 index 0000000000..c5000260ed --- /dev/null +++ b/transport/mkcp/receiving.go @@ -0,0 +1,256 @@ +package mkcp + +import ( + "sync" +) + +type ReceivingWindow struct { + cache map[uint32]*DataSegment +} + +func NewReceivingWindow() *ReceivingWindow { + return &ReceivingWindow{ + cache: make(map[uint32]*DataSegment), + } +} + +func (w *ReceivingWindow) Set(id uint32, value *DataSegment) bool { + _, f := w.cache[id] + if f { + return false + } + w.cache[id] = value + return true +} + +func (w *ReceivingWindow) Has(id uint32) bool { + _, f := w.cache[id] + return f +} + +func (w *ReceivingWindow) Remove(id uint32) *DataSegment { + v, f := w.cache[id] + if !f { + return nil + } + delete(w.cache, id) + return v +} + +type AckList struct { + writer SegmentWriter + timestamps []uint32 + numbers []uint32 + nextFlush []uint32 + + flushCandidates []uint32 + dirty bool +} + +func NewAckList(writer SegmentWriter) *AckList { + return &AckList{ + writer: writer, + timestamps: make([]uint32, 0, 128), + numbers: make([]uint32, 0, 128), + nextFlush: make([]uint32, 0, 128), + flushCandidates: make([]uint32, 0, 128), + } +} + +func (l *AckList) Add(number uint32, timestamp uint32) { + l.timestamps = append(l.timestamps, timestamp) + l.numbers = append(l.numbers, number) + l.nextFlush = append(l.nextFlush, 0) + l.dirty = true +} + +func (l *AckList) Clear(una uint32) { + count := 0 + for i := 0; i < len(l.numbers); i++ { + if l.numbers[i] < una { + continue + } + if i != count { + l.numbers[count] = l.numbers[i] + l.timestamps[count] = l.timestamps[i] + l.nextFlush[count] = l.nextFlush[i] + } + count++ + } + if count < len(l.numbers) { + l.numbers = l.numbers[:count] + l.timestamps = l.timestamps[:count] + l.nextFlush = l.nextFlush[:count] + l.dirty = true + } +} + +func (l *AckList) Flush(current uint32, rto uint32) { + l.flushCandidates = l.flushCandidates[:0] + + seg := NewAckSegment() + for i := 0; i < len(l.numbers); i++ { + if l.nextFlush[i] > current { + if len(l.flushCandidates) < cap(l.flushCandidates) { + l.flushCandidates = append(l.flushCandidates, l.numbers[i]) + } + continue + } + seg.PutNumber(l.numbers[i]) + seg.PutTimestamp(l.timestamps[i]) + timeout := rto / 2 + if timeout < 20 { + timeout = 20 + } + l.nextFlush[i] = current + timeout + + if seg.IsFull() { + l.writer.Write(seg) + seg.Release() + seg = NewAckSegment() + l.dirty = false + } + } + + if l.dirty || !seg.IsEmpty() { + for _, number := range l.flushCandidates { + if seg.IsFull() { + break + } + seg.PutNumber(number) + } + l.writer.Write(seg) + l.dirty = false + } + + seg.Release() +} + +type ReceivingWorker struct { + sync.RWMutex + conn *Connection + leftOver MultiBuffer + window *ReceivingWindow + acklist *AckList + nextNumber uint32 + windowSize uint32 +} + +func NewReceivingWorker(kcp *Connection) *ReceivingWorker { + worker := &ReceivingWorker{ + conn: kcp, + window: NewReceivingWindow(), + windowSize: kcp.Config.GetReceivingInFlightSize(), + } + worker.acklist = NewAckList(worker) + return worker +} + +func (w *ReceivingWorker) Release() { + w.Lock() + ReleaseMulti(w.leftOver) + w.leftOver = nil + w.Unlock() +} + +func (w *ReceivingWorker) ProcessSendingNext(number uint32) { + w.Lock() + defer w.Unlock() + + w.acklist.Clear(number) +} + +func (w *ReceivingWorker) ProcessSegment(seg *DataSegment) { + w.Lock() + defer w.Unlock() + + number := seg.Number + idx := number - w.nextNumber + if idx >= w.windowSize { + return + } + w.acklist.Clear(seg.SendingNext) + w.acklist.Add(number, seg.Timestamp) + + if !w.window.Set(seg.Number, seg) { + seg.Release() + } +} + +func (w *ReceivingWorker) ReadMultiBuffer() MultiBuffer { + if w.leftOver != nil { + mb := w.leftOver + w.leftOver = nil + return mb + } + + mb := make(MultiBuffer, 0, 32) + + w.Lock() + defer w.Unlock() + for { + seg := w.window.Remove(w.nextNumber) + if seg == nil { + break + } + w.nextNumber++ + mb = append(mb, seg.Detach()) + seg.Release() + } + + return mb +} + +func (w *ReceivingWorker) Read(b []byte) int { + mb := w.ReadMultiBuffer() + if mb.IsEmpty() { + return 0 + } + mb, nBytes := SplitBytes(mb, b) + if !mb.IsEmpty() { + w.leftOver = mb + } + return nBytes +} + +func (w *ReceivingWorker) IsDataAvailable() bool { + w.RLock() + defer w.RUnlock() + return w.window.Has(w.nextNumber) +} + +func (w *ReceivingWorker) NextNumber() uint32 { + w.RLock() + defer w.RUnlock() + + return w.nextNumber +} + +func (w *ReceivingWorker) Flush(current uint32) { + w.Lock() + defer w.Unlock() + + w.acklist.Flush(current, w.conn.roundTrip.Timeout()) +} + +func (w *ReceivingWorker) Write(seg Segment) error { + ackSeg := seg.(*AckSegment) + ackSeg.Conv = w.conn.meta.Conversation + ackSeg.ReceivingNext = w.nextNumber + ackSeg.ReceivingWindow = w.nextNumber + w.windowSize + ackSeg.Option = 0 + if w.conn.State() == StateReadyToClose { + ackSeg.Option = SegmentOptionClose + } + return w.conn.output.Write(ackSeg) +} + +func (*ReceivingWorker) CloseRead() { +} + +func (w *ReceivingWorker) UpdateNecessary() bool { + w.RLock() + defer w.RUnlock() + + return len(w.acklist.numbers) > 0 +} diff --git a/transport/mkcp/segment.go b/transport/mkcp/segment.go new file mode 100644 index 0000000000..87d5158262 --- /dev/null +++ b/transport/mkcp/segment.go @@ -0,0 +1,301 @@ +package mkcp + +import ( + "encoding/binary" +) + +// Command is a KCP command that indicates the purpose of a Segment. +type Command byte + +const ( + // CommandACK indicates an AckSegment. + CommandACK Command = 0 + // CommandData indicates a DataSegment. + CommandData Command = 1 + // CommandTerminate indicates that peer terminates the connection. + CommandTerminate Command = 2 + // CommandPing indicates a ping. + CommandPing Command = 3 +) + +type SegmentOption byte + +const ( + SegmentOptionClose SegmentOption = 1 +) + +type Segment interface { + Release() + Conversation() uint16 + Command() Command + ByteSize() int32 + Serialize([]byte) + parse(conv uint16, cmd Command, opt SegmentOption, buf []byte) (bool, []byte) +} + +const ( + DataSegmentOverhead = 18 +) + +type DataSegment struct { + Conv uint16 + Option SegmentOption + Timestamp uint32 + Number uint32 + SendingNext uint32 + + payload *Buffer + timeout uint32 + transmit uint32 +} + +func NewDataSegment() *DataSegment { + return new(DataSegment) +} + +func (s *DataSegment) parse(conv uint16, cmd Command, opt SegmentOption, buf []byte) (bool, []byte) { + s.Conv = conv + s.Option = opt + if len(buf) < 15 { + return false, nil + } + s.Timestamp = binary.BigEndian.Uint32(buf) + buf = buf[4:] + + s.Number = binary.BigEndian.Uint32(buf) + buf = buf[4:] + + s.SendingNext = binary.BigEndian.Uint32(buf) + buf = buf[4:] + + dataLen := int(binary.BigEndian.Uint16(buf)) + buf = buf[2:] + + if len(buf) < dataLen { + return false, nil + } + s.Data().Clear() + s.Data().Write(buf[:dataLen]) + buf = buf[dataLen:] + + return true, buf +} + +func (s *DataSegment) Conversation() uint16 { + return s.Conv +} + +func (*DataSegment) Command() Command { + return CommandData +} + +func (s *DataSegment) Detach() *Buffer { + r := s.payload + s.payload = nil + return r +} + +func (s *DataSegment) Data() *Buffer { + if s.payload == nil { + s.payload = New() + } + return s.payload +} + +func (s *DataSegment) Serialize(b []byte) { + binary.BigEndian.PutUint16(b, s.Conv) + b[2] = byte(CommandData) + b[3] = byte(s.Option) + binary.BigEndian.PutUint32(b[4:], s.Timestamp) + binary.BigEndian.PutUint32(b[8:], s.Number) + binary.BigEndian.PutUint32(b[12:], s.SendingNext) + binary.BigEndian.PutUint16(b[16:], uint16(s.payload.Len())) + copy(b[18:], s.payload.Bytes()) +} + +func (s *DataSegment) ByteSize() int32 { + return 2 + 1 + 1 + 4 + 4 + 4 + 2 + s.payload.Len() +} + +func (s *DataSegment) Release() { + s.payload.Release() + s.payload = nil +} + +type AckSegment struct { + Conv uint16 + Option SegmentOption + ReceivingWindow uint32 + ReceivingNext uint32 + Timestamp uint32 + NumberList []uint32 +} + +const ackNumberLimit = 128 + +func NewAckSegment() *AckSegment { + return new(AckSegment) +} + +func (s *AckSegment) parse(conv uint16, cmd Command, opt SegmentOption, buf []byte) (bool, []byte) { + s.Conv = conv + s.Option = opt + if len(buf) < 13 { + return false, nil + } + + s.ReceivingWindow = binary.BigEndian.Uint32(buf) + buf = buf[4:] + + s.ReceivingNext = binary.BigEndian.Uint32(buf) + buf = buf[4:] + + s.Timestamp = binary.BigEndian.Uint32(buf) + buf = buf[4:] + + count := int(buf[0]) + buf = buf[1:] + + if len(buf) < count*4 { + return false, nil + } + for i := 0; i < count; i++ { + s.PutNumber(binary.BigEndian.Uint32(buf)) + buf = buf[4:] + } + + return true, buf +} + +func (s *AckSegment) Conversation() uint16 { + return s.Conv +} + +func (*AckSegment) Command() Command { + return CommandACK +} + +func (s *AckSegment) PutTimestamp(timestamp uint32) { + if timestamp-s.Timestamp < 0x7FFFFFFF { + s.Timestamp = timestamp + } +} + +func (s *AckSegment) PutNumber(number uint32) { + s.NumberList = append(s.NumberList, number) +} + +func (s *AckSegment) IsFull() bool { + return len(s.NumberList) == ackNumberLimit +} + +func (s *AckSegment) IsEmpty() bool { + return len(s.NumberList) == 0 +} + +func (s *AckSegment) ByteSize() int32 { + return 2 + 1 + 1 + 4 + 4 + 4 + 1 + int32(len(s.NumberList)*4) +} + +func (s *AckSegment) Serialize(b []byte) { + binary.BigEndian.PutUint16(b, s.Conv) + b[2] = byte(CommandACK) + b[3] = byte(s.Option) + binary.BigEndian.PutUint32(b[4:], s.ReceivingWindow) + binary.BigEndian.PutUint32(b[8:], s.ReceivingNext) + binary.BigEndian.PutUint32(b[12:], s.Timestamp) + b[16] = byte(len(s.NumberList)) + n := 17 + for _, number := range s.NumberList { + binary.BigEndian.PutUint32(b[n:], number) + n += 4 + } +} + +func (s *AckSegment) Release() {} + +type CmdOnlySegment struct { + Conv uint16 + Cmd Command + Option SegmentOption + SendingNext uint32 + ReceivingNext uint32 + PeerRTO uint32 +} + +func NewCmdOnlySegment() *CmdOnlySegment { + return new(CmdOnlySegment) +} + +func (s *CmdOnlySegment) parse(conv uint16, cmd Command, opt SegmentOption, buf []byte) (bool, []byte) { + s.Conv = conv + s.Cmd = cmd + s.Option = opt + + if len(buf) < 12 { + return false, nil + } + + s.SendingNext = binary.BigEndian.Uint32(buf) + buf = buf[4:] + + s.ReceivingNext = binary.BigEndian.Uint32(buf) + buf = buf[4:] + + s.PeerRTO = binary.BigEndian.Uint32(buf) + buf = buf[4:] + + return true, buf +} + +func (s *CmdOnlySegment) Conversation() uint16 { + return s.Conv +} + +func (s *CmdOnlySegment) Command() Command { + return s.Cmd +} + +func (*CmdOnlySegment) ByteSize() int32 { + return 2 + 1 + 1 + 4 + 4 + 4 +} + +func (s *CmdOnlySegment) Serialize(b []byte) { + binary.BigEndian.PutUint16(b, s.Conv) + b[2] = byte(s.Cmd) + b[3] = byte(s.Option) + binary.BigEndian.PutUint32(b[4:], s.SendingNext) + binary.BigEndian.PutUint32(b[8:], s.ReceivingNext) + binary.BigEndian.PutUint32(b[12:], s.PeerRTO) +} + +func (*CmdOnlySegment) Release() {} + +func ReadSegment(buf []byte) (Segment, []byte) { + if len(buf) < 4 { + return nil, nil + } + + conv := binary.BigEndian.Uint16(buf) + buf = buf[2:] + + cmd := Command(buf[0]) + opt := SegmentOption(buf[1]) + buf = buf[2:] + + var seg Segment + switch cmd { + case CommandData: + seg = NewDataSegment() + case CommandACK: + seg = NewAckSegment() + default: + seg = NewCmdOnlySegment() + } + + valid, extra := seg.parse(conv, cmd, opt, buf) + if !valid { + return nil, nil + } + return seg, extra +} diff --git a/transport/mkcp/sending.go b/transport/mkcp/sending.go new file mode 100644 index 0000000000..4fe6903768 --- /dev/null +++ b/transport/mkcp/sending.go @@ -0,0 +1,362 @@ +package mkcp + +import ( + "container/list" + "sync" +) + +type SendingWindow struct { + cache *list.List + totalInFlightSize uint32 + writer SegmentWriter + onPacketLoss func(uint32) +} + +func NewSendingWindow(writer SegmentWriter, onPacketLoss func(uint32)) *SendingWindow { + window := &SendingWindow{ + cache: list.New(), + writer: writer, + onPacketLoss: onPacketLoss, + } + return window +} + +func (sw *SendingWindow) Release() { + if sw == nil { + return + } + for sw.cache.Len() > 0 { + seg := sw.cache.Front().Value.(*DataSegment) + seg.Release() + sw.cache.Remove(sw.cache.Front()) + } +} + +func (sw *SendingWindow) Len() uint32 { + return uint32(sw.cache.Len()) +} + +func (sw *SendingWindow) IsEmpty() bool { + return sw.cache.Len() == 0 +} + +func (sw *SendingWindow) Push(number uint32, b *Buffer) { + seg := NewDataSegment() + seg.Number = number + seg.payload = b + + sw.cache.PushBack(seg) +} + +func (sw *SendingWindow) FirstNumber() uint32 { + return sw.cache.Front().Value.(*DataSegment).Number +} + +func (sw *SendingWindow) Clear(una uint32) { + for !sw.IsEmpty() { + seg := sw.cache.Front().Value.(*DataSegment) + if seg.Number >= una { + break + } + seg.Release() + sw.cache.Remove(sw.cache.Front()) + } +} + +func (sw *SendingWindow) HandleFastAck(number uint32, rto uint32) { + if sw.IsEmpty() { + return + } + + sw.Visit(func(seg *DataSegment) bool { + if number == seg.Number || number-seg.Number > 0x7FFFFFFF { + return false + } + + if seg.transmit > 0 && seg.timeout > rto/3 { + seg.timeout -= rto / 3 + } + return true + }) +} + +func (sw *SendingWindow) Visit(visitor func(seg *DataSegment) bool) { + if sw.IsEmpty() { + return + } + + for e := sw.cache.Front(); e != nil; e = e.Next() { + seg := e.Value.(*DataSegment) + if !visitor(seg) { + break + } + } +} + +func (sw *SendingWindow) Flush(current uint32, rto uint32, maxInFlightSize uint32) { + if sw.IsEmpty() { + return + } + + var lost uint32 + var inFlightSize uint32 + + sw.Visit(func(segment *DataSegment) bool { + if current-segment.timeout >= 0x7FFFFFFF { + return true + } + if segment.transmit == 0 { + // First time + sw.totalInFlightSize++ + } else { + lost++ + } + segment.timeout = current + rto + + segment.Timestamp = current + segment.transmit++ + sw.writer.Write(segment) + inFlightSize++ + return inFlightSize < maxInFlightSize + }) + + if sw.onPacketLoss != nil && inFlightSize > 0 && sw.totalInFlightSize != 0 { + rate := lost * 100 / sw.totalInFlightSize + sw.onPacketLoss(rate) + } +} + +func (sw *SendingWindow) Remove(number uint32) bool { + if sw.IsEmpty() { + return false + } + + for e := sw.cache.Front(); e != nil; e = e.Next() { + seg := e.Value.(*DataSegment) + if seg.Number > number { + return false + } else if seg.Number == number { + if sw.totalInFlightSize > 0 { + sw.totalInFlightSize-- + } + seg.Release() + sw.cache.Remove(e) + return true + } + } + + return false +} + +type SendingWorker struct { + sync.RWMutex + conn *Connection + window *SendingWindow + firstUnacknowledged uint32 + nextNumber uint32 + remoteNextNumber uint32 + controlWindow uint32 + fastResend uint32 + windowSize uint32 + firstUnacknowledgedUpdated bool + closed bool +} + +func NewSendingWorker(kcp *Connection) *SendingWorker { + worker := &SendingWorker{ + conn: kcp, + fastResend: 2, + remoteNextNumber: 32, + controlWindow: kcp.Config.GetSendingInFlightSize(), + windowSize: kcp.Config.GetSendingBufferSize(), + } + worker.window = NewSendingWindow(worker, worker.OnPacketLoss) + return worker +} + +func (w *SendingWorker) Release() { + w.Lock() + w.window.Release() + w.closed = true + w.Unlock() +} + +func (w *SendingWorker) ProcessReceivingNext(nextNumber uint32) { + w.Lock() + defer w.Unlock() + + w.ProcessReceivingNextWithoutLock(nextNumber) +} + +func (w *SendingWorker) ProcessReceivingNextWithoutLock(nextNumber uint32) { + w.window.Clear(nextNumber) + w.FindFirstUnacknowledged() +} + +func (w *SendingWorker) FindFirstUnacknowledged() { + first := w.firstUnacknowledged + if !w.window.IsEmpty() { + w.firstUnacknowledged = w.window.FirstNumber() + } else { + w.firstUnacknowledged = w.nextNumber + } + if first != w.firstUnacknowledged { + w.firstUnacknowledgedUpdated = true + } +} + +func (w *SendingWorker) processAck(number uint32) bool { + // number < v.firstUnacknowledged || number >= v.nextNumber + if number-w.firstUnacknowledged > 0x7FFFFFFF || number-w.nextNumber < 0x7FFFFFFF { + return false + } + + removed := w.window.Remove(number) + if removed { + w.FindFirstUnacknowledged() + } + return removed +} + +func (w *SendingWorker) ProcessSegment(current uint32, seg *AckSegment, rto uint32) { + defer seg.Release() + + w.Lock() + defer w.Unlock() + + if w.closed { + return + } + + if w.remoteNextNumber < seg.ReceivingWindow { + w.remoteNextNumber = seg.ReceivingWindow + } + w.ProcessReceivingNextWithoutLock(seg.ReceivingNext) + + if seg.IsEmpty() { + return + } + + var maxack uint32 + var maxackRemoved bool + for _, number := range seg.NumberList { + removed := w.processAck(number) + if maxack < number { + maxack = number + maxackRemoved = removed + } + } + + if maxackRemoved { + w.window.HandleFastAck(maxack, rto) + if current-seg.Timestamp < 10000 { + w.conn.roundTrip.Update(current-seg.Timestamp, current) + } + } +} + +func (w *SendingWorker) Push(b *Buffer) bool { + w.Lock() + defer w.Unlock() + + if w.closed { + return false + } + + if w.window.Len() > w.windowSize { + return false + } + + w.window.Push(w.nextNumber, b) + w.nextNumber++ + return true +} + +func (w *SendingWorker) Write(seg Segment) error { + dataSeg := seg.(*DataSegment) + + dataSeg.Conv = w.conn.meta.Conversation + dataSeg.SendingNext = w.firstUnacknowledged + dataSeg.Option = 0 + if w.conn.State() == StateReadyToClose { + dataSeg.Option = SegmentOptionClose + } + + return w.conn.output.Write(dataSeg) +} + +func (w *SendingWorker) OnPacketLoss(lossRate uint32) { + if !w.conn.Config.Congestion || w.conn.roundTrip.Timeout() == 0 { + return + } + + if lossRate >= 15 { + w.controlWindow = 3 * w.controlWindow / 4 + } else if lossRate <= 5 { + w.controlWindow += w.controlWindow / 4 + } + if w.controlWindow < 16 { + w.controlWindow = 16 + } + if w.controlWindow > 2*w.conn.Config.GetSendingInFlightSize() { + w.controlWindow = 2 * w.conn.Config.GetSendingInFlightSize() + } +} + +func (w *SendingWorker) Flush(current uint32) { + w.Lock() + + if w.closed { + w.Unlock() + return + } + + cwnd := w.conn.Config.GetSendingInFlightSize() + if cwnd > w.remoteNextNumber-w.firstUnacknowledged { + cwnd = w.remoteNextNumber - w.firstUnacknowledged + } + if w.conn.Config.Congestion && cwnd > w.controlWindow { + cwnd = w.controlWindow + } + + cwnd *= 20 // magic + + if !w.window.IsEmpty() { + w.window.Flush(current, w.conn.roundTrip.Timeout(), cwnd) + w.firstUnacknowledgedUpdated = false + } + + updated := w.firstUnacknowledgedUpdated + w.firstUnacknowledgedUpdated = false + + w.Unlock() + + if updated { + w.conn.Ping(current, CommandPing) + } +} + +func (w *SendingWorker) CloseWrite() { + w.Lock() + defer w.Unlock() + + w.window.Clear(0xFFFFFFFF) +} + +func (w *SendingWorker) IsEmpty() bool { + w.RLock() + defer w.RUnlock() + + return w.window.IsEmpty() +} + +func (w *SendingWorker) UpdateNecessary() bool { + return !w.IsEmpty() +} + +func (w *SendingWorker) FirstUnacknowledged() uint32 { + w.RLock() + defer w.RUnlock() + + return w.firstUnacknowledged +} diff --git a/transport/mkcp/signal.go b/transport/mkcp/signal.go new file mode 100644 index 0000000000..55ea1273f9 --- /dev/null +++ b/transport/mkcp/signal.go @@ -0,0 +1,49 @@ +package mkcp + +// notifier is a utility for notifying changes. The producer may signal multiple +// times; the consumer gets notified asynchronously. Ported from xray-core +// common/signal.Notifier. +type notifier struct { + c chan struct{} +} + +func newNotifier() *notifier { + return ¬ifier{c: make(chan struct{}, 1)} +} + +// Signal signals a change. Never blocks. +func (n *notifier) Signal() { + select { + case n.c <- struct{}{}: + default: + } +} + +// Wait returns a channel that receives on each signal. Never closed. +func (n *notifier) Wait() <-chan struct{} { + return n.c +} + +// semaphore is a counting semaphore, ported from xray-core +// common/signal/semaphore.Instance. +type semaphore struct { + token chan struct{} +} + +func newSemaphore(n int) *semaphore { + s := &semaphore{token: make(chan struct{}, n)} + for i := 0; i < n; i++ { + s.token <- struct{}{} + } + return s +} + +// Wait returns a channel for acquiring a permit. +func (s *semaphore) Wait() <-chan struct{} { + return s.token +} + +// Signal releases a permit back into the semaphore. +func (s *semaphore) Signal() { + s.token <- struct{}{} +} diff --git a/transport/mkcp/xor.go b/transport/mkcp/xor.go new file mode 100644 index 0000000000..4c930c465d --- /dev/null +++ b/transport/mkcp/xor.go @@ -0,0 +1,15 @@ +package mkcp + +// xorfwd performs XOR forwards in words, x[i] ^= x[i-4], i from 0 to len. +func xorfwd(x []byte) { + for i := 4; i < len(x); i++ { + x[i] ^= x[i-4] + } +} + +// xorbkd performs XOR backwards in words, x[i] ^= x[i-4], i from len to 0. +func xorbkd(x []byte) { + for i := len(x) - 1; i >= 4; i-- { + x[i] ^= x[i-4] + } +}