From 0fb0e490f813f0d943a1e2c6c7db7ce0055653fc Mon Sep 17 00:00:00 2001 From: Skyxim Date: Sun, 16 Oct 2022 13:12:49 +0800 Subject: [PATCH] fix: when connection refused active health test --- adapter/outboundgroup/fallback.go | 2 +- adapter/outboundgroup/groupbase.go | 46 ++++++++++++++++++---------- adapter/outboundgroup/loadbalance.go | 6 ++-- adapter/outboundgroup/urltest.go | 5 +-- component/dialer/dialer.go | 10 ++++++ 5 files changed, 47 insertions(+), 22 deletions(-) diff --git a/adapter/outboundgroup/fallback.go b/adapter/outboundgroup/fallback.go index 3e4c8927..ea9e13ae 100644 --- a/adapter/outboundgroup/fallback.go +++ b/adapter/outboundgroup/fallback.go @@ -31,7 +31,7 @@ func (f *Fallback) DialContext(ctx context.Context, metadata *C.Metadata, opts . c.AppendToChains(f) f.onDialSuccess() } else { - f.onDialFailed() + f.onDialFailed(proxy.Type(), err) } return c, err diff --git a/adapter/outboundgroup/groupbase.go b/adapter/outboundgroup/groupbase.go index 7754d220..e96a5500 100644 --- a/adapter/outboundgroup/groupbase.go +++ b/adapter/outboundgroup/groupbase.go @@ -11,6 +11,7 @@ import ( "github.com/Dreamacro/clash/tunnel" "github.com/dlclark/regexp2" "go.uber.org/atomic" + "strings" "sync" "time" ) @@ -136,8 +137,13 @@ func (gb *GroupBase) URLTest(ctx context.Context, url string) (map[string]uint16 } } -func (gb *GroupBase) onDialFailed() { - if gb.failedTesting.Load() { +func (gb *GroupBase) onDialFailed(adapterType C.AdapterType, err error) { + if adapterType == C.Direct || adapterType == C.Compatible || adapterType == C.Reject || adapterType == C.Pass { + return + } + + if strings.Contains(err.Error(), "connection refused") { + go gb.healthCheck() return } @@ -157,26 +163,34 @@ func (gb *GroupBase) onDialFailed() { log.Debugln("ProxyGroup: %s failed count: %d", gb.Name(), gb.failedTimes) if gb.failedTimes >= gb.maxFailedTimes() { - gb.failedTesting.Store(true) log.Warnln("because %s failed multiple times, active health check", gb.Name()) - wg := sync.WaitGroup{} - for _, proxyProvider := range gb.providers { - wg.Add(1) - proxyProvider := proxyProvider - go func() { - defer wg.Done() - proxyProvider.HealthCheck() - }() - } - - wg.Wait() - gb.failedTesting.Store(false) - gb.failedTimes = 0 + gb.healthCheck() } } }() } +func (gb *GroupBase) healthCheck() { + if gb.failedTesting.Load() { + return + } + + gb.failedTesting.Store(true) + wg := sync.WaitGroup{} + for _, proxyProvider := range gb.providers { + wg.Add(1) + proxyProvider := proxyProvider + go func() { + defer wg.Done() + proxyProvider.HealthCheck() + }() + } + + wg.Wait() + gb.failedTesting.Store(false) + gb.failedTimes = 0 +} + func (gb *GroupBase) failedIntervalTime() int64 { return 5 * time.Second.Milliseconds() } diff --git a/adapter/outboundgroup/loadbalance.go b/adapter/outboundgroup/loadbalance.go index 430aed82..83af1e4f 100644 --- a/adapter/outboundgroup/loadbalance.go +++ b/adapter/outboundgroup/loadbalance.go @@ -82,17 +82,17 @@ func jumpHash(key uint64, buckets int32) int32 { // DialContext implements C.ProxyAdapter func (lb *LoadBalance) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (c C.Conn, err error) { + proxy := lb.Unwrap(metadata) + defer func() { if err == nil { c.AppendToChains(lb) lb.onDialSuccess() } else { - lb.onDialFailed() + lb.onDialFailed(proxy.Type(), err) } }() - proxy := lb.Unwrap(metadata) - c, err = proxy.DialContext(ctx, metadata, lb.Base.DialOptions(opts...)...) return } diff --git a/adapter/outboundgroup/urltest.go b/adapter/outboundgroup/urltest.go index 9386e1ef..c4c5e219 100644 --- a/adapter/outboundgroup/urltest.go +++ b/adapter/outboundgroup/urltest.go @@ -34,12 +34,13 @@ func (u *URLTest) Now() string { // DialContext implements C.ProxyAdapter func (u *URLTest) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (c C.Conn, err error) { - c, err = u.fast(true).DialContext(ctx, metadata, u.Base.DialOptions(opts...)...) + proxy := u.fast(true) + c, err = proxy.DialContext(ctx, metadata, u.Base.DialOptions(opts...)...) if err == nil { c.AppendToChains(u) u.onDialSuccess() } else { - u.onDialFailed() + u.onDialFailed(proxy.Type(), err) } return c, err } diff --git a/component/dialer/dialer.go b/component/dialer/dialer.go index fa1b17a2..ea4d2ece 100644 --- a/component/dialer/dialer.go +++ b/component/dialer/dialer.go @@ -290,6 +290,7 @@ func concurrentDialContext(ctx context.Context, network string, ips []netip.Addr connCount := len(ips) var fallback dialResult + var primaryError error for i := 0; i < connCount; i++ { select { case res := <-results: @@ -303,6 +304,7 @@ func concurrentDialContext(ctx context.Context, network string, ips []netip.Addr } } else { if res.isPrimary { + primaryError = res.error preferCount.Add(-1) if preferCount.Load() == 0 && fallback.done && fallback.error == nil { return fallback.Conn, nil @@ -321,6 +323,14 @@ func concurrentDialContext(ctx context.Context, network string, ips []netip.Addr return fallback.Conn, nil } + if primaryError != nil { + return nil, primaryError + } + + if fallback.error != nil { + return nil, fallback.error + } + return nil, fmt.Errorf("all ips %v tcp shake hands failed", ips) }