[Feature] Proxy stores delay data of different URLs. And supports specifying different test URLs and expected statue by group (#588)

Co-authored-by: Larvan2 <78135608+Larvan2@users.noreply.github.com>
Co-authored-by: wwqgtxx <wwqgtxx@gmail.com>
This commit is contained in:
wzdnzd 2023-06-04 11:51:30 +08:00 committed by GitHub
parent 03d0c8620e
commit 3ef81afc76
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 570 additions and 252 deletions

View file

@ -3,6 +3,7 @@ package adapter
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
@ -12,16 +13,28 @@ import (
"github.com/Dreamacro/clash/common/atomic"
"github.com/Dreamacro/clash/common/queue"
"github.com/Dreamacro/clash/common/utils"
"github.com/Dreamacro/clash/component/dialer"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/log"
)
var UnifiedDelay = atomic.NewBool(false)
const (
defaultHistoriesNum = 10
)
type extraProxyState struct {
history *queue.Queue[C.DelayHistory]
alive *atomic.Bool
}
type Proxy struct {
C.ProxyAdapter
history *queue.Queue[C.DelayHistory]
alive *atomic.Bool
extra map[string]*extraProxyState
}
// Alive implements C.Proxy
@ -29,6 +42,17 @@ func (p *Proxy) Alive() bool {
return p.alive.Load()
}
// AliveForTestUrl implements C.Proxy
func (p *Proxy) AliveForTestUrl(url string) bool {
if p.extra != nil {
if state, ok := p.extra[url]; ok {
return state.alive.Load()
}
}
return p.alive.Load()
}
// Dial implements C.Proxy
func (p *Proxy) Dial(metadata *C.Metadata) (C.Conn, error) {
ctx, cancel := context.WithTimeout(context.Background(), C.DefaultTCPTimeout)
@ -65,6 +89,42 @@ func (p *Proxy) DelayHistory() []C.DelayHistory {
return histories
}
// DelayHistoryForTestUrl implements C.Proxy
func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory {
var queueM []C.DelayHistory
if p.extra != nil {
if state, ok := p.extra[url]; ok {
queueM = state.history.Copy()
}
}
if queueM == nil {
queueM = p.history.Copy()
}
histories := []C.DelayHistory{}
for _, item := range queueM {
histories = append(histories, item)
}
return histories
}
func (p *Proxy) ExtraDelayHistory() map[string][]C.DelayHistory {
extra := map[string][]C.DelayHistory{}
if p.extra != nil && len(p.extra) != 0 {
for url, option := range p.extra {
histories := []C.DelayHistory{}
queueM := option.history.Copy()
for _, item := range queueM {
histories = append(histories, item)
}
extra[url] = histories
}
}
return extra
}
// LastDelay return last history record. if proxy is not alive, return the max value of uint16.
// implements C.Proxy
func (p *Proxy) LastDelay() (delay uint16) {
@ -80,6 +140,30 @@ func (p *Proxy) LastDelay() (delay uint16) {
return history.Delay
}
// LastDelayForTestUrl implements C.Proxy
func (p *Proxy) LastDelayForTestUrl(url string) (delay uint16) {
var max uint16 = 0xffff
alive := p.alive.Load()
history := p.history.Last()
if p.extra != nil {
if state, ok := p.extra[url]; ok {
alive = state.alive.Load()
history = state.history.Last()
}
}
if !alive {
return max
}
if history.Delay == 0 {
return max
}
return history.Delay
}
// MarshalJSON implements C.ProxyAdapter
func (p *Proxy) MarshalJSON() ([]byte, error) {
inner, err := p.ProxyAdapter.MarshalJSON()
@ -90,6 +174,7 @@ func (p *Proxy) MarshalJSON() ([]byte, error) {
mapping := map[string]any{}
_ = json.Unmarshal(inner, &mapping)
mapping["history"] = p.DelayHistory()
mapping["extra"] = p.ExtraDelayHistory()
mapping["name"] = p.Name()
mapping["udp"] = p.SupportUDP()
mapping["xudp"] = p.SupportXUDP()
@ -99,16 +184,46 @@ func (p *Proxy) MarshalJSON() ([]byte, error) {
// URLTest get the delay for the specified URL
// implements C.Proxy
func (p *Proxy) URLTest(ctx context.Context, url string) (t uint16, err error) {
func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16], store C.DelayHistoryStoreType) (t uint16, err error) {
defer func() {
p.alive.Store(err == nil)
record := C.DelayHistory{Time: time.Now()}
if err == nil {
record.Delay = t
}
p.history.Put(record)
if p.history.Len() > 10 {
p.history.Pop()
alive := err == nil
switch store {
case C.OriginalHistory:
p.alive.Store(alive)
record := C.DelayHistory{Time: time.Now()}
if alive {
record.Delay = t
}
p.history.Put(record)
if p.history.Len() > defaultHistoriesNum {
p.history.Pop()
}
case C.ExtraHistory:
record := C.DelayHistory{Time: time.Now()}
if alive {
record.Delay = t
}
if p.extra == nil {
p.extra = map[string]*extraProxyState{}
}
state, ok := p.extra[url]
if !ok {
state = &extraProxyState{
history: queue.New[C.DelayHistory](defaultHistoriesNum),
alive: atomic.NewBool(true),
}
p.extra[url] = state
}
state.alive.Store(alive)
state.history.Put(record)
if state.history.Len() > defaultHistoriesNum {
state.history.Pop()
}
default:
log.Debugln("health check result will be discarded, url: %s alive: %t, delay: %d", url, alive, t)
}
}()
@ -172,12 +287,17 @@ func (p *Proxy) URLTest(ctx context.Context, url string) (t uint16, err error) {
}
}
if !expectedStatus.Check(uint16(resp.StatusCode)) {
// maybe another value should be returned for differentiation
err = errors.New("response status is inconsistent with the expected status")
}
t = uint16(time.Since(start) / time.Millisecond)
return
}
func NewProxy(adapter C.ProxyAdapter) *Proxy {
return &Proxy{adapter, queue.New[C.DelayHistory](10), atomic.NewBool(true)}
return &Proxy{adapter, queue.New[C.DelayHistory](defaultHistoriesNum), atomic.NewBool(true), map[string]*extraProxyState{}}
}
func urlToMetadata(rawURL string) (addr C.Metadata, err error) {

View file

@ -9,6 +9,7 @@ import (
"github.com/Dreamacro/clash/adapter/outbound"
"github.com/Dreamacro/clash/common/callback"
N "github.com/Dreamacro/clash/common/net"
"github.com/Dreamacro/clash/common/utils"
"github.com/Dreamacro/clash/component/dialer"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/constant/provider"
@ -16,9 +17,10 @@ import (
type Fallback struct {
*GroupBase
disableUDP bool
testUrl string
selected string
disableUDP bool
testUrl string
selected string
expectedStatus string
}
func (f *Fallback) Now() string {
@ -82,9 +84,11 @@ func (f *Fallback) MarshalJSON() ([]byte, error) {
all = append(all, proxy.Name())
}
return json.Marshal(map[string]any{
"type": f.Type().String(),
"now": f.Now(),
"all": all,
"type": f.Type().String(),
"now": f.Now(),
"all": all,
"testUrl": f.testUrl,
"expected": f.expectedStatus,
})
}
@ -98,12 +102,14 @@ func (f *Fallback) findAliveProxy(touch bool) C.Proxy {
proxies := f.GetProxies(touch)
for _, proxy := range proxies {
if len(f.selected) == 0 {
if proxy.Alive() {
// if proxy.Alive() {
if proxy.AliveForTestUrl(f.testUrl) {
return proxy
}
} else {
if proxy.Name() == f.selected {
if proxy.Alive() {
// if proxy.Alive() {
if proxy.AliveForTestUrl(f.testUrl) {
return proxy
} else {
f.selected = ""
@ -129,10 +135,12 @@ func (f *Fallback) Set(name string) error {
}
f.selected = name
if !p.Alive() {
// if !p.Alive() {
if !p.AliveForTestUrl(f.testUrl) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(5000))
defer cancel()
_, _ = p.URLTest(ctx, f.testUrl)
expectedStatus, _ := utils.NewIntRanges[uint16](f.expectedStatus)
_, _ = p.URLTest(ctx, f.testUrl, expectedStatus, C.ExtraHistory)
}
return nil
@ -156,7 +164,8 @@ func NewFallback(option *GroupCommonOption, providers []provider.ProxyProvider)
option.ExcludeType,
providers,
}),
disableUDP: option.DisableUDP,
testUrl: option.URL,
disableUDP: option.DisableUDP,
testUrl: option.URL,
expectedStatus: option.ExpectedStatus,
}
}

View file

@ -9,6 +9,7 @@ import (
"github.com/Dreamacro/clash/adapter/outbound"
"github.com/Dreamacro/clash/common/atomic"
"github.com/Dreamacro/clash/common/utils"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/constant/provider"
types "github.com/Dreamacro/clash/constant/provider"
@ -192,7 +193,7 @@ func (gb *GroupBase) GetProxies(touch bool) []C.Proxy {
return proxies
}
func (gb *GroupBase) URLTest(ctx context.Context, url string) (map[string]uint16, error) {
func (gb *GroupBase) URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16]) (map[string]uint16, error) {
var wg sync.WaitGroup
var lock sync.Mutex
mp := map[string]uint16{}
@ -201,7 +202,7 @@ func (gb *GroupBase) URLTest(ctx context.Context, url string) (map[string]uint16
proxy := proxy
wg.Add(1)
go func() {
delay, err := proxy.URLTest(ctx, url)
delay, err := proxy.URLTest(ctx, url, expectedStatus, C.DropHistory)
if err == nil {
lock.Lock()
mp[proxy.Name()] = delay

View file

@ -25,8 +25,10 @@ type strategyFn = func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Pr
type LoadBalance struct {
*GroupBase
disableUDP bool
strategyFn strategyFn
disableUDP bool
strategyFn strategyFn
testUrl string
expectedStatus string
}
var errStrategy = errors.New("unsupported strategy")
@ -129,7 +131,7 @@ func (lb *LoadBalance) IsL3Protocol(metadata *C.Metadata) bool {
return lb.Unwrap(metadata, false).IsL3Protocol(metadata)
}
func strategyRoundRobin() strategyFn {
func strategyRoundRobin(url string) strategyFn {
idx := 0
idxMutex := sync.Mutex{}
return func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Proxy {
@ -148,7 +150,8 @@ func strategyRoundRobin() strategyFn {
for ; i < length; i++ {
id := (idx + i) % length
proxy := proxies[id]
if proxy.Alive() {
// if proxy.Alive() {
if proxy.AliveForTestUrl(url) {
i++
return proxy
}
@ -158,7 +161,7 @@ func strategyRoundRobin() strategyFn {
}
}
func strategyConsistentHashing() strategyFn {
func strategyConsistentHashing(url string) strategyFn {
maxRetry := 5
return func(proxies []C.Proxy, metadata *C.Metadata, touch bool) C.Proxy {
key := uint64(murmur3.Sum32([]byte(getKey(metadata))))
@ -166,14 +169,16 @@ func strategyConsistentHashing() strategyFn {
for i := 0; i < maxRetry; i, key = i+1, key+1 {
idx := jumpHash(key, buckets)
proxy := proxies[idx]
if proxy.Alive() {
// if proxy.Alive() {
if proxy.AliveForTestUrl(url) {
return proxy
}
}
// when availability is poor, traverse the entire list to get the available nodes
for _, proxy := range proxies {
if proxy.Alive() {
// if proxy.Alive() {
if proxy.AliveForTestUrl(url) {
return proxy
}
}
@ -182,7 +187,7 @@ func strategyConsistentHashing() strategyFn {
}
}
func strategyStickySessions() strategyFn {
func strategyStickySessions(url string) strategyFn {
ttl := time.Minute * 10
maxRetry := 5
lruCache := cache.New[uint64, int](
@ -199,7 +204,8 @@ func strategyStickySessions() strategyFn {
nowIdx := idx
for i := 1; i < maxRetry; i++ {
proxy := proxies[nowIdx]
if proxy.Alive() {
// if proxy.Alive() {
if proxy.AliveForTestUrl(url) {
if nowIdx != idx {
lruCache.Delete(key)
lruCache.Set(key, nowIdx)
@ -230,8 +236,10 @@ func (lb *LoadBalance) MarshalJSON() ([]byte, error) {
all = append(all, proxy.Name())
}
return json.Marshal(map[string]any{
"type": lb.Type().String(),
"all": all,
"type": lb.Type().String(),
"all": all,
"testUrl": lb.testUrl,
"expectedStatus": lb.expectedStatus,
})
}
@ -239,11 +247,11 @@ func NewLoadBalance(option *GroupCommonOption, providers []provider.ProxyProvide
var strategyFn strategyFn
switch strategy {
case "consistent-hashing":
strategyFn = strategyConsistentHashing()
strategyFn = strategyConsistentHashing(option.URL)
case "round-robin":
strategyFn = strategyRoundRobin()
strategyFn = strategyRoundRobin(option.URL)
case "sticky-sessions":
strategyFn = strategyStickySessions()
strategyFn = strategyStickySessions(option.URL)
default:
return nil, fmt.Errorf("%w: %s", errStrategy, strategy)
}
@ -260,7 +268,9 @@ func NewLoadBalance(option *GroupCommonOption, providers []provider.ProxyProvide
option.ExcludeType,
providers,
}),
strategyFn: strategyFn,
disableUDP: option.DisableUDP,
strategyFn: strategyFn,
disableUDP: option.DisableUDP,
testUrl: option.URL,
expectedStatus: option.ExpectedStatus,
}, nil
}

View file

@ -3,17 +3,19 @@ package outboundgroup
import (
"errors"
"fmt"
"strings"
"github.com/Dreamacro/clash/adapter/outbound"
"github.com/Dreamacro/clash/adapter/provider"
"github.com/Dreamacro/clash/common/structure"
"github.com/Dreamacro/clash/common/utils"
C "github.com/Dreamacro/clash/constant"
types "github.com/Dreamacro/clash/constant/provider"
)
var (
errFormat = errors.New("format error")
errType = errors.New("unsupport type")
errType = errors.New("unsupported type")
errMissProxy = errors.New("`use` or `proxies` missing")
errMissHealthCheck = errors.New("`url` or `interval` missing")
errDuplicateProvider = errors.New("duplicate provider name")
@ -21,17 +23,18 @@ var (
type GroupCommonOption struct {
outbound.BasicOption
Name string `group:"name"`
Type string `group:"type"`
Proxies []string `group:"proxies,omitempty"`
Use []string `group:"use,omitempty"`
URL string `group:"url,omitempty"`
Interval int `group:"interval,omitempty"`
Lazy bool `group:"lazy,omitempty"`
DisableUDP bool `group:"disable-udp,omitempty"`
Filter string `group:"filter,omitempty"`
ExcludeFilter string `group:"exclude-filter,omitempty"`
ExcludeType string `group:"exclude-type,omitempty"`
Name string `group:"name"`
Type string `group:"type"`
Proxies []string `group:"proxies,omitempty"`
Use []string `group:"use,omitempty"`
URL string `group:"url,omitempty"`
Interval int `group:"interval,omitempty"`
Lazy bool `group:"lazy,omitempty"`
DisableUDP bool `group:"disable-udp,omitempty"`
Filter string `group:"filter,omitempty"`
ExcludeFilter string `group:"exclude-filter,omitempty"`
ExcludeType string `group:"exclude-type,omitempty"`
ExpectedStatus string `group:"expected-status,omitempty"`
}
func ParseProxyGroup(config map[string]any, proxyMap map[string]C.Proxy, providersMap map[string]types.ProxyProvider) (C.ProxyAdapter, error) {
@ -56,6 +59,18 @@ func ParseProxyGroup(config map[string]any, proxyMap map[string]C.Proxy, provide
return nil, errMissProxy
}
expectedStatus, err := utils.NewIntRanges[uint16](groupOption.ExpectedStatus)
if err != nil {
return nil, err
}
status := strings.TrimSpace(groupOption.ExpectedStatus)
if status == "" {
status = "*"
}
groupOption.ExpectedStatus = status
testUrl := groupOption.URL
if len(groupOption.Proxies) != 0 {
ps, err := getProxies(proxyMap, groupOption.Proxies)
if err != nil {
@ -66,17 +81,14 @@ func ParseProxyGroup(config map[string]any, proxyMap map[string]C.Proxy, provide
return nil, errDuplicateProvider
}
// select don't need health check
if groupOption.Type == "select" || groupOption.Type == "relay" {
hc := provider.NewHealthCheck(ps, "", 0, true)
pd, err := provider.NewCompatibleProvider(groupName, ps, hc)
if err != nil {
return nil, err
}
hc := provider.NewHealthCheck(ps, "", 0, true)
pd, err := provider.NewCompatibleProvider(groupName, ps, hc)
if err != nil {
return nil, err
}
providers = append(providers, pd)
providersMap[groupName] = pd
} else {
// select don't need health check
if groupOption.Type != "select" && groupOption.Type != "relay" {
if groupOption.URL == "" {
groupOption.URL = "https://cp.cloudflare.com/generate_204"
}
@ -85,15 +97,11 @@ func ParseProxyGroup(config map[string]any, proxyMap map[string]C.Proxy, provide
groupOption.Interval = 300
}
hc := provider.NewHealthCheck(ps, groupOption.URL, uint(groupOption.Interval), groupOption.Lazy)
pd, err := provider.NewCompatibleProvider(groupName, ps, hc)
if err != nil {
return nil, err
}
providers = append(providers, pd)
providersMap[groupName] = pd
pd.RegisterHealthCheckTask(groupOption.URL, expectedStatus, "", uint(groupOption.Interval))
}
providers = append(providers, pd)
providersMap[groupName] = pd
}
if len(groupOption.Use) != 0 {
@ -101,6 +109,10 @@ func ParseProxyGroup(config map[string]any, proxyMap map[string]C.Proxy, provide
if err != nil {
return nil, err
}
// different proxy groups use different test URL
addTestUrlToProviders(list, testUrl, expectedStatus, groupOption.Filter, uint(groupOption.Interval))
providers = append(providers, list...)
} else {
groupOption.Filter = ""
@ -154,3 +166,13 @@ func getProviders(mapping map[string]types.ProxyProvider, list []string) ([]type
}
return ps, nil
}
func addTestUrlToProviders(providers []types.ProxyProvider, url string, expectedStatus utils.IntRanges[uint16], filter string, interval uint) {
if len(providers) == 0 || len(url) == 0 {
return
}
for _, pd := range providers {
pd.RegisterHealthCheckTask(url, expectedStatus, filter, interval)
}
}

View file

@ -25,12 +25,13 @@ func urlTestWithTolerance(tolerance uint16) urlTestOption {
type URLTest struct {
*GroupBase
selected string
testUrl string
tolerance uint16
disableUDP bool
fastNode C.Proxy
fastSingle *singledo.Single[C.Proxy]
selected string
testUrl string
expectedStatus string
tolerance uint16
disableUDP bool
fastNode C.Proxy
fastSingle *singledo.Single[C.Proxy]
}
func (u *URLTest) Now() string {
@ -112,7 +113,8 @@ func (u *URLTest) fast(touch bool) C.Proxy {
elm, _, shared := u.fastSingle.Do(func() (C.Proxy, error) {
fast := proxies[0]
min := fast.LastDelay()
// min := fast.LastDelay()
min := fast.LastDelayForTestUrl(u.testUrl)
fastNotExist := true
for _, proxy := range proxies[1:] {
@ -120,11 +122,13 @@ func (u *URLTest) fast(touch bool) C.Proxy {
fastNotExist = false
}
if !proxy.Alive() {
// if !proxy.Alive() {
if !proxy.AliveForTestUrl(u.testUrl) {
continue
}
delay := proxy.LastDelay()
// delay := proxy.LastDelay()
delay := proxy.LastDelayForTestUrl(u.testUrl)
if delay < min {
fast = proxy
min = delay
@ -132,7 +136,8 @@ func (u *URLTest) fast(touch bool) C.Proxy {
}
// tolerance
if u.fastNode == nil || fastNotExist || !u.fastNode.Alive() || u.fastNode.LastDelay() > fast.LastDelay()+u.tolerance {
// if u.fastNode == nil || fastNotExist || !u.fastNode.Alive() || u.fastNode.LastDelay() > fast.LastDelay()+u.tolerance {
if u.fastNode == nil || fastNotExist || !u.fastNode.AliveForTestUrl(u.testUrl) || u.fastNode.LastDelayForTestUrl(u.testUrl) > fast.LastDelayForTestUrl(u.testUrl)+u.tolerance {
u.fastNode = fast
}
return u.fastNode, nil
@ -164,9 +169,11 @@ func (u *URLTest) MarshalJSON() ([]byte, error) {
all = append(all, proxy.Name())
}
return json.Marshal(map[string]any{
"type": u.Type().String(),
"now": u.Now(),
"all": all,
"type": u.Type().String(),
"now": u.Now(),
"all": all,
"testUrl": u.testUrl,
"expected": u.expectedStatus,
})
}
@ -198,9 +205,10 @@ func NewURLTest(option *GroupCommonOption, providers []provider.ProxyProvider, o
option.ExcludeType,
providers,
}),
fastSingle: singledo.NewSingle[C.Proxy](time.Second * 10),
disableUDP: option.DisableUDP,
testUrl: option.URL,
fastSingle: singledo.NewSingle[C.Proxy](time.Second * 10),
disableUDP: option.DisableUDP,
testUrl: option.URL,
expectedStatus: option.ExpectedStatus,
}
for _, option := range options {

View file

@ -2,6 +2,8 @@ package provider
import (
"context"
"strings"
"sync"
"time"
"github.com/Dreamacro/clash/common/atomic"
@ -10,10 +12,13 @@ import (
"github.com/Dreamacro/clash/common/utils"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/log"
"github.com/dlclark/regexp2"
)
const (
defaultURLTestTimeout = time.Second * 5
defaultMaxTestUrlNum = 6
)
type HealthCheckOption struct {
@ -21,8 +26,16 @@ type HealthCheckOption struct {
Interval uint
}
type extraOption struct {
expectedStatus utils.IntRanges[uint16]
filters map[string]struct{}
}
type HealthCheck struct {
url string
extra map[string]*extraOption
mu sync.Mutex
started *atomic.Bool
proxies []C.Proxy
interval uint
lazy bool
@ -32,7 +45,13 @@ type HealthCheck struct {
}
func (hc *HealthCheck) process() {
if hc.started.Load() {
log.Warnln("Skip start health check timer due to it's started")
return
}
ticker := time.NewTicker(time.Duration(hc.interval) * time.Second)
hc.start()
for {
select {
case <-ticker.C:
@ -44,6 +63,7 @@ func (hc *HealthCheck) process() {
}
case <-hc.done:
ticker.Stop()
hc.stop()
return
}
}
@ -53,6 +73,63 @@ func (hc *HealthCheck) setProxy(proxies []C.Proxy) {
hc.proxies = proxies
}
func (hc *HealthCheck) registerHealthCheckTask(url string, expectedStatus utils.IntRanges[uint16], filter string, interval uint) {
url = strings.TrimSpace(url)
if len(url) == 0 || url == hc.url {
log.Debugln("ignore invalid health check url: %s", url)
return
}
hc.mu.Lock()
defer hc.mu.Unlock()
// if the provider has not set up health checks, then modify it to be the same as the group's interval
if hc.interval == 0 {
hc.interval = interval
}
if hc.extra == nil {
hc.extra = make(map[string]*extraOption)
}
// prioritize the use of previously registered configurations, especially those from provider
if _, ok := hc.extra[url]; ok {
// provider default health check does not set filter
if url != hc.url && len(filter) != 0 {
splitAndAddFiltersToExtra(filter, hc.extra[url])
}
log.Debugln("health check url: %s exists", url)
return
}
// due to the time-consuming nature of health checks, a maximum of defaultMaxTestURLNum URLs can be set for testing
if len(hc.extra) > defaultMaxTestUrlNum {
log.Debugln("skip add url: %s to health check because it has reached the maximum limit: %d", url, defaultMaxTestUrlNum)
return
}
option := &extraOption{filters: map[string]struct{}{}, expectedStatus: expectedStatus}
splitAndAddFiltersToExtra(filter, option)
hc.extra[url] = option
if hc.auto() && !hc.started.Load() {
go hc.process()
}
}
func splitAndAddFiltersToExtra(filter string, option *extraOption) {
filter = strings.TrimSpace(filter)
if len(filter) != 0 {
for _, regex := range strings.Split(filter, "`") {
regex = strings.TrimSpace(regex)
if len(regex) != 0 {
option.filters[regex] = struct{}{}
}
}
}
}
func (hc *HealthCheck) auto() bool {
return hc.interval != 0
}
@ -61,29 +138,78 @@ func (hc *HealthCheck) touch() {
hc.lastTouch.Store(time.Now().Unix())
}
func (hc *HealthCheck) start() {
hc.started.Store(true)
}
func (hc *HealthCheck) stop() {
hc.started.Store(false)
}
func (hc *HealthCheck) check() {
_, _, _ = hc.singleDo.Do(func() (struct{}, error) {
id := utils.NewUUIDV4().String()
log.Debugln("Start New Health Checking {%s}", id)
b, _ := batch.New[bool](context.Background(), batch.WithConcurrencyNum[bool](10))
for _, proxy := range hc.proxies {
p := proxy
b.Go(p.Name(), func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultURLTestTimeout)
defer cancel()
log.Debugln("Health Checking %s {%s}", p.Name(), id)
_, _ = p.URLTest(ctx, hc.url)
log.Debugln("Health Checked %s : %t %d ms {%s}", p.Name(), p.Alive(), p.LastDelay(), id)
return false, nil
})
}
// execute default health check
hc.execute(b, hc.url, id, nil)
// execute extra health check
if len(hc.extra) != 0 {
for url, option := range hc.extra {
hc.execute(b, url, id, option)
}
}
b.Wait()
log.Debugln("Finish A Health Checking {%s}", id)
return struct{}{}, nil
})
}
func (hc *HealthCheck) execute(b *batch.Batch[bool], url, uid string, option *extraOption) {
url = strings.TrimSpace(url)
if len(url) == 0 {
log.Debugln("Health Check has been skipped due to testUrl is empty, {%s}", uid)
return
}
var filterReg *regexp2.Regexp
var store = C.OriginalHistory
var expectedStatus utils.IntRanges[uint16]
if option != nil {
store = C.ExtraHistory
expectedStatus = option.expectedStatus
if len(option.filters) != 0 {
filters := make([]string, 0, len(option.filters))
for filter := range option.filters {
filters = append(filters, filter)
}
filterReg = regexp2.MustCompile(strings.Join(filters, "|"), 0)
}
}
for _, proxy := range hc.proxies {
// skip proxies that do not require health check
if filterReg != nil {
if match, _ := filterReg.FindStringMatch(proxy.Name()); match == nil {
continue
}
}
p := proxy
b.Go(p.Name(), func() (bool, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultURLTestTimeout)
defer cancel()
log.Debugln("Health Checking, proxy: %s, url: %s, id: {%s}", p.Name(), url, uid)
_, _ = p.URLTest(ctx, url, expectedStatus, store)
log.Debugln("Health Checked, proxy: %s, url: %s, alive: %t, delay: %d ms uid: {%s}", p.Name(), url, p.AliveForTestUrl(url), p.LastDelayForTestUrl(url), uid)
return false, nil
})
}
}
func (hc *HealthCheck) close() {
hc.done <- struct{}{}
}
@ -92,6 +218,8 @@ func NewHealthCheck(proxies []C.Proxy, url string, interval uint, lazy bool) *He
return &HealthCheck{
proxies: proxies,
url: url,
extra: map[string]*extraOption{},
started: atomic.NewBool(false),
interval: interval,
lazy: lazy,
lastTouch: atomic.NewInt64(0),

View file

@ -12,6 +12,7 @@ import (
"github.com/Dreamacro/clash/adapter"
"github.com/Dreamacro/clash/common/convert"
"github.com/Dreamacro/clash/common/utils"
clashHttp "github.com/Dreamacro/clash/component/http"
"github.com/Dreamacro/clash/component/resource"
C "github.com/Dreamacro/clash/constant"
@ -50,6 +51,7 @@ func (pp *proxySetProvider) MarshalJSON() ([]byte, error) {
"type": pp.Type().String(),
"vehicleType": pp.VehicleType().String(),
"proxies": pp.Proxies(),
"testUrl": pp.healthCheck.url,
"updatedAt": pp.UpdatedAt,
"subscriptionInfo": pp.subscriptionInfo,
})
@ -98,6 +100,10 @@ func (pp *proxySetProvider) Touch() {
pp.healthCheck.touch()
}
func (pp *proxySetProvider) RegisterHealthCheckTask(url string, expectedStatus utils.IntRanges[uint16], filter string, interval uint) {
pp.healthCheck.registerHealthCheckTask(url, expectedStatus, filter, interval)
}
func (pp *proxySetProvider) setProxies(proxies []C.Proxy) {
pp.proxies = proxies
pp.healthCheck.setProxy(proxies)
@ -210,6 +216,7 @@ func (cp *compatibleProvider) MarshalJSON() ([]byte, error) {
"type": cp.Type().String(),
"vehicleType": cp.VehicleType().String(),
"proxies": cp.Proxies(),
"testUrl": cp.healthCheck.url,
})
}
@ -249,6 +256,10 @@ func (cp *compatibleProvider) Touch() {
cp.healthCheck.touch()
}
func (cp *compatibleProvider) RegisterHealthCheckTask(url string, expectedStatus utils.IntRanges[uint16], filter string, interval uint) {
cp.healthCheck.registerHealthCheckTask(url, expectedStatus, filter, interval)
}
func stopCompatibleProvider(pd *CompatibleProvider) {
pd.healthCheck.close()
}

View file

@ -9,36 +9,36 @@ type Range[T constraints.Ordered] struct {
end T
}
func NewRange[T constraints.Ordered](start, end T) *Range[T] {
func NewRange[T constraints.Ordered](start, end T) Range[T] {
if start > end {
return &Range[T]{
return Range[T]{
start: end,
end: start,
}
}
return &Range[T]{
return Range[T]{
start: start,
end: end,
}
}
func (r *Range[T]) Contains(t T) bool {
func (r Range[T]) Contains(t T) bool {
return t >= r.start && t <= r.end
}
func (r *Range[T]) LeftContains(t T) bool {
func (r Range[T]) LeftContains(t T) bool {
return t >= r.start && t < r.end
}
func (r *Range[T]) RightContains(t T) bool {
func (r Range[T]) RightContains(t T) bool {
return t > r.start && t <= r.end
}
func (r *Range[T]) Start() T {
func (r Range[T]) Start() T {
return r.start
}
func (r *Range[T]) End() T {
func (r Range[T]) End() T {
return r.end
}

77
common/utils/ranges.go Normal file
View file

@ -0,0 +1,77 @@
package utils
import (
"errors"
"fmt"
"strconv"
"strings"
"golang.org/x/exp/constraints"
)
type IntRanges[T constraints.Integer] []Range[T]
var errIntRanges = errors.New("intRanges error")
func NewIntRanges[T constraints.Integer](expected string) (IntRanges[T], error) {
// example: 200 or 200/302 or 200-400 or 200/204/401-429/501-503
expected = strings.TrimSpace(expected)
if len(expected) == 0 || expected == "*" {
return nil, nil
}
list := strings.Split(expected, "/")
if len(list) > 28 {
return nil, fmt.Errorf("%w, too many ranges to use, maximum support 28 ranges", errIntRanges)
}
return NewIntRangesFromList[T](list)
}
func NewIntRangesFromList[T constraints.Integer](list []string) (IntRanges[T], error) {
var ranges IntRanges[T]
for _, s := range list {
if s == "" {
continue
}
status := strings.Split(s, "-")
statusLen := len(status)
if statusLen > 2 {
return nil, errIntRanges
}
start, err := strconv.ParseInt(strings.Trim(status[0], "[ ]"), 10, 64)
if err != nil {
return nil, errIntRanges
}
switch statusLen {
case 1:
ranges = append(ranges, NewRange(T(start), T(start)))
case 2:
end, err := strconv.ParseUint(strings.Trim(status[1], "[ ]"), 10, 64)
if err != nil {
return nil, errIntRanges
}
ranges = append(ranges, NewRange(T(start), T(end)))
}
}
return ranges, nil
}
func (ranges IntRanges[T]) Check(status T) bool {
if ranges == nil || len(ranges) == 0 {
return true
}
for _, segment := range ranges {
if segment.Contains(status) {
return true
}
}
return false
}

View file

@ -10,11 +10,11 @@ import (
type SnifferConfig struct {
OverrideDest bool
Ports []utils.Range[uint16]
Ports utils.IntRanges[uint16]
}
type BaseSniffer struct {
ports []utils.Range[uint16]
ports utils.IntRanges[uint16]
supportNetworkType constant.NetWork
}
@ -35,15 +35,10 @@ func (bs *BaseSniffer) SupportNetwork() constant.NetWork {
// SupportPort implements sniffer.Sniffer
func (bs *BaseSniffer) SupportPort(port uint16) bool {
for _, portRange := range bs.ports {
if portRange.Contains(port) {
return true
}
}
return false
return bs.ports.Check(port)
}
func NewBaseSniffer(ports []utils.Range[uint16], networkType constant.NetWork) *BaseSniffer {
func NewBaseSniffer(ports utils.IntRanges[uint16], networkType constant.NetWork) *BaseSniffer {
return &BaseSniffer{
ports: ports,
supportNetworkType: networkType,

View file

@ -34,11 +34,9 @@ type HTTPSniffer struct {
var _ sniffer.Sniffer = (*HTTPSniffer)(nil)
func NewHTTPSniffer(snifferConfig SnifferConfig) (*HTTPSniffer, error) {
ports := make([]utils.Range[uint16], 0)
if len(snifferConfig.Ports) == 0 {
ports = append(ports, *utils.NewRange[uint16](80, 80))
} else {
ports = append(ports, snifferConfig.Ports...)
ports := snifferConfig.Ports
if len(ports) == 0 {
ports = utils.IntRanges[uint16]{utils.NewRange[uint16](80, 80)}
}
return &HTTPSniffer{
BaseSniffer: NewBaseSniffer(ports, C.TCP),

View file

@ -22,11 +22,9 @@ type TLSSniffer struct {
}
func NewTLSSniffer(snifferConfig SnifferConfig) (*TLSSniffer, error) {
ports := make([]utils.Range[uint16], 0)
if len(snifferConfig.Ports) == 0 {
ports = append(ports, *utils.NewRange[uint16](443, 443))
} else {
ports = append(ports, snifferConfig.Ports...)
ports := snifferConfig.Ports
if len(ports) == 0 {
ports = utils.IntRanges[uint16]{utils.NewRange[uint16](443, 443)}
}
return &TLSSniffer{
BaseSniffer: NewBaseSniffer(ports, C.TCP),

View file

@ -17,7 +17,7 @@ import (
var trustCerts []*x509.Certificate
var certPool *x509.CertPool
var mutex sync.RWMutex
var errNotMacth error = errors.New("certificate fingerprints do not match")
var errNotMatch = errors.New("certificate fingerprints do not match")
func AddCertificate(certificate string) error {
mutex.Lock()
@ -79,7 +79,7 @@ func verifyFingerprint(fingerprint *[32]byte) func(rawCerts [][]byte, verifiedCh
}
}
}
return errNotMacth
return errNotMatch
}
}

View file

@ -9,7 +9,6 @@ import (
"net/url"
"os"
"regexp"
"strconv"
"strings"
"time"
@ -1304,7 +1303,7 @@ func parseSniffer(snifferRaw RawSniffer) (*Sniffer, error) {
if len(snifferRaw.Sniff) != 0 {
for sniffType, sniffConfig := range snifferRaw.Sniff {
find := false
ports, err := parsePortRange(sniffConfig.Ports)
ports, err := utils.NewIntRangesFromList[uint16](sniffConfig.Ports)
if err != nil {
return nil, err
}
@ -1331,7 +1330,7 @@ func parseSniffer(snifferRaw RawSniffer) (*Sniffer, error) {
// Deprecated: Use Sniff instead
log.Warnln("Deprecated: Use Sniff instead")
}
globalPorts, err := parsePortRange(snifferRaw.Ports)
globalPorts, err := utils.NewIntRangesFromList[uint16](snifferRaw.Ports)
if err != nil {
return nil, err
}
@ -1376,28 +1375,3 @@ func parseSniffer(snifferRaw RawSniffer) (*Sniffer, error) {
return sniffer, nil
}
func parsePortRange(portRanges []string) ([]utils.Range[uint16], error) {
ports := make([]utils.Range[uint16], 0)
for _, portRange := range portRanges {
portRaws := strings.Split(portRange, "-")
p, err := strconv.ParseUint(portRaws[0], 10, 16)
if err != nil {
return nil, fmt.Errorf("%s format error", portRange)
}
start := uint16(p)
if len(portRaws) > 1 {
p, err = strconv.ParseUint(portRaws[1], 10, 16)
if err != nil {
return nil, fmt.Errorf("%s format error", portRange)
}
end := uint16(p)
ports = append(ports, *utils.NewRange(start, end))
} else {
ports = append(ports, *utils.NewRange(start, start))
}
}
return ports, nil
}

View file

@ -10,6 +10,7 @@ import (
"time"
N "github.com/Dreamacro/clash/common/net"
"github.com/Dreamacro/clash/common/utils"
"github.com/Dreamacro/clash/component/dialer"
)
@ -132,7 +133,7 @@ type ProxyAdapter interface {
}
type Group interface {
URLTest(ctx context.Context, url string) (mp map[string]uint16, err error)
URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16]) (mp map[string]uint16, err error)
GetProxies(touch bool) []Proxy
Touch()
}
@ -142,12 +143,23 @@ type DelayHistory struct {
Delay uint16 `json:"delay"`
}
type DelayHistoryStoreType int
const (
OriginalHistory DelayHistoryStoreType = iota
ExtraHistory
DropHistory
)
type Proxy interface {
ProxyAdapter
Alive() bool
AliveForTestUrl(url string) bool
DelayHistory() []DelayHistory
ExtraDelayHistory() map[string][]DelayHistory
LastDelay() uint16
URLTest(ctx context.Context, url string) (uint16, error)
LastDelayForTestUrl(url string) uint16
URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16], store DelayHistoryStoreType) (uint16, error)
// Deprecated: use DialContext instead.
Dial(metadata *Metadata) (Conn, error)

View file

@ -1,6 +1,7 @@
package provider
import (
"github.com/Dreamacro/clash/common/utils"
"github.com/Dreamacro/clash/constant"
)
@ -71,6 +72,7 @@ type ProxyProvider interface {
Touch()
HealthCheck()
Version() uint32
RegisterHealthCheckTask(url string, expectedStatus utils.IntRanges[uint16], filter string, interval uint)
}
// RuleProvider interface

View file

@ -2,14 +2,16 @@ package route
import (
"context"
"github.com/Dreamacro/clash/adapter"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/tunnel"
"github.com/go-chi/chi/v5"
"github.com/go-chi/render"
"net/http"
"strconv"
"time"
"github.com/Dreamacro/clash/adapter"
"github.com/Dreamacro/clash/common/utils"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/tunnel"
)
func GroupRouter() http.Handler {
@ -64,10 +66,17 @@ func getGroupDelay(w http.ResponseWriter, r *http.Request) {
return
}
expectedStatus, err := utils.NewIntRanges[uint16](query.Get("expected"))
if err != nil {
render.Status(r, http.StatusBadRequest)
render.JSON(w, r, ErrBadRequest)
return
}
ctx, cancel := context.WithTimeout(r.Context(), time.Millisecond*time.Duration(timeout))
defer cancel()
dm, err := group.URLTest(ctx, url)
dm, err := group.URLTest(ctx, url, expectedStatus)
if err != nil {
render.Status(r, http.StatusGatewayTimeout)

View file

@ -9,6 +9,7 @@ import (
"github.com/Dreamacro/clash/adapter"
"github.com/Dreamacro/clash/adapter/outboundgroup"
"github.com/Dreamacro/clash/common/utils"
"github.com/Dreamacro/clash/component/profile/cachefile"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/tunnel"
@ -112,12 +113,19 @@ func getProxyDelay(w http.ResponseWriter, r *http.Request) {
return
}
expectedStatus, err := utils.NewIntRanges[uint16](query.Get("expected"))
if err != nil {
render.Status(r, http.StatusBadRequest)
render.JSON(w, r, ErrBadRequest)
return
}
proxy := r.Context().Value(CtxKeyProxy).(C.Proxy)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(timeout))
defer cancel()
delay, err := proxy.URLTest(ctx, url)
delay, err := proxy.URLTest(ctx, url, expectedStatus, C.DropHistory)
if ctx.Err() != nil {
render.Status(r, http.StatusGatewayTimeout)
render.JSON(w, r, ErrRequestTimeout)
@ -126,7 +134,11 @@ func getProxyDelay(w http.ResponseWriter, r *http.Request) {
if err != nil || delay == 0 {
render.Status(r, http.StatusServiceUnavailable)
render.JSON(w, r, newError("An error occurred in the delay test"))
if err != nil && delay != 0 {
render.JSON(w, r, err)
} else {
render.JSON(w, r, newError("An error occurred in the delay test"))
}
return
}

View file

@ -3,7 +3,6 @@ package common
import (
"fmt"
"strconv"
"strings"
"github.com/Dreamacro/clash/common/utils"
C "github.com/Dreamacro/clash/constant"
@ -11,10 +10,10 @@ import (
type Port struct {
*Base
adapter string
port string
ruleType C.RuleType
portList []utils.Range[uint16]
adapter string
port string
ruleType C.RuleType
portRanges utils.IntRanges[uint16]
}
func (p *Port) RuleType() C.RuleType {
@ -43,61 +42,25 @@ func (p *Port) Payload() string {
func (p *Port) matchPortReal(portRef string) bool {
port, _ := strconv.Atoi(portRef)
for _, pr := range p.portList {
if pr.Contains(uint16(port)) {
return true
}
}
return false
return p.portRanges.Check(uint16(port))
}
func NewPort(port string, adapter string, ruleType C.RuleType) (*Port, error) {
ports := strings.Split(port, "/")
if len(ports) > 28 {
return nil, fmt.Errorf("%s, too many ports to use, maximum support 28 ports", errPayload.Error())
portRanges, err := utils.NewIntRanges[uint16](port)
if err != nil {
return nil, fmt.Errorf("%w, %s", errPayload, err.Error())
}
var portRange []utils.Range[uint16]
for _, p := range ports {
if p == "" {
continue
}
subPorts := strings.Split(p, "-")
subPortsLen := len(subPorts)
if subPortsLen > 2 {
return nil, errPayload
}
portStart, err := strconv.ParseUint(strings.Trim(subPorts[0], "[ ]"), 10, 16)
if err != nil {
return nil, errPayload
}
switch subPortsLen {
case 1:
portRange = append(portRange, *utils.NewRange(uint16(portStart), uint16(portStart)))
case 2:
portEnd, err := strconv.ParseUint(strings.Trim(subPorts[1], "[ ]"), 10, 16)
if err != nil {
return nil, errPayload
}
portRange = append(portRange, *utils.NewRange(uint16(portStart), uint16(portEnd)))
}
}
if len(portRange) == 0 {
if len(portRanges) == 0 {
return nil, errPayload
}
return &Port{
Base: &Base{},
adapter: adapter,
port: port,
ruleType: ruleType,
portList: portRange,
Base: &Base{},
adapter: adapter,
port: port,
ruleType: ruleType,
portRanges: portRanges,
}, nil
}

View file

@ -2,57 +2,28 @@ package common
import (
"fmt"
"runtime"
"github.com/Dreamacro/clash/common/utils"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/log"
"runtime"
"strconv"
"strings"
)
type Uid struct {
*Base
uids []utils.Range[uint32]
uids utils.IntRanges[uint32]
oUid string
adapter string
}
func NewUid(oUid, adapter string) (*Uid, error) {
//if len(_uids) > 28 {
// return nil, fmt.Errorf("%s, too many uid to use, maximum support 28 uid", errPayload.Error())
//}
if !(runtime.GOOS == "linux" || runtime.GOOS == "android") {
return nil, fmt.Errorf("uid rule not support this platform")
}
var uidRange []utils.Range[uint32]
for _, u := range strings.Split(oUid, "/") {
if u == "" {
continue
}
subUids := strings.Split(u, "-")
subUidsLen := len(subUids)
if subUidsLen > 2 {
return nil, errPayload
}
uidStart, err := strconv.ParseUint(strings.Trim(subUids[0], "[ ]"), 10, 32)
if err != nil {
return nil, errPayload
}
switch subUidsLen {
case 1:
uidRange = append(uidRange, *utils.NewRange(uint32(uidStart), uint32(uidStart)))
case 2:
uidEnd, err := strconv.ParseUint(strings.Trim(subUids[1], "[ ]"), 10, 32)
if err != nil {
return nil, errPayload
}
uidRange = append(uidRange, *utils.NewRange(uint32(uidStart), uint32(uidEnd)))
}
uidRange, err := utils.NewIntRanges[uint32](oUid)
if err != nil {
return nil, fmt.Errorf("%w, %s", errPayload, err.Error())
}
if len(uidRange) == 0 {
@ -72,10 +43,8 @@ func (u *Uid) RuleType() C.RuleType {
func (u *Uid) Match(metadata *C.Metadata) (bool, string) {
if metadata.Uid != 0 {
for _, uid := range u.uids {
if uid.Contains(metadata.Uid) {
return true, u.adapter
}
if u.uids.Check(metadata.Uid) {
return true, u.adapter
}
}
log.Warnln("[UID] could not get uid from %s", metadata.String())