chore: decrease goroutine used in core tunnel

This commit is contained in:
wwqgtxx 2023-09-28 18:59:31 +08:00
parent 21fb5f75b8
commit e0458a8fde
42 changed files with 252 additions and 269 deletions

View file

@ -1,6 +1,8 @@
package inbound package inbound
import ( import (
"net"
C "github.com/Dreamacro/clash/constant" C "github.com/Dreamacro/clash/constant"
) )
@ -33,3 +35,12 @@ func WithSpecialProxy(specialProxy string) Addition {
metadata.SpecialProxy = specialProxy metadata.SpecialProxy = specialProxy
} }
} }
func WithSrcAddr(addr net.Addr) Addition {
return func(metadata *C.Metadata) {
if ip, port, err := parseAddr(addr); err == nil {
metadata.SrcIP = ip
metadata.SrcPort = port
}
}
}

View file

@ -16,7 +16,7 @@ type MultiAddrListener interface {
type InboundListener interface { type InboundListener interface {
Name() string Name() string
Listen(tcpIn chan<- ConnContext, udpIn chan<- PacketAdapter, natTable NatTable) error Listen(tunnel Tunnel) error
Close() error Close() error
Address() string Address() string
RawAddress() string RawAddress() string

10
constant/tunnel.go Normal file
View file

@ -0,0 +1,10 @@
package constant
type Tunnel interface {
// HandleTCPConn will handle a tcp connection blocking
HandleTCPConn(connCtx ConnContext)
// HandleUDPPacket will handle a udp packet nonblocking
HandleUDPPacket(packet PacketAdapter)
// NatTable return nat table
NatTable() NatTable
}

View file

@ -118,7 +118,7 @@ func ApplyConfig(cfg *config.Config, force bool) {
} }
func initInnerTcp() { func initInnerTcp() {
inner.New(tunnel.TCPIn()) inner.New(tunnel.Tunnel)
} }
func GetGeneral() *config.General { func GetGeneral() *config.General {
@ -157,11 +157,7 @@ func GetGeneral() *config.General {
} }
func updateListeners(general *config.General, listeners map[string]C.InboundListener, force bool) { func updateListeners(general *config.General, listeners map[string]C.InboundListener, force bool) {
tcpIn := tunnel.TCPIn() listener.PatchInboundListeners(listeners, tunnel.Tunnel, true)
udpIn := tunnel.UDPIn()
natTable := tunnel.NatTable()
listener.PatchInboundListeners(listeners, tcpIn, udpIn, natTable, true)
if !force { if !force {
return return
} }
@ -171,15 +167,15 @@ func updateListeners(general *config.General, listeners map[string]C.InboundList
bindAddress := general.BindAddress bindAddress := general.BindAddress
listener.SetBindAddress(bindAddress) listener.SetBindAddress(bindAddress)
listener.ReCreateHTTP(general.Port, tcpIn) listener.ReCreateHTTP(general.Port, tunnel.Tunnel)
listener.ReCreateSocks(general.SocksPort, tcpIn, udpIn) listener.ReCreateSocks(general.SocksPort, tunnel.Tunnel)
listener.ReCreateRedir(general.RedirPort, tcpIn, udpIn, natTable) listener.ReCreateRedir(general.RedirPort, tunnel.Tunnel)
listener.ReCreateAutoRedir(general.EBpf.AutoRedir, tcpIn, udpIn) listener.ReCreateAutoRedir(general.EBpf.AutoRedir, tunnel.Tunnel)
listener.ReCreateTProxy(general.TProxyPort, tcpIn, udpIn, natTable) listener.ReCreateTProxy(general.TProxyPort, tunnel.Tunnel)
listener.ReCreateMixed(general.MixedPort, tcpIn, udpIn) listener.ReCreateMixed(general.MixedPort, tunnel.Tunnel)
listener.ReCreateShadowSocks(general.ShadowSocksConfig, tcpIn, udpIn) listener.ReCreateShadowSocks(general.ShadowSocksConfig, tunnel.Tunnel)
listener.ReCreateVmess(general.VmessConfig, tcpIn, udpIn) listener.ReCreateVmess(general.VmessConfig, tunnel.Tunnel)
listener.ReCreateTuic(LC.TuicServer(general.TuicServer), tcpIn, udpIn) listener.ReCreateTuic(general.TuicServer, tunnel.Tunnel)
} }
func updateExperimental(c *config.Config) { func updateExperimental(c *config.Config) {
@ -339,7 +335,7 @@ func updateTun(general *config.General) {
if general == nil { if general == nil {
return return
} }
listener.ReCreateTun(LC.Tun(general.Tun), tunnel.TCPIn(), tunnel.UDPIn()) listener.ReCreateTun(general.Tun, tunnel.Tunnel)
listener.ReCreateRedirToTun(general.Tun.RedirectToTun) listener.ReCreateRedirToTun(general.Tun.RedirectToTun)
} }
@ -367,7 +363,7 @@ func updateSniffer(sniffer *config.Sniffer) {
} }
func updateTunnels(tunnels []LC.Tunnel) { func updateTunnels(tunnels []LC.Tunnel) {
listener.PatchTunnel(tunnels, tunnel.TCPIn(), tunnel.UDPIn()) listener.PatchTunnel(tunnels, tunnel.Tunnel)
} }
func updateGeneral(general *config.General) { func updateGeneral(general *config.General) {

View file

@ -249,19 +249,15 @@ func patchConfigs(w http.ResponseWriter, r *http.Request) {
ports := P.GetPorts() ports := P.GetPorts()
tcpIn := tunnel.TCPIn() P.ReCreateHTTP(pointerOrDefault(general.Port, ports.Port), tunnel.Tunnel)
udpIn := tunnel.UDPIn() P.ReCreateSocks(pointerOrDefault(general.SocksPort, ports.SocksPort), tunnel.Tunnel)
natTable := tunnel.NatTable() P.ReCreateRedir(pointerOrDefault(general.RedirPort, ports.RedirPort), tunnel.Tunnel)
P.ReCreateTProxy(pointerOrDefault(general.TProxyPort, ports.TProxyPort), tunnel.Tunnel)
P.ReCreateHTTP(pointerOrDefault(general.Port, ports.Port), tcpIn) P.ReCreateMixed(pointerOrDefault(general.MixedPort, ports.MixedPort), tunnel.Tunnel)
P.ReCreateSocks(pointerOrDefault(general.SocksPort, ports.SocksPort), tcpIn, udpIn) P.ReCreateTun(pointerOrDefaultTun(general.Tun, P.LastTunConf), tunnel.Tunnel)
P.ReCreateRedir(pointerOrDefault(general.RedirPort, ports.RedirPort), tcpIn, udpIn, natTable) P.ReCreateShadowSocks(pointerOrDefaultString(general.ShadowSocksConfig, ports.ShadowSocksConfig), tunnel.Tunnel)
P.ReCreateTProxy(pointerOrDefault(general.TProxyPort, ports.TProxyPort), tcpIn, udpIn, natTable) P.ReCreateVmess(pointerOrDefaultString(general.VmessConfig, ports.VmessConfig), tunnel.Tunnel)
P.ReCreateMixed(pointerOrDefault(general.MixedPort, ports.MixedPort), tcpIn, udpIn) P.ReCreateTuic(pointerOrDefaultTuicServer(general.TuicServer, P.LastTuicConf), tunnel.Tunnel)
P.ReCreateTun(pointerOrDefaultTun(general.Tun, P.LastTunConf), tcpIn, udpIn)
P.ReCreateShadowSocks(pointerOrDefaultString(general.ShadowSocksConfig, ports.ShadowSocksConfig), tcpIn, udpIn)
P.ReCreateVmess(pointerOrDefaultString(general.VmessConfig, ports.VmessConfig), tcpIn, udpIn)
P.ReCreateTuic(pointerOrDefaultTuicServer(general.TuicServer, P.LastTuicConf), tcpIn, udpIn)
if general.Mode != nil { if general.Mode != nil {
tunnel.SetMode(*general.Mode) tunnel.SetMode(*general.Mode)

View file

@ -43,7 +43,7 @@ func (l *Listener) SetLookupFunc(lookupFunc func(netip.AddrPort) (socks5.Addr, e
l.lookupFunc = lookupFunc l.lookupFunc = lookupFunc
} }
func (l *Listener) handleRedir(conn net.Conn, in chan<- C.ConnContext) { func (l *Listener) handleRedir(conn net.Conn, tunnel C.Tunnel) {
if l.lookupFunc == nil { if l.lookupFunc == nil {
log.Errorln("[Auto Redirect] lookup function is nil") log.Errorln("[Auto Redirect] lookup function is nil")
return return
@ -58,10 +58,10 @@ func (l *Listener) handleRedir(conn net.Conn, in chan<- C.ConnContext) {
N.TCPKeepAlive(conn) N.TCPKeepAlive(conn)
in <- inbound.NewSocket(target, conn, C.REDIR, l.additions...) tunnel.HandleTCPConn(inbound.NewSocket(target, conn, C.REDIR, l.additions...))
} }
func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*Listener, error) { func New(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) {
if len(additions) == 0 { if len(additions) == 0 {
additions = []inbound.Addition{ additions = []inbound.Addition{
inbound.WithInName("DEFAULT-REDIR"), inbound.WithInName("DEFAULT-REDIR"),
@ -87,7 +87,7 @@ func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*
} }
continue continue
} }
go rl.handleRedir(c, in) go rl.handleRedir(c, tunnel)
} }
}() }()

View file

@ -12,7 +12,7 @@ import (
"github.com/Dreamacro/clash/transport/socks5" "github.com/Dreamacro/clash/transport/socks5"
) )
func newClient(source net.Addr, in chan<- C.ConnContext, additions ...inbound.Addition) *http.Client { func newClient(source net.Addr, tunnel C.Tunnel, additions ...inbound.Addition) *http.Client {
return &http.Client{ return &http.Client{
Transport: &http.Transport{ Transport: &http.Transport{
// from http.DefaultTransport // from http.DefaultTransport
@ -32,7 +32,7 @@ func newClient(source net.Addr, in chan<- C.ConnContext, additions ...inbound.Ad
left, right := net.Pipe() left, right := net.Pipe()
in <- inbound.NewHTTP(dstAddr, source, right, additions...) go tunnel.HandleTCPConn(inbound.NewHTTP(dstAddr, source, right, additions...))
return left, nil return left, nil
}, },

View file

@ -14,8 +14,8 @@ import (
"github.com/Dreamacro/clash/log" "github.com/Dreamacro/clash/log"
) )
func HandleConn(c net.Conn, in chan<- C.ConnContext, cache *cache.LruCache[string, bool], additions ...inbound.Addition) { func HandleConn(c net.Conn, tunnel C.Tunnel, cache *cache.LruCache[string, bool], additions ...inbound.Addition) {
client := newClient(c.RemoteAddr(), in, additions...) client := newClient(c.RemoteAddr(), tunnel, additions...)
defer client.CloseIdleConnections() defer client.CloseIdleConnections()
conn := N.NewBufferedConn(c) conn := N.NewBufferedConn(c)
@ -48,7 +48,7 @@ func HandleConn(c net.Conn, in chan<- C.ConnContext, cache *cache.LruCache[strin
break // close connection break // close connection
} }
in <- inbound.NewHTTPS(request, conn, additions...) tunnel.HandleTCPConn(inbound.NewHTTPS(request, conn, additions...))
return // hijack connection return // hijack connection
} }
@ -61,7 +61,7 @@ func HandleConn(c net.Conn, in chan<- C.ConnContext, cache *cache.LruCache[strin
request.RequestURI = "" request.RequestURI = ""
if isUpgradeRequest(request) { if isUpgradeRequest(request) {
handleUpgrade(conn, request, in, additions...) handleUpgrade(conn, request, tunnel, additions...)
return // hijack connection return // hijack connection
} }

View file

@ -30,11 +30,11 @@ func (l *Listener) Close() error {
return l.listener.Close() return l.listener.Close()
} }
func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*Listener, error) { func New(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) {
return NewWithAuthenticate(addr, in, true, additions...) return NewWithAuthenticate(addr, tunnel, true, additions...)
} }
func NewWithAuthenticate(addr string, in chan<- C.ConnContext, authenticate bool, additions ...inbound.Addition) (*Listener, error) { func NewWithAuthenticate(addr string, tunnel C.Tunnel, authenticate bool, additions ...inbound.Addition) (*Listener, error) {
if len(additions) == 0 { if len(additions) == 0 {
additions = []inbound.Addition{ additions = []inbound.Addition{
inbound.WithInName("DEFAULT-HTTP"), inbound.WithInName("DEFAULT-HTTP"),
@ -65,7 +65,7 @@ func NewWithAuthenticate(addr string, in chan<- C.ConnContext, authenticate bool
} }
continue continue
} }
go HandleConn(conn, in, c, additions...) go HandleConn(conn, tunnel, c, additions...)
} }
}() }()

View file

@ -25,7 +25,7 @@ func isUpgradeRequest(req *http.Request) bool {
return false return false
} }
func handleUpgrade(conn net.Conn, request *http.Request, in chan<- C.ConnContext, additions ...inbound.Addition) { func handleUpgrade(conn net.Conn, request *http.Request, tunnel C.Tunnel, additions ...inbound.Addition) {
defer conn.Close() defer conn.Close()
removeProxyHeaders(request.Header) removeProxyHeaders(request.Header)
@ -43,7 +43,7 @@ func handleUpgrade(conn net.Conn, request *http.Request, in chan<- C.ConnContext
left, right := net.Pipe() left, right := net.Pipe()
in <- inbound.NewHTTP(dstAddr, conn.RemoteAddr(), right, additions...) go tunnel.HandleTCPConn(inbound.NewHTTP(dstAddr, conn.RemoteAddr(), right, additions...))
var bufferedLeft *N.BufferedConn var bufferedLeft *N.BufferedConn
if request.TLS != nil { if request.TLS != nil {

View file

@ -61,7 +61,7 @@ func (b *Base) RawAddress() string {
} }
// Listen implements constant.InboundListener // Listen implements constant.InboundListener
func (*Base) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { func (*Base) Listen(tunnel C.Tunnel) error {
return nil return nil
} }

View file

@ -42,9 +42,9 @@ func (h *HTTP) Address() string {
} }
// Listen implements constant.InboundListener // Listen implements constant.InboundListener
func (h *HTTP) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { func (h *HTTP) Listen(tunnel C.Tunnel) error {
var err error var err error
h.l, err = http.New(h.RawAddress(), tcpIn, h.Additions()...) h.l, err = http.New(h.RawAddress(), tunnel, h.Additions()...)
if err != nil { if err != nil {
return err return err
} }

View file

@ -77,9 +77,9 @@ func (t *Hysteria2) Address() string {
} }
// Listen implements constant.InboundListener // Listen implements constant.InboundListener
func (t *Hysteria2) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { func (t *Hysteria2) Listen(tunnel C.Tunnel) error {
var err error var err error
t.l, err = sing_hysteria2.New(t.ts, tcpIn, udpIn, t.Additions()...) t.l, err = sing_hysteria2.New(t.ts, tunnel, t.Additions()...)
if err != nil { if err != nil {
return err return err
} }

View file

@ -50,14 +50,14 @@ func (m *Mixed) Address() string {
} }
// Listen implements constant.InboundListener // Listen implements constant.InboundListener
func (m *Mixed) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { func (m *Mixed) Listen(tunnel C.Tunnel) error {
var err error var err error
m.l, err = mixed.New(m.RawAddress(), tcpIn, m.Additions()...) m.l, err = mixed.New(m.RawAddress(), tunnel, m.Additions()...)
if err != nil { if err != nil {
return err return err
} }
if m.udp { if m.udp {
m.lUDP, err = socks.NewUDP(m.RawAddress(), udpIn, m.Additions()...) m.lUDP, err = socks.NewUDP(m.RawAddress(), tunnel, m.Additions()...)
if err != nil { if err != nil {
return err return err
} }

View file

@ -42,9 +42,9 @@ func (r *Redir) Address() string {
} }
// Listen implements constant.InboundListener // Listen implements constant.InboundListener
func (r *Redir) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { func (r *Redir) Listen(tunnel C.Tunnel) error {
var err error var err error
r.l, err = redir.New(r.RawAddress(), tcpIn, r.Additions()...) r.l, err = redir.New(r.RawAddress(), tunnel, r.Additions()...)
if err != nil { if err != nil {
return err return err
} }

View file

@ -59,9 +59,9 @@ func (s *ShadowSocks) Address() string {
} }
// Listen implements constant.InboundListener // Listen implements constant.InboundListener
func (s *ShadowSocks) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { func (s *ShadowSocks) Listen(tunnel C.Tunnel) error {
var err error var err error
s.l, err = sing_shadowsocks.New(s.ss, tcpIn, udpIn, s.Additions()...) s.l, err = sing_shadowsocks.New(s.ss, tunnel, s.Additions()...)
if err != nil { if err != nil {
return err return err
} }

View file

@ -68,13 +68,13 @@ func (s *Socks) Address() string {
} }
// Listen implements constant.InboundListener // Listen implements constant.InboundListener
func (s *Socks) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { func (s *Socks) Listen(tunnel C.Tunnel) error {
var err error var err error
if s.stl, err = socks.New(s.RawAddress(), tcpIn, s.Additions()...); err != nil { if s.stl, err = socks.New(s.RawAddress(), tunnel, s.Additions()...); err != nil {
return err return err
} }
if s.udp { if s.udp {
if s.sul, err = socks.NewUDP(s.RawAddress(), udpIn, s.Additions()...); err != nil { if s.sul, err = socks.NewUDP(s.RawAddress(), tunnel, s.Additions()...); err != nil {
return err return err
} }
} }

View file

@ -49,14 +49,14 @@ func (t *TProxy) Address() string {
} }
// Listen implements constant.InboundListener // Listen implements constant.InboundListener
func (t *TProxy) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { func (t *TProxy) Listen(tunnel C.Tunnel) error {
var err error var err error
t.lTCP, err = tproxy.New(t.RawAddress(), tcpIn, t.Additions()...) t.lTCP, err = tproxy.New(t.RawAddress(), tunnel, t.Additions()...)
if err != nil { if err != nil {
return err return err
} }
if t.udp { if t.udp {
t.lUDP, err = tproxy.NewUDP(t.RawAddress(), udpIn, natTable, t.Additions()...) t.lUDP, err = tproxy.NewUDP(t.RawAddress(), tunnel, t.Additions()...)
if err != nil { if err != nil {
return err return err
} }

View file

@ -73,9 +73,9 @@ func (t *Tuic) Address() string {
} }
// Listen implements constant.InboundListener // Listen implements constant.InboundListener
func (t *Tuic) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { func (t *Tuic) Listen(tunnel C.Tunnel) error {
var err error var err error
t.l, err = tuic.New(t.ts, tcpIn, udpIn, t.Additions()...) t.l, err = tuic.New(t.ts, tunnel, t.Additions()...)
if err != nil { if err != nil {
return err return err
} }

View file

@ -113,9 +113,9 @@ func (t *Tun) Address() string {
} }
// Listen implements constant.InboundListener // Listen implements constant.InboundListener
func (t *Tun) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { func (t *Tun) Listen(tunnel C.Tunnel) error {
var err error var err error
t.l, err = sing_tun.New(t.tun, tcpIn, udpIn, t.Additions()...) t.l, err = sing_tun.New(t.tun, tunnel, t.Additions()...)
if err != nil { if err != nil {
return err return err
} }

View file

@ -4,7 +4,7 @@ import (
"fmt" "fmt"
C "github.com/Dreamacro/clash/constant" C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/listener/tunnel" LT "github.com/Dreamacro/clash/listener/tunnel"
"github.com/Dreamacro/clash/log" "github.com/Dreamacro/clash/log"
) )
@ -21,8 +21,8 @@ func (o TunnelOption) Equal(config C.InboundConfig) bool {
type Tunnel struct { type Tunnel struct {
*Base *Base
config *TunnelOption config *TunnelOption
ttl *tunnel.Listener ttl *LT.Listener
tul *tunnel.PacketConn tul *LT.PacketConn
} }
func NewTunnel(options *TunnelOption) (*Tunnel, error) { func NewTunnel(options *TunnelOption) (*Tunnel, error) {
@ -74,16 +74,16 @@ func (t *Tunnel) Address() string {
} }
// Listen implements constant.InboundListener // Listen implements constant.InboundListener
func (t *Tunnel) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { func (t *Tunnel) Listen(tunnel C.Tunnel) error {
var err error var err error
for _, network := range t.config.Network { for _, network := range t.config.Network {
switch network { switch network {
case "tcp": case "tcp":
if t.ttl, err = tunnel.New(t.RawAddress(), t.config.Target, t.config.SpecialProxy, tcpIn, t.Additions()...); err != nil { if t.ttl, err = LT.New(t.RawAddress(), t.config.Target, t.config.SpecialProxy, tunnel, t.Additions()...); err != nil {
return err return err
} }
case "udp": case "udp":
if t.tul, err = tunnel.NewUDP(t.RawAddress(), t.config.Target, t.config.SpecialProxy, udpIn, t.Additions()...); err != nil { if t.tul, err = LT.NewUDP(t.RawAddress(), t.config.Target, t.config.SpecialProxy, tunnel, t.Additions()...); err != nil {
return err return err
} }
default: default:

View file

@ -69,7 +69,7 @@ func (v *Vmess) Address() string {
} }
// Listen implements constant.InboundListener // Listen implements constant.InboundListener
func (v *Vmess) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) error { func (v *Vmess) Listen(tunnel C.Tunnel) error {
var err error var err error
users := make([]LC.VmessUser, len(v.config.Users)) users := make([]LC.VmessUser, len(v.config.Users))
for i, v := range v.config.Users { for i, v := range v.config.Users {
@ -79,7 +79,7 @@ func (v *Vmess) Listen(tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter,
AlterID: v.AlterID, AlterID: v.AlterID,
} }
} }
v.l, err = sing_vmess.New(v.vs, tcpIn, udpIn, v.Additions()...) v.l, err = sing_vmess.New(v.vs, tunnel, v.Additions()...)
if err != nil { if err != nil {
return err return err
} }

View file

@ -8,19 +8,19 @@ import (
C "github.com/Dreamacro/clash/constant" C "github.com/Dreamacro/clash/constant"
) )
var tcpIn chan<- C.ConnContext var tunnel C.Tunnel
func New(in chan<- C.ConnContext) { func New(t C.Tunnel) {
tcpIn = in tunnel = t
} }
func HandleTcp(address string) (conn net.Conn, err error) { func HandleTcp(address string) (conn net.Conn, err error) {
if tcpIn == nil { if tunnel == nil {
return nil, errors.New("tcp uninitialized") return nil, errors.New("tcp uninitialized")
} }
// executor Parsed // executor Parsed
conn1, conn2 := net.Pipe() conn1, conn2 := net.Pipe()
context := inbound.NewInner(conn2, address) context := inbound.NewInner(conn2, address)
tcpIn <- context go tunnel.HandleTCPConn(context)
return conn1, nil return conn1, nil
} }

View file

@ -23,7 +23,7 @@ import (
"github.com/Dreamacro/clash/listener/socks" "github.com/Dreamacro/clash/listener/socks"
"github.com/Dreamacro/clash/listener/tproxy" "github.com/Dreamacro/clash/listener/tproxy"
"github.com/Dreamacro/clash/listener/tuic" "github.com/Dreamacro/clash/listener/tuic"
"github.com/Dreamacro/clash/listener/tunnel" LT "github.com/Dreamacro/clash/listener/tunnel"
"github.com/Dreamacro/clash/log" "github.com/Dreamacro/clash/log"
"github.com/samber/lo" "github.com/samber/lo"
@ -42,8 +42,8 @@ var (
tproxyUDPListener *tproxy.UDPListener tproxyUDPListener *tproxy.UDPListener
mixedListener *mixed.Listener mixedListener *mixed.Listener
mixedUDPLister *socks.UDPListener mixedUDPLister *socks.UDPListener
tunnelTCPListeners = map[string]*tunnel.Listener{} tunnelTCPListeners = map[string]*LT.Listener{}
tunnelUDPListeners = map[string]*tunnel.PacketConn{} tunnelUDPListeners = map[string]*LT.PacketConn{}
inboundListeners = map[string]C.InboundListener{} inboundListeners = map[string]C.InboundListener{}
tunLister *sing_tun.Listener tunLister *sing_tun.Listener
shadowSocksListener C.MultiAddrListener shadowSocksListener C.MultiAddrListener
@ -112,7 +112,7 @@ func SetBindAddress(host string) {
bindAddress = host bindAddress = host
} }
func ReCreateHTTP(port int, tcpIn chan<- C.ConnContext) { func ReCreateHTTP(port int, tunnel C.Tunnel) {
httpMux.Lock() httpMux.Lock()
defer httpMux.Unlock() defer httpMux.Unlock()
@ -137,7 +137,7 @@ func ReCreateHTTP(port int, tcpIn chan<- C.ConnContext) {
return return
} }
httpListener, err = http.New(addr, tcpIn) httpListener, err = http.New(addr, tunnel)
if err != nil { if err != nil {
log.Errorln("Start HTTP server error: %s", err.Error()) log.Errorln("Start HTTP server error: %s", err.Error())
return return
@ -146,7 +146,7 @@ func ReCreateHTTP(port int, tcpIn chan<- C.ConnContext) {
log.Infoln("HTTP proxy listening at: %s", httpListener.Address()) log.Infoln("HTTP proxy listening at: %s", httpListener.Address())
} }
func ReCreateSocks(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) { func ReCreateSocks(port int, tunnel C.Tunnel) {
socksMux.Lock() socksMux.Lock()
defer socksMux.Unlock() defer socksMux.Unlock()
@ -188,12 +188,12 @@ func ReCreateSocks(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAd
return return
} }
tcpListener, err := socks.New(addr, tcpIn) tcpListener, err := socks.New(addr, tunnel)
if err != nil { if err != nil {
return return
} }
udpListener, err := socks.NewUDP(addr, udpIn) udpListener, err := socks.NewUDP(addr, tunnel)
if err != nil { if err != nil {
tcpListener.Close() tcpListener.Close()
return return
@ -205,7 +205,7 @@ func ReCreateSocks(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAd
log.Infoln("SOCKS proxy listening at: %s", socksListener.Address()) log.Infoln("SOCKS proxy listening at: %s", socksListener.Address())
} }
func ReCreateRedir(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) { func ReCreateRedir(port int, tunnel C.Tunnel) {
redirMux.Lock() redirMux.Lock()
defer redirMux.Unlock() defer redirMux.Unlock()
@ -238,12 +238,12 @@ func ReCreateRedir(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAd
return return
} }
redirListener, err = redir.New(addr, tcpIn) redirListener, err = redir.New(addr, tunnel)
if err != nil { if err != nil {
return return
} }
redirUDPListener, err = tproxy.NewUDP(addr, udpIn, natTable) redirUDPListener, err = tproxy.NewUDP(addr, tunnel)
if err != nil { if err != nil {
log.Warnln("Failed to start Redir UDP Listener: %s", err) log.Warnln("Failed to start Redir UDP Listener: %s", err)
} }
@ -251,7 +251,7 @@ func ReCreateRedir(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAd
log.Infoln("Redirect proxy listening at: %s", redirListener.Address()) log.Infoln("Redirect proxy listening at: %s", redirListener.Address())
} }
func ReCreateShadowSocks(shadowSocksConfig string, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) { func ReCreateShadowSocks(shadowSocksConfig string, tunnel C.Tunnel) {
ssMux.Lock() ssMux.Lock()
defer ssMux.Unlock() defer ssMux.Unlock()
@ -292,7 +292,7 @@ func ReCreateShadowSocks(shadowSocksConfig string, tcpIn chan<- C.ConnContext, u
return return
} }
listener, err := sing_shadowsocks.New(ssConfig, tcpIn, udpIn) listener, err := sing_shadowsocks.New(ssConfig, tunnel)
if err != nil { if err != nil {
return return
} }
@ -305,7 +305,7 @@ func ReCreateShadowSocks(shadowSocksConfig string, tcpIn chan<- C.ConnContext, u
return return
} }
func ReCreateVmess(vmessConfig string, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) { func ReCreateVmess(vmessConfig string, tunnel C.Tunnel) {
vmessMux.Lock() vmessMux.Lock()
defer vmessMux.Unlock() defer vmessMux.Unlock()
@ -344,7 +344,7 @@ func ReCreateVmess(vmessConfig string, tcpIn chan<- C.ConnContext, udpIn chan<-
return return
} }
listener, err := sing_vmess.New(vsConfig, tcpIn, udpIn) listener, err := sing_vmess.New(vsConfig, tunnel)
if err != nil { if err != nil {
return return
} }
@ -357,7 +357,7 @@ func ReCreateVmess(vmessConfig string, tcpIn chan<- C.ConnContext, udpIn chan<-
return return
} }
func ReCreateTuic(config LC.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) { func ReCreateTuic(config LC.TuicServer, tunnel C.Tunnel) {
tuicMux.Lock() tuicMux.Lock()
defer func() { defer func() {
LastTuicConf = config LastTuicConf = config
@ -389,7 +389,7 @@ func ReCreateTuic(config LC.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<-
return return
} }
listener, err := tuic.New(config, tcpIn, udpIn) listener, err := tuic.New(config, tunnel)
if err != nil { if err != nil {
return return
} }
@ -402,7 +402,7 @@ func ReCreateTuic(config LC.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<-
return return
} }
func ReCreateTProxy(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable) { func ReCreateTProxy(port int, tunnel C.Tunnel) {
tproxyMux.Lock() tproxyMux.Lock()
defer tproxyMux.Unlock() defer tproxyMux.Unlock()
@ -435,12 +435,12 @@ func ReCreateTProxy(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketA
return return
} }
tproxyListener, err = tproxy.New(addr, tcpIn) tproxyListener, err = tproxy.New(addr, tunnel)
if err != nil { if err != nil {
return return
} }
tproxyUDPListener, err = tproxy.NewUDP(addr, udpIn, natTable) tproxyUDPListener, err = tproxy.NewUDP(addr, tunnel)
if err != nil { if err != nil {
log.Warnln("Failed to start TProxy UDP Listener: %s", err) log.Warnln("Failed to start TProxy UDP Listener: %s", err)
} }
@ -448,7 +448,7 @@ func ReCreateTProxy(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketA
log.Infoln("TProxy server listening at: %s", tproxyListener.Address()) log.Infoln("TProxy server listening at: %s", tproxyListener.Address())
} }
func ReCreateMixed(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) { func ReCreateMixed(port int, tunnel C.Tunnel) {
mixedMux.Lock() mixedMux.Lock()
defer mixedMux.Unlock() defer mixedMux.Unlock()
@ -489,12 +489,12 @@ func ReCreateMixed(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAd
return return
} }
mixedListener, err = mixed.New(addr, tcpIn) mixedListener, err = mixed.New(addr, tunnel)
if err != nil { if err != nil {
return return
} }
mixedUDPLister, err = socks.NewUDP(addr, udpIn) mixedUDPLister, err = socks.NewUDP(addr, tunnel)
if err != nil { if err != nil {
mixedListener.Close() mixedListener.Close()
return return
@ -503,7 +503,7 @@ func ReCreateMixed(port int, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAd
log.Infoln("Mixed(http+socks) proxy listening at: %s", mixedListener.Address()) log.Infoln("Mixed(http+socks) proxy listening at: %s", mixedListener.Address())
} }
func ReCreateTun(tunConf LC.Tun, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) { func ReCreateTun(tunConf LC.Tun, tunnel C.Tunnel) {
tunMux.Lock() tunMux.Lock()
defer func() { defer func() {
LastTunConf = tunConf LastTunConf = tunConf
@ -531,7 +531,7 @@ func ReCreateTun(tunConf LC.Tun, tcpIn chan<- C.ConnContext, udpIn chan<- C.Pack
return return
} }
lister, err := sing_tun.New(tunConf, tcpIn, udpIn) lister, err := sing_tun.New(tunConf, tunnel)
if err != nil { if err != nil {
return return
} }
@ -573,7 +573,7 @@ func ReCreateRedirToTun(ifaceNames []string) {
log.Infoln("Attached tc ebpf program to interfaces %v", tcProgram.RawNICs()) log.Infoln("Attached tc ebpf program to interfaces %v", tcProgram.RawNICs())
} }
func ReCreateAutoRedir(ifaceNames []string, tcpIn chan<- C.ConnContext, _ chan<- C.PacketAdapter) { func ReCreateAutoRedir(ifaceNames []string, tunnel C.Tunnel) {
autoRedirMux.Lock() autoRedirMux.Lock()
defer autoRedirMux.Unlock() defer autoRedirMux.Unlock()
@ -614,7 +614,7 @@ func ReCreateAutoRedir(ifaceNames []string, tcpIn chan<- C.ConnContext, _ chan<-
addr := genAddr("*", C.TcpAutoRedirPort, true) addr := genAddr("*", C.TcpAutoRedirPort, true)
autoRedirListener, err = autoredir.New(addr, tcpIn) autoRedirListener, err = autoredir.New(addr, tunnel)
if err != nil { if err != nil {
return return
} }
@ -629,7 +629,7 @@ func ReCreateAutoRedir(ifaceNames []string, tcpIn chan<- C.ConnContext, _ chan<-
log.Infoln("Auto redirect proxy listening at: %s, attached tc ebpf program to interfaces %v", autoRedirListener.Address(), autoRedirProgram.RawNICs()) log.Infoln("Auto redirect proxy listening at: %s, attached tc ebpf program to interfaces %v", autoRedirListener.Address(), autoRedirProgram.RawNICs())
} }
func PatchTunnel(tunnels []LC.Tunnel, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) { func PatchTunnel(tunnels []LC.Tunnel, tunnel C.Tunnel) {
tunnelMux.Lock() tunnelMux.Lock()
defer tunnelMux.Unlock() defer tunnelMux.Unlock()
@ -699,7 +699,7 @@ func PatchTunnel(tunnels []LC.Tunnel, tcpIn chan<- C.ConnContext, udpIn chan<- C
for _, elm := range needCreate { for _, elm := range needCreate {
key := fmt.Sprintf("%s/%s/%s", elm.addr, elm.target, elm.proxy) key := fmt.Sprintf("%s/%s/%s", elm.addr, elm.target, elm.proxy)
if elm.network == "tcp" { if elm.network == "tcp" {
l, err := tunnel.New(elm.addr, elm.target, elm.proxy, tcpIn) l, err := LT.New(elm.addr, elm.target, elm.proxy, tunnel)
if err != nil { if err != nil {
log.Errorln("Start tunnel %s error: %s", elm.target, err.Error()) log.Errorln("Start tunnel %s error: %s", elm.target, err.Error())
continue continue
@ -707,7 +707,7 @@ func PatchTunnel(tunnels []LC.Tunnel, tcpIn chan<- C.ConnContext, udpIn chan<- C
tunnelTCPListeners[key] = l tunnelTCPListeners[key] = l
log.Infoln("Tunnel(tcp/%s) proxy %s listening at: %s", elm.target, elm.proxy, tunnelTCPListeners[key].Address()) log.Infoln("Tunnel(tcp/%s) proxy %s listening at: %s", elm.target, elm.proxy, tunnelTCPListeners[key].Address())
} else { } else {
l, err := tunnel.NewUDP(elm.addr, elm.target, elm.proxy, udpIn) l, err := LT.NewUDP(elm.addr, elm.target, elm.proxy, tunnel)
if err != nil { if err != nil {
log.Errorln("Start tunnel %s error: %s", elm.target, err.Error()) log.Errorln("Start tunnel %s error: %s", elm.target, err.Error())
continue continue
@ -718,7 +718,7 @@ func PatchTunnel(tunnels []LC.Tunnel, tcpIn chan<- C.ConnContext, udpIn chan<- C
} }
} }
func PatchInboundListeners(newListenerMap map[string]C.InboundListener, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, natTable C.NatTable, dropOld bool) { func PatchInboundListeners(newListenerMap map[string]C.InboundListener, tunnel C.Tunnel, dropOld bool) {
inboundMux.Lock() inboundMux.Lock()
defer inboundMux.Unlock() defer inboundMux.Unlock()
@ -730,7 +730,7 @@ func PatchInboundListeners(newListenerMap map[string]C.InboundListener, tcpIn ch
continue continue
} }
} }
if err := newListener.Listen(tcpIn, udpIn, natTable); err != nil { if err := newListener.Listen(tunnel); err != nil {
log.Errorln("Listener %s listen err: %s", name, err.Error()) log.Errorln("Listener %s listen err: %s", name, err.Error())
continue continue
} }

View file

@ -36,7 +36,7 @@ func (l *Listener) Close() error {
return l.listener.Close() return l.listener.Close()
} }
func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*Listener, error) { func New(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) {
if len(additions) == 0 { if len(additions) == 0 {
additions = []inbound.Addition{ additions = []inbound.Addition{
inbound.WithInName("DEFAULT-MIXED"), inbound.WithInName("DEFAULT-MIXED"),
@ -62,14 +62,14 @@ func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*
} }
continue continue
} }
go handleConn(c, in, ml.cache, additions...) go handleConn(c, tunnel, ml.cache, additions...)
} }
}() }()
return ml, nil return ml, nil
} }
func handleConn(conn net.Conn, in chan<- C.ConnContext, cache *cache.LruCache[string, bool], additions ...inbound.Addition) { func handleConn(conn net.Conn, tunnel C.Tunnel, cache *cache.LruCache[string, bool], additions ...inbound.Addition) {
N.TCPKeepAlive(conn) N.TCPKeepAlive(conn)
bufConn := N.NewBufferedConn(conn) bufConn := N.NewBufferedConn(conn)
@ -80,10 +80,10 @@ func handleConn(conn net.Conn, in chan<- C.ConnContext, cache *cache.LruCache[st
switch head[0] { switch head[0] {
case socks4.Version: case socks4.Version:
socks.HandleSocks4(bufConn, in, additions...) socks.HandleSocks4(bufConn, tunnel, additions...)
case socks5.Version: case socks5.Version:
socks.HandleSocks5(bufConn, in, additions...) socks.HandleSocks5(bufConn, tunnel, additions...)
default: default:
http.HandleConn(bufConn, in, cache, additions...) http.HandleConn(bufConn, tunnel, cache, additions...)
} }
} }

View file

@ -30,7 +30,7 @@ func (l *Listener) Close() error {
return l.listener.Close() return l.listener.Close()
} }
func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*Listener, error) { func New(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) {
if len(additions) == 0 { if len(additions) == 0 {
additions = []inbound.Addition{ additions = []inbound.Addition{
inbound.WithInName("DEFAULT-REDIR"), inbound.WithInName("DEFAULT-REDIR"),
@ -55,18 +55,19 @@ func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*
} }
continue continue
} }
go handleRedir(c, in, additions...) go handleRedir(c, tunnel, additions...)
} }
}() }()
return rl, nil return rl, nil
} }
func handleRedir(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) {
func handleRedir(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) {
target, err := parserPacket(conn) target, err := parserPacket(conn)
if err != nil { if err != nil {
conn.Close() conn.Close()
return return
} }
N.TCPKeepAlive(conn) N.TCPKeepAlive(conn)
in <- inbound.NewSocket(target, conn, C.REDIR, additions...) tunnel.HandleTCPConn(inbound.NewSocket(target, conn, C.REDIR, additions...))
} }

View file

@ -22,7 +22,7 @@ type Listener struct {
var _listener *Listener var _listener *Listener
func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter) (*Listener, error) { func New(config LC.ShadowsocksServer, tunnel C.Tunnel) (*Listener, error) {
pickCipher, err := core.PickCipher(config.Cipher, nil, config.Password) pickCipher, err := core.PickCipher(config.Cipher, nil, config.Password)
if err != nil { if err != nil {
return nil, err return nil, err
@ -36,7 +36,7 @@ func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C
if config.Udp { if config.Udp {
//UDP //UDP
ul, err := NewUDP(addr, pickCipher, udpIn) ul, err := NewUDP(addr, pickCipher, tunnel)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -60,7 +60,7 @@ func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C
continue continue
} }
N.TCPKeepAlive(c) N.TCPKeepAlive(c)
go sl.HandleConn(c, tcpIn) go sl.HandleConn(c, tunnel)
} }
}() }()
} }
@ -99,7 +99,7 @@ func (l *Listener) AddrList() (addrList []net.Addr) {
return return
} }
func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { func (l *Listener) HandleConn(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) {
conn = l.pickCipher.StreamConn(conn) conn = l.pickCipher.StreamConn(conn)
conn = N.NewDeadlineConn(conn) // embed ss can't handle readDeadline correctly conn = N.NewDeadlineConn(conn) // embed ss can't handle readDeadline correctly
@ -108,12 +108,12 @@ func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions
_ = conn.Close() _ = conn.Close()
return return
} }
in <- inbound.NewSocket(target, conn, C.SHADOWSOCKS, additions...) tunnel.HandleTCPConn(inbound.NewSocket(target, conn, C.SHADOWSOCKS, additions...))
} }
func HandleShadowSocks(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) bool { func HandleShadowSocks(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) bool {
if _listener != nil && _listener.pickCipher != nil { if _listener != nil && _listener.pickCipher != nil {
go _listener.HandleConn(conn, in, additions...) go _listener.HandleConn(conn, tunnel, additions...)
return true return true
} }
return false return false

View file

@ -17,7 +17,7 @@ type UDPListener struct {
closed bool closed bool
} }
func NewUDP(addr string, pickCipher core.Cipher, in chan<- C.PacketAdapter) (*UDPListener, error) { func NewUDP(addr string, pickCipher core.Cipher, tunnel C.Tunnel) (*UDPListener, error) {
l, err := net.ListenPacket("udp", addr) l, err := net.ListenPacket("udp", addr)
if err != nil { if err != nil {
return nil, err return nil, err
@ -42,7 +42,7 @@ func NewUDP(addr string, pickCipher core.Cipher, in chan<- C.PacketAdapter) (*UD
} }
continue continue
} }
handleSocksUDP(conn, in, data, put, remoteAddr) handleSocksUDP(conn, tunnel, data, put, remoteAddr)
} }
}() }()
@ -58,7 +58,7 @@ func (l *UDPListener) LocalAddr() net.Addr {
return l.packetConn.LocalAddr() return l.packetConn.LocalAddr()
} }
func handleSocksUDP(pc net.PacketConn, in chan<- C.PacketAdapter, buf []byte, put func(), addr net.Addr, additions ...inbound.Addition) { func handleSocksUDP(pc net.PacketConn, tunnel C.Tunnel, buf []byte, put func(), addr net.Addr, additions ...inbound.Addition) {
tgtAddr := socks5.SplitAddr(buf) tgtAddr := socks5.SplitAddr(buf)
if tgtAddr == nil { if tgtAddr == nil {
// Unresolved UDP packet, return buffer to the pool // Unresolved UDP packet, return buffer to the pool
@ -76,8 +76,5 @@ func handleSocksUDP(pc net.PacketConn, in chan<- C.PacketAdapter, buf []byte, pu
payload: payload, payload: payload,
put: put, put: put,
} }
select { tunnel.HandleUDPPacket(inbound.NewPacket(target, packet, C.SHADOWSOCKS, additions...))
case in <- inbound.NewPacket(target, packet, C.SHADOWSOCKS, additions...):
default:
}
} }

View file

@ -28,43 +28,12 @@ import (
const UDPTimeout = 5 * time.Minute const UDPTimeout = 5 * time.Minute
type ListenerHandler struct { type ListenerHandler struct {
TcpIn chan<- C.ConnContext Tunnel C.Tunnel
UdpIn chan<- C.PacketAdapter
Type C.Type Type C.Type
Additions []inbound.Addition Additions []inbound.Addition
UDPTimeout time.Duration UDPTimeout time.Duration
} }
type waitCloseConn struct {
N.ExtendedConn
wg *sync.WaitGroup
close sync.Once
rAddr net.Addr
}
func (c *waitCloseConn) Close() error { // call from handleTCPConn(connCtx C.ConnContext)
c.close.Do(func() {
c.wg.Done()
})
return c.ExtendedConn.Close()
}
func (c *waitCloseConn) RemoteAddr() net.Addr {
return c.rAddr
}
func (c *waitCloseConn) Upstream() any {
return c.ExtendedConn
}
func (c *waitCloseConn) ReaderReplaceable() bool {
return true
}
func (c *waitCloseConn) WriterReplaceable() bool {
return true
}
func UpstreamMetadata(metadata M.Metadata) M.Metadata { func UpstreamMetadata(metadata M.Metadata) M.Metadata {
return M.Metadata{ return M.Metadata{
Source: metadata.Source, Source: metadata.Source,
@ -117,14 +86,14 @@ func (h *ListenerHandler) NewConnection(ctx context.Context, conn net.Conn, meta
return h.ParseSpecialFqdn(ctx, conn, metadata) return h.ParseSpecialFqdn(ctx, conn, metadata)
} }
target := socks5.ParseAddr(metadata.Destination.String()) target := socks5.ParseAddr(metadata.Destination.String())
wg := &sync.WaitGroup{}
defer wg.Wait() // this goroutine must exit after conn.Close()
wg.Add(1)
if deadline.NeedAdditionalReadDeadline(conn) { if deadline.NeedAdditionalReadDeadline(conn) {
conn = N.NewDeadlineConn(conn) // conn from sing should check NeedAdditionalReadDeadline conn = N.NewDeadlineConn(conn) // conn from sing should check NeedAdditionalReadDeadline
} }
h.TcpIn <- inbound.NewSocket(target, &waitCloseConn{ExtendedConn: N.NewExtendedConn(conn), wg: wg, rAddr: metadata.Source.TCPAddr()}, h.Type, combineAdditions(ctx, h.Additions)...) connCtx := inbound.NewSocket(target, conn, h.Type, combineAdditions(ctx, h.Additions)...)
inbound.WithSrcAddr(metadata.Source.TCPAddr()).Apply(connCtx.Metadata()) // set srcAddr from sing's metadata
h.Tunnel.HandleTCPConn(connCtx) // this goroutine must exit after conn unused
return nil return nil
} }
@ -177,10 +146,8 @@ func (h *ListenerHandler) NewPacketConnection(ctx context.Context, conn network.
lAddr: conn.LocalAddr(), lAddr: conn.LocalAddr(),
buff: buff, buff: buff,
} }
select {
case h.UdpIn <- inbound.NewPacket(target, packet, h.Type, combineAdditions(ctx, h.Additions)...): h.Tunnel.HandleUDPPacket(inbound.NewPacket(target, packet, h.Type, combineAdditions(ctx, h.Additions)...))
default:
}
} }
return nil return nil
} }

View file

@ -32,7 +32,7 @@ type Listener struct {
services []*hysteria2.Service[string] services []*hysteria2.Service[string]
} }
func New(config LC.Hysteria2Server, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, additions ...inbound.Addition) (*Listener, error) { func New(config LC.Hysteria2Server, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) {
var sl *Listener var sl *Listener
var err error var err error
if len(additions) == 0 { if len(additions) == 0 {
@ -43,8 +43,7 @@ func New(config LC.Hysteria2Server, tcpIn chan<- C.ConnContext, udpIn chan<- C.P
} }
h := &sing.ListenerHandler{ h := &sing.ListenerHandler{
TcpIn: tcpIn, Tunnel: tunnel,
UdpIn: udpIn,
Type: C.HYSTERIA2, Type: C.HYSTERIA2,
Additions: additions, Additions: additions,
} }

View file

@ -35,7 +35,7 @@ type Listener struct {
var _listener *Listener var _listener *Listener
func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, additions ...inbound.Addition) (C.MultiAddrListener, error) { func New(config LC.ShadowsocksServer, tunnel C.Tunnel, additions ...inbound.Addition) (C.MultiAddrListener, error) {
var sl *Listener var sl *Listener
var err error var err error
if len(additions) == 0 { if len(additions) == 0 {
@ -51,8 +51,7 @@ func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C
udpTimeout := int64(sing.UDPTimeout.Seconds()) udpTimeout := int64(sing.UDPTimeout.Seconds())
h := &sing.ListenerHandler{ h := &sing.ListenerHandler{
TcpIn: tcpIn, Tunnel: tunnel,
UdpIn: udpIn,
Type: C.SHADOWSOCKS, Type: C.SHADOWSOCKS,
Additions: additions, Additions: additions,
} }
@ -68,7 +67,7 @@ func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C
sl.service, err = shadowaead_2022.NewServiceWithPassword(config.Cipher, config.Password, udpTimeout, h, ntp.Now) sl.service, err = shadowaead_2022.NewServiceWithPassword(config.Cipher, config.Password, udpTimeout, h, ntp.Now)
default: default:
err = fmt.Errorf("shadowsocks: unsupported method: %s", config.Cipher) err = fmt.Errorf("shadowsocks: unsupported method: %s", config.Cipher)
return embedSS.New(config, tcpIn, udpIn) return embedSS.New(config, tunnel)
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -148,7 +147,7 @@ func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C
} }
N.TCPKeepAlive(c) N.TCPKeepAlive(c)
go sl.HandleConn(c, tcpIn) go sl.HandleConn(c, tunnel)
} }
}() }()
} }
@ -188,7 +187,7 @@ func (l *Listener) AddrList() (addrList []net.Addr) {
return return
} }
func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { func (l *Listener) HandleConn(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) {
ctx := sing.WithAdditions(context.TODO(), additions...) ctx := sing.WithAdditions(context.TODO(), additions...)
err := l.service.NewConnection(ctx, conn, M.Metadata{ err := l.service.NewConnection(ctx, conn, M.Metadata{
Protocol: "shadowsocks", Protocol: "shadowsocks",
@ -200,10 +199,10 @@ func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions
} }
} }
func HandleShadowSocks(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) bool { func HandleShadowSocks(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) bool {
if _listener != nil && _listener.service != nil { if _listener != nil && _listener.service != nil {
go _listener.HandleConn(conn, in, additions...) go _listener.HandleConn(conn, tunnel, additions...)
return true return true
} }
return embedSS.HandleShadowSocks(conn, in, additions...) return embedSS.HandleShadowSocks(conn, tunnel, additions...)
} }

View file

@ -88,7 +88,7 @@ func checkTunName(tunName string) (ok bool) {
return true return true
} }
func New(options LC.Tun, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, additions ...inbound.Addition) (l *Listener, err error) { func New(options LC.Tun, tunnel C.Tunnel, additions ...inbound.Addition) (l *Listener, err error) {
if len(additions) == 0 { if len(additions) == 0 {
additions = []inbound.Addition{ additions = []inbound.Addition{
inbound.WithInName("DEFAULT-TUN"), inbound.WithInName("DEFAULT-TUN"),
@ -152,8 +152,7 @@ func New(options LC.Tun, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapte
handler := &ListenerHandler{ handler := &ListenerHandler{
ListenerHandler: sing.ListenerHandler{ ListenerHandler: sing.ListenerHandler{
TcpIn: tcpIn, Tunnel: tunnel,
UdpIn: udpIn,
Type: C.TUN, Type: C.TUN,
Additions: additions, Additions: additions,
UDPTimeout: time.Second * time.Duration(udpTimeout), UDPTimeout: time.Second * time.Duration(udpTimeout),

View file

@ -27,7 +27,7 @@ type Listener struct {
var _listener *Listener var _listener *Listener
func New(config LC.VmessServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, additions ...inbound.Addition) (sl *Listener, err error) { func New(config LC.VmessServer, tunnel C.Tunnel, additions ...inbound.Addition) (sl *Listener, err error) {
if len(additions) == 0 { if len(additions) == 0 {
additions = []inbound.Addition{ additions = []inbound.Addition{
inbound.WithInName("DEFAULT-VMESS"), inbound.WithInName("DEFAULT-VMESS"),
@ -38,8 +38,7 @@ func New(config LC.VmessServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.Packe
}() }()
} }
h := &sing.ListenerHandler{ h := &sing.ListenerHandler{
TcpIn: tcpIn, Tunnel: tunnel,
UdpIn: udpIn,
Type: C.VMESS, Type: C.VMESS,
Additions: additions, Additions: additions,
} }
@ -87,7 +86,7 @@ func New(config LC.VmessServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.Packe
} }
N.TCPKeepAlive(c) N.TCPKeepAlive(c)
go sl.HandleConn(c, tcpIn) go sl.HandleConn(c, tunnel)
} }
}() }()
} }
@ -122,7 +121,7 @@ func (l *Listener) AddrList() (addrList []net.Addr) {
return return
} }
func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { func (l *Listener) HandleConn(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) {
ctx := sing.WithAdditions(context.TODO(), additions...) ctx := sing.WithAdditions(context.TODO(), additions...)
err := l.service.NewConnection(ctx, conn, metadata.Metadata{ err := l.service.NewConnection(ctx, conn, metadata.Metadata{
Protocol: "vmess", Protocol: "vmess",
@ -134,9 +133,9 @@ func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions
} }
} }
func HandleVmess(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) bool { func HandleVmess(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) bool {
if _listener != nil && _listener.service != nil { if _listener != nil && _listener.service != nil {
go _listener.HandleConn(conn, in, additions...) go _listener.HandleConn(conn, tunnel, additions...)
return true return true
} }
return false return false

View file

@ -34,7 +34,7 @@ func (l *Listener) Close() error {
return l.listener.Close() return l.listener.Close()
} }
func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*Listener, error) { func New(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) {
if len(additions) == 0 { if len(additions) == 0 {
additions = []inbound.Addition{ additions = []inbound.Addition{
inbound.WithInName("DEFAULT-SOCKS"), inbound.WithInName("DEFAULT-SOCKS"),
@ -59,14 +59,14 @@ func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*
} }
continue continue
} }
go handleSocks(c, in, additions...) go handleSocks(c, tunnel, additions...)
} }
}() }()
return sl, nil return sl, nil
} }
func handleSocks(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { func handleSocks(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) {
N.TCPKeepAlive(conn) N.TCPKeepAlive(conn)
bufConn := N.NewBufferedConn(conn) bufConn := N.NewBufferedConn(conn)
head, err := bufConn.Peek(1) head, err := bufConn.Peek(1)
@ -77,24 +77,24 @@ func handleSocks(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Ad
switch head[0] { switch head[0] {
case socks4.Version: case socks4.Version:
HandleSocks4(bufConn, in, additions...) HandleSocks4(bufConn, tunnel, additions...)
case socks5.Version: case socks5.Version:
HandleSocks5(bufConn, in, additions...) HandleSocks5(bufConn, tunnel, additions...)
default: default:
conn.Close() conn.Close()
} }
} }
func HandleSocks4(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { func HandleSocks4(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) {
addr, _, err := socks4.ServerHandshake(conn, authStore.Authenticator()) addr, _, err := socks4.ServerHandshake(conn, authStore.Authenticator())
if err != nil { if err != nil {
conn.Close() conn.Close()
return return
} }
in <- inbound.NewSocket(socks5.ParseAddr(addr), conn, C.SOCKS4, additions...) tunnel.HandleTCPConn(inbound.NewSocket(socks5.ParseAddr(addr), conn, C.SOCKS4, additions...))
} }
func HandleSocks5(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { func HandleSocks5(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) {
target, command, err := socks5.ServerHandshake(conn, authStore.Authenticator()) target, command, err := socks5.ServerHandshake(conn, authStore.Authenticator())
if err != nil { if err != nil {
conn.Close() conn.Close()
@ -105,5 +105,5 @@ func HandleSocks5(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.A
io.Copy(io.Discard, conn) io.Copy(io.Discard, conn)
return return
} }
in <- inbound.NewSocket(target, conn, C.SOCKS5, additions...) tunnel.HandleTCPConn(inbound.NewSocket(target, conn, C.SOCKS5, additions...))
} }

View file

@ -33,7 +33,7 @@ func (l *UDPListener) Close() error {
return l.packetConn.Close() return l.packetConn.Close()
} }
func NewUDP(addr string, in chan<- C.PacketAdapter, additions ...inbound.Addition) (*UDPListener, error) { func NewUDP(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*UDPListener, error) {
if len(additions) == 0 { if len(additions) == 0 {
additions = []inbound.Addition{ additions = []inbound.Addition{
inbound.WithInName("DEFAULT-SOCKS"), inbound.WithInName("DEFAULT-SOCKS"),
@ -66,14 +66,14 @@ func NewUDP(addr string, in chan<- C.PacketAdapter, additions ...inbound.Additio
} }
continue continue
} }
handleSocksUDP(l, in, data, put, remoteAddr, additions...) handleSocksUDP(l, tunnel, data, put, remoteAddr, additions...)
} }
}() }()
return sl, nil return sl, nil
} }
func handleSocksUDP(pc net.PacketConn, in chan<- C.PacketAdapter, buf []byte, put func(), addr net.Addr, additions ...inbound.Addition) { func handleSocksUDP(pc net.PacketConn, tunnel C.Tunnel, buf []byte, put func(), addr net.Addr, additions ...inbound.Addition) {
target, payload, err := socks5.DecodeUDPPacket(buf) target, payload, err := socks5.DecodeUDPPacket(buf)
if err != nil { if err != nil {
// Unresolved UDP packet, return buffer to the pool // Unresolved UDP packet, return buffer to the pool
@ -88,8 +88,5 @@ func handleSocksUDP(pc net.PacketConn, in chan<- C.PacketAdapter, buf []byte, pu
payload: payload, payload: payload,
put: put, put: put,
} }
select { tunnel.HandleUDPPacket(inbound.NewPacket(target, packet, C.SOCKS5, additions...))
case in <- inbound.NewPacket(target, packet, C.SOCKS5, additions...):
default:
}
} }

View file

@ -13,11 +13,10 @@ import (
) )
type packet struct { type packet struct {
pc net.PacketConn pc net.PacketConn
lAddr netip.AddrPort lAddr netip.AddrPort
buf []byte buf []byte
in chan<- C.PacketAdapter tunnel C.Tunnel
natTable C.NatTable
} }
func (c *packet) Data() []byte { func (c *packet) Data() []byte {
@ -26,7 +25,7 @@ func (c *packet) Data() []byte {
// WriteBack opens a new socket binding `addr` to write UDP packet back // WriteBack opens a new socket binding `addr` to write UDP packet back
func (c *packet) WriteBack(b []byte, addr net.Addr) (n int, err error) { func (c *packet) WriteBack(b []byte, addr net.Addr) (n int, err error) {
tc, err := createOrGetLocalConn(addr, c.LocalAddr(), c.in, c.natTable) tc, err := createOrGetLocalConn(addr, c.LocalAddr(), c.tunnel)
if err != nil { if err != nil {
n = 0 n = 0
return return
@ -52,9 +51,10 @@ func (c *packet) InAddr() net.Addr {
// this function listen at rAddr and write to lAddr // this function listen at rAddr and write to lAddr
// for here, rAddr is the ip/port client want to access // for here, rAddr is the ip/port client want to access
// lAddr is the ip/port client opened // lAddr is the ip/port client opened
func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natTable C.NatTable) (*net.UDPConn, error) { func createOrGetLocalConn(rAddr, lAddr net.Addr, tunnel C.Tunnel) (*net.UDPConn, error) {
remote := rAddr.String() remote := rAddr.String()
local := lAddr.String() local := lAddr.String()
natTable := tunnel.NatTable()
localConn := natTable.GetForLocalConn(local, remote) localConn := natTable.GetForLocalConn(local, remote)
// localConn not exist // localConn not exist
if localConn == nil { if localConn == nil {
@ -76,7 +76,7 @@ func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natT
natTable.DeleteLockForLocalConn(local, remote) natTable.DeleteLockForLocalConn(local, remote)
cond.Broadcast() cond.Broadcast()
}() }()
conn, err := listenLocalConn(rAddr, lAddr, in, natTable) conn, err := listenLocalConn(rAddr, lAddr, tunnel)
if err != nil { if err != nil {
log.Errorln("listenLocalConn failed with error: %s, packet loss (rAddr[%T]=%s lAddr[%T]=%s)", err.Error(), rAddr, remote, lAddr, local) log.Errorln("listenLocalConn failed with error: %s, packet loss (rAddr[%T]=%s lAddr[%T]=%s)", err.Error(), rAddr, remote, lAddr, local)
return nil, err return nil, err
@ -90,7 +90,7 @@ func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natT
// this function listen at rAddr // this function listen at rAddr
// and send what received to program itself, then send to real remote // and send what received to program itself, then send to real remote
func listenLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natTable C.NatTable) (*net.UDPConn, error) { func listenLocalConn(rAddr, lAddr net.Addr, tunnel C.Tunnel) (*net.UDPConn, error) {
additions := []inbound.Addition{ additions := []inbound.Addition{
inbound.WithInName("DEFAULT-TPROXY"), inbound.WithInName("DEFAULT-TPROXY"),
inbound.WithSpecialRules(""), inbound.WithSpecialRules(""),
@ -113,7 +113,7 @@ func listenLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natTable
} }
// since following localPackets are pass through this socket which listen rAddr // since following localPackets are pass through this socket which listen rAddr
// I choose current listener as packet's packet conn // I choose current listener as packet's packet conn
handlePacketConn(lc, in, natTable, buf[:br], lAddr.(*net.UDPAddr).AddrPort(), rAddr.(*net.UDPAddr).AddrPort(), additions...) handlePacketConn(lc, tunnel, buf[:br], lAddr.(*net.UDPAddr).AddrPort(), rAddr.(*net.UDPAddr).AddrPort(), additions...)
} }
}() }()
return lc, nil return lc, nil

View file

@ -31,13 +31,13 @@ func (l *Listener) Close() error {
return l.listener.Close() return l.listener.Close()
} }
func (l *Listener) handleTProxy(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { func (l *Listener) handleTProxy(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) {
target := socks5.ParseAddrToSocksAddr(conn.LocalAddr()) target := socks5.ParseAddrToSocksAddr(conn.LocalAddr())
N.TCPKeepAlive(conn) N.TCPKeepAlive(conn)
in <- inbound.NewSocket(target, conn, C.TPROXY, additions...) tunnel.HandleTCPConn(inbound.NewSocket(target, conn, C.TPROXY, additions...))
} }
func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*Listener, error) { func New(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) {
if len(additions) == 0 { if len(additions) == 0 {
additions = []inbound.Addition{ additions = []inbound.Addition{
inbound.WithInName("DEFAULT-TPROXY"), inbound.WithInName("DEFAULT-TPROXY"),
@ -74,7 +74,7 @@ func New(addr string, in chan<- C.ConnContext, additions ...inbound.Addition) (*
} }
continue continue
} }
go rl.handleTProxy(c, in, additions...) go rl.handleTProxy(c, tunnel, additions...)
} }
}() }()

View file

@ -32,7 +32,7 @@ func (l *UDPListener) Close() error {
return l.packetConn.Close() return l.packetConn.Close()
} }
func NewUDP(addr string, in chan<- C.PacketAdapter, natTable C.NatTable, additions ...inbound.Addition) (*UDPListener, error) { func NewUDP(addr string, tunnel C.Tunnel, additions ...inbound.Addition) (*UDPListener, error) {
if len(additions) == 0 { if len(additions) == 0 {
additions = []inbound.Addition{ additions = []inbound.Addition{
inbound.WithInName("DEFAULT-TPROXY"), inbound.WithInName("DEFAULT-TPROXY"),
@ -83,24 +83,20 @@ func NewUDP(addr string, in chan<- C.PacketAdapter, natTable C.NatTable, additio
// try to unmap 4in6 address // try to unmap 4in6 address
lAddr = netip.AddrPortFrom(lAddr.Addr().Unmap(), lAddr.Port()) lAddr = netip.AddrPortFrom(lAddr.Addr().Unmap(), lAddr.Port())
} }
handlePacketConn(l, in, natTable, buf[:n], lAddr, rAddr, additions...) handlePacketConn(l, tunnel, buf[:n], lAddr, rAddr, additions...)
} }
}() }()
return rl, nil return rl, nil
} }
func handlePacketConn(pc net.PacketConn, in chan<- C.PacketAdapter, natTable C.NatTable, buf []byte, lAddr, rAddr netip.AddrPort, additions ...inbound.Addition) { func handlePacketConn(pc net.PacketConn, tunnel C.Tunnel, buf []byte, lAddr, rAddr netip.AddrPort, additions ...inbound.Addition) {
target := socks5.AddrFromStdAddrPort(rAddr) target := socks5.AddrFromStdAddrPort(rAddr)
pkt := &packet{ pkt := &packet{
pc: pc, pc: pc,
lAddr: lAddr, lAddr: lAddr,
buf: buf, buf: buf,
in: in, tunnel: tunnel,
natTable: natTable,
}
select {
case in <- inbound.NewPacket(target, pkt, C.TPROXY, additions...):
default:
} }
tunnel.HandleUDPPacket(inbound.NewPacket(target, pkt, C.TPROXY, additions...))
} }

View file

@ -31,7 +31,7 @@ type Listener struct {
servers []*tuic.Server servers []*tuic.Server
} }
func New(config LC.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.PacketAdapter, additions ...inbound.Addition) (*Listener, error) { func New(config LC.TuicServer, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) {
if len(additions) == 0 { if len(additions) == 0 {
additions = []inbound.Addition{ additions = []inbound.Addition{
inbound.WithInName("DEFAULT-TUIC"), inbound.WithInName("DEFAULT-TUIC"),
@ -39,8 +39,7 @@ func New(config LC.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.Packet
} }
} }
h := &sing.ListenerHandler{ h := &sing.ListenerHandler{
TcpIn: tcpIn, Tunnel: tunnel,
UdpIn: udpIn,
Type: C.TUIC, Type: C.TUIC,
Additions: additions, Additions: additions,
} }
@ -106,7 +105,7 @@ func New(config LC.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.Packet
}() }()
return nil return nil
} }
tcpIn <- connCtx go tunnel.HandleTCPConn(connCtx)
return nil return nil
} }
handleUdpFn := func(addr socks5.Addr, packet C.UDPPacket, _additions ...inbound.Addition) error { handleUdpFn := func(addr socks5.Addr, packet C.UDPPacket, _additions ...inbound.Addition) error {
@ -115,10 +114,7 @@ func New(config LC.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<- C.Packet
newAdditions = slices.Clone(additions) newAdditions = slices.Clone(additions)
newAdditions = append(newAdditions, _additions...) newAdditions = append(newAdditions, _additions...)
} }
select { tunnel.HandleUDPPacket(inbound.NewPacket(addr, packet, C.TUIC, newAdditions...))
case udpIn <- inbound.NewPacket(addr, packet, C.TUIC, newAdditions...):
default:
}
return nil return nil
} }

View file

@ -34,14 +34,14 @@ func (l *Listener) Close() error {
return l.listener.Close() return l.listener.Close()
} }
func (l *Listener) handleTCP(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { func (l *Listener) handleTCP(conn net.Conn, tunnel C.Tunnel, additions ...inbound.Addition) {
N.TCPKeepAlive(conn) N.TCPKeepAlive(conn)
ctx := inbound.NewSocket(l.target, conn, C.TUNNEL, additions...) ctx := inbound.NewSocket(l.target, conn, C.TUNNEL, additions...)
ctx.Metadata().SpecialProxy = l.proxy ctx.Metadata().SpecialProxy = l.proxy
in <- ctx tunnel.HandleTCPConn(ctx)
} }
func New(addr, target, proxy string, in chan<- C.ConnContext, additions ...inbound.Addition) (*Listener, error) { func New(addr, target, proxy string, tunnel C.Tunnel, additions ...inbound.Addition) (*Listener, error) {
l, err := inbound.Listen("tcp", addr) l, err := inbound.Listen("tcp", addr)
if err != nil { if err != nil {
return nil, err return nil, err
@ -68,7 +68,7 @@ func New(addr, target, proxy string, in chan<- C.ConnContext, additions ...inbou
} }
continue continue
} }
go rl.handleTCP(c, in, additions...) go rl.handleTCP(c, tunnel, additions...)
} }
}() }()

View file

@ -34,7 +34,7 @@ func (l *PacketConn) Close() error {
return l.conn.Close() return l.conn.Close()
} }
func NewUDP(addr, target, proxy string, in chan<- C.PacketAdapter, additions ...inbound.Addition) (*PacketConn, error) { func NewUDP(addr, target, proxy string, tunnel C.Tunnel, additions ...inbound.Addition) (*PacketConn, error) {
l, err := net.ListenPacket("udp", addr) l, err := net.ListenPacket("udp", addr)
if err != nil { if err != nil {
return nil, err return nil, err
@ -62,14 +62,14 @@ func NewUDP(addr, target, proxy string, in chan<- C.PacketAdapter, additions ...
} }
continue continue
} }
sl.handleUDP(l, in, buf[:n], remoteAddr, additions...) sl.handleUDP(l, tunnel, buf[:n], remoteAddr, additions...)
} }
}() }()
return sl, nil return sl, nil
} }
func (l *PacketConn) handleUDP(pc net.PacketConn, in chan<- C.PacketAdapter, buf []byte, addr net.Addr, additions ...inbound.Addition) { func (l *PacketConn) handleUDP(pc net.PacketConn, tunnel C.Tunnel, buf []byte, addr net.Addr, additions ...inbound.Addition) {
packet := &packet{ packet := &packet{
pc: pc, pc: pc,
rAddr: addr, rAddr: addr,
@ -78,8 +78,5 @@ func (l *PacketConn) handleUDP(pc net.PacketConn, in chan<- C.PacketAdapter, buf
ctx := inbound.NewPacket(l.target, packet, C.TUNNEL, additions...) ctx := inbound.NewPacket(l.target, packet, C.TUNNEL, additions...)
ctx.Metadata().SpecialProxy = l.proxy ctx.Metadata().SpecialProxy = l.proxy
select { tunnel.HandleUDPPacket(ctx)
case in <- ctx:
default:
}
} }

View file

@ -49,6 +49,25 @@ var (
fakeIPRange netip.Prefix fakeIPRange netip.Prefix
) )
type tunnel struct{}
var Tunnel C.Tunnel = tunnel{}
func (t tunnel) HandleTCPConn(connCtx C.ConnContext) {
handleTCPConn(connCtx)
}
func (t tunnel) HandleUDPPacket(packet C.PacketAdapter) {
select {
case udpQueue <- packet:
default:
}
}
func (t tunnel) NatTable() C.NatTable {
return natTable
}
func OnSuspend() { func OnSuspend() {
status.Store(Suspend) status.Store(Suspend)
} }
@ -90,11 +109,13 @@ func init() {
} }
// TCPIn return fan-in queue // TCPIn return fan-in queue
// Deprecated: using Tunnel instead
func TCPIn() chan<- C.ConnContext { func TCPIn() chan<- C.ConnContext {
return tcpQueue return tcpQueue
} }
// UDPIn return fan-in udp queue // UDPIn return fan-in udp queue
// Deprecated: using Tunnel instead
func UDPIn() chan<- C.PacketAdapter { func UDPIn() chan<- C.PacketAdapter {
return udpQueue return udpQueue
} }
@ -197,10 +218,6 @@ func isHandle(t C.Type) bool {
func processUDP() { func processUDP() {
queue := udpQueue queue := udpQueue
for conn := range queue { for conn := range queue {
if !isHandle(conn.Metadata().Type) {
conn.Drop()
continue
}
handleUDPConn(conn) handleUDPConn(conn)
} }
} }
@ -216,10 +233,6 @@ func process() {
queue := tcpQueue queue := tcpQueue
for conn := range queue { for conn := range queue {
if !isHandle(conn.Metadata().Type) {
_ = conn.Conn().Close()
continue
}
go handleTCPConn(conn) go handleTCPConn(conn)
} }
} }
@ -284,6 +297,11 @@ func resolveMetadata(ctx C.PlainContext, metadata *C.Metadata) (proxy C.Proxy, r
} }
func handleUDPConn(packet C.PacketAdapter) { func handleUDPConn(packet C.PacketAdapter) {
if !isHandle(packet.Metadata().Type) {
packet.Drop()
return
}
metadata := packet.Metadata() metadata := packet.Metadata()
if !metadata.Valid() { if !metadata.Valid() {
packet.Drop() packet.Drop()
@ -409,6 +427,11 @@ func handleUDPConn(packet C.PacketAdapter) {
} }
func handleTCPConn(connCtx C.ConnContext) { func handleTCPConn(connCtx C.ConnContext) {
if !isHandle(connCtx.Metadata().Type) {
_ = connCtx.Conn().Close()
return
}
defer func(conn net.Conn) { defer func(conn net.Conn) {
_ = conn.Close() _ = conn.Close()
}(connCtx.Conn()) }(connCtx.Conn())