From 855df99b046976fdaf2ee7b868bf35565ae0e73c Mon Sep 17 00:00:00 2001 From: gVisor bot Date: Mon, 2 May 2022 13:50:10 +0800 Subject: [PATCH] refactor: Unified active health detection, supported by load balancing policy group --- adapter/outboundgroup/fallback.go | 44 ++------------------ adapter/outboundgroup/groupbase.go | 60 ++++++++++++++++++++++++---- adapter/outboundgroup/loadbalance.go | 3 ++ adapter/outboundgroup/urltest.go | 49 +++++------------------ 4 files changed, 69 insertions(+), 87 deletions(-) diff --git a/adapter/outboundgroup/fallback.go b/adapter/outboundgroup/fallback.go index 37c38e7e..bd8c968a 100644 --- a/adapter/outboundgroup/fallback.go +++ b/adapter/outboundgroup/fallback.go @@ -3,10 +3,6 @@ package outboundgroup import ( "context" "encoding/json" - "github.com/Dreamacro/clash/log" - "go.uber.org/atomic" - "time" - "github.com/Dreamacro/clash/adapter/outbound" "github.com/Dreamacro/clash/component/dialer" C "github.com/Dreamacro/clash/constant" @@ -15,9 +11,7 @@ import ( type Fallback struct { *GroupBase - disableUDP bool - failedTimes *atomic.Int32 - failedTime *atomic.Int64 + disableUDP bool } func (f *Fallback) Now() string { @@ -31,8 +25,7 @@ func (f *Fallback) DialContext(ctx context.Context, metadata *C.Metadata, opts . c, err := proxy.DialContext(ctx, metadata, f.Base.DialOptions(opts...)...) if err == nil { c.AppendToChains(f) - f.failedTimes.Store(-1) - f.failedTime.Store(-1) + f.onDialSuccess() } else { f.onDialFailed() } @@ -46,8 +39,7 @@ func (f *Fallback) ListenPacketContext(ctx context.Context, metadata *C.Metadata pc, err := proxy.ListenPacketContext(ctx, metadata, f.Base.DialOptions(opts...)...) if err == nil { pc.AppendToChains(f) - f.failedTimes.Store(-1) - f.failedTime.Store(-1) + f.onDialSuccess() } else { f.onDialFailed() } @@ -55,32 +47,6 @@ func (f *Fallback) ListenPacketContext(ctx context.Context, metadata *C.Metadata return pc, err } -func (f *Fallback) onDialFailed() { - if f.failedTime.Load() == -1 { - log.Warnln("%s first failed", f.Name()) - now := time.Now().UnixMilli() - f.failedTime.Store(now) - f.failedTimes.Store(1) - } else { - if f.failedTime.Load()-time.Now().UnixMilli() > 5*time.Second.Milliseconds() { - f.failedTimes.Store(-1) - f.failedTime.Store(-1) - } else { - failedCount := f.failedTimes.Inc() - log.Warnln("%s failed count: %d", f.Name(), failedCount) - if failedCount >= 5 { - log.Warnln("because %s failed multiple times, active health check", f.Name()) - for _, proxyProvider := range f.providers { - go proxyProvider.HealthCheck() - } - - f.failedTimes.Store(-1) - f.failedTime.Store(-1) - } - } - } -} - // SupportUDP implements C.ProxyAdapter func (f *Fallback) SupportUDP() bool { if f.disableUDP { @@ -133,8 +99,6 @@ func NewFallback(option *GroupCommonOption, providers []provider.ProxyProvider) option.Filter, providers, }), - disableUDP: option.DisableUDP, - failedTimes: atomic.NewInt32(-1), - failedTime: atomic.NewInt64(-1), + disableUDP: option.DisableUDP, } } diff --git a/adapter/outboundgroup/groupbase.go b/adapter/outboundgroup/groupbase.go index e934bc47..79d23c2a 100644 --- a/adapter/outboundgroup/groupbase.go +++ b/adapter/outboundgroup/groupbase.go @@ -5,17 +5,22 @@ import ( C "github.com/Dreamacro/clash/constant" "github.com/Dreamacro/clash/constant/provider" types "github.com/Dreamacro/clash/constant/provider" + "github.com/Dreamacro/clash/log" "github.com/Dreamacro/clash/tunnel" "github.com/dlclark/regexp2" + "go.uber.org/atomic" "sync" + "time" ) type GroupBase struct { *outbound.Base - filter *regexp2.Regexp - providers []provider.ProxyProvider - versions sync.Map // map[string]uint - proxies sync.Map // map[string][]C.Proxy + filter *regexp2.Regexp + providers []provider.ProxyProvider + versions sync.Map // map[string]uint + proxies sync.Map // map[string][]C.Proxy + failedTimes *atomic.Int32 + failedTime *atomic.Int64 } type GroupBaseOption struct { @@ -30,9 +35,11 @@ func NewGroupBase(opt GroupBaseOption) *GroupBase { filter = regexp2.MustCompile(opt.filter, 0) } return &GroupBase{ - Base: outbound.NewBase(opt.BaseOption), - filter: filter, - providers: opt.providers, + Base: outbound.NewBase(opt.BaseOption), + filter: filter, + providers: opt.providers, + failedTimes: atomic.NewInt32(-1), + failedTime: atomic.NewInt64(-1), } } @@ -96,3 +103,42 @@ func (gb *GroupBase) GetProxies(touch bool) []C.Proxy { } return proxies } + +func (gb *GroupBase) onDialFailed() { + if gb.failedTime.Load() == -1 { + log.Warnln("%s first failed", gb.Name()) + now := time.Now().UnixMilli() + gb.failedTime.Store(now) + gb.failedTimes.Store(1) + } else { + if gb.failedTime.Load()-time.Now().UnixMilli() > gb.failedIntervalTime() { + gb.failedTimes.Store(-1) + gb.failedTime.Store(-1) + } else { + failedCount := gb.failedTimes.Inc() + log.Warnln("%s failed count: %d", gb.Name(), failedCount) + if failedCount >= gb.maxFailedTimes() { + log.Warnln("because %s failed multiple times, active health check", gb.Name()) + for _, proxyProvider := range gb.providers { + go proxyProvider.HealthCheck() + } + + gb.failedTimes.Store(-1) + gb.failedTime.Store(-1) + } + } + } +} + +func (gb *GroupBase) failedIntervalTime() int64 { + return 5 * time.Second.Milliseconds() +} + +func (gb *GroupBase) onDialSuccess() { + gb.failedTimes.Store(-1) + gb.failedTime.Store(-1) +} + +func (gb *GroupBase) maxFailedTimes() int32 { + return 5 +} diff --git a/adapter/outboundgroup/loadbalance.go b/adapter/outboundgroup/loadbalance.go index 5e952390..f0342c56 100644 --- a/adapter/outboundgroup/loadbalance.go +++ b/adapter/outboundgroup/loadbalance.go @@ -71,6 +71,9 @@ func (lb *LoadBalance) DialContext(ctx context.Context, metadata *C.Metadata, op defer func() { if err == nil { c.AppendToChains(lb) + lb.onDialSuccess() + } else { + lb.onDialFailed() } }() diff --git a/adapter/outboundgroup/urltest.go b/adapter/outboundgroup/urltest.go index f0d181b8..97d28bb9 100644 --- a/adapter/outboundgroup/urltest.go +++ b/adapter/outboundgroup/urltest.go @@ -3,8 +3,6 @@ package outboundgroup import ( "context" "encoding/json" - "github.com/Dreamacro/clash/log" - "go.uber.org/atomic" "time" "github.com/Dreamacro/clash/adapter/outbound" @@ -24,12 +22,10 @@ func urlTestWithTolerance(tolerance uint16) urlTestOption { type URLTest struct { *GroupBase - tolerance uint16 - disableUDP bool - fastNode C.Proxy - fastSingle *singledo.Single[C.Proxy] - failedTimes *atomic.Int32 - failedTime *atomic.Int64 + tolerance uint16 + disableUDP bool + fastNode C.Proxy + fastSingle *singledo.Single[C.Proxy] } func (u *URLTest) Now() string { @@ -54,11 +50,11 @@ func (u *URLTest) ListenPacketContext(ctx context.Context, metadata *C.Metadata, pc, err := u.fast(true).ListenPacketContext(ctx, metadata, u.Base.DialOptions(opts...)...) if err == nil { pc.AppendToChains(u) - u.failedTimes.Store(-1) - u.failedTime.Store(-1) + u.onDialSuccess() } else { u.onDialFailed() } + return pc, err } @@ -123,32 +119,6 @@ func (u *URLTest) MarshalJSON() ([]byte, error) { }) } -func (u *URLTest) onDialFailed() { - if u.failedTime.Load() == -1 { - log.Warnln("%s first failed", u.Name()) - now := time.Now().UnixMilli() - u.failedTime.Store(now) - u.failedTimes.Store(1) - } else { - if u.failedTime.Load()-time.Now().UnixMilli() > 5*1000 { - u.failedTimes.Store(-1) - u.failedTime.Store(-1) - } else { - failedCount := u.failedTimes.Inc() - log.Warnln("%s failed count: %d", u.Name(), failedCount) - if failedCount >= 5 { - log.Warnln("because %s failed multiple times, active health check", u.Name()) - for _, proxyProvider := range u.providers { - go proxyProvider.HealthCheck() - } - - u.failedTimes.Store(-1) - u.failedTime.Store(-1) - } - } - } -} - func parseURLTestOption(config map[string]any) []urlTestOption { opts := []urlTestOption{} @@ -171,13 +141,12 @@ func NewURLTest(option *GroupCommonOption, providers []provider.ProxyProvider, o Interface: option.Interface, RoutingMark: option.RoutingMark, }, + option.Filter, providers, }), - fastSingle: singledo.NewSingle[C.Proxy](time.Second * 10), - disableUDP: option.DisableUDP, - failedTimes: atomic.NewInt32(-1), - failedTime: atomic.NewInt64(-1), + fastSingle: singledo.NewSingle[C.Proxy](time.Second * 10), + disableUDP: option.DisableUDP, } for _, option := range options {