diff --git a/adapter/outbound/shadowsocks.go b/adapter/outbound/shadowsocks.go index 7ee9e634..44bfe64f 100644 --- a/adapter/outbound/shadowsocks.go +++ b/adapter/outbound/shadowsocks.go @@ -21,7 +21,6 @@ import ( restlsC "github.com/3andne/restls-client-go" "github.com/metacubex/sing-shadowsocks2" - "github.com/sagernet/sing/common/bufio" M "github.com/sagernet/sing/common/metadata" "github.com/sagernet/sing/common/uot" ) @@ -194,7 +193,7 @@ func (ss *ShadowSocks) ListenPacketWithDialer(ctx context.Context, dialer C.Dial if err != nil { return nil, err } - pc = ss.method.DialPacketConn(bufio.NewBindPacketConn(pc, addr)) + pc = ss.method.DialPacketConn(N.NewBindPacketConn(N.NewEnhancePacketConn(pc), addr)) return newPacketConn(pc, ss), nil } diff --git a/common/net/bind.go b/common/net/bind.go index 1e20a8c0..edf51ccb 100644 --- a/common/net/bind.go +++ b/common/net/bind.go @@ -3,34 +3,43 @@ package net import "net" type bindPacketConn struct { - net.PacketConn + EnhancePacketConn rAddr net.Addr } -func (wpc *bindPacketConn) Read(b []byte) (n int, err error) { - n, _, err = wpc.PacketConn.ReadFrom(b) +func (c *bindPacketConn) Read(b []byte) (n int, err error) { + n, _, err = c.EnhancePacketConn.ReadFrom(b) return n, err } -func (wpc *bindPacketConn) Write(b []byte) (n int, err error) { - return wpc.PacketConn.WriteTo(b, wpc.rAddr) +func (c *bindPacketConn) WaitRead() (data []byte, put func(), err error) { + data, put, _, err = c.EnhancePacketConn.WaitReadFrom() + return } -func (wpc *bindPacketConn) RemoteAddr() net.Addr { - return wpc.rAddr +func (c *bindPacketConn) Write(b []byte) (n int, err error) { + return c.EnhancePacketConn.WriteTo(b, c.rAddr) } -func (wpc *bindPacketConn) LocalAddr() net.Addr { - if wpc.PacketConn.LocalAddr() == nil { +func (c *bindPacketConn) RemoteAddr() net.Addr { + return c.rAddr +} + +func (c *bindPacketConn) LocalAddr() net.Addr { + if c.EnhancePacketConn.LocalAddr() == nil { return &net.UDPAddr{IP: net.IPv4zero, Port: 0} } else { - return wpc.PacketConn.LocalAddr() + return c.EnhancePacketConn.LocalAddr() } } -func NewBindPacketConn(pc net.PacketConn, rAddr net.Addr) net.Conn { +func (c *bindPacketConn) Upstream() any { + return c.EnhancePacketConn +} + +func NewBindPacketConn(pc EnhancePacketConn, rAddr net.Addr) net.Conn { return &bindPacketConn{ - PacketConn: pc, - rAddr: rAddr, + EnhancePacketConn: pc, + rAddr: rAddr, } } diff --git a/common/net/deadline/packet.go b/common/net/deadline/packet.go new file mode 100644 index 00000000..4f2e9111 --- /dev/null +++ b/common/net/deadline/packet.go @@ -0,0 +1,172 @@ +package deadline + +import ( + "net" + "os" + "time" + + "github.com/Dreamacro/clash/common/atomic" + "github.com/Dreamacro/clash/common/net/packet" +) + +type readResult struct { + data []byte + put func() + addr net.Addr + err error +} + +type PacketConn struct { + net.PacketConn + deadline atomic.TypedValue[time.Time] + pipeDeadline pipeDeadline + disablePipe atomic.Bool + inRead atomic.Bool + resultCh chan *readResult +} + +func NewPacketConn(pc net.PacketConn) net.PacketConn { + c := &PacketConn{ + PacketConn: pc, + pipeDeadline: makePipeDeadline(), + resultCh: make(chan *readResult, 1), + } + c.resultCh <- nil + if enhancePacketConn, isEnhance := pc.(packet.EnhancePacketConn); isEnhance { + return &EnhancePacketConn{ + PacketConn: c, + enhancePacketConn: enhancePacketConn, + } + } + return c +} + +func (c *PacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { + select { + case result := <-c.resultCh: + if result != nil { + n = copy(p, result.data) + addr = result.addr + err = result.err + c.resultCh <- nil // finish cache read + return + } else { + c.resultCh <- nil + break + } + case <-c.pipeDeadline.wait(): + return 0, nil, os.ErrDeadlineExceeded + } + + if c.disablePipe.Load() { + return c.PacketConn.ReadFrom(p) + } else if c.deadline.Load().IsZero() { + c.inRead.Store(true) + defer c.inRead.Store(false) + n, addr, err = c.PacketConn.ReadFrom(p) + return + } + + <-c.resultCh + go c.pipeReadFrom(len(p)) + + return c.ReadFrom(p) +} + +func (c *PacketConn) pipeReadFrom(size int) { + buffer := make([]byte, size) + n, addr, err := c.PacketConn.ReadFrom(buffer) + buffer = buffer[:n] + c.resultCh <- &readResult{ + data: buffer, + addr: addr, + err: err, + } +} + +func (c *PacketConn) SetReadDeadline(t time.Time) error { + if c.disablePipe.Load() { + return c.PacketConn.SetReadDeadline(t) + } else if c.inRead.Load() { + c.disablePipe.Store(true) + return c.PacketConn.SetReadDeadline(t) + } + c.deadline.Store(t) + c.pipeDeadline.set(t) + return nil +} + +func (c *PacketConn) ReaderReplaceable() bool { + select { + case result := <-c.resultCh: + c.resultCh <- result + if result != nil { + return false // cache reading + } else { + break + } + default: + return false // pipe reading + } + return c.disablePipe.Load() || c.deadline.Load().IsZero() +} + +func (c *PacketConn) WriterReplaceable() bool { + return true +} + +func (c *PacketConn) Upstream() any { + return c.PacketConn +} + +func (c *PacketConn) NeedAdditionalReadDeadline() bool { + return false +} + +type EnhancePacketConn struct { + *PacketConn + enhancePacketConn packet.EnhancePacketConn +} + +func (c *EnhancePacketConn) WaitReadFrom() (data []byte, put func(), addr net.Addr, err error) { + select { + case result := <-c.resultCh: + if result != nil { + data = result.data + put = result.put + addr = result.addr + err = result.err + c.resultCh <- nil // finish cache read + return + } else { + c.resultCh <- nil + break + } + case <-c.pipeDeadline.wait(): + return nil, nil, nil, os.ErrDeadlineExceeded + } + + if c.disablePipe.Load() { + return c.enhancePacketConn.WaitReadFrom() + } else if c.deadline.Load().IsZero() { + c.inRead.Store(true) + defer c.inRead.Store(false) + data, put, addr, err = c.enhancePacketConn.WaitReadFrom() + return + } + + <-c.resultCh + go c.pipeWaitReadFrom() + + return c.WaitReadFrom() +} + +func (c *EnhancePacketConn) pipeWaitReadFrom() { + data, put, addr, err := c.enhancePacketConn.WaitReadFrom() + c.resultCh <- &readResult{ + data: data, + put: put, + addr: addr, + err: err, + } +} diff --git a/common/net/deadline/pipe.go b/common/net/deadline/pipe.go new file mode 100644 index 00000000..2cccfb42 --- /dev/null +++ b/common/net/deadline/pipe.go @@ -0,0 +1,84 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package deadline + +import ( + "sync" + "time" +) + +// pipeDeadline is an abstraction for handling timeouts. +type pipeDeadline struct { + mu sync.Mutex // Guards timer and cancel + timer *time.Timer + cancel chan struct{} // Must be non-nil +} + +func makePipeDeadline() pipeDeadline { + return pipeDeadline{cancel: make(chan struct{})} +} + +// set sets the point in time when the deadline will time out. +// A timeout event is signaled by closing the channel returned by waiter. +// Once a timeout has occurred, the deadline can be refreshed by specifying a +// t value in the future. +// +// A zero value for t prevents timeout. +func (d *pipeDeadline) set(t time.Time) { + d.mu.Lock() + defer d.mu.Unlock() + + if d.timer != nil && !d.timer.Stop() { + <-d.cancel // Wait for the timer callback to finish and close cancel + } + d.timer = nil + + // Time is zero, then there is no deadline. + closed := isClosedChan(d.cancel) + if t.IsZero() { + if closed { + d.cancel = make(chan struct{}) + } + return + } + + // Time in the future, setup a timer to cancel in the future. + if dur := time.Until(t); dur > 0 { + if closed { + d.cancel = make(chan struct{}) + } + d.timer = time.AfterFunc(dur, func() { + close(d.cancel) + }) + return + } + + // Time in the past, so close immediately. + if !closed { + close(d.cancel) + } +} + +// wait returns a channel that is closed when the deadline is exceeded. +func (d *pipeDeadline) wait() chan struct{} { + d.mu.Lock() + defer d.mu.Unlock() + return d.cancel +} + +func isClosedChan(c <-chan struct{}) bool { + select { + case <-c: + return true + default: + return false + } +} + +func makeFilledChan() chan struct{} { + ch := make(chan struct{}, 1) + ch <- struct{}{} + return ch +} diff --git a/common/net/packet.go b/common/net/packet.go index 43a2bc9c..2895b537 100644 --- a/common/net/packet.go +++ b/common/net/packet.go @@ -4,69 +4,14 @@ import ( "net" "sync" - "github.com/Dreamacro/clash/common/pool" + "github.com/Dreamacro/clash/common/net/deadline" + "github.com/Dreamacro/clash/common/net/packet" ) -type EnhancePacketConn interface { - net.PacketConn - WaitReadFrom() (data []byte, put func(), addr net.Addr, err error) - Upstream() any -} +type EnhancePacketConn = packet.EnhancePacketConn -func NewEnhancePacketConn(pc net.PacketConn) EnhancePacketConn { - if udpConn, isUDPConn := pc.(*net.UDPConn); isUDPConn { - return &enhanceUDPConn{UDPConn: udpConn} - } - return &enhancePacketConn{PacketConn: pc} -} - -type enhancePacketConn struct { - net.PacketConn -} - -func (c *enhancePacketConn) WaitReadFrom() (data []byte, put func(), addr net.Addr, err error) { - return waitReadFrom(c.PacketConn) -} - -func (c *enhancePacketConn) Upstream() any { - return c.PacketConn -} - -func (c *enhancePacketConn) WriterReplaceable() bool { - return true -} - -func (c *enhancePacketConn) ReaderReplaceable() bool { - return true -} - -func (c *enhanceUDPConn) Upstream() any { - return c.UDPConn -} - -func (c *enhanceUDPConn) WriterReplaceable() bool { - return true -} - -func (c *enhanceUDPConn) ReaderReplaceable() bool { - return true -} - -func waitReadFrom(pc net.PacketConn) (data []byte, put func(), addr net.Addr, err error) { - readBuf := pool.Get(pool.UDPBufferSize) - put = func() { - _ = pool.Put(readBuf) - } - var readN int - readN, addr, err = pc.ReadFrom(readBuf) - if readN > 0 { - data = readBuf[:readN] - } else { - put() - put = nil - } - return -} +var NewEnhancePacketConn = packet.NewEnhancePacketConn +var NewDeadlinePacketConn = deadline.NewPacketConn type threadSafePacketConn struct { net.PacketConn diff --git a/common/net/packet/packet.go b/common/net/packet/packet.go new file mode 100644 index 00000000..d823334e --- /dev/null +++ b/common/net/packet/packet.go @@ -0,0 +1,70 @@ +package packet + +import ( + "net" + + "github.com/Dreamacro/clash/common/pool" +) + +type EnhancePacketConn interface { + net.PacketConn + WaitReadFrom() (data []byte, put func(), addr net.Addr, err error) +} + +func NewEnhancePacketConn(pc net.PacketConn) EnhancePacketConn { + if udpConn, isUDPConn := pc.(*net.UDPConn); isUDPConn { + return &enhanceUDPConn{UDPConn: udpConn} + } + if enhancePC, isEnhancePC := pc.(EnhancePacketConn); isEnhancePC { + return enhancePC + } + return &enhancePacketConn{PacketConn: pc} +} + +type enhancePacketConn struct { + net.PacketConn +} + +func (c *enhancePacketConn) WaitReadFrom() (data []byte, put func(), addr net.Addr, err error) { + return waitReadFrom(c.PacketConn) +} + +func (c *enhancePacketConn) Upstream() any { + return c.PacketConn +} + +func (c *enhancePacketConn) WriterReplaceable() bool { + return true +} + +func (c *enhancePacketConn) ReaderReplaceable() bool { + return true +} + +func (c *enhanceUDPConn) Upstream() any { + return c.UDPConn +} + +func (c *enhanceUDPConn) WriterReplaceable() bool { + return true +} + +func (c *enhanceUDPConn) ReaderReplaceable() bool { + return true +} + +func waitReadFrom(pc net.PacketConn) (data []byte, put func(), addr net.Addr, err error) { + readBuf := pool.Get(pool.UDPBufferSize) + put = func() { + _ = pool.Put(readBuf) + } + var readN int + readN, addr, err = pc.ReadFrom(readBuf) + if readN > 0 { + data = readBuf[:readN] + } else { + put() + put = nil + } + return +} diff --git a/common/net/packet_posix.go b/common/net/packet/packet_posix.go similarity index 98% rename from common/net/packet_posix.go rename to common/net/packet/packet_posix.go index 18c72a1c..3c5d23a6 100644 --- a/common/net/packet_posix.go +++ b/common/net/packet/packet_posix.go @@ -1,6 +1,6 @@ //go:build !windows -package net +package packet import ( "io" diff --git a/common/net/packet_windows.go b/common/net/packet/packet_windows.go similarity index 93% rename from common/net/packet_windows.go rename to common/net/packet/packet_windows.go index a5bf75aa..cb4c518b 100644 --- a/common/net/packet_windows.go +++ b/common/net/packet/packet_windows.go @@ -1,6 +1,6 @@ //go:build windows -package net +package packet import ( "net" diff --git a/common/net/sing.go b/common/net/sing.go index aa6f6917..c92008ba 100644 --- a/common/net/sing.go +++ b/common/net/sing.go @@ -23,10 +23,6 @@ func NewDeadlineConn(conn net.Conn) ExtendedConn { return deadline.NewFallbackConn(conn) } -func NewDeadlinePacketConn(pc net.PacketConn) network.NetPacketConn { - return deadline.NewFallbackPacketConn(bufio.NewPacketConn(pc)) -} - func NeedHandshake(conn any) bool { if earlyConn, isEarlyConn := common.Cast[network.EarlyConn](conn); isEarlyConn && earlyConn.NeedHandshake() { return true diff --git a/go.mod b/go.mod index b51e646a..ffa071ae 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/mdlayher/netlink v1.7.2 github.com/metacubex/quic-go v0.33.3-0.20230510010206-687b537b6a58 github.com/metacubex/sing-shadowsocks v0.2.2-0.20230509230448-a5157cc00a1c - github.com/metacubex/sing-shadowsocks2 v0.0.0-20230510002911-25e95d677383 + github.com/metacubex/sing-shadowsocks2 v0.0.0-20230511095725-1d6e98507d8c github.com/metacubex/sing-tun v0.1.5-0.20230509224930-30065d4b6376 github.com/metacubex/sing-wireguard v0.0.0-20230426030325-41db09ae771a github.com/miekg/dns v1.1.54 @@ -103,4 +103,4 @@ require ( golang.org/x/text v0.9.0 // indirect golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect golang.org/x/tools v0.6.0 // indirect -) +) \ No newline at end of file diff --git a/go.sum b/go.sum index abb18231..9bad1a3a 100644 --- a/go.sum +++ b/go.sum @@ -98,8 +98,8 @@ github.com/metacubex/quic-go v0.33.3-0.20230510010206-687b537b6a58 h1:E/sNW9tugF github.com/metacubex/quic-go v0.33.3-0.20230510010206-687b537b6a58/go.mod h1:9nOiGX6kqV3+ZbkDKdTNzdFD726QQHPH6WDb36jUSpA= github.com/metacubex/sing-shadowsocks v0.2.2-0.20230509230448-a5157cc00a1c h1:LpVNvlW/xE+mR8z76xJeYZlYznZXEmU4TeWeuygYdJg= github.com/metacubex/sing-shadowsocks v0.2.2-0.20230509230448-a5157cc00a1c/go.mod h1:4uQQReKMTU7KTfOykVBe/oGJ00pl38d+BYJ99+mx26s= -github.com/metacubex/sing-shadowsocks2 v0.0.0-20230510002911-25e95d677383 h1:YdLeRuENJZ9QL58Kf/qtMp1wZv9VGQJYMqZ2WEF6/FM= -github.com/metacubex/sing-shadowsocks2 v0.0.0-20230510002911-25e95d677383/go.mod h1:r+JnKYxqLJIkRhpT9xb3b11icXsvM6yVjCxr2Smp1Og= +github.com/metacubex/sing-shadowsocks2 v0.0.0-20230511095725-1d6e98507d8c h1:LlKpVuMuccNe3JfKJ+G+JMxiqw4FBSVsK/jQYatSsnY= +github.com/metacubex/sing-shadowsocks2 v0.0.0-20230511095725-1d6e98507d8c/go.mod h1:r+JnKYxqLJIkRhpT9xb3b11icXsvM6yVjCxr2Smp1Og= github.com/metacubex/sing-tun v0.1.5-0.20230509224930-30065d4b6376 h1:zKNsbFQyleMFAP7NJYRew9sEMJuniuODH3V0FdWnEtk= github.com/metacubex/sing-tun v0.1.5-0.20230509224930-30065d4b6376/go.mod h1:BMfG00enVf90/CzcdX9PK3Dymgl7BZqHXJfexEyB7Cc= github.com/metacubex/sing-wireguard v0.0.0-20230426030325-41db09ae771a h1:cWKym33Qvl6HA3hj4/YuYD8hHyqQPb47wT5cJRAPgco= diff --git a/listener/sing/sing.go b/listener/sing/sing.go index 7292df33..fe806e0f 100644 --- a/listener/sing/sing.go +++ b/listener/sing/sing.go @@ -106,7 +106,7 @@ func (h *ListenerHandler) NewPacketConnection(ctx context.Context, conn network. additions = append(additions, ctxAdditions...) } if deadline.NeedAdditionalReadDeadline(conn) { - conn = N.NewDeadlinePacketConn(bufio.NewNetPacketConn(conn)) // conn from sing should check NeedAdditionalReadDeadline + conn = deadline.NewFallbackPacketConn(bufio.NewNetPacketConn(conn)) // conn from sing should check NeedAdditionalReadDeadline } defer func() { _ = conn.Close() }() mutex := sync.Mutex{}