chore: tuic use a simple client pool
This commit is contained in:
parent
29df286610
commit
69b223041c
2 changed files with 102 additions and 57 deletions
|
@ -17,6 +17,7 @@ import (
|
||||||
|
|
||||||
"github.com/metacubex/quic-go"
|
"github.com/metacubex/quic-go"
|
||||||
|
|
||||||
|
"github.com/Dreamacro/clash/common/generics/list"
|
||||||
"github.com/Dreamacro/clash/component/dialer"
|
"github.com/Dreamacro/clash/component/dialer"
|
||||||
tlsC "github.com/Dreamacro/clash/component/tls"
|
tlsC "github.com/Dreamacro/clash/component/tls"
|
||||||
C "github.com/Dreamacro/clash/constant"
|
C "github.com/Dreamacro/clash/constant"
|
||||||
|
@ -25,8 +26,8 @@ import (
|
||||||
|
|
||||||
type Tuic struct {
|
type Tuic struct {
|
||||||
*Base
|
*Base
|
||||||
getClient func(udp bool, opts ...dialer.Option) *tuic.Client
|
newClient func(udp bool, opts ...dialer.Option) *tuic.Client
|
||||||
removeClient func(udp bool, opts ...dialer.Option)
|
getClient func(udp bool, opts ...dialer.Option) *tuic.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
type TuicOption struct {
|
type TuicOption struct {
|
||||||
|
@ -57,7 +58,7 @@ type TuicOption struct {
|
||||||
// DialContext implements C.ProxyAdapter
|
// DialContext implements C.ProxyAdapter
|
||||||
func (t *Tuic) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.Conn, error) {
|
func (t *Tuic) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (C.Conn, error) {
|
||||||
opts = t.Base.DialOptions(opts...)
|
opts = t.Base.DialOptions(opts...)
|
||||||
conn, err := t.getClient(false, opts...).DialContext(ctx, metadata, func(ctx context.Context) (net.PacketConn, net.Addr, error) {
|
dialFn := func(ctx context.Context) (net.PacketConn, net.Addr, error) {
|
||||||
pc, err := dialer.ListenPacket(ctx, "udp", "", opts...)
|
pc, err := dialer.ListenPacket(ctx, "udp", "", opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
@ -67,12 +68,12 @@ func (t *Tuic) DialContext(ctx context.Context, metadata *C.Metadata, opts ...di
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
return pc, addr, err
|
return pc, addr, err
|
||||||
})
|
}
|
||||||
|
conn, err := t.getClient(false, opts...).DialContext(ctx, metadata, dialFn)
|
||||||
|
if errors.Is(err, tuic.TooManyOpenStreams) {
|
||||||
|
conn, err = t.newClient(false, opts...).DialContext(ctx, metadata, dialFn)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, tuic.TooManyOpenStreams) {
|
|
||||||
t.removeClient(false, opts...)
|
|
||||||
return t.DialContext(ctx, metadata, opts...)
|
|
||||||
}
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return NewConn(conn, t), err
|
return NewConn(conn, t), err
|
||||||
|
@ -81,7 +82,7 @@ func (t *Tuic) DialContext(ctx context.Context, metadata *C.Metadata, opts ...di
|
||||||
// ListenPacketContext implements C.ProxyAdapter
|
// ListenPacketContext implements C.ProxyAdapter
|
||||||
func (t *Tuic) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) {
|
func (t *Tuic) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (_ C.PacketConn, err error) {
|
||||||
opts = t.Base.DialOptions(opts...)
|
opts = t.Base.DialOptions(opts...)
|
||||||
pc, err := t.getClient(true, opts...).ListenPacketContext(ctx, metadata, func(ctx context.Context) (net.PacketConn, net.Addr, error) {
|
dialFn := func(ctx context.Context) (net.PacketConn, net.Addr, error) {
|
||||||
pc, err := dialer.ListenPacket(ctx, "udp", "", opts...)
|
pc, err := dialer.ListenPacket(ctx, "udp", "", opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
@ -91,12 +92,12 @@ func (t *Tuic) ListenPacketContext(ctx context.Context, metadata *C.Metadata, op
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
return pc, addr, err
|
return pc, addr, err
|
||||||
})
|
}
|
||||||
|
pc, err := t.getClient(true, opts...).ListenPacketContext(ctx, metadata, dialFn)
|
||||||
|
if errors.Is(err, tuic.TooManyOpenStreams) {
|
||||||
|
pc, err = t.newClient(false, opts...).ListenPacketContext(ctx, metadata, dialFn)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, tuic.TooManyOpenStreams) {
|
|
||||||
t.removeClient(true, opts...)
|
|
||||||
return t.ListenPacketContext(ctx, metadata, opts...)
|
|
||||||
}
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return newPacketConn(pc, t), nil
|
return newPacketConn(pc, t), nil
|
||||||
|
@ -194,37 +195,23 @@ func NewTuic(option TuicOption) (*Tuic, error) {
|
||||||
tlsConfig.ServerName = ""
|
tlsConfig.ServerName = ""
|
||||||
}
|
}
|
||||||
tkn := tuic.GenTKN(option.Token)
|
tkn := tuic.GenTKN(option.Token)
|
||||||
tcpClientMap := make(map[any]*tuic.Client)
|
tcpClients := list.New[*tuic.Client]()
|
||||||
tcpClientMapMutex := &sync.Mutex{}
|
tcpClientsMutex := &sync.Mutex{}
|
||||||
udpClientMap := make(map[any]*tuic.Client)
|
udpClients := list.New[*tuic.Client]()
|
||||||
udpClientMapMutex := &sync.Mutex{}
|
udpClientsMutex := &sync.Mutex{}
|
||||||
getClient := func(udp bool, opts ...dialer.Option) *tuic.Client {
|
newClient := func(udp bool, opts ...dialer.Option) *tuic.Client {
|
||||||
clientMap := tcpClientMap
|
clients := tcpClients
|
||||||
clientMapMutex := tcpClientMapMutex
|
clientsMutex := tcpClientsMutex
|
||||||
if udp {
|
if udp {
|
||||||
clientMap = udpClientMap
|
clients = udpClients
|
||||||
clientMapMutex = udpClientMapMutex
|
clientsMutex = udpClientsMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
o := *dialer.ApplyOptions(opts...)
|
var o any = *dialer.ApplyOptions(opts...)
|
||||||
|
|
||||||
|
clientsMutex.Lock()
|
||||||
|
defer clientsMutex.Unlock()
|
||||||
|
|
||||||
clientMapMutex.Lock()
|
|
||||||
defer clientMapMutex.Unlock()
|
|
||||||
for key := range clientMap {
|
|
||||||
client := clientMap[key]
|
|
||||||
if client == nil {
|
|
||||||
delete(clientMap, key) // It is safe in Golang
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if key == o {
|
|
||||||
client.LastVisited = time.Now()
|
|
||||||
return client
|
|
||||||
}
|
|
||||||
if time.Now().Sub(client.LastVisited) > 30*time.Minute {
|
|
||||||
delete(clientMap, key)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
client := &tuic.Client{
|
client := &tuic.Client{
|
||||||
TlsConfig: tlsConfig,
|
TlsConfig: tlsConfig,
|
||||||
QuicConfig: quicConfig,
|
QuicConfig: quicConfig,
|
||||||
|
@ -235,26 +222,60 @@ func NewTuic(option TuicOption) (*Tuic, error) {
|
||||||
ReduceRtt: option.ReduceRtt,
|
ReduceRtt: option.ReduceRtt,
|
||||||
RequestTimeout: option.RequestTimeout,
|
RequestTimeout: option.RequestTimeout,
|
||||||
MaxUdpRelayPacketSize: option.MaxUdpRelayPacketSize,
|
MaxUdpRelayPacketSize: option.MaxUdpRelayPacketSize,
|
||||||
|
Key: o,
|
||||||
LastVisited: time.Now(),
|
LastVisited: time.Now(),
|
||||||
UDP: udp,
|
UDP: udp,
|
||||||
}
|
}
|
||||||
clientMap[o] = client
|
clients.PushFront(client)
|
||||||
runtime.SetFinalizer(client, closeTuicClient)
|
runtime.SetFinalizer(client, closeTuicClient)
|
||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
removeClient := func(udp bool, opts ...dialer.Option) {
|
getClient := func(udp bool, opts ...dialer.Option) *tuic.Client {
|
||||||
clientMap := tcpClientMap
|
clients := tcpClients
|
||||||
clientMapMutex := tcpClientMapMutex
|
clientsMutex := tcpClientsMutex
|
||||||
if udp {
|
if udp {
|
||||||
clientMap = udpClientMap
|
clients = udpClients
|
||||||
clientMapMutex = udpClientMapMutex
|
clientsMutex = udpClientsMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
o := *dialer.ApplyOptions(opts...)
|
var o any = *dialer.ApplyOptions(opts...)
|
||||||
|
var bestClient *tuic.Client
|
||||||
|
|
||||||
clientMapMutex.Lock()
|
func() {
|
||||||
defer clientMapMutex.Unlock()
|
clientsMutex.Lock()
|
||||||
delete(clientMap, o)
|
defer clientsMutex.Unlock()
|
||||||
|
for it := clients.Front(); it != nil; {
|
||||||
|
client := it.Value
|
||||||
|
if client == nil {
|
||||||
|
next := it.Next()
|
||||||
|
clients.Remove(it)
|
||||||
|
it = next
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if client.Key == o {
|
||||||
|
if bestClient == nil {
|
||||||
|
bestClient = client
|
||||||
|
} else {
|
||||||
|
if client.OpenStreams.Load() < bestClient.OpenStreams.Load() {
|
||||||
|
bestClient = client
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if time.Now().Sub(client.LastVisited) > 30*time.Minute {
|
||||||
|
next := it.Next()
|
||||||
|
clients.Remove(it)
|
||||||
|
it = next
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
it = it.Next()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if bestClient == nil {
|
||||||
|
return newClient(udp, opts...)
|
||||||
|
} else {
|
||||||
|
return bestClient
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Tuic{
|
return &Tuic{
|
||||||
|
@ -266,8 +287,8 @@ func NewTuic(option TuicOption) (*Tuic, error) {
|
||||||
iface: option.Interface,
|
iface: option.Interface,
|
||||||
prefer: C.NewDNSPrefer(option.IPVersion),
|
prefer: C.NewDNSPrefer(option.IPVersion),
|
||||||
},
|
},
|
||||||
getClient: getClient,
|
newClient: newClient,
|
||||||
removeClient: removeClient,
|
getClient: getClient,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,13 +39,14 @@ type Client struct {
|
||||||
RequestTimeout int
|
RequestTimeout int
|
||||||
MaxUdpRelayPacketSize int
|
MaxUdpRelayPacketSize int
|
||||||
|
|
||||||
|
Key any
|
||||||
LastVisited time.Time
|
LastVisited time.Time
|
||||||
UDP bool
|
UDP bool
|
||||||
|
|
||||||
quicConn quic.Connection
|
quicConn quic.Connection
|
||||||
connMutex sync.Mutex
|
connMutex sync.Mutex
|
||||||
|
|
||||||
openStreams atomic.Int32
|
OpenStreams atomic.Int32
|
||||||
|
|
||||||
udpInputMap sync.Map
|
udpInputMap sync.Map
|
||||||
}
|
}
|
||||||
|
@ -242,9 +243,9 @@ func (t *Client) DialContext(ctx context.Context, metadata *C.Metadata, dialFn f
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
openStreams := t.openStreams.Add(1)
|
openStreams := t.OpenStreams.Add(1)
|
||||||
if openStreams >= MaxOpenStreams {
|
if openStreams >= MaxOpenStreams {
|
||||||
t.openStreams.Add(-1)
|
t.OpenStreams.Add(-1)
|
||||||
return nil, TooManyOpenStreams
|
return nil, TooManyOpenStreams
|
||||||
}
|
}
|
||||||
stream, err := func() (stream *quicStreamConn, err error) {
|
stream, err := func() (stream *quicStreamConn, err error) {
|
||||||
|
@ -300,6 +301,9 @@ type quicStreamConn struct {
|
||||||
lAddr net.Addr
|
lAddr net.Addr
|
||||||
rAddr net.Addr
|
rAddr net.Addr
|
||||||
client *Client
|
client *Client
|
||||||
|
|
||||||
|
closeOnce sync.Once
|
||||||
|
closeErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *quicStreamConn) Write(p []byte) (n int, err error) {
|
func (q *quicStreamConn) Write(p []byte) (n int, err error) {
|
||||||
|
@ -309,8 +313,15 @@ func (q *quicStreamConn) Write(p []byte) (n int, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *quicStreamConn) Close() error {
|
func (q *quicStreamConn) Close() error {
|
||||||
|
q.closeOnce.Do(func() {
|
||||||
|
q.closeErr = q.close()
|
||||||
|
})
|
||||||
|
return q.closeErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *quicStreamConn) close() error {
|
||||||
defer time.AfterFunc(C.DefaultTCPTimeout, func() {
|
defer time.AfterFunc(C.DefaultTCPTimeout, func() {
|
||||||
q.client.openStreams.Add(-1)
|
q.client.OpenStreams.Add(-1)
|
||||||
})
|
})
|
||||||
|
|
||||||
// https://github.com/cloudflare/cloudflared/commit/ed2bac026db46b239699ac5ce4fcf122d7cab2cd
|
// https://github.com/cloudflare/cloudflared/commit/ed2bac026db46b239699ac5ce4fcf122d7cab2cd
|
||||||
|
@ -342,6 +353,11 @@ func (t *Client) ListenPacketContext(ctx context.Context, metadata *C.Metadata,
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
openStreams := t.OpenStreams.Add(1)
|
||||||
|
if openStreams >= MaxOpenStreams {
|
||||||
|
t.OpenStreams.Add(-1)
|
||||||
|
return nil, TooManyOpenStreams
|
||||||
|
}
|
||||||
|
|
||||||
pipe1, pipe2 := net.Pipe()
|
pipe1, pipe2 := net.Pipe()
|
||||||
inputCh := make(chan udpData)
|
inputCh := make(chan udpData)
|
||||||
|
@ -380,16 +396,21 @@ type quicStreamPacketConn struct {
|
||||||
|
|
||||||
closeOnce sync.Once
|
closeOnce sync.Once
|
||||||
closeErr error
|
closeErr error
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *quicStreamPacketConn) Close() error {
|
func (q *quicStreamPacketConn) Close() error {
|
||||||
q.closeOnce.Do(func() {
|
q.closeOnce.Do(func() {
|
||||||
|
q.closed = true
|
||||||
q.closeErr = q.close()
|
q.closeErr = q.close()
|
||||||
})
|
})
|
||||||
return q.closeErr
|
return q.closeErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *quicStreamPacketConn) close() (err error) {
|
func (q *quicStreamPacketConn) close() (err error) {
|
||||||
|
defer time.AfterFunc(C.DefaultTCPTimeout, func() {
|
||||||
|
q.client.OpenStreams.Add(-1)
|
||||||
|
})
|
||||||
defer func() {
|
defer func() {
|
||||||
q.client.deferQuicConn(q.quicConn, err)
|
q.client.deferQuicConn(q.quicConn, err)
|
||||||
}()
|
}()
|
||||||
|
@ -441,6 +462,9 @@ func (q *quicStreamPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err erro
|
||||||
if len(p) > q.client.MaxUdpRelayPacketSize {
|
if len(p) > q.client.MaxUdpRelayPacketSize {
|
||||||
return 0, fmt.Errorf("udp packet too large(%d > %d)", len(p), q.client.MaxUdpRelayPacketSize)
|
return 0, fmt.Errorf("udp packet too large(%d > %d)", len(p), q.client.MaxUdpRelayPacketSize)
|
||||||
}
|
}
|
||||||
|
if q.closed {
|
||||||
|
return 0, net.ErrClosed
|
||||||
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
q.client.deferQuicConn(q.quicConn, err)
|
q.client.deferQuicConn(q.quicConn, err)
|
||||||
}()
|
}()
|
||||||
|
|
Loading…
Reference in a new issue