fix: remove cyclic dependent to make tuic's Finalizer work
This commit is contained in:
parent
bc5ab3120f
commit
0aefa3be85
7 changed files with 209 additions and 159 deletions
|
@ -56,7 +56,7 @@ type TuicOption struct {
|
||||||
// DialContext implements C.ProxyAdapter
|
// DialContext implements C.ProxyAdapter
|
||||||
func (t *Tuic) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.Conn, error) {
|
func (t *Tuic) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.Conn, error) {
|
||||||
opts = t.Base.DialOptions(opts...)
|
opts = t.Base.DialOptions(opts...)
|
||||||
conn, err := t.client.DialContext(ctx, metadata, opts...)
|
conn, err := t.client.DialContext(ctx, metadata, t.dial, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -66,7 +66,7 @@ func (t *Tuic) DialContext(ctx context.Context, metadata *C.Metadata, opts ...di
|
||||||
// ListenPacketContext implements C.ProxyAdapter
|
// ListenPacketContext implements C.ProxyAdapter
|
||||||
func (t *Tuic) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) {
|
func (t *Tuic) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) {
|
||||||
opts = t.Base.DialOptions(opts...)
|
opts = t.Base.DialOptions(opts...)
|
||||||
pc, err := t.client.ListenPacketContext(ctx, metadata, opts...)
|
pc, err := t.client.ListenPacketContext(ctx, metadata, t.dial, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -205,7 +205,6 @@ func NewTuic(option TuicOption) (*Tuic, error) {
|
||||||
clientMaxOpenStreams = 1
|
clientMaxOpenStreams = 1
|
||||||
}
|
}
|
||||||
clientOption := &tuic.ClientOption{
|
clientOption := &tuic.ClientOption{
|
||||||
DialFn: t.dial,
|
|
||||||
TlsConfig: tlsConfig,
|
TlsConfig: tlsConfig,
|
||||||
QuicConfig: quicConfig,
|
QuicConfig: quicConfig,
|
||||||
Host: host,
|
Host: host,
|
||||||
|
@ -219,7 +218,7 @@ func NewTuic(option TuicOption) (*Tuic, error) {
|
||||||
MaxOpenStreams: clientMaxOpenStreams,
|
MaxOpenStreams: clientMaxOpenStreams,
|
||||||
}
|
}
|
||||||
|
|
||||||
t.client = tuic.NewClientPool(clientOption)
|
t.client = tuic.NewPoolClient(clientOption)
|
||||||
|
|
||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,8 +12,8 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
|
CN "github.com/Dreamacro/clash/common/net"
|
||||||
"github.com/Dreamacro/clash/component/dialer"
|
"github.com/Dreamacro/clash/component/dialer"
|
||||||
"github.com/Dreamacro/clash/component/resolver"
|
"github.com/Dreamacro/clash/component/resolver"
|
||||||
C "github.com/Dreamacro/clash/constant"
|
C "github.com/Dreamacro/clash/constant"
|
||||||
|
@ -220,7 +220,7 @@ func (w *WireGuard) DialContext(ctx context.Context, metadata *C.Metadata, opts
|
||||||
if conn == nil {
|
if conn == nil {
|
||||||
return nil, E.New("conn is nil")
|
return nil, E.New("conn is nil")
|
||||||
}
|
}
|
||||||
return NewConn(&wgConn{conn, w}, w), nil
|
return NewConn(CN.NewRefConn(conn, w), w), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WireGuard) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) {
|
func (w *WireGuard) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) {
|
||||||
|
@ -249,90 +249,5 @@ func (w *WireGuard) ListenPacketContext(ctx context.Context, metadata *C.Metadat
|
||||||
if pc == nil {
|
if pc == nil {
|
||||||
return nil, E.New("packetConn is nil")
|
return nil, E.New("packetConn is nil")
|
||||||
}
|
}
|
||||||
return newPacketConn(&wgPacketConn{pc, w}, w), nil
|
return newPacketConn(CN.NewRefPacketConn(pc, w), w), nil
|
||||||
}
|
|
||||||
|
|
||||||
type wgConn struct {
|
|
||||||
conn net.Conn
|
|
||||||
wg *WireGuard
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *wgConn) Read(b []byte) (n int, err error) {
|
|
||||||
defer runtime.KeepAlive(c.wg)
|
|
||||||
return c.conn.Read(b)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *wgConn) Write(b []byte) (n int, err error) {
|
|
||||||
defer runtime.KeepAlive(c.wg)
|
|
||||||
return c.conn.Write(b)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *wgConn) Close() error {
|
|
||||||
defer runtime.KeepAlive(c.wg)
|
|
||||||
return c.conn.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *wgConn) LocalAddr() net.Addr {
|
|
||||||
defer runtime.KeepAlive(c.wg)
|
|
||||||
return c.conn.LocalAddr()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *wgConn) RemoteAddr() net.Addr {
|
|
||||||
defer runtime.KeepAlive(c.wg)
|
|
||||||
return c.conn.RemoteAddr()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *wgConn) SetDeadline(t time.Time) error {
|
|
||||||
defer runtime.KeepAlive(c.wg)
|
|
||||||
return c.conn.SetDeadline(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *wgConn) SetReadDeadline(t time.Time) error {
|
|
||||||
defer runtime.KeepAlive(c.wg)
|
|
||||||
return c.conn.SetReadDeadline(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *wgConn) SetWriteDeadline(t time.Time) error {
|
|
||||||
defer runtime.KeepAlive(c.wg)
|
|
||||||
return c.conn.SetWriteDeadline(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
type wgPacketConn struct {
|
|
||||||
pc net.PacketConn
|
|
||||||
wg *WireGuard
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pc *wgPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
|
|
||||||
defer runtime.KeepAlive(pc.wg)
|
|
||||||
return pc.pc.ReadFrom(p)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pc *wgPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
|
|
||||||
defer runtime.KeepAlive(pc.wg)
|
|
||||||
return pc.pc.WriteTo(p, addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pc *wgPacketConn) Close() error {
|
|
||||||
defer runtime.KeepAlive(pc.wg)
|
|
||||||
return pc.pc.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pc *wgPacketConn) LocalAddr() net.Addr {
|
|
||||||
defer runtime.KeepAlive(pc.wg)
|
|
||||||
return pc.pc.LocalAddr()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pc *wgPacketConn) SetDeadline(t time.Time) error {
|
|
||||||
defer runtime.KeepAlive(pc.wg)
|
|
||||||
return pc.pc.SetDeadline(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pc *wgPacketConn) SetReadDeadline(t time.Time) error {
|
|
||||||
defer runtime.KeepAlive(pc.wg)
|
|
||||||
return pc.pc.SetReadDeadline(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pc *wgPacketConn) SetWriteDeadline(t time.Time) error {
|
|
||||||
defer runtime.KeepAlive(pc.wg)
|
|
||||||
return pc.pc.SetWriteDeadline(t)
|
|
||||||
}
|
}
|
||||||
|
|
100
common/net/refconn.go
Normal file
100
common/net/refconn.go
Normal file
|
@ -0,0 +1,100 @@
|
||||||
|
package net
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"runtime"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type refConn struct {
|
||||||
|
conn net.Conn
|
||||||
|
ref any
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *refConn) Read(b []byte) (n int, err error) {
|
||||||
|
defer runtime.KeepAlive(c.ref)
|
||||||
|
return c.conn.Read(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *refConn) Write(b []byte) (n int, err error) {
|
||||||
|
defer runtime.KeepAlive(c.ref)
|
||||||
|
return c.conn.Write(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *refConn) Close() error {
|
||||||
|
defer runtime.KeepAlive(c.ref)
|
||||||
|
return c.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *refConn) LocalAddr() net.Addr {
|
||||||
|
defer runtime.KeepAlive(c.ref)
|
||||||
|
return c.conn.LocalAddr()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *refConn) RemoteAddr() net.Addr {
|
||||||
|
defer runtime.KeepAlive(c.ref)
|
||||||
|
return c.conn.RemoteAddr()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *refConn) SetDeadline(t time.Time) error {
|
||||||
|
defer runtime.KeepAlive(c.ref)
|
||||||
|
return c.conn.SetDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *refConn) SetReadDeadline(t time.Time) error {
|
||||||
|
defer runtime.KeepAlive(c.ref)
|
||||||
|
return c.conn.SetReadDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *refConn) SetWriteDeadline(t time.Time) error {
|
||||||
|
defer runtime.KeepAlive(c.ref)
|
||||||
|
return c.conn.SetWriteDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRefConn(conn net.Conn, ref any) net.Conn {
|
||||||
|
return &refConn{conn: conn, ref: ref}
|
||||||
|
}
|
||||||
|
|
||||||
|
type refPacketConn struct {
|
||||||
|
pc net.PacketConn
|
||||||
|
ref any
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pc *refPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
|
||||||
|
defer runtime.KeepAlive(pc.ref)
|
||||||
|
return pc.pc.ReadFrom(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pc *refPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
|
||||||
|
defer runtime.KeepAlive(pc.ref)
|
||||||
|
return pc.pc.WriteTo(p, addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pc *refPacketConn) Close() error {
|
||||||
|
defer runtime.KeepAlive(pc.ref)
|
||||||
|
return pc.pc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pc *refPacketConn) LocalAddr() net.Addr {
|
||||||
|
defer runtime.KeepAlive(pc.ref)
|
||||||
|
return pc.pc.LocalAddr()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pc *refPacketConn) SetDeadline(t time.Time) error {
|
||||||
|
defer runtime.KeepAlive(pc.ref)
|
||||||
|
return pc.pc.SetDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pc *refPacketConn) SetReadDeadline(t time.Time) error {
|
||||||
|
defer runtime.KeepAlive(pc.ref)
|
||||||
|
return pc.pc.SetReadDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pc *refPacketConn) SetWriteDeadline(t time.Time) error {
|
||||||
|
defer runtime.KeepAlive(pc.ref)
|
||||||
|
return pc.pc.SetWriteDeadline(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRefPacketConn(pc net.PacketConn, ref any) net.PacketConn {
|
||||||
|
return &refPacketConn{pc: pc, ref: ref}
|
||||||
|
}
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"github.com/Dreamacro/clash/common/pool"
|
"github.com/Dreamacro/clash/common/pool"
|
||||||
"github.com/Dreamacro/clash/component/dialer"
|
"github.com/Dreamacro/clash/component/dialer"
|
||||||
C "github.com/Dreamacro/clash/constant"
|
C "github.com/Dreamacro/clash/constant"
|
||||||
|
"github.com/Dreamacro/clash/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -26,9 +27,9 @@ var (
|
||||||
TooManyOpenStreams = errors.New("tuic: too many open streams")
|
TooManyOpenStreams = errors.New("tuic: too many open streams")
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClientOption struct {
|
type DialFunc func(ctx context.Context, opts ...dialer.Option) (pc net.PacketConn, addr net.Addr, err error)
|
||||||
DialFn func(ctx context.Context, opts ...dialer.Option) (pc net.PacketConn, addr net.Addr, err error)
|
|
||||||
|
|
||||||
|
type ClientOption struct {
|
||||||
TlsConfig *tls.Config
|
TlsConfig *tls.Config
|
||||||
QuicConfig *quic.Config
|
QuicConfig *quic.Config
|
||||||
Host string
|
Host string
|
||||||
|
@ -42,7 +43,7 @@ type ClientOption struct {
|
||||||
MaxOpenStreams int64
|
MaxOpenStreams int64
|
||||||
}
|
}
|
||||||
|
|
||||||
type Client struct {
|
type clientImpl struct {
|
||||||
*ClientOption
|
*ClientOption
|
||||||
udp bool
|
udp bool
|
||||||
|
|
||||||
|
@ -55,18 +56,17 @@ type Client struct {
|
||||||
udpInputMap sync.Map
|
udpInputMap sync.Map
|
||||||
|
|
||||||
// only ready for PoolClient
|
// only ready for PoolClient
|
||||||
poolRef *PoolClient
|
|
||||||
optionRef any
|
optionRef any
|
||||||
lastVisited time.Time
|
lastVisited time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Client) getQuicConn(ctx context.Context) (quic.Connection, error) {
|
func (t *clientImpl) getQuicConn(ctx context.Context, dialFn DialFunc, opts ...dialer.Option) (quic.Connection, error) {
|
||||||
t.connMutex.Lock()
|
t.connMutex.Lock()
|
||||||
defer t.connMutex.Unlock()
|
defer t.connMutex.Unlock()
|
||||||
if t.quicConn != nil {
|
if t.quicConn != nil {
|
||||||
return t.quicConn, nil
|
return t.quicConn, nil
|
||||||
}
|
}
|
||||||
pc, addr, err := t.DialFn(ctx)
|
pc, addr, err := dialFn(ctx, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -97,7 +97,7 @@ func (t *Client) getQuicConn(ctx context.Context) (quic.Connection, error) {
|
||||||
return quicConn, nil
|
return quicConn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Client) sendAuthentication(quicConn quic.Connection) (err error) {
|
func (t *clientImpl) sendAuthentication(quicConn quic.Connection) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
t.deferQuicConn(quicConn, err)
|
t.deferQuicConn(quicConn, err)
|
||||||
}()
|
}()
|
||||||
|
@ -122,7 +122,7 @@ func (t *Client) sendAuthentication(quicConn quic.Connection) (err error) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Client) parseUDP(quicConn quic.Connection) (err error) {
|
func (t *clientImpl) parseUDP(quicConn quic.Connection) (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
t.deferQuicConn(quicConn, err)
|
t.deferQuicConn(quicConn, err)
|
||||||
}()
|
}()
|
||||||
|
@ -199,45 +199,50 @@ func (t *Client) parseUDP(quicConn quic.Connection) (err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Client) deferQuicConn(quicConn quic.Connection, err error) {
|
func (t *clientImpl) deferQuicConn(quicConn quic.Connection, err error) {
|
||||||
var netError net.Error
|
var netError net.Error
|
||||||
if err != nil && errors.As(err, &netError) {
|
if err != nil && errors.As(err, &netError) {
|
||||||
t.connMutex.Lock()
|
t.forceClose(quicConn, err)
|
||||||
defer t.connMutex.Unlock()
|
|
||||||
if t.quicConn == quicConn {
|
|
||||||
t.forceClose(err, true)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Client) forceClose(err error, locked bool) {
|
func (t *clientImpl) forceClose(quicConn quic.Connection, err error) {
|
||||||
if !locked {
|
|
||||||
t.connMutex.Lock()
|
t.connMutex.Lock()
|
||||||
defer t.connMutex.Unlock()
|
defer t.connMutex.Unlock()
|
||||||
|
if quicConn == nil {
|
||||||
|
quicConn = t.quicConn
|
||||||
}
|
}
|
||||||
quicConn := t.quicConn
|
|
||||||
if quicConn != nil {
|
if quicConn != nil {
|
||||||
_ = quicConn.CloseWithError(ProtocolError, err.Error())
|
if quicConn == t.quicConn {
|
||||||
t.udpInputMap.Range(func(key, value any) bool {
|
t.quicConn = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
errStr := ""
|
||||||
|
if err != nil {
|
||||||
|
errStr = err.Error()
|
||||||
|
}
|
||||||
|
if quicConn != nil {
|
||||||
|
_ = quicConn.CloseWithError(ProtocolError, errStr)
|
||||||
|
}
|
||||||
|
udpInputMap := &t.udpInputMap
|
||||||
|
udpInputMap.Range(func(key, value any) bool {
|
||||||
if conn, ok := value.(net.Conn); ok {
|
if conn, ok := value.(net.Conn); ok {
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
}
|
}
|
||||||
t.udpInputMap.Delete(key)
|
udpInputMap.Delete(key)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
t.quicConn = nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Client) Close() {
|
func (t *clientImpl) Close() {
|
||||||
t.closed.Store(true)
|
t.closed.Store(true)
|
||||||
if t.openStreams.Load() == 0 {
|
if t.openStreams.Load() == 0 {
|
||||||
t.forceClose(ClientClosed, false)
|
t.forceClose(nil, ClientClosed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Client) DialContext(ctx context.Context, metadata *C.Metadata) (net.Conn, error) {
|
func (t *clientImpl) DialContext(ctx context.Context, metadata *C.Metadata, dialFn DialFunc, opts ...dialer.Option) (net.Conn, error) {
|
||||||
quicConn, err := t.getQuicConn(ctx)
|
quicConn, err := t.getQuicConn(ctx, dialFn, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -264,12 +269,11 @@ func (t *Client) DialContext(ctx context.Context, metadata *C.Metadata) (net.Con
|
||||||
Stream: quicStream,
|
Stream: quicStream,
|
||||||
lAddr: quicConn.LocalAddr(),
|
lAddr: quicConn.LocalAddr(),
|
||||||
rAddr: quicConn.RemoteAddr(),
|
rAddr: quicConn.RemoteAddr(),
|
||||||
ref: t,
|
|
||||||
closeDeferFn: func() {
|
closeDeferFn: func() {
|
||||||
time.AfterFunc(C.DefaultTCPTimeout, func() {
|
time.AfterFunc(C.DefaultTCPTimeout, func() {
|
||||||
openStreams := t.openStreams.Add(-1)
|
openStreams := t.openStreams.Add(-1)
|
||||||
if openStreams == 0 && t.closed.Load() {
|
if openStreams == 0 && t.closed.Load() {
|
||||||
t.forceClose(ClientClosed, false)
|
t.forceClose(quicConn, ClientClosed)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
|
@ -335,8 +339,8 @@ func (conn *earlyConn) Read(b []byte) (n int, err error) {
|
||||||
return conn.BufferedConn.Read(b)
|
return conn.BufferedConn.Read(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Client) ListenPacketContext(ctx context.Context, metadata *C.Metadata) (net.PacketConn, error) {
|
func (t *clientImpl) ListenPacketContext(ctx context.Context, metadata *C.Metadata, dialFn DialFunc, opts ...dialer.Option) (net.PacketConn, error) {
|
||||||
quicConn, err := t.getQuicConn(ctx)
|
quicConn, err := t.getQuicConn(ctx, dialFn, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -362,14 +366,13 @@ func (t *Client) ListenPacketContext(ctx context.Context, metadata *C.Metadata)
|
||||||
inputConn: N.NewBufferedConn(pipe2),
|
inputConn: N.NewBufferedConn(pipe2),
|
||||||
udpRelayMode: t.UdpRelayMode,
|
udpRelayMode: t.UdpRelayMode,
|
||||||
maxUdpRelayPacketSize: t.MaxUdpRelayPacketSize,
|
maxUdpRelayPacketSize: t.MaxUdpRelayPacketSize,
|
||||||
ref: t,
|
|
||||||
deferQuicConnFn: t.deferQuicConn,
|
deferQuicConnFn: t.deferQuicConn,
|
||||||
closeDeferFn: func() {
|
closeDeferFn: func() {
|
||||||
t.udpInputMap.Delete(connId)
|
t.udpInputMap.Delete(connId)
|
||||||
time.AfterFunc(C.DefaultUDPTimeout, func() {
|
time.AfterFunc(C.DefaultUDPTimeout, func() {
|
||||||
openStreams := t.openStreams.Add(-1)
|
openStreams := t.openStreams.Add(-1)
|
||||||
if openStreams == 0 && t.closed.Load() {
|
if openStreams == 0 && t.closed.Load() {
|
||||||
t.forceClose(ClientClosed, false)
|
t.forceClose(quicConn, ClientClosed)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
|
@ -377,15 +380,42 @@ func (t *Client) ListenPacketContext(ctx context.Context, metadata *C.Metadata)
|
||||||
return pc, nil
|
return pc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
*clientImpl // use an independent pointer to let Finalizer can work no matter somewhere handle an influence in clientImpl inner
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Client) DialContext(ctx context.Context, metadata *C.Metadata, dialFn DialFunc, opts ...dialer.Option) (net.Conn, error) {
|
||||||
|
conn, err := t.clientImpl.DialContext(ctx, metadata, dialFn, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return N.NewRefConn(conn, t), err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Client) ListenPacketContext(ctx context.Context, metadata *C.Metadata, dialFn DialFunc, opts ...dialer.Option) (net.PacketConn, error) {
|
||||||
|
pc, err := t.clientImpl.ListenPacketContext(ctx, metadata, dialFn, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return N.NewRefPacketConn(pc, t), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Client) forceClose() {
|
||||||
|
t.clientImpl.forceClose(nil, ClientClosed)
|
||||||
|
}
|
||||||
|
|
||||||
func NewClient(clientOption *ClientOption, udp bool) *Client {
|
func NewClient(clientOption *ClientOption, udp bool) *Client {
|
||||||
c := &Client{
|
ci := &clientImpl{
|
||||||
ClientOption: clientOption,
|
ClientOption: clientOption,
|
||||||
udp: udp,
|
udp: udp,
|
||||||
}
|
}
|
||||||
|
c := &Client{ci}
|
||||||
runtime.SetFinalizer(c, closeClient)
|
runtime.SetFinalizer(c, closeClient)
|
||||||
|
log.Debugln("New Tuic Client at %p", c)
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func closeClient(client *Client) {
|
func closeClient(client *Client) {
|
||||||
client.forceClose(ClientClosed, false)
|
log.Debugln("Close Tuic Client at %p", client)
|
||||||
|
client.forceClose()
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,8 +58,6 @@ type quicStreamConn struct {
|
||||||
lAddr net.Addr
|
lAddr net.Addr
|
||||||
rAddr net.Addr
|
rAddr net.Addr
|
||||||
|
|
||||||
ref any
|
|
||||||
|
|
||||||
closeDeferFn func()
|
closeDeferFn func()
|
||||||
|
|
||||||
closeOnce sync.Once
|
closeOnce sync.Once
|
||||||
|
@ -117,8 +115,6 @@ type quicStreamPacketConn struct {
|
||||||
udpRelayMode string
|
udpRelayMode string
|
||||||
maxUdpRelayPacketSize int
|
maxUdpRelayPacketSize int
|
||||||
|
|
||||||
ref any
|
|
||||||
|
|
||||||
deferQuicConnFn func(quicConn quic.Connection, err error)
|
deferQuicConnFn func(quicConn quic.Connection, err error)
|
||||||
closeDeferFn func()
|
closeDeferFn func()
|
||||||
writeClosed *atomic.Bool
|
writeClosed *atomic.Bool
|
||||||
|
|
|
@ -9,8 +9,10 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Dreamacro/clash/common/generics/list"
|
"github.com/Dreamacro/clash/common/generics/list"
|
||||||
|
N "github.com/Dreamacro/clash/common/net"
|
||||||
"github.com/Dreamacro/clash/component/dialer"
|
"github.com/Dreamacro/clash/component/dialer"
|
||||||
C "github.com/Dreamacro/clash/constant"
|
C "github.com/Dreamacro/clash/constant"
|
||||||
|
"github.com/Dreamacro/clash/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type dialResult struct {
|
type dialResult struct {
|
||||||
|
@ -31,29 +33,35 @@ type PoolClient struct {
|
||||||
udpClientsMutex *sync.Mutex
|
udpClientsMutex *sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *PoolClient) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (net.Conn, error) {
|
func (t *PoolClient) DialContext(ctx context.Context, metadata *C.Metadata, dialFn DialFunc, opts ...dialer.Option) (net.Conn, error) {
|
||||||
conn, err := t.getClient(false, opts...).DialContext(ctx, metadata)
|
newDialFn := func(ctx context.Context, opts ...dialer.Option) (pc net.PacketConn, addr net.Addr, err error) {
|
||||||
|
return t.dial(ctx, dialFn, opts...)
|
||||||
|
}
|
||||||
|
conn, err := t.getClient(false, opts...).DialContext(ctx, metadata, newDialFn, opts...)
|
||||||
if errors.Is(err, TooManyOpenStreams) {
|
if errors.Is(err, TooManyOpenStreams) {
|
||||||
conn, err = t.newClient(false, opts...).DialContext(ctx, metadata)
|
conn, err = t.newClient(false, opts...).DialContext(ctx, metadata, newDialFn, opts...)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return conn, err
|
return N.NewRefConn(conn, t), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *PoolClient) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (net.PacketConn, error) {
|
func (t *PoolClient) ListenPacketContext(ctx context.Context, metadata *C.Metadata, dialFn DialFunc, opts ...dialer.Option) (net.PacketConn, error) {
|
||||||
pc, err := t.getClient(true, opts...).ListenPacketContext(ctx, metadata)
|
newDialFn := func(ctx context.Context, opts ...dialer.Option) (pc net.PacketConn, addr net.Addr, err error) {
|
||||||
|
return t.dial(ctx, dialFn, opts...)
|
||||||
|
}
|
||||||
|
pc, err := t.getClient(true, opts...).ListenPacketContext(ctx, metadata, newDialFn, opts...)
|
||||||
if errors.Is(err, TooManyOpenStreams) {
|
if errors.Is(err, TooManyOpenStreams) {
|
||||||
pc, err = t.newClient(false, opts...).ListenPacketContext(ctx, metadata)
|
pc, err = t.newClient(false, opts...).ListenPacketContext(ctx, metadata, newDialFn, opts...)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return pc, nil
|
return N.NewRefPacketConn(pc, t), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *PoolClient) dial(ctx context.Context, opts ...dialer.Option) (pc net.PacketConn, addr net.Addr, err error) {
|
func (t *PoolClient) dial(ctx context.Context, dialFn DialFunc, opts ...dialer.Option) (pc net.PacketConn, addr net.Addr, err error) {
|
||||||
var o any = *dialer.ApplyOptions(opts...)
|
var o any = *dialer.ApplyOptions(opts...)
|
||||||
|
|
||||||
t.dialResultMutex.Lock()
|
t.dialResultMutex.Lock()
|
||||||
|
@ -63,7 +71,7 @@ func (t *PoolClient) dial(ctx context.Context, opts ...dialer.Option) (pc net.Pa
|
||||||
return dr.pc, dr.addr, dr.err
|
return dr.pc, dr.addr, dr.err
|
||||||
}
|
}
|
||||||
|
|
||||||
pc, addr, err = t.DialFn(ctx, opts...)
|
pc, addr, err = dialFn(ctx, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -102,7 +110,6 @@ func (t *PoolClient) newClient(udp bool, opts ...dialer.Option) *Client {
|
||||||
defer clientsMutex.Unlock()
|
defer clientsMutex.Unlock()
|
||||||
|
|
||||||
client := NewClient(t.newClientOption, udp)
|
client := NewClient(t.newClientOption, udp)
|
||||||
client.poolRef = t // make sure pool has a reference
|
|
||||||
client.optionRef = o
|
client.optionRef = o
|
||||||
client.lastVisited = time.Now()
|
client.lastVisited = time.Now()
|
||||||
|
|
||||||
|
@ -141,7 +148,12 @@ func (t *PoolClient) getClient(udp bool, opts ...dialer.Option) *Client {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if client.openStreams.Load() == 0 && time.Now().Sub(client.lastVisited) > 30*time.Minute {
|
it = it.Next()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for it := clients.Front(); it != nil; {
|
||||||
|
client := it.Value
|
||||||
|
if client != bestClient && client.openStreams.Load() == 0 && time.Now().Sub(client.lastVisited) > 30*time.Minute {
|
||||||
client.Close()
|
client.Close()
|
||||||
next := it.Next()
|
next := it.Next()
|
||||||
clients.Remove(it)
|
clients.Remove(it)
|
||||||
|
@ -150,7 +162,6 @@ func (t *PoolClient) getClient(udp bool, opts ...dialer.Option) *Client {
|
||||||
}
|
}
|
||||||
it = it.Next()
|
it = it.Next()
|
||||||
}
|
}
|
||||||
}()
|
|
||||||
|
|
||||||
if bestClient == nil {
|
if bestClient == nil {
|
||||||
return t.newClient(udp, opts...)
|
return t.newClient(udp, opts...)
|
||||||
|
@ -160,7 +171,7 @@ func (t *PoolClient) getClient(udp bool, opts ...dialer.Option) *Client {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClientPool(clientOption *ClientOption) *PoolClient {
|
func NewPoolClient(clientOption *ClientOption) *PoolClient {
|
||||||
p := &PoolClient{
|
p := &PoolClient{
|
||||||
ClientOption: clientOption,
|
ClientOption: clientOption,
|
||||||
dialResultMap: make(map[any]dialResult),
|
dialResultMap: make(map[any]dialResult),
|
||||||
|
@ -171,12 +182,13 @@ func NewClientPool(clientOption *ClientOption) *PoolClient {
|
||||||
udpClientsMutex: &sync.Mutex{},
|
udpClientsMutex: &sync.Mutex{},
|
||||||
}
|
}
|
||||||
newClientOption := *clientOption
|
newClientOption := *clientOption
|
||||||
newClientOption.DialFn = p.dial
|
|
||||||
p.newClientOption = &newClientOption
|
p.newClientOption = &newClientOption
|
||||||
runtime.SetFinalizer(p, closeClientPool)
|
runtime.SetFinalizer(p, closeClientPool)
|
||||||
|
log.Debugln("New Tuic PoolClient at %p", p)
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
func closeClientPool(client *PoolClient) {
|
func closeClientPool(client *PoolClient) {
|
||||||
|
log.Debugln("Close Tuic PoolClient at %p", client)
|
||||||
client.forceClose()
|
client.forceClose()
|
||||||
}
|
}
|
||||||
|
|
|
@ -143,7 +143,6 @@ func (s *serverHandler) parsePacket(packet Packet, udpRelayMode string) (err err
|
||||||
inputConn: nil,
|
inputConn: nil,
|
||||||
udpRelayMode: udpRelayMode,
|
udpRelayMode: udpRelayMode,
|
||||||
maxUdpRelayPacketSize: s.MaxUdpRelayPacketSize,
|
maxUdpRelayPacketSize: s.MaxUdpRelayPacketSize,
|
||||||
ref: s,
|
|
||||||
deferQuicConnFn: nil,
|
deferQuicConnFn: nil,
|
||||||
closeDeferFn: nil,
|
closeDeferFn: nil,
|
||||||
writeClosed: writeClosed,
|
writeClosed: writeClosed,
|
||||||
|
@ -173,7 +172,6 @@ func (s *serverHandler) handleStream() (err error) {
|
||||||
Stream: quicStream,
|
Stream: quicStream,
|
||||||
lAddr: s.quicConn.LocalAddr(),
|
lAddr: s.quicConn.LocalAddr(),
|
||||||
rAddr: s.quicConn.RemoteAddr(),
|
rAddr: s.quicConn.RemoteAddr(),
|
||||||
ref: s,
|
|
||||||
}
|
}
|
||||||
conn := N.NewBufferedConn(stream)
|
conn := N.NewBufferedConn(stream)
|
||||||
connect, err := ReadConnect(conn)
|
connect, err := ReadConnect(conn)
|
||||||
|
|
Loading…
Reference in a new issue