fix: when connection refused active health test

This commit is contained in:
Skyxim 2022-10-16 13:12:49 +08:00
parent 023e3d0c41
commit 0fb0e490f8
5 changed files with 47 additions and 22 deletions

View file

@ -31,7 +31,7 @@ func (f *Fallback) DialContext(ctx context.Context, metadata *C.Metadata, opts .
c.AppendToChains(f) c.AppendToChains(f)
f.onDialSuccess() f.onDialSuccess()
} else { } else {
f.onDialFailed() f.onDialFailed(proxy.Type(), err)
} }
return c, err return c, err

View file

@ -11,6 +11,7 @@ import (
"github.com/Dreamacro/clash/tunnel" "github.com/Dreamacro/clash/tunnel"
"github.com/dlclark/regexp2" "github.com/dlclark/regexp2"
"go.uber.org/atomic" "go.uber.org/atomic"
"strings"
"sync" "sync"
"time" "time"
) )
@ -136,8 +137,13 @@ func (gb *GroupBase) URLTest(ctx context.Context, url string) (map[string]uint16
} }
} }
func (gb *GroupBase) onDialFailed() { func (gb *GroupBase) onDialFailed(adapterType C.AdapterType, err error) {
if gb.failedTesting.Load() { if adapterType == C.Direct || adapterType == C.Compatible || adapterType == C.Reject || adapterType == C.Pass {
return
}
if strings.Contains(err.Error(), "connection refused") {
go gb.healthCheck()
return return
} }
@ -157,26 +163,34 @@ func (gb *GroupBase) onDialFailed() {
log.Debugln("ProxyGroup: %s failed count: %d", gb.Name(), gb.failedTimes) log.Debugln("ProxyGroup: %s failed count: %d", gb.Name(), gb.failedTimes)
if gb.failedTimes >= gb.maxFailedTimes() { if gb.failedTimes >= gb.maxFailedTimes() {
gb.failedTesting.Store(true)
log.Warnln("because %s failed multiple times, active health check", gb.Name()) log.Warnln("because %s failed multiple times, active health check", gb.Name())
wg := sync.WaitGroup{} gb.healthCheck()
for _, proxyProvider := range gb.providers {
wg.Add(1)
proxyProvider := proxyProvider
go func() {
defer wg.Done()
proxyProvider.HealthCheck()
}()
}
wg.Wait()
gb.failedTesting.Store(false)
gb.failedTimes = 0
} }
} }
}() }()
} }
func (gb *GroupBase) healthCheck() {
if gb.failedTesting.Load() {
return
}
gb.failedTesting.Store(true)
wg := sync.WaitGroup{}
for _, proxyProvider := range gb.providers {
wg.Add(1)
proxyProvider := proxyProvider
go func() {
defer wg.Done()
proxyProvider.HealthCheck()
}()
}
wg.Wait()
gb.failedTesting.Store(false)
gb.failedTimes = 0
}
func (gb *GroupBase) failedIntervalTime() int64 { func (gb *GroupBase) failedIntervalTime() int64 {
return 5 * time.Second.Milliseconds() return 5 * time.Second.Milliseconds()
} }

View file

@ -82,17 +82,17 @@ func jumpHash(key uint64, buckets int32) int32 {
// DialContext implements C.ProxyAdapter // DialContext implements C.ProxyAdapter
func (lb *LoadBalance) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (c C.Conn, err error) { func (lb *LoadBalance) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (c C.Conn, err error) {
proxy := lb.Unwrap(metadata)
defer func() { defer func() {
if err == nil { if err == nil {
c.AppendToChains(lb) c.AppendToChains(lb)
lb.onDialSuccess() lb.onDialSuccess()
} else { } else {
lb.onDialFailed() lb.onDialFailed(proxy.Type(), err)
} }
}() }()
proxy := lb.Unwrap(metadata)
c, err = proxy.DialContext(ctx, metadata, lb.Base.DialOptions(opts...)...) c, err = proxy.DialContext(ctx, metadata, lb.Base.DialOptions(opts...)...)
return return
} }

View file

@ -34,12 +34,13 @@ func (u *URLTest) Now() string {
// DialContext implements C.ProxyAdapter // DialContext implements C.ProxyAdapter
func (u *URLTest) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (c C.Conn, err error) { func (u *URLTest) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (c C.Conn, err error) {
c, err = u.fast(true).DialContext(ctx, metadata, u.Base.DialOptions(opts...)...) proxy := u.fast(true)
c, err = proxy.DialContext(ctx, metadata, u.Base.DialOptions(opts...)...)
if err == nil { if err == nil {
c.AppendToChains(u) c.AppendToChains(u)
u.onDialSuccess() u.onDialSuccess()
} else { } else {
u.onDialFailed() u.onDialFailed(proxy.Type(), err)
} }
return c, err return c, err
} }

View file

@ -290,6 +290,7 @@ func concurrentDialContext(ctx context.Context, network string, ips []netip.Addr
connCount := len(ips) connCount := len(ips)
var fallback dialResult var fallback dialResult
var primaryError error
for i := 0; i < connCount; i++ { for i := 0; i < connCount; i++ {
select { select {
case res := <-results: case res := <-results:
@ -303,6 +304,7 @@ func concurrentDialContext(ctx context.Context, network string, ips []netip.Addr
} }
} else { } else {
if res.isPrimary { if res.isPrimary {
primaryError = res.error
preferCount.Add(-1) preferCount.Add(-1)
if preferCount.Load() == 0 && fallback.done && fallback.error == nil { if preferCount.Load() == 0 && fallback.done && fallback.error == nil {
return fallback.Conn, nil return fallback.Conn, nil
@ -321,6 +323,14 @@ func concurrentDialContext(ctx context.Context, network string, ips []netip.Addr
return fallback.Conn, nil return fallback.Conn, nil
} }
if primaryError != nil {
return nil, primaryError
}
if fallback.error != nil {
return nil, fallback.error
}
return nil, fmt.Errorf("all ips %v tcp shake hands failed", ips) return nil, fmt.Errorf("all ips %v tcp shake hands failed", ips)
} }