diff --git a/adapter/inbound/addition.go b/adapter/inbound/addition.go index 47307ed7..327d00e9 100644 --- a/adapter/inbound/addition.go +++ b/adapter/inbound/addition.go @@ -1,6 +1,8 @@ package inbound import ( + "net" + C "github.com/Dreamacro/clash/constant" ) @@ -33,3 +35,12 @@ func WithSpecialProxy(specialProxy string) Addition { metadata.SpecialProxy = specialProxy } } + +func WithSrcAddr(addr net.Addr) Addition { + return func(metadata *C.Metadata) { + if ip, port, err := parseAddr(addr); err == nil { + metadata.SrcIP = ip + metadata.SrcPort = port + } + } +} diff --git a/constant/listener.go b/constant/listener.go index 6f9f169b..f69b4a9b 100644 --- a/constant/listener.go +++ b/constant/listener.go @@ -16,7 +16,7 @@ type MultiAddrListener interface { type InboundListener interface { Name() string - Listen(tcpIn chan<- ConnContext, udpIn chan<- PacketAdapter, natTable NatTable) error + Listen(tunnel Tunnel) error Close() error Address() string RawAddress() string diff --git a/constant/tunnel.go b/constant/tunnel.go new file mode 100644 index 00000000..39f8936a --- /dev/null +++ b/constant/tunnel.go @@ -0,0 +1,10 @@ +package constant + +type Tunnel interface { + // HandleTCPConn will handle a tcp connection blocking + HandleTCPConn(connCtx ConnContext) + // HandleUDPPacket will handle a udp packet nonblocking + HandleUDPPacket(packet PacketAdapter) + // NatTable return nat table + NatTable() NatTable +} diff --git a/hub/executor/executor.go b/hub/executor/executor.go index 1831584f..a50d3539 100644 --- a/hub/executor/executor.go +++ b/hub/executor/executor.go @@ -118,7 +118,7 @@ func ApplyConfig(cfg *config.Config, force bool) { } func initInnerTcp() { - inner.New(tunnel.TCPIn()) + inner.New(tunnel.Tunnel) } func GetGeneral() *config.General { @@ -157,11 +157,7 @@ func GetGeneral() *config.General { } func updateListeners(general *config.General, listeners map[string]C.InboundListener, force bool) { - tcpIn := tunnel.TCPIn() - udpIn := tunnel.UDPIn() - natTable := tunnel.NatTable() - - listener.PatchInboundListeners(listeners, tcpIn, udpIn, natTable, true) + listener.PatchInboundListeners(listeners, tunnel.Tunnel, true) if !force { return } @@ -171,15 +167,15 @@ func updateListeners(general *config.General, listeners map[string]C.InboundList bindAddress := general.BindAddress listener.SetBindAddress(bindAddress) - listener.ReCreateHTTP(general.Port, tcpIn) - listener.ReCreateSocks(general.SocksPort, tcpIn, udpIn) - listener.ReCreateRedir(general.RedirPort, tcpIn, udpIn, natTable) - listener.ReCreateAutoRedir(general.EBpf.AutoRedir, tcpIn, udpIn) - listener.ReCreateTProxy(general.TProxyPort, tcpIn, udpIn, natTable) - listener.ReCreateMixed(general.MixedPort, tcpIn, udpIn) - listener.ReCreateShadowSocks(general.ShadowSocksConfig, tcpIn, udpIn) - listener.ReCreateVmess(general.VmessConfig, tcpIn, udpIn) - listener.ReCreateTuic(LC.TuicServer(general.TuicServer), tcpIn, udpIn) + listener.ReCreateHTTP(general.Port, tunnel.Tunnel) + listener.ReCreateSocks(general.SocksPort, tunnel.Tunnel) + listener.ReCreateRedir(general.RedirPort, tunnel.Tunnel) + listener.ReCreateAutoRedir(general.EBpf.AutoRedir, tunnel.Tunnel) + listener.ReCreateTProxy(general.TProxyPort, tunnel.Tunnel) + listener.ReCreateMixed(general.MixedPort, tunnel.Tunnel) + listener.ReCreateShadowSocks(general.ShadowSocksConfig, tunnel.Tunnel) + listener.ReCreateVmess(general.VmessConfig, tunnel.Tunnel) + listener.ReCreateTuic(general.TuicServer, tunnel.Tunnel) } func updateExperimental(c *config.Config) { @@ -339,7 +335,7 @@ func updateTun(general *config.General) { if general == nil { return } - listener.ReCreateTun(LC.Tun(general.Tun), tunnel.TCPIn(), tunnel.UDPIn()) + listener.ReCreateTun(general.Tun, tunnel.Tunnel) listener.ReCreateRedirToTun(general.Tun.RedirectToTun) } @@ -367,7 +363,7 @@ func updateSniffer(sniffer *config.Sniffer) { } func updateTunnels(tunnels []LC.Tunnel) { - listener.PatchTunnel(tunnels, tunnel.TCPIn(), tunnel.UDPIn()) + listener.PatchTunnel(tunnels, tunnel.Tunnel) } func updateGeneral(general *config.General) { diff --git a/hub/route/configs.go b/hub/route/configs.go index cb7c93f6..1f29de0c 100644 --- a/hub/route/configs.go +++ b/hub/route/configs.go @@ -249,19 +249,15 @@ func patchConfigs(w http.ResponseWriter, r *http.Request) { ports := P.GetPorts() - tcpIn := tunnel.TCPIn() - udpIn := tunnel.UDPIn() - natTable := tunnel.NatTable() - - P.ReCreateHTTP(pointerOrDefault(general.Port, ports.Port), tcpIn) - P.ReCreateSocks(pointerOrDefault(general.SocksPort, ports.SocksPort), tcpIn, udpIn) - P.ReCreateRedir(pointerOrDefault(general.RedirPort, ports.RedirPort), tcpIn, udpIn, natTable) - P.ReCreateTProxy(pointerOrDefault(general.TProxyPort, ports.TProxyPort), tcpIn, udpIn, natTable) - P.ReCreateMixed(pointerOrDefault(general.MixedPort, ports.MixedPort), tcpIn, udpIn) - P.ReCreateTun(pointerOrDefaultTun(general.Tun, P.LastTunConf), tcpIn, udpIn) - P.ReCreateShadowSocks(pointerOrDefaultString(general.ShadowSocksConfig, ports.ShadowSocksConfig), tcpIn, udpIn) - P.ReCreateVmess(pointerOrDefaultString(general.VmessConfig, ports.VmessConfig), tcpIn, udpIn) - P.ReCreateTuic(pointerOrDefaultTuicServer(general.TuicServer, P.LastTuicConf), tcpIn, udpIn) + P.ReCreateHTTP(pointerOrDefault(general.Port, ports.Port), tunnel.Tunnel) + P.ReCreateSocks(pointerOrDefault(general.SocksPort, ports.SocksPort), tunnel.Tunnel) + P.ReCreateRedir(pointerOrDefault(general.RedirPort, ports.RedirPort), tunnel.Tunnel) + P.ReCreateTProxy(pointerOrDefault(general.TProxyPort, ports.TProxyPort), tunnel.Tunnel) + P.ReCreateMixed(pointerOrDefault(general.MixedPort, ports.MixedPort), tunnel.Tunnel) + P.ReCreateTun(pointerOrDefaultTun(general.Tun, P.LastTunConf), tunnel.Tunnel) + P.ReCreateShadowSocks(pointerOrDefaultString(general.ShadowSocksConfig, ports.ShadowSocksConfig), tunnel.Tunnel) + P.ReCreateVmess(pointerOrDefaultString(general.VmessConfig, ports.VmessConfig), tunnel.Tunnel) + P.ReCreateTuic(pointerOrDefaultTuicServer(general.TuicServer, P.LastTuicConf), tunnel.Tunnel) if general.Mode != nil { tunnel.SetMode(*general.Mode) diff --git a/listener/autoredir/tcp.go b/listener/autoredir/tcp.go index c390d89a..57df45f3 100644 --- a/listener/autoredir/tcp.go +++ b/listener/autoredir/tcp.go @@ -43,7 +43,7 @@ func (l *Listener) SetLookupFunc(lookupFunc func(netip.AddrPort) (socks5.Addr, e l.lookupFunc = lookupFunc } -func (l *Listener) handleRedir(conn net.Conn, in chan<- C.ConnContext) { +func (l *Listener) handleRedir(conn net.Conn, tunnel C.Tunnel) { if l.lookupFunc == nil { log.Errorln("[Auto Redirect] lookup function is nil") return @@ -58,10 +58,10 @@ func (l *Listener) handleRedir(conn net.Conn, in chan<- C.ConnContext) { N.TCPKeepAlive(conn) - in <- inbound.NewSocket(target, conn, C.REDIR, l.additions...) + tunnel.HandleTCPConn(inbound.NewSocket(target, conn, C.REDIR, l.additions...)) } -func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*Listener, error) { +func New(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) { if len(additions) == 0 { additions = []inbound.Addition{ inbound.WithInName("DEFAULT-REDIR"), @@ -87,7 +87,7 @@ func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (* } continue } - go rl.handleRedir(c, in) + go rl.handleRedir(c, tunnel) } }() diff --git a/listener/http/client.go b/listener/http/client.go index 15c21f91..76c7c8eb 100644 --- a/listener/http/client.go +++ b/listener/http/client.go @@ -12,7 +12,7 @@ import ( "github.com/Dreamacro/clash/transport/socks5" ) -func newClient(source net.Addr, in chan<- C.ConnContext, additions ...inbound.Addition) *http.Client { +func newClient(source net.Addr, tunnel C.Tunnel, additions ...inbound.Addition) *http.Client { return &http.Client{ Transport: &http.Transport{ // from http.DefaultTransport @@ -32,7 +32,7 @@ func newClient(source net.Addr, in chan<- C.ConnContext, additions ...inbound.Ad left, right := net.Pipe() - in <- inbound.NewHTTP(dstAddr, source, right, additions...) + go tunnel.HandleTCPConn(inbound.NewHTTP(dstAddr, source, right, additions...)) return left, nil }, diff --git a/listener/http/proxy.go b/listener/http/proxy.go index a95f7195..a267fbad 100644 --- a/listener/http/proxy.go +++ b/listener/http/proxy.go @@ -14,8 +14,8 @@ import ( "github.com/Dreamacro/clash/log" ) -func HandleConn(c net.Conn, in chan<- C.ConnContext, cache *cache.LruCache[string, bool], additions ...inbound.Addition) { - client := newClient(c.RemoteAddr(), in, additions...) +func HandleConn(c net.Conn, tunnel C.Tunnel, cache *cache.LruCache[string, bool], additions ...inbound.Addition) { + client := newClient(c.RemoteAddr(), tunnel, additions...) defer client.CloseIdleConnections() conn := N.NewBufferedConn(c) @@ -48,7 +48,7 @@ func HandleConn(c net.Conn, in chan<- C.ConnContext, cache *cache.LruCache[strin break // close connection } - in <- inbound.NewHTTPS(request, conn, additions...) + tunnel.HandleTCPConn(inbound.NewHTTPS(request, conn, additions...)) return // hijack connection } @@ -61,7 +61,7 @@ func HandleConn(c net.Conn, in chan<- C.ConnContext, cache *cache.LruCache[strin request.RequestURI = "" if isUpgradeRequest(request) { - handleUpgrade(conn, request, in, additions...) + handleUpgrade(conn, request, tunnel, additions...) return // hijack connection } diff --git a/listener/http/server.go b/listener/http/server.go index 8819af11..0377d3b6 100644 --- a/listener/http/server.go +++ b/listener/http/server.go @@ -30,11 +30,11 @@ func (l *Listener) Close() error { return l.listener.Close() } -func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*Listener, error) { - return NewWithAuthenticate(addr, in, true, additions...) +func New(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) { + return NewWithAuthenticate(addr, tunnel, true, additions...) } -func NewWithAuthenticate(addr string, in chan<- C.ConnContext, authenticate bool, additions ...inbound.Addition) (*Listener, error) { +func NewWithAuthenticate(addr string, tunnel C.Tunnel, authenticate bool, additions ...inbound.Addition) (*Listener, error) { if len(additions) == 0 { additions = []inbound.Addition{ inbound.WithInName("DEFAULT-HTTP"), @@ -65,7 +65,7 @@ func NewWithAuthenticate(addr string, in chan<- C.ConnContext, authenticate bool } continue } - go HandleConn(conn, in, c, additions...) + go HandleConn(conn, tunnel, c, additions...) } }() diff --git a/listener/http/upgrade.go b/listener/http/upgrade.go index 90e28f0a..e67928ce 100644 --- a/listener/http/upgrade.go +++ b/listener/http/upgrade.go @@ -25,7 +25,7 @@ func isUpgradeRequest(req *http.Request) bool { return false } -func handleUpgrade(conn net.Conn, request *http.Request, in chan<- C.ConnContext, additions ...inbound.Addition) { +func handleUpgrade(conn net.Conn, request *http.Request, tunnel C.Tunnel, additions ...inbound.Addition) { defer conn.Close() removeProxyHeaders(request.Header) @@ -43,7 +43,7 @@ func handleUpgrade(conn net.Conn, request *http.Request, in chan<- C.ConnContext left, right := net.Pipe() - in <- inbound.NewHTTP(dstAddr, conn.RemoteAddr(), right, additions...) + go tunnel.HandleTCPConn(inbound.NewHTTP(dstAddr, conn.RemoteAddr(), right, additions...)) var bufferedLeft *N.BufferedConn if request.TLS != nil { diff --git a/listener/inbound/base.go b/listener/inbound/base.go index b132ac6c..83695bb1 100644 --- a/listener/inbound/base.go +++ b/listener/inbound/base.go @@ -61,7 +61,7 @@ func (b *Base) RawAddress() string { } // Listen implements constant.InboundListener -func (*Base) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { +func (*Base) Listen(tunnel C.Tunnel) error { return nil } diff --git a/listener/inbound/http.go b/listener/inbound/http.go index a93f9684..99577177 100644 --- a/listener/inbound/http.go +++ b/listener/inbound/http.go @@ -42,9 +42,9 @@ func (h *HTTP) Address() string { } // Listen implements constant.InboundListener -func (h *HTTP) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { +func (h *HTTP) Listen(tunnel C.Tunnel) error { var err error - h.l, err = http.New(h.RawAddress(), tcpIn, h.Additions()...) + h.l, err = http.New(h.RawAddress(), tunnel, h.Additions()...) if err != nil { return err } diff --git a/listener/inbound/hysteria2.go b/listener/inbound/hysteria2.go index 430d0e68..df537a41 100644 --- a/listener/inbound/hysteria2.go +++ b/listener/inbound/hysteria2.go @@ -77,9 +77,9 @@ func (t *Hysteria2) Address() string { } // Listen implements constant.InboundListener -func (t *Hysteria2) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { +func (t *Hysteria2) Listen(tunnel C.Tunnel) error { var err error - t.l, err = sing_hysteria2.New(t.ts, tcpIn, udpIn, t.Additions()...) + t.l, err = sing_hysteria2.New(t.ts, tunnel, t.Additions()...) if err != nil { return err } diff --git a/listener/inbound/mixed.go b/listener/inbound/mixed.go index dbba264c..ce445bda 100644 --- a/listener/inbound/mixed.go +++ b/listener/inbound/mixed.go @@ -50,14 +50,14 @@ func (m *Mixed) Address() string { } // Listen implements constant.InboundListener -func (m *Mixed) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { +func (m *Mixed) Listen(tunnel C.Tunnel) error { var err error - m.l, err = mixed.New(m.RawAddress(), tcpIn, m.Additions()...) + m.l, err = mixed.New(m.RawAddress(), tunnel, m.Additions()...) if err != nil { return err } if m.udp { - m.lUDP, err = socks.NewUDP(m.RawAddress(), udpIn, m.Additions()...) + m.lUDP, err = socks.NewUDP(m.RawAddress(), tunnel, m.Additions()...) if err != nil { return err } diff --git a/listener/inbound/redir.go b/listener/inbound/redir.go index 4b88d895..085bf3a8 100644 --- a/listener/inbound/redir.go +++ b/listener/inbound/redir.go @@ -42,9 +42,9 @@ func (r *Redir) Address() string { } // Listen implements constant.InboundListener -func (r *Redir) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { +func (r *Redir) Listen(tunnel C.Tunnel) error { var err error - r.l, err = redir.New(r.RawAddress(), tcpIn, r.Additions()...) + r.l, err = redir.New(r.RawAddress(), tunnel, r.Additions()...) if err != nil { return err } diff --git a/listener/inbound/shadowsocks.go b/listener/inbound/shadowsocks.go index 4659f4d7..fa5b3082 100644 --- a/listener/inbound/shadowsocks.go +++ b/listener/inbound/shadowsocks.go @@ -59,9 +59,9 @@ func (s *ShadowSocks) Address() string { } // Listen implements constant.InboundListener -func (s *ShadowSocks) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { +func (s *ShadowSocks) Listen(tunnel C.Tunnel) error { var err error - s.l, err = sing_shadowsocks.New(s.ss, tcpIn, udpIn, s.Additions()...) + s.l, err = sing_shadowsocks.New(s.ss, tunnel, s.Additions()...) if err != nil { return err } diff --git a/listener/inbound/socks.go b/listener/inbound/socks.go index aac2ee23..09580a57 100644 --- a/listener/inbound/socks.go +++ b/listener/inbound/socks.go @@ -68,13 +68,13 @@ func (s *Socks) Address() string { } // Listen implements constant.InboundListener -func (s *Socks) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { +func (s *Socks) Listen(tunnel C.Tunnel) error { var err error - if s.stl, err = socks.New(s.RawAddress(), tcpIn, s.Additions()...); err != nil { + if s.stl, err = socks.New(s.RawAddress(), tunnel, s.Additions()...); err != nil { return err } if s.udp { - if s.sul, err = socks.NewUDP(s.RawAddress(), udpIn, s.Additions()...); err != nil { + if s.sul, err = socks.NewUDP(s.RawAddress(), tunnel, s.Additions()...); err != nil { return err } } diff --git a/listener/inbound/tproxy.go b/listener/inbound/tproxy.go index 00cd0849..682188f5 100644 --- a/listener/inbound/tproxy.go +++ b/listener/inbound/tproxy.go @@ -49,14 +49,14 @@ func (t *TProxy) Address() string { } // Listen implements constant.InboundListener -func (t *TProxy) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { +func (t *TProxy) Listen(tunnel C.Tunnel) error { var err error - t.lTCP, err = tproxy.New(t.RawAddress(), tcpIn, t.Additions()...) + t.lTCP, err = tproxy.New(t.RawAddress(), tunnel, t.Additions()...) if err != nil { return err } if t.udp { - t.lUDP, err = tproxy.NewUDP(t.RawAddress(), udpIn, natTable, t.Additions()...) + t.lUDP, err = tproxy.NewUDP(t.RawAddress(), tunnel, t.Additions()...) if err != nil { return err } diff --git a/listener/inbound/tuic.go b/listener/inbound/tuic.go index bf448d31..e7b51392 100644 --- a/listener/inbound/tuic.go +++ b/listener/inbound/tuic.go @@ -73,9 +73,9 @@ func (t *Tuic) Address() string { } // Listen implements constant.InboundListener -func (t *Tuic) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { +func (t *Tuic) Listen(tunnel C.Tunnel) error { var err error - t.l, err = tuic.New(t.ts, tcpIn, udpIn, t.Additions()...) + t.l, err = tuic.New(t.ts, tunnel, t.Additions()...) if err != nil { return err } diff --git a/listener/inbound/tun.go b/listener/inbound/tun.go index eb16d2dd..3151e6b0 100644 --- a/listener/inbound/tun.go +++ b/listener/inbound/tun.go @@ -113,9 +113,9 @@ func (t *Tun) Address() string { } // Listen implements constant.InboundListener -func (t *Tun) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { +func (t *Tun) Listen(tunnel C.Tunnel) error { var err error - t.l, err = sing_tun.New(t.tun, tcpIn, udpIn, t.Additions()...) + t.l, err = sing_tun.New(t.tun, tunnel, t.Additions()...) if err != nil { return err } diff --git a/listener/inbound/tunnel.go b/listener/inbound/tunnel.go index 41d024ef..2af663a5 100644 --- a/listener/inbound/tunnel.go +++ b/listener/inbound/tunnel.go @@ -4,7 +4,7 @@ import ( "fmt" C "github.com/Dreamacro/clash/constant" - "github.com/Dreamacro/clash/listener/tunnel" + LT "github.com/Dreamacro/clash/listener/tunnel" "github.com/Dreamacro/clash/log" ) @@ -21,8 +21,8 @@ func (o TunnelOption) Equal(config C.InboundConfig) bool { type Tunnel struct { *Base config *TunnelOption - ttl *tunnel.Listener - tul *tunnel.PacketConn + ttl *LT.Listener + tul *LT.PacketConn } func NewTunnel(options *TunnelOption) (*Tunnel, error) { @@ -74,16 +74,16 @@ func (t *Tunnel) Address() string { } // Listen implements constant.InboundListener -func (t *Tunnel) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { +func (t *Tunnel) Listen(tunnel C.Tunnel) error { var err error for _, network := range t.config.Network { switch network { case "tcp": - if t.ttl, err = tunnel.New(t.RawAddress(), t.config.Target, t.config.SpecialProxy, tcpIn, t.Additions()...); err != nil { + if t.ttl, err = LT.New(t.RawAddress(), t.config.Target, t.config.SpecialProxy, tunnel, t.Additions()...); err != nil { return err } case "udp": - if t.tul, err = tunnel.NewUDP(t.RawAddress(), t.config.Target, t.config.SpecialProxy, udpIn, t.Additions()...); err != nil { + if t.tul, err = LT.NewUDP(t.RawAddress(), t.config.Target, t.config.SpecialProxy, tunnel, t.Additions()...); err != nil { return err } default: diff --git a/listener/inbound/vmess.go b/listener/inbound/vmess.go index 70e840a5..fa2c30f1 100644 --- a/listener/inbound/vmess.go +++ b/listener/inbound/vmess.go @@ -69,7 +69,7 @@ func (v *Vmess) Address() string { } // Listen implements constant.InboundListener -func (v *Vmess) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { +func (v *Vmess) Listen(tunnel C.Tunnel) error { var err error users := make([]LC.VmessUser, len(v.config.Users)) for i, v := range v.config.Users { @@ -79,7 +79,7 @@ func (v *Vmess) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, AlterID: v.AlterID, } } - v.l, err = sing_vmess.New(v.vs, tcpIn, udpIn, v.Additions()...) + v.l, err = sing_vmess.New(v.vs, tunnel, v.Additions()...) if err != nil { return err } diff --git a/listener/inner/tcp.go b/listener/inner/tcp.go index 9ba87e2f..4a54a82f 100644 --- a/listener/inner/tcp.go +++ b/listener/inner/tcp.go @@ -8,19 +8,19 @@ import ( C "github.com/Dreamacro/clash/constant" ) -var tcpIn chan<- C.ConnContext +var tunnel C.Tunnel -func New(in chan<- C.ConnContext) { - tcpIn = in +func New(t C.Tunnel) { + tunnel = t } func HandleTcp(address string) (conn net.Conn, err error) { - if tcpIn == nil { + if tunnel == nil { return nil, errors.New("tcp uninitialized") } // executor Parsed conn1, conn2 := net.Pipe() context := inbound.NewInner(conn2, address) - tcpIn <- context + go tunnel.HandleTCPConn(context) return conn1, nil } diff --git a/listener/listener.go b/listener/listener.go index b1d59d49..afbcf14c 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -23,7 +23,7 @@ import ( "github.com/Dreamacro/clash/listener/socks" "github.com/Dreamacro/clash/listener/tproxy" "github.com/Dreamacro/clash/listener/tuic" - "github.com/Dreamacro/clash/listener/tunnel" + LT "github.com/Dreamacro/clash/listener/tunnel" "github.com/Dreamacro/clash/log" "github.com/samber/lo" @@ -42,8 +42,8 @@ var ( tproxyUDPListener *tproxy.UDPListener mixedListener *mixed.Listener mixedUDPLister *socks.UDPListener - tunnelTCPListeners = map[string]*tunnel.Listener{} - tunnelUDPListeners = map[string]*tunnel.PacketConn{} + tunnelTCPListeners = map[string]*LT.Listener{} + tunnelUDPListeners = map[string]*LT.PacketConn{} inboundListeners = map[string]C.InboundListener{} tunLister *sing_tun.Listener shadowSocksListener C.MultiAddrListener @@ -112,7 +112,7 @@ func SetBindAddress(host string) { bindAddress = host } -func ReCreateHTTP(port int, tcpIn chan<- C.ConnContext) { +func ReCreateHTTP(port int, tunnel C.Tunnel) { httpMux.Lock() defer httpMux.Unlock() @@ -137,7 +137,7 @@ func ReCreateHTTP(port int, tcpIn chan<- C.ConnContext) { return } - httpListener, err = http.New(addr, tcpIn) + httpListener, err = http.New(addr, tunnel) if err != nil { log.Errorln("Start HTTP server error: %s", err.Error()) return @@ -146,7 +146,7 @@ func ReCreateHTTP(port int, tcpIn chan<- C.ConnContext) { log.Infoln("HTTP proxy listening at: %s", httpListener.Address()) } -func ReCreateSocks(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) { +func ReCreateSocks(port int, tunnel C.Tunnel) { socksMux.Lock() defer socksMux.Unlock() @@ -188,12 +188,12 @@ func ReCreateSocks(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAd return } - tcpListener, err := socks.New(addr, tcpIn) + tcpListener, err := socks.New(addr, tunnel) if err != nil { return } - udpListener, err := socks.NewUDP(addr, udpIn) + udpListener, err := socks.NewUDP(addr, tunnel) if err != nil { tcpListener.Close() return @@ -205,7 +205,7 @@ func ReCreateSocks(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAd log.Infoln("SOCKS proxy listening at: %s", socksListener.Address()) } -func ReCreateRedir(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) { +func ReCreateRedir(port int, tunnel C.Tunnel) { redirMux.Lock() defer redirMux.Unlock() @@ -238,12 +238,12 @@ func ReCreateRedir(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAd return } - redirListener, err = redir.New(addr, tcpIn) + redirListener, err = redir.New(addr, tunnel) if err != nil { return } - redirUDPListener, err = tproxy.NewUDP(addr, udpIn, natTable) + redirUDPListener, err = tproxy.NewUDP(addr, tunnel) if err != nil { log.Warnln("Failed to start Redir UDP Listener: %s", err) } @@ -251,7 +251,7 @@ func ReCreateRedir(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAd log.Infoln("Redirect proxy listening at: %s", redirListener.Address()) } -func ReCreateShadowSocks(shadowSocksConfig string, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) { +func ReCreateShadowSocks(shadowSocksConfig string, tunnel C.Tunnel) { ssMux.Lock() defer ssMux.Unlock() @@ -292,7 +292,7 @@ func ReCreateShadowSocks(shadowSocksConfig string, tcpIn chan<- C.ConnContext, u return } - listener, err := sing_shadowsocks.New(ssConfig, tcpIn, udpIn) + listener, err := sing_shadowsocks.New(ssConfig, tunnel) if err != nil { return } @@ -305,7 +305,7 @@ func ReCreateShadowSocks(shadowSocksConfig string, tcpIn chan<- C.ConnContext, u return } -func ReCreateVmess(vmessConfig string, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) { +func ReCreateVmess(vmessConfig string, tunnel C.Tunnel) { vmessMux.Lock() defer vmessMux.Unlock() @@ -344,7 +344,7 @@ func ReCreateVmess(vmessConfig string, tcpIn chan<- C.ConnContext, udpIn chan<- return } - listener, err := sing_vmess.New(vsConfig, tcpIn, udpIn) + listener, err := sing_vmess.New(vsConfig, tunnel) if err != nil { return } @@ -357,7 +357,7 @@ func ReCreateVmess(vmessConfig string, tcpIn chan<- C.ConnContext, udpIn chan<- return } -func ReCreateTuic(config LC.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) { +func ReCreateTuic(config LC.TuicServer, tunnel C.Tunnel) { tuicMux.Lock() defer func() { LastTuicConf = config @@ -389,7 +389,7 @@ func ReCreateTuic(config LC.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<- return } - listener, err := tuic.New(config, tcpIn, udpIn) + listener, err := tuic.New(config, tunnel) if err != nil { return } @@ -402,7 +402,7 @@ func ReCreateTuic(config LC.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<- return } -func ReCreateTProxy(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) { +func ReCreateTProxy(port int, tunnel C.Tunnel) { tproxyMux.Lock() defer tproxyMux.Unlock() @@ -435,12 +435,12 @@ func ReCreateTProxy(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketA return } - tproxyListener, err = tproxy.New(addr, tcpIn) + tproxyListener, err = tproxy.New(addr, tunnel) if err != nil { return } - tproxyUDPListener, err = tproxy.NewUDP(addr, udpIn, natTable) + tproxyUDPListener, err = tproxy.NewUDP(addr, tunnel) if err != nil { log.Warnln("Failed to start TProxy UDP Listener: %s", err) } @@ -448,7 +448,7 @@ func ReCreateTProxy(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketA log.Infoln("TProxy server listening at: %s", tproxyListener.Address()) } -func ReCreateMixed(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) { +func ReCreateMixed(port int, tunnel C.Tunnel) { mixedMux.Lock() defer mixedMux.Unlock() @@ -489,12 +489,12 @@ func ReCreateMixed(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAd return } - mixedListener, err = mixed.New(addr, tcpIn) + mixedListener, err = mixed.New(addr, tunnel) if err != nil { return } - mixedUDPLister, err = socks.NewUDP(addr, udpIn) + mixedUDPLister, err = socks.NewUDP(addr, tunnel) if err != nil { mixedListener.Close() return @@ -503,7 +503,7 @@ func ReCreateMixed(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAd log.Infoln("Mixed(http+socks) proxy listening at: %s", mixedListener.Address()) } -func ReCreateTun(tunConf LC.Tun, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) { +func ReCreateTun(tunConf LC.Tun, tunnel C.Tunnel) { tunMux.Lock() defer func() { LastTunConf = tunConf @@ -531,7 +531,7 @@ func ReCreateTun(tunConf LC.Tun, tcpIn chan<- C.ConnContext, udpIn chan<- C.Pack return } - lister, err := sing_tun.New(tunConf, tcpIn, udpIn) + lister, err := sing_tun.New(tunConf, tunnel) if err != nil { return } @@ -573,7 +573,7 @@ func ReCreateRedirToTun(ifaceNames []string) { log.Infoln("Attached tc ebpf program to interfaces %v", tcProgram.RawNICs()) } -func ReCreateAutoRedir(ifaceNames []string, tcpIn chan<- C.ConnContext, _ chan<- C.PacketAdapter) { +func ReCreateAutoRedir(ifaceNames []string, tunnel C.Tunnel) { autoRedirMux.Lock() defer autoRedirMux.Unlock() @@ -614,7 +614,7 @@ func ReCreateAutoRedir(ifaceNames []string, tcpIn chan<- C.ConnContext, _ chan<- addr := genAddr("*", C.TcpAutoRedirPort, true) - autoRedirListener, err = autoredir.New(addr, tcpIn) + autoRedirListener, err = autoredir.New(addr, tunnel) if err != nil { return } @@ -629,7 +629,7 @@ func ReCreateAutoRedir(ifaceNames []string, tcpIn chan<- C.ConnContext, _ chan<- log.Infoln("Auto redirect proxy listening at: %s, attached tc ebpf program to interfaces %v", autoRedirListener.Address(), autoRedirProgram.RawNICs()) } -func PatchTunnel(tunnels []LC.Tunnel, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) { +func PatchTunnel(tunnels []LC.Tunnel, tunnel C.Tunnel) { tunnelMux.Lock() defer tunnelMux.Unlock() @@ -699,7 +699,7 @@ func PatchTunnel(tunnels []LC.Tunnel, tcpIn chan<- C.ConnContext, udpIn chan<- C for _, elm := range needCreate { key := fmt.Sprintf("%s/%s/%s", elm.addr, elm.target, elm.proxy) if elm.network == "tcp" { - l, err := tunnel.New(elm.addr, elm.target, elm.proxy, tcpIn) + l, err := LT.New(elm.addr, elm.target, elm.proxy, tunnel) if err != nil { log.Errorln("Start tunnel %s error: %s", elm.target, err.Error()) continue @@ -707,7 +707,7 @@ func PatchTunnel(tunnels []LC.Tunnel, tcpIn chan<- C.ConnContext, udpIn chan<- C tunnelTCPListeners[key] = l log.Infoln("Tunnel(tcp/%s) proxy %s listening at: %s", elm.target, elm.proxy, tunnelTCPListeners[key].Address()) } else { - l, err := tunnel.NewUDP(elm.addr, elm.target, elm.proxy, udpIn) + l, err := LT.NewUDP(elm.addr, elm.target, elm.proxy, tunnel) if err != nil { log.Errorln("Start tunnel %s error: %s", elm.target, err.Error()) continue @@ -718,7 +718,7 @@ func PatchTunnel(tunnels []LC.Tunnel, tcpIn chan<- C.ConnContext, udpIn chan<- C } } -func PatchInboundListeners(newListenerMap map[string]C.InboundListener, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable, dropOld bool) { +func PatchInboundListeners(newListenerMap map[string]C.InboundListener, tunnel C.Tunnel, dropOld bool) { inboundMux.Lock() defer inboundMux.Unlock() @@ -730,7 +730,7 @@ func PatchInboundListeners(newListenerMap map[string]C.InboundListener, tcpIn ch continue } } - if err := newListener.Listen(tcpIn, udpIn, natTable); err != nil { + if err := newListener.Listen(tunnel); err != nil { log.Errorln("Listener %s listen err: %s", name, err.Error()) continue } diff --git a/listener/mixed/mixed.go b/listener/mixed/mixed.go index 7241927d..d2ede096 100644 --- a/listener/mixed/mixed.go +++ b/listener/mixed/mixed.go @@ -36,7 +36,7 @@ func (l *Listener) Close() error { return l.listener.Close() } -func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*Listener, error) { +func New(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) { if len(additions) == 0 { additions = []inbound.Addition{ inbound.WithInName("DEFAULT-MIXED"), @@ -62,14 +62,14 @@ func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (* } continue } - go handleConn(c, in, ml.cache, additions...) + go handleConn(c, tunnel, ml.cache, additions...) } }() return ml, nil } -func handleConn(conn net.Conn, in chan<- C.ConnContext, cache *cache.LruCache[string, bool], additions ...inbound.Addition) { +func handleConn(conn net.Conn, tunnel C.Tunnel, cache *cache.LruCache[string, bool], additions ...inbound.Addition) { N.TCPKeepAlive(conn) bufConn := N.NewBufferedConn(conn) @@ -80,10 +80,10 @@ func handleConn(conn net.Conn, in chan<- C.ConnContext, cache *cache.LruCache[st switch head[0] { case socks4.Version: - socks.HandleSocks4(bufConn, in, additions...) + socks.HandleSocks4(bufConn, tunnel, additions...) case socks5.Version: - socks.HandleSocks5(bufConn, in, additions...) + socks.HandleSocks5(bufConn, tunnel, additions...) default: - http.HandleConn(bufConn, in, cache, additions...) + http.HandleConn(bufConn, tunnel, cache, additions...) } } diff --git a/listener/redir/tcp.go b/listener/redir/tcp.go index 9a843af8..6419760f 100644 --- a/listener/redir/tcp.go +++ b/listener/redir/tcp.go @@ -30,7 +30,7 @@ func (l *Listener) Close() error { return l.listener.Close() } -func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*Listener, error) { +func New(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) { if len(additions) == 0 { additions = []inbound.Addition{ inbound.WithInName("DEFAULT-REDIR"), @@ -55,18 +55,19 @@ func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (* } continue } - go handleRedir(c, in, additions...) + go handleRedir(c, tunnel, additions...) } }() return rl, nil } -func handleRedir(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { + +func handleRedir(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) { target, err := parserPacket(conn) if err != nil { conn.Close() return } N.TCPKeepAlive(conn) - in <- inbound.NewSocket(target, conn, C.REDIR, additions...) + tunnel.HandleTCPConn(inbound.NewSocket(target, conn, C.REDIR, additions...)) } diff --git a/listener/shadowsocks/tcp.go b/listener/shadowsocks/tcp.go index 2d0958a0..8959e6ba 100644 --- a/listener/shadowsocks/tcp.go +++ b/listener/shadowsocks/tcp.go @@ -22,7 +22,7 @@ type Listener struct { var _listener *Listener -func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) (*Listener, error) { +func New(config LC.ShadowsocksServer, tunnel C.Tunnel) (*Listener, error) { pickCipher, err := core.PickCipher(config.Cipher, nil, config.Password) if err != nil { return nil, err @@ -36,7 +36,7 @@ func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C if config.Udp { //UDP - ul, err := NewUDP(addr, pickCipher, udpIn) + ul, err := NewUDP(addr, pickCipher, tunnel) if err != nil { return nil, err } @@ -60,7 +60,7 @@ func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C continue } N.TCPKeepAlive(c) - go sl.HandleConn(c, tcpIn) + go sl.HandleConn(c, tunnel) } }() } @@ -99,7 +99,7 @@ func (l *Listener) AddrList() (addrList []net.Addr) { return } -func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { +func (l *Listener) HandleConn(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) { conn = l.pickCipher.StreamConn(conn) conn = N.NewDeadlineConn(conn) // embed ss can't handle readDeadline correctly @@ -108,12 +108,12 @@ func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions _ = conn.Close() return } - in <- inbound.NewSocket(target, conn, C.SHADOWSOCKS, additions...) + tunnel.HandleTCPConn(inbound.NewSocket(target, conn, C.SHADOWSOCKS, additions...)) } -func HandleShadowSocks(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) bool { +func HandleShadowSocks(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) bool { if _listener != nil && _listener.pickCipher != nil { - go _listener.HandleConn(conn, in, additions...) + go _listener.HandleConn(conn, tunnel, additions...) return true } return false diff --git a/listener/shadowsocks/udp.go b/listener/shadowsocks/udp.go index af610431..cc055853 100644 --- a/listener/shadowsocks/udp.go +++ b/listener/shadowsocks/udp.go @@ -17,7 +17,7 @@ type UDPListener struct { closed bool } -func NewUDP(addr string, pickCipher core.Cipher, in chan<- C.PacketAdapter) (*UDPListener, error) { +func NewUDP(addr string, pickCipher core.Cipher, tunnel C.Tunnel) (*UDPListener, error) { l, err := net.ListenPacket("udp", addr) if err != nil { return nil, err @@ -42,7 +42,7 @@ func NewUDP(addr string, pickCipher core.Cipher, in chan<- C.PacketAdapter) (*UD } continue } - handleSocksUDP(conn, in, data, put, remoteAddr) + handleSocksUDP(conn, tunnel, data, put, remoteAddr) } }() @@ -58,7 +58,7 @@ func (l *UDPListener) LocalAddr() net.Addr { return l.packetConn.LocalAddr() } -func handleSocksUDP(pc net.PacketConn, in chan<- C.PacketAdapter, buf []byte, put func(), addr net.Addr, additions ...inbound.Addition) { +func handleSocksUDP(pc net.PacketConn, tunnel C.Tunnel, buf []byte, put func(), addr net.Addr, additions ...inbound.Addition) { tgtAddr := socks5.SplitAddr(buf) if tgtAddr == nil { // Unresolved UDP packet, return buffer to the pool @@ -76,8 +76,5 @@ func handleSocksUDP(pc net.PacketConn, in chan<- C.PacketAdapter, buf []byte, pu payload: payload, put: put, } - select { - case in <- inbound.NewPacket(target, packet, C.SHADOWSOCKS, additions...): - default: - } + tunnel.HandleUDPPacket(inbound.NewPacket(target, packet, C.SHADOWSOCKS, additions...)) } diff --git a/listener/sing/sing.go b/listener/sing/sing.go index d5731bbf..a9bee564 100644 --- a/listener/sing/sing.go +++ b/listener/sing/sing.go @@ -28,43 +28,12 @@ import ( const UDPTimeout = 5 * time.Minute type ListenerHandler struct { - TcpIn chan<- C.ConnContext - UdpIn chan<- C.PacketAdapter + Tunnel C.Tunnel Type C.Type Additions []inbound.Addition UDPTimeout time.Duration } -type waitCloseConn struct { - N.ExtendedConn - wg *sync.WaitGroup - close sync.Once - rAddr net.Addr -} - -func (c *waitCloseConn) Close() error { // call from handleTCPConn(connCtx C.ConnContext) - c.close.Do(func() { - c.wg.Done() - }) - return c.ExtendedConn.Close() -} - -func (c *waitCloseConn) RemoteAddr() net.Addr { - return c.rAddr -} - -func (c *waitCloseConn) Upstream() any { - return c.ExtendedConn -} - -func (c *waitCloseConn) ReaderReplaceable() bool { - return true -} - -func (c *waitCloseConn) WriterReplaceable() bool { - return true -} - func UpstreamMetadata(metadata M.Metadata) M.Metadata { return M.Metadata{ Source: metadata.Source, @@ -117,14 +86,14 @@ func (h *ListenerHandler) NewConnection(ctx context.Context, conn net.Conn, meta return h.ParseSpecialFqdn(ctx, conn, metadata) } target := socks5.ParseAddr(metadata.Destination.String()) - wg := &sync.WaitGroup{} - defer wg.Wait() // this goroutine must exit after conn.Close() - wg.Add(1) if deadline.NeedAdditionalReadDeadline(conn) { conn = N.NewDeadlineConn(conn) // conn from sing should check NeedAdditionalReadDeadline } - h.TcpIn <- inbound.NewSocket(target, &waitCloseConn{ExtendedConn: N.NewExtendedConn(conn), wg: wg, rAddr: metadata.Source.TCPAddr()}, h.Type, combineAdditions(ctx, h.Additions)...) + connCtx := inbound.NewSocket(target, conn, h.Type, combineAdditions(ctx, h.Additions)...) + inbound.WithSrcAddr(metadata.Source.TCPAddr()).Apply(connCtx.Metadata()) // set srcAddr from sing's metadata + + h.Tunnel.HandleTCPConn(connCtx) // this goroutine must exit after conn unused return nil } @@ -177,10 +146,8 @@ func (h *ListenerHandler) NewPacketConnection(ctx context.Context, conn network. lAddr: conn.LocalAddr(), buff: buff, } - select { - case h.UdpIn <- inbound.NewPacket(target, packet, h.Type, combineAdditions(ctx, h.Additions)...): - default: - } + + h.Tunnel.HandleUDPPacket(inbound.NewPacket(target, packet, h.Type, combineAdditions(ctx, h.Additions)...)) } return nil } diff --git a/listener/sing_hysteria2/server.go b/listener/sing_hysteria2/server.go index 4e0a7c07..3628a2ce 100644 --- a/listener/sing_hysteria2/server.go +++ b/listener/sing_hysteria2/server.go @@ -32,7 +32,7 @@ type Listener struct { services []*hysteria2.Service[string] } -func New(config LC.Hysteria2Server, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, additions ...inbound.Addition) (*Listener, error) { +func New(config LC.Hysteria2Server, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) { var sl *Listener var err error if len(additions) == 0 { @@ -43,8 +43,7 @@ func New(config LC.Hysteria2Server, tcpIn chan<- C.ConnContext, udpIn chan<- C.P } h := &sing.ListenerHandler{ - TcpIn: tcpIn, - UdpIn: udpIn, + Tunnel: tunnel, Type: C.HYSTERIA2, Additions: additions, } diff --git a/listener/sing_shadowsocks/server.go b/listener/sing_shadowsocks/server.go index d0e137a7..51baeaa1 100644 --- a/listener/sing_shadowsocks/server.go +++ b/listener/sing_shadowsocks/server.go @@ -35,7 +35,7 @@ type Listener struct { var _listener *Listener -func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, additions ...inbound.Addition) (C.MultiAddrListener, error) { +func New(config LC.ShadowsocksServer, tunnel C.Tunnel, additions ...inbound.Addition) (C.MultiAddrListener, error) { var sl *Listener var err error if len(additions) == 0 { @@ -51,8 +51,7 @@ func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C udpTimeout := int64(sing.UDPTimeout.Seconds()) h := &sing.ListenerHandler{ - TcpIn: tcpIn, - UdpIn: udpIn, + Tunnel: tunnel, Type: C.SHADOWSOCKS, Additions: additions, } @@ -68,7 +67,7 @@ func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C sl.service, err = shadowaead_2022.NewServiceWithPassword(config.Cipher, config.Password, udpTimeout, h, ntp.Now) default: err = fmt.Errorf("shadowsocks: unsupported method: %s", config.Cipher) - return embedSS.New(config, tcpIn, udpIn) + return embedSS.New(config, tunnel) } if err != nil { return nil, err @@ -148,7 +147,7 @@ func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C } N.TCPKeepAlive(c) - go sl.HandleConn(c, tcpIn) + go sl.HandleConn(c, tunnel) } }() } @@ -188,7 +187,7 @@ func (l *Listener) AddrList() (addrList []net.Addr) { return } -func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { +func (l *Listener) HandleConn(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) { ctx := sing.WithAdditions(context.TODO(), additions...) err := l.service.NewConnection(ctx, conn, M.Metadata{ Protocol: "shadowsocks", @@ -200,10 +199,10 @@ func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions } } -func HandleShadowSocks(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) bool { +func HandleShadowSocks(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) bool { if _listener != nil && _listener.service != nil { - go _listener.HandleConn(conn, in, additions...) + go _listener.HandleConn(conn, tunnel, additions...) return true } - return embedSS.HandleShadowSocks(conn, in, additions...) + return embedSS.HandleShadowSocks(conn, tunnel, additions...) } diff --git a/listener/sing_tun/server.go b/listener/sing_tun/server.go index 66fe8cd5..4ca6fc30 100644 --- a/listener/sing_tun/server.go +++ b/listener/sing_tun/server.go @@ -88,7 +88,7 @@ func checkTunName(tunName string) (ok bool) { return true } -func New(options LC.Tun, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, additions ...inbound.Addition) (l *Listener, err error) { +func New(options LC.Tun, tunnel C.Tunnel, additions ...inbound.Addition) (l *Listener, err error) { if len(additions) == 0 { additions = []inbound.Addition{ inbound.WithInName("DEFAULT-TUN"), @@ -152,8 +152,7 @@ func New(options LC.Tun, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapte handler := &ListenerHandler{ ListenerHandler: sing.ListenerHandler{ - TcpIn: tcpIn, - UdpIn: udpIn, + Tunnel: tunnel, Type: C.TUN, Additions: additions, UDPTimeout: time.Second * time.Duration(udpTimeout), diff --git a/listener/sing_vmess/server.go b/listener/sing_vmess/server.go index 06f3e051..d28213c8 100644 --- a/listener/sing_vmess/server.go +++ b/listener/sing_vmess/server.go @@ -27,7 +27,7 @@ type Listener struct { var _listener *Listener -func New(config LC.VmessServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, additions ...inbound.Addition) (sl *Listener, err error) { +func New(config LC.VmessServer, tunnel C.Tunnel, additions ...inbound.Addition) (sl *Listener, err error) { if len(additions) == 0 { additions = []inbound.Addition{ inbound.WithInName("DEFAULT-VMESS"), @@ -38,8 +38,7 @@ func New(config LC.VmessServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.Packe }() } h := &sing.ListenerHandler{ - TcpIn: tcpIn, - UdpIn: udpIn, + Tunnel: tunnel, Type: C.VMESS, Additions: additions, } @@ -87,7 +86,7 @@ func New(config LC.VmessServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.Packe } N.TCPKeepAlive(c) - go sl.HandleConn(c, tcpIn) + go sl.HandleConn(c, tunnel) } }() } @@ -122,7 +121,7 @@ func (l *Listener) AddrList() (addrList []net.Addr) { return } -func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { +func (l *Listener) HandleConn(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) { ctx := sing.WithAdditions(context.TODO(), additions...) err := l.service.NewConnection(ctx, conn, metadata.Metadata{ Protocol: "vmess", @@ -134,9 +133,9 @@ func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions } } -func HandleVmess(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) bool { +func HandleVmess(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) bool { if _listener != nil && _listener.service != nil { - go _listener.HandleConn(conn, in, additions...) + go _listener.HandleConn(conn, tunnel, additions...) return true } return false diff --git a/listener/socks/tcp.go b/listener/socks/tcp.go index 2fd252a3..89b23562 100644 --- a/listener/socks/tcp.go +++ b/listener/socks/tcp.go @@ -34,7 +34,7 @@ func (l *Listener) Close() error { return l.listener.Close() } -func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*Listener, error) { +func New(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) { if len(additions) == 0 { additions = []inbound.Addition{ inbound.WithInName("DEFAULT-SOCKS"), @@ -59,14 +59,14 @@ func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (* } continue } - go handleSocks(c, in, additions...) + go handleSocks(c, tunnel, additions...) } }() return sl, nil } -func handleSocks(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { +func handleSocks(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) { N.TCPKeepAlive(conn) bufConn := N.NewBufferedConn(conn) head, err := bufConn.Peek(1) @@ -77,24 +77,24 @@ func handleSocks(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Ad switch head[0] { case socks4.Version: - HandleSocks4(bufConn, in, additions...) + HandleSocks4(bufConn, tunnel, additions...) case socks5.Version: - HandleSocks5(bufConn, in, additions...) + HandleSocks5(bufConn, tunnel, additions...) default: conn.Close() } } -func HandleSocks4(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { +func HandleSocks4(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) { addr, _, err := socks4.ServerHandshake(conn, authStore.Authenticator()) if err != nil { conn.Close() return } - in <- inbound.NewSocket(socks5.ParseAddr(addr), conn, C.SOCKS4, additions...) + tunnel.HandleTCPConn(inbound.NewSocket(socks5.ParseAddr(addr), conn, C.SOCKS4, additions...)) } -func HandleSocks5(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { +func HandleSocks5(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) { target, command, err := socks5.ServerHandshake(conn, authStore.Authenticator()) if err != nil { conn.Close() @@ -105,5 +105,5 @@ func HandleSocks5(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.A io.Copy(io.Discard, conn) return } - in <- inbound.NewSocket(target, conn, C.SOCKS5, additions...) + tunnel.HandleTCPConn(inbound.NewSocket(target, conn, C.SOCKS5, additions...)) } diff --git a/listener/socks/udp.go b/listener/socks/udp.go index 31858f74..2f786e95 100644 --- a/listener/socks/udp.go +++ b/listener/socks/udp.go @@ -33,7 +33,7 @@ func (l *UDPListener) Close() error { return l.packetConn.Close() } -func NewUDP(addr string, in chan<- C.PacketAdapter, additions ...inbound.Addition) (*UDPListener, error) { +func NewUDP(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*UDPListener, error) { if len(additions) == 0 { additions = []inbound.Addition{ inbound.WithInName("DEFAULT-SOCKS"), @@ -66,14 +66,14 @@ func NewUDP(addr string, in chan<- C.PacketAdapter, additions ...inbound.Additio } continue } - handleSocksUDP(l, in, data, put, remoteAddr, additions...) + handleSocksUDP(l, tunnel, data, put, remoteAddr, additions...) } }() return sl, nil } -func handleSocksUDP(pc net.PacketConn, in chan<- C.PacketAdapter, buf []byte, put func(), addr net.Addr, additions ...inbound.Addition) { +func handleSocksUDP(pc net.PacketConn, tunnel C.Tunnel, buf []byte, put func(), addr net.Addr, additions ...inbound.Addition) { target, payload, err := socks5.DecodeUDPPacket(buf) if err != nil { // Unresolved UDP packet, return buffer to the pool @@ -88,8 +88,5 @@ func handleSocksUDP(pc net.PacketConn, in chan<- C.PacketAdapter, buf []byte, pu payload: payload, put: put, } - select { - case in <- inbound.NewPacket(target, packet, C.SOCKS5, additions...): - default: - } + tunnel.HandleUDPPacket(inbound.NewPacket(target, packet, C.SOCKS5, additions...)) } diff --git a/listener/tproxy/packet.go b/listener/tproxy/packet.go index b73339a1..24fff09a 100644 --- a/listener/tproxy/packet.go +++ b/listener/tproxy/packet.go @@ -13,11 +13,10 @@ import ( ) type packet struct { - pc net.PacketConn - lAddr netip.AddrPort - buf []byte - in chan<- C.PacketAdapter - natTable C.NatTable + pc net.PacketConn + lAddr netip.AddrPort + buf []byte + tunnel C.Tunnel } func (c *packet) Data() []byte { @@ -26,7 +25,7 @@ func (c *packet) Data() []byte { // WriteBack opens a new socket binding `addr` to write UDP packet back func (c *packet) WriteBack(b []byte, addr net.Addr) (n int, err error) { - tc, err := createOrGetLocalConn(addr, c.LocalAddr(), c.in, c.natTable) + tc, err := createOrGetLocalConn(addr, c.LocalAddr(), c.tunnel) if err != nil { n = 0 return @@ -52,9 +51,10 @@ func (c *packet) InAddr() net.Addr { // this function listen at rAddr and write to lAddr // for here, rAddr is the ip/port client want to access // lAddr is the ip/port client opened -func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natTable C.NatTable) (*net.UDPConn, error) { +func createOrGetLocalConn(rAddr, lAddr net.Addr, tunnel C.Tunnel) (*net.UDPConn, error) { remote := rAddr.String() local := lAddr.String() + natTable := tunnel.NatTable() localConn := natTable.GetForLocalConn(local, remote) // localConn not exist if localConn == nil { @@ -76,7 +76,7 @@ func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natT natTable.DeleteLockForLocalConn(local, remote) cond.Broadcast() }() - conn, err := listenLocalConn(rAddr, lAddr, in, natTable) + conn, err := listenLocalConn(rAddr, lAddr, tunnel) if err != nil { log.Errorln("listenLocalConn failed with error: %s, packet loss (rAddr[%T]=%s lAddr[%T]=%s)", err.Error(), rAddr, remote, lAddr, local) return nil, err @@ -90,7 +90,7 @@ func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natT // this function listen at rAddr // and send what received to program itself, then send to real remote -func listenLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natTable C.NatTable) (*net.UDPConn, error) { +func listenLocalConn(rAddr, lAddr net.Addr, tunnel C.Tunnel) (*net.UDPConn, error) { additions := []inbound.Addition{ inbound.WithInName("DEFAULT-TPROXY"), inbound.WithSpecialRules(""), @@ -113,7 +113,7 @@ func listenLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natTable } // since following localPackets are pass through this socket which listen rAddr // I choose current listener as packet's packet conn - handlePacketConn(lc, in, natTable, buf[:br], lAddr.(*net.UDPAddr).AddrPort(), rAddr.(*net.UDPAddr).AddrPort(), additions...) + handlePacketConn(lc, tunnel, buf[:br], lAddr.(*net.UDPAddr).AddrPort(), rAddr.(*net.UDPAddr).AddrPort(), additions...) } }() return lc, nil diff --git a/listener/tproxy/tproxy.go b/listener/tproxy/tproxy.go index 8c868609..319d5c30 100644 --- a/listener/tproxy/tproxy.go +++ b/listener/tproxy/tproxy.go @@ -31,13 +31,13 @@ func (l *Listener) Close() error { return l.listener.Close() } -func (l *Listener) handleTProxy(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { +func (l *Listener) handleTProxy(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) { target := socks5.ParseAddrToSocksAddr(conn.LocalAddr()) N.TCPKeepAlive(conn) - in <- inbound.NewSocket(target, conn, C.TPROXY, additions...) + tunnel.HandleTCPConn(inbound.NewSocket(target, conn, C.TPROXY, additions...)) } -func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*Listener, error) { +func New(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) { if len(additions) == 0 { additions = []inbound.Addition{ inbound.WithInName("DEFAULT-TPROXY"), @@ -74,7 +74,7 @@ func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (* } continue } - go rl.handleTProxy(c, in, additions...) + go rl.handleTProxy(c, tunnel, additions...) } }() diff --git a/listener/tproxy/udp.go b/listener/tproxy/udp.go index d3727180..c8460def 100644 --- a/listener/tproxy/udp.go +++ b/listener/tproxy/udp.go @@ -32,7 +32,7 @@ func (l *UDPListener) Close() error { return l.packetConn.Close() } -func NewUDP(addr string, in chan<- C.PacketAdapter, natTable C.NatTable, additions ...inbound.Addition) (*UDPListener, error) { +func NewUDP(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*UDPListener, error) { if len(additions) == 0 { additions = []inbound.Addition{ inbound.WithInName("DEFAULT-TPROXY"), @@ -83,24 +83,20 @@ func NewUDP(addr string, in chan<- C.PacketAdapter, natTable C.NatTable, additio // try to unmap 4in6 address lAddr = netip.AddrPortFrom(lAddr.Addr().Unmap(), lAddr.Port()) } - handlePacketConn(l, in, natTable, buf[:n], lAddr, rAddr, additions...) + handlePacketConn(l, tunnel, buf[:n], lAddr, rAddr, additions...) } }() return rl, nil } -func handlePacketConn(pc net.PacketConn, in chan<- C.PacketAdapter, natTable C.NatTable, buf []byte, lAddr, rAddr netip.AddrPort, additions ...inbound.Addition) { +func handlePacketConn(pc net.PacketConn, tunnel C.Tunnel, buf []byte, lAddr, rAddr netip.AddrPort, additions ...inbound.Addition) { target := socks5.AddrFromStdAddrPort(rAddr) pkt := &packet{ - pc: pc, - lAddr: lAddr, - buf: buf, - in: in, - natTable: natTable, - } - select { - case in <- inbound.NewPacket(target, pkt, C.TPROXY, additions...): - default: + pc: pc, + lAddr: lAddr, + buf: buf, + tunnel: tunnel, } + tunnel.HandleUDPPacket(inbound.NewPacket(target, pkt, C.TPROXY, additions...)) } diff --git a/listener/tuic/server.go b/listener/tuic/server.go index 125c53e1..bfead7f3 100644 --- a/listener/tuic/server.go +++ b/listener/tuic/server.go @@ -31,7 +31,7 @@ type Listener struct { servers []*tuic.Server } -func New(config LC.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, additions ...inbound.Addition) (*Listener, error) { +func New(config LC.TuicServer, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) { if len(additions) == 0 { additions = []inbound.Addition{ inbound.WithInName("DEFAULT-TUIC"), @@ -39,8 +39,7 @@ func New(config LC.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.Packet } } h := &sing.ListenerHandler{ - TcpIn: tcpIn, - UdpIn: udpIn, + Tunnel: tunnel, Type: C.TUIC, Additions: additions, } @@ -106,7 +105,7 @@ func New(config LC.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.Packet }() return nil } - tcpIn <- connCtx + go tunnel.HandleTCPConn(connCtx) return nil } handleUdpFn := func(addr socks5.Addr, packet C.UDPPacket, _additions ...inbound.Addition) error { @@ -115,10 +114,7 @@ func New(config LC.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.Packet newAdditions = slices.Clone(additions) newAdditions = append(newAdditions, _additions...) } - select { - case udpIn <- inbound.NewPacket(addr, packet, C.TUIC, newAdditions...): - default: - } + tunnel.HandleUDPPacket(inbound.NewPacket(addr, packet, C.TUIC, newAdditions...)) return nil } diff --git a/listener/tunnel/tcp.go b/listener/tunnel/tcp.go index d660d2b8..8cc527fb 100644 --- a/listener/tunnel/tcp.go +++ b/listener/tunnel/tcp.go @@ -34,14 +34,14 @@ func (l *Listener) Close() error { return l.listener.Close() } -func (l *Listener) handleTCP(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { +func (l *Listener) handleTCP(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) { N.TCPKeepAlive(conn) ctx := inbound.NewSocket(l.target, conn, C.TUNNEL, additions...) ctx.Metadata().SpecialProxy = l.proxy - in <- ctx + tunnel.HandleTCPConn(ctx) } -func New(addr, target, proxy string, in chan<- C.ConnContext, additions ...inbound.Addition) (*Listener, error) { +func New(addr, target, proxy string, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) { l, err := inbound.Listen("tcp", addr) if err != nil { return nil, err @@ -68,7 +68,7 @@ func New(addr, target, proxy string, in chan<- C.ConnContext, additions ...inbou } continue } - go rl.handleTCP(c, in, additions...) + go rl.handleTCP(c, tunnel, additions...) } }() diff --git a/listener/tunnel/udp.go b/listener/tunnel/udp.go index 0795084c..c2f1dcc3 100644 --- a/listener/tunnel/udp.go +++ b/listener/tunnel/udp.go @@ -34,7 +34,7 @@ func (l *PacketConn) Close() error { return l.conn.Close() } -func NewUDP(addr, target, proxy string, in chan<- C.PacketAdapter, additions ...inbound.Addition) (*PacketConn, error) { +func NewUDP(addr, target, proxy string, tunnel C.Tunnel, additions ...inbound.Addition) (*PacketConn, error) { l, err := net.ListenPacket("udp", addr) if err != nil { return nil, err @@ -62,14 +62,14 @@ func NewUDP(addr, target, proxy string, in chan<- C.PacketAdapter, additions ... } continue } - sl.handleUDP(l, in, buf[:n], remoteAddr, additions...) + sl.handleUDP(l, tunnel, buf[:n], remoteAddr, additions...) } }() return sl, nil } -func (l *PacketConn) handleUDP(pc net.PacketConn, in chan<- C.PacketAdapter, buf []byte, addr net.Addr, additions ...inbound.Addition) { +func (l *PacketConn) handleUDP(pc net.PacketConn, tunnel C.Tunnel, buf []byte, addr net.Addr, additions ...inbound.Addition) { packet := &packet{ pc: pc, rAddr: addr, @@ -78,8 +78,5 @@ func (l *PacketConn) handleUDP(pc net.PacketConn, in chan<- C.PacketAdapter, buf ctx := inbound.NewPacket(l.target, packet, C.TUNNEL, additions...) ctx.Metadata().SpecialProxy = l.proxy - select { - case in <- ctx: - default: - } + tunnel.HandleUDPPacket(ctx) } diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index ff64915a..1e73a833 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -49,6 +49,25 @@ var ( fakeIPRange netip.Prefix ) +type tunnel struct{} + +var Tunnel C.Tunnel = tunnel{} + +func (t tunnel) HandleTCPConn(connCtx C.ConnContext) { + handleTCPConn(connCtx) +} + +func (t tunnel) HandleUDPPacket(packet C.PacketAdapter) { + select { + case udpQueue <- packet: + default: + } +} + +func (t tunnel) NatTable() C.NatTable { + return natTable +} + func OnSuspend() { status.Store(Suspend) } @@ -90,11 +109,13 @@ func init() { } // TCPIn return fan-in queue +// Deprecated: using Tunnel instead func TCPIn() chan<- C.ConnContext { return tcpQueue } // UDPIn return fan-in udp queue +// Deprecated: using Tunnel instead func UDPIn() chan<- C.PacketAdapter { return udpQueue } @@ -197,10 +218,6 @@ func isHandle(t C.Type) bool { func processUDP() { queue := udpQueue for conn := range queue { - if !isHandle(conn.Metadata().Type) { - conn.Drop() - continue - } handleUDPConn(conn) } } @@ -216,10 +233,6 @@ func process() { queue := tcpQueue for conn := range queue { - if !isHandle(conn.Metadata().Type) { - _ = conn.Conn().Close() - continue - } go handleTCPConn(conn) } } @@ -284,6 +297,11 @@ func resolveMetadata(ctx C.PlainContext, metadata *C.Metadata) (proxy C.Proxy, r } func handleUDPConn(packet C.PacketAdapter) { + if !isHandle(packet.Metadata().Type) { + packet.Drop() + return + } + metadata := packet.Metadata() if !metadata.Valid() { packet.Drop() @@ -409,6 +427,11 @@ func handleUDPConn(packet C.PacketAdapter) { } func handleTCPConn(connCtx C.ConnContext) { + if !isHandle(connCtx.Metadata().Type) { + _ = connCtx.Conn().Close() + return + } + defer func(conn net.Conn) { _ = conn.Close() }(connCtx.Conn())