refactor: tcp concurrent

This commit is contained in:
Skyxim 2022-04-27 21:37:20 +08:00
parent e11f6f84df
commit 96a32f5038
3 changed files with 97 additions and 48 deletions

View file

@ -4,14 +4,19 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/Dreamacro/clash/log"
"net" "net"
"net/netip" "net/netip"
"sync"
"github.com/Dreamacro/clash/component/resolver" "github.com/Dreamacro/clash/component/resolver"
) )
var DisableIPv6 = false var (
dialMux sync.Mutex
actualSingleDialContext = singleDialContext
actualDualStackDialContext = dualStackDialContext
DisableIPv6 = false
)
func DialContext(ctx context.Context, network, address string, options ...Option) (net.Conn, error) { func DialContext(ctx context.Context, network, address string, options ...Option) (net.Conn, error) {
opt := &option{ opt := &option{
@ -29,37 +34,9 @@ func DialContext(ctx context.Context, network, address string, options ...Option
switch network { switch network {
case "tcp4", "tcp6", "udp4", "udp6": case "tcp4", "tcp6", "udp4", "udp6":
host, port, err := net.SplitHostPort(address) return actualSingleDialContext(ctx, network, address, opt)
if err != nil {
return nil, err
}
var ip netip.Addr
switch network {
case "tcp4", "udp4":
if !opt.direct {
ip, err = resolver.ResolveIPv4ProxyServerHost(host)
} else {
ip, err = resolver.ResolveIPv4(host)
}
default:
if !opt.direct {
ip, err = resolver.ResolveIPv6ProxyServerHost(host)
} else {
ip, err = resolver.ResolveIPv6(host)
}
}
if err != nil {
return nil, err
}
return dialContext(ctx, network, ip, port, opt)
case "tcp", "udp": case "tcp", "udp":
if TCPConcurrent { return actualDualStackDialContext(ctx, network, address, opt)
return concurrentDialContext(ctx, network, address, opt)
} else {
return dualStackDialContext(ctx, network, address, opt)
}
default: default:
return nil, errors.New("network invalid") return nil, errors.New("network invalid")
} }
@ -97,6 +74,19 @@ func ListenPacket(ctx context.Context, network, address string, options ...Optio
return lc.ListenPacket(ctx, network, address) return lc.ListenPacket(ctx, network, address)
} }
func SetDial(concurrent bool) {
dialMux.Lock()
if concurrent {
actualSingleDialContext = concurrentSingleDialContext
actualDualStackDialContext = concurrentDualStackDialContext
} else {
actualSingleDialContext = singleDialContext
actualDualStackDialContext = concurrentDualStackDialContext
}
dialMux.Unlock()
}
func dialContext(ctx context.Context, network string, destination netip.Addr, port string, opt *option) (net.Conn, error) { func dialContext(ctx context.Context, network string, destination netip.Addr, port string, opt *option) (net.Conn, error) {
dialer := &net.Dialer{} dialer := &net.Dialer{}
if opt.interfaceName != "" { if opt.interfaceName != "" {
@ -196,12 +186,24 @@ func dualStackDialContext(ctx context.Context, network, address string, opt *opt
return nil, errors.New("never touched") return nil, errors.New("never touched")
} }
func concurrentDialContext(ctx context.Context, network, address string, opt *option) (net.Conn, error) { func concurrentDualStackDialContext(ctx context.Context, network, address string, opt *option) (net.Conn, error) {
host, port, err := net.SplitHostPort(address) host, port, err := net.SplitHostPort(address)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var ips []netip.Addr
if opt.direct {
ips, err = resolver.ResolveAllIP(host)
} else {
ips, err = resolver.ResolveAllIPProxyServerHost(host)
}
return concurrentDialContext(ctx, network, ips, port, opt)
}
func concurrentDialContext(ctx context.Context, network string, ips []netip.Addr, port string, opt *option) (net.Conn, error) {
returned := make(chan struct{}) returned := make(chan struct{})
defer close(returned) defer close(returned)
@ -213,13 +215,6 @@ func concurrentDialContext(ctx context.Context, network, address string, opt *op
} }
results := make(chan dialResult) results := make(chan dialResult)
var ips []netip.Addr
if opt.direct {
ips, err = resolver.ResolveAllIP(host)
} else {
ips, err = resolver.ResolveAllIPProxyServerHost(host)
}
tcpRacer := func(ctx context.Context, ip netip.Addr) { tcpRacer := func(ctx context.Context, ip netip.Addr) {
result := dialResult{ip: ip} result := dialResult{ip: ip}
@ -238,7 +233,7 @@ func concurrentDialContext(ctx context.Context, network, address string, opt *op
if ip.Is6() { if ip.Is6() {
v = "6" v = "6"
} }
//log.Debugln("[%s] try use [%s] connected", host, ip.String())
result.Conn, result.error = dialContext(ctx, network+v, ip, port, opt) result.Conn, result.error = dialContext(ctx, network+v, ip, port, opt)
} }
@ -250,15 +245,70 @@ func concurrentDialContext(ctx context.Context, network, address string, opt *op
for res := range results { for res := range results {
connCount-- connCount--
if res.error == nil { if res.error == nil {
log.Debugln("[%s] used [%s] connected", host, res.ip.String())
return res.Conn, nil return res.Conn, nil
} }
//log.Errorln("connect error:%v", res.error)
if connCount == 0 { if connCount == 0 {
//log.Errorln("connect [%s] all ip failed", host)
break break
} }
} }
return nil, errors.New("all ip tcp shakeHands failed")
return nil, errors.New("all ip tcp shake hands failed")
}
func singleDialContext(ctx context.Context, network string, address string, opt *option) (net.Conn, error) {
host, port, err := net.SplitHostPort(address)
if err != nil {
return nil, err
}
var ip netip.Addr
switch network {
case "tcp4", "udp4":
if !opt.direct {
ip, err = resolver.ResolveIPv4ProxyServerHost(host)
} else {
ip, err = resolver.ResolveIPv4(host)
}
default:
if !opt.direct {
ip, err = resolver.ResolveIPv6ProxyServerHost(host)
} else {
ip, err = resolver.ResolveIPv6(host)
}
}
if err != nil {
return nil, err
}
return dialContext(ctx, network, ip, port, opt)
}
func concurrentSingleDialContext(ctx context.Context, network string, address string, opt *option) (net.Conn, error) {
host, port, err := net.SplitHostPort(address)
if err != nil {
return nil, err
}
var ips []netip.Addr
switch network {
case "tcp4", "udp4":
if !opt.direct {
ips, err = resolver.ResolveAllIPv4ProxyServerHost(host)
} else {
ips, err = resolver.ResolveAllIPv4(host)
}
default:
if !opt.direct {
ips, err = resolver.ResolveAllIPv6ProxyServerHost(host)
} else {
ips, err = resolver.ResolveAllIPv6(host)
}
}
if err != nil {
return nil, err
}
return concurrentDialContext(ctx, network, ips, port, opt)
} }

View file

@ -6,7 +6,6 @@ var (
DefaultOptions []Option DefaultOptions []Option
DefaultInterface = atomic.NewString("") DefaultInterface = atomic.NewString("")
DefaultRoutingMark = atomic.NewInt32(0) DefaultRoutingMark = atomic.NewInt32(0)
TCPConcurrent = false
) )
type option struct { type option struct {

View file

@ -251,8 +251,8 @@ func updateGeneral(general *config.General, force bool) {
resolver.DisableIPv6 = true resolver.DisableIPv6 = true
} }
dialer.TCPConcurrent = general.TCPConcurrent if general.TCPConcurrent {
if dialer.TCPConcurrent { dialer.SetDial(general.TCPConcurrent)
log.Infoln("Use tcp concurrent") log.Infoln("Use tcp concurrent")
} }