mihomo/transport/tuic/pool_client.go

213 lines
5.4 KiB
Go
Raw Permalink Normal View History

2022-11-26 23:53:59 +08:00
package tuic
import (
"context"
"errors"
"net"
"runtime"
"sync"
"time"
2023-11-03 21:01:45 +08:00
N "github.com/metacubex/mihomo/common/net"
C "github.com/metacubex/mihomo/constant"
"github.com/metacubex/mihomo/log"
2023-06-03 16:45:35 +08:00
"github.com/metacubex/quic-go"
list "github.com/bahlo/generic-list-go"
2022-11-26 23:53:59 +08:00
)
type dialResult struct {
2023-06-03 16:45:35 +08:00
transport *quic.Transport
addr net.Addr
err error
2022-11-26 23:53:59 +08:00
}
type PoolClient struct {
2023-06-12 17:44:22 +08:00
newClientOptionV4 *ClientOptionV4
newClientOptionV5 *ClientOptionV5
dialResultMap map[C.Dialer]dialResult
dialResultMutex *sync.Mutex
tcpClients *list.List[Client]
tcpClientsMutex *sync.Mutex
udpClients *list.List[Client]
udpClientsMutex *sync.Mutex
2022-11-26 23:53:59 +08:00
}
2022-12-22 09:53:11 +08:00
func (t *PoolClient) DialContextWithDialer(ctx context.Context, metadata *C.Metadata, dialer C.Dialer, dialFn DialFunc) (net.Conn, error) {
2023-06-03 16:45:35 +08:00
newDialFn := func(ctx context.Context, dialer C.Dialer) (transport *quic.Transport, addr net.Addr, err error) {
return t.dial(ctx, dialer, dialFn)
}
conn, err := t.getClient(false, dialer).DialContextWithDialer(ctx, metadata, dialer, newDialFn)
2022-11-26 23:53:59 +08:00
if errors.Is(err, TooManyOpenStreams) {
2023-06-03 16:45:35 +08:00
conn, err = t.newClient(false, dialer).DialContextWithDialer(ctx, metadata, dialer, newDialFn)
2022-12-20 00:11:02 +08:00
}
if err != nil {
return nil, err
}
return N.NewRefConn(conn, t), err
}
2022-12-22 09:53:11 +08:00
func (t *PoolClient) ListenPacketWithDialer(ctx context.Context, metadata *C.Metadata, dialer C.Dialer, dialFn DialFunc) (net.PacketConn, error) {
2023-06-03 16:45:35 +08:00
newDialFn := func(ctx context.Context, dialer C.Dialer) (transport *quic.Transport, addr net.Addr, err error) {
return t.dial(ctx, dialer, dialFn)
}
pc, err := t.getClient(true, dialer).ListenPacketWithDialer(ctx, metadata, dialer, newDialFn)
2022-12-20 00:11:02 +08:00
if errors.Is(err, TooManyOpenStreams) {
2023-06-03 16:45:35 +08:00
pc, err = t.newClient(true, dialer).ListenPacketWithDialer(ctx, metadata, dialer, newDialFn)
2022-12-20 00:11:02 +08:00
}
if err != nil {
return nil, err
}
return N.NewRefPacketConn(pc, t), nil
}
2023-06-03 16:45:35 +08:00
func (t *PoolClient) dial(ctx context.Context, dialer C.Dialer, dialFn DialFunc) (transport *quic.Transport, addr net.Addr, err error) {
2022-11-26 23:53:59 +08:00
t.dialResultMutex.Lock()
2022-12-22 09:53:11 +08:00
dr, ok := t.dialResultMap[dialer]
2022-11-26 23:53:59 +08:00
t.dialResultMutex.Unlock()
if ok {
2023-06-03 16:45:35 +08:00
return dr.transport, dr.addr, dr.err
2022-11-26 23:53:59 +08:00
}
2023-06-03 16:45:35 +08:00
transport, addr, err = dialFn(ctx, dialer)
2022-11-26 23:53:59 +08:00
if err != nil {
return nil, nil, err
}
2023-06-03 16:45:35 +08:00
if _, ok := transport.Conn.(*net.UDPConn); ok { // only cache the system's UDPConn
transport.SetSingleUse(false) // don't close transport in each dial
dr.transport, dr.addr, dr.err = transport, addr, err
t.dialResultMutex.Lock()
t.dialResultMap[dialer] = dr
t.dialResultMutex.Unlock()
}
2022-11-26 23:53:59 +08:00
2023-06-03 16:45:35 +08:00
return transport, addr, err
2022-11-26 23:53:59 +08:00
}
2022-11-28 17:09:25 +08:00
func (t *PoolClient) forceClose() {
2022-11-26 23:53:59 +08:00
t.dialResultMutex.Lock()
defer t.dialResultMutex.Unlock()
for key := range t.dialResultMap {
2023-06-03 16:45:35 +08:00
transport := t.dialResultMap[key].transport
if transport != nil {
_ = transport.Close()
2022-11-26 23:53:59 +08:00
}
delete(t.dialResultMap, key)
}
}
2023-06-12 17:44:22 +08:00
func (t *PoolClient) newClient(udp bool, dialer C.Dialer) (client Client) {
2022-11-26 23:53:59 +08:00
clients := t.tcpClients
clientsMutex := t.tcpClientsMutex
if udp {
clients = t.udpClients
clientsMutex = t.udpClientsMutex
}
clientsMutex.Lock()
defer clientsMutex.Unlock()
2023-06-12 17:44:22 +08:00
if t.newClientOptionV4 != nil {
client = NewClientV4(t.newClientOptionV4, udp, dialer)
} else {
client = NewClientV5(t.newClientOptionV5, udp, dialer)
}
client.SetLastVisited(time.Now())
2022-11-26 23:53:59 +08:00
clients.PushFront(client)
return client
}
2023-06-12 17:44:22 +08:00
func (t *PoolClient) getClient(udp bool, dialer C.Dialer) Client {
2022-11-26 23:53:59 +08:00
clients := t.tcpClients
clientsMutex := t.tcpClientsMutex
if udp {
clients = t.udpClients
clientsMutex = t.udpClientsMutex
}
2023-06-12 17:44:22 +08:00
var bestClient Client
2022-11-26 23:53:59 +08:00
func() {
clientsMutex.Lock()
defer clientsMutex.Unlock()
for it := clients.Front(); it != nil; {
client := it.Value
if client == nil {
next := it.Next()
clients.Remove(it)
it = next
continue
}
2023-06-12 17:44:22 +08:00
if client.DialerRef() == dialer {
2022-11-26 23:53:59 +08:00
if bestClient == nil {
bestClient = client
} else {
2023-06-12 17:44:22 +08:00
if client.OpenStreams() < bestClient.OpenStreams() {
2022-11-26 23:53:59 +08:00
bestClient = client
}
}
}
it = it.Next()
}
}()
for it := clients.Front(); it != nil; {
client := it.Value
2023-06-12 17:44:22 +08:00
if client != bestClient && client.OpenStreams() == 0 && time.Now().Sub(client.LastVisited()) > 30*time.Minute {
client.Close()
next := it.Next()
clients.Remove(it)
it = next
continue
}
it = it.Next()
}
2022-11-26 23:53:59 +08:00
if bestClient == nil {
2022-12-22 09:53:11 +08:00
return t.newClient(udp, dialer)
2022-11-26 23:53:59 +08:00
} else {
2023-06-12 17:44:22 +08:00
bestClient.SetLastVisited(time.Now())
2022-11-26 23:53:59 +08:00
return bestClient
}
}
2023-06-12 17:44:22 +08:00
func NewPoolClientV4(clientOption *ClientOptionV4) *PoolClient {
p := &PoolClient{
dialResultMap: make(map[C.Dialer]dialResult),
dialResultMutex: &sync.Mutex{},
tcpClients: list.New[Client](),
tcpClientsMutex: &sync.Mutex{},
udpClients: list.New[Client](),
udpClientsMutex: &sync.Mutex{},
}
newClientOption := *clientOption
p.newClientOptionV4 = &newClientOption
runtime.SetFinalizer(p, closeClientPool)
log.Debugln("New TuicV4 PoolClient at %p", p)
return p
}
func NewPoolClientV5(clientOption *ClientOptionV5) *PoolClient {
2022-11-26 23:53:59 +08:00
p := &PoolClient{
2022-12-22 09:53:11 +08:00
dialResultMap: make(map[C.Dialer]dialResult),
2022-11-26 23:53:59 +08:00
dialResultMutex: &sync.Mutex{},
2023-06-12 17:44:22 +08:00
tcpClients: list.New[Client](),
2022-11-26 23:53:59 +08:00
tcpClientsMutex: &sync.Mutex{},
2023-06-12 17:44:22 +08:00
udpClients: list.New[Client](),
2022-11-26 23:53:59 +08:00
udpClientsMutex: &sync.Mutex{},
}
2022-11-29 09:23:28 +08:00
newClientOption := *clientOption
2023-06-12 17:44:22 +08:00
p.newClientOptionV5 = &newClientOption
2022-11-26 23:53:59 +08:00
runtime.SetFinalizer(p, closeClientPool)
2023-06-12 17:44:22 +08:00
log.Debugln("New TuicV5 PoolClient at %p", p)
2022-11-26 23:53:59 +08:00
return p
}
func closeClientPool(client *PoolClient) {
log.Debugln("Close Tuic PoolClient at %p", client)
2022-11-28 17:09:25 +08:00
client.forceClose()
2022-11-26 23:53:59 +08:00
}