When testing the delay through REST API, determine whether to store the delay data based on certain conditions instead of discarding it directly (#609)

This commit is contained in:
wzdnzd 2023-06-07 11:04:03 +08:00 committed by Larvan2
parent e0faffbfbd
commit da04e00767
4 changed files with 53 additions and 16 deletions

View file

@ -34,6 +34,7 @@ type Proxy struct {
C.ProxyAdapter C.ProxyAdapter
history *queue.Queue[C.DelayHistory] history *queue.Queue[C.DelayHistory]
alive *atomic.Bool alive *atomic.Bool
url string
extra map[string]*extraProxyState extra map[string]*extraProxyState
} }
@ -112,14 +113,14 @@ func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory {
func (p *Proxy) ExtraDelayHistory() map[string][]C.DelayHistory { func (p *Proxy) ExtraDelayHistory() map[string][]C.DelayHistory {
extra := map[string][]C.DelayHistory{} extra := map[string][]C.DelayHistory{}
if p.extra != nil && len(p.extra) != 0 { if p.extra != nil && len(p.extra) != 0 {
for url, option := range p.extra { for testUrl, option := range p.extra {
histories := []C.DelayHistory{} histories := []C.DelayHistory{}
queueM := option.history.Copy() queueM := option.history.Copy()
for _, item := range queueM { for _, item := range queueM {
histories = append(histories, item) histories = append(histories, item)
} }
extra[url] = histories extra[testUrl] = histories
} }
} }
return extra return extra
@ -187,6 +188,8 @@ func (p *Proxy) MarshalJSON() ([]byte, error) {
func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16], store C.DelayHistoryStoreType) (t uint16, err error) { func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16], store C.DelayHistoryStoreType) (t uint16, err error) {
defer func() { defer func() {
alive := err == nil alive := err == nil
store = p.determineFinalStoreType(store, url)
switch store { switch store {
case C.OriginalHistory: case C.OriginalHistory:
p.alive.Store(alive) p.alive.Store(alive)
@ -198,6 +201,11 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In
if p.history.Len() > defaultHistoriesNum { if p.history.Len() > defaultHistoriesNum {
p.history.Pop() p.history.Pop()
} }
// test URL configured by the proxy provider
if len(p.url) == 0 {
p.url = url
}
case C.ExtraHistory: case C.ExtraHistory:
record := C.DelayHistory{Time: time.Now()} record := C.DelayHistory{Time: time.Now()}
if alive { if alive {
@ -297,7 +305,7 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In
} }
func NewProxy(adapter C.ProxyAdapter) *Proxy { func NewProxy(adapter C.ProxyAdapter) *Proxy {
return &Proxy{adapter, queue.New[C.DelayHistory](defaultHistoriesNum), atomic.NewBool(true), map[string]*extraProxyState{}} return &Proxy{adapter, queue.New[C.DelayHistory](defaultHistoriesNum), atomic.NewBool(true), "", map[string]*extraProxyState{}}
} }
func urlToMetadata(rawURL string) (addr C.Metadata, err error) { func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
@ -326,3 +334,24 @@ func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
} }
return return
} }
func (p *Proxy) determineFinalStoreType(store C.DelayHistoryStoreType, url string) C.DelayHistoryStoreType {
if store != C.DropHistory {
return store
}
if len(p.url) == 0 || url == p.url {
return C.OriginalHistory
}
if p.extra == nil {
store = C.ExtraHistory
} else {
if _, ok := p.extra[url]; ok {
store = C.ExtraHistory
} else if len(p.extra) < 2*C.DefaultMaxHealthCheckUrlNum {
store = C.ExtraHistory
}
}
return store
}

View file

@ -17,7 +17,6 @@ var (
errFormat = errors.New("format error") errFormat = errors.New("format error")
errType = errors.New("unsupported type") errType = errors.New("unsupported type")
errMissProxy = errors.New("`use` or `proxies` missing") errMissProxy = errors.New("`use` or `proxies` missing")
errMissHealthCheck = errors.New("`url` or `interval` missing")
errDuplicateProvider = errors.New("duplicate provider name") errDuplicateProvider = errors.New("duplicate provider name")
) )
@ -81,11 +80,8 @@ func ParseProxyGroup(config map[string]any, proxyMap map[string]C.Proxy, provide
return nil, fmt.Errorf("%s: %w", groupName, errDuplicateProvider) return nil, fmt.Errorf("%s: %w", groupName, errDuplicateProvider)
} }
hc := provider.NewHealthCheck(ps, "", 0, true, nil) var url string
pd, err := provider.NewCompatibleProvider(groupName, ps, hc) var interval uint
if err != nil {
return nil, fmt.Errorf("%s: %w", groupName, err)
}
// select don't need health check // select don't need health check
if groupOption.Type != "select" && groupOption.Type != "relay" { if groupOption.Type != "select" && groupOption.Type != "relay" {
@ -97,7 +93,14 @@ func ParseProxyGroup(config map[string]any, proxyMap map[string]C.Proxy, provide
groupOption.Interval = 300 groupOption.Interval = 300
} }
pd.RegisterHealthCheckTask(groupOption.URL, expectedStatus, "", uint(groupOption.Interval)) url = groupOption.URL
interval = uint(groupOption.Interval)
}
hc := provider.NewHealthCheck(ps, url, interval, true, expectedStatus)
pd, err := provider.NewCompatibleProvider(groupName, ps, hc)
if err != nil {
return nil, fmt.Errorf("%s: %w", groupName, err)
} }
providers = append(providers, pd) providers = append(providers, pd)

View file

@ -18,7 +18,6 @@ import (
const ( const (
defaultURLTestTimeout = time.Second * 5 defaultURLTestTimeout = time.Second * 5
defaultMaxTestUrlNum = 6
) )
type HealthCheckOption struct { type HealthCheckOption struct {
@ -105,8 +104,8 @@ func (hc *HealthCheck) registerHealthCheckTask(url string, expectedStatus utils.
} }
// due to the time-consuming nature of health checks, a maximum of defaultMaxTestURLNum URLs can be set for testing // due to the time-consuming nature of health checks, a maximum of defaultMaxTestURLNum URLs can be set for testing
if len(hc.extra) > defaultMaxTestUrlNum { if len(hc.extra) > C.DefaultMaxHealthCheckUrlNum {
log.Debugln("skip add url: %s to health check because it has reached the maximum limit: %d", url, defaultMaxTestUrlNum) log.Debugln("skip add url: %s to health check because it has reached the maximum limit: %d", url, C.DefaultMaxHealthCheckUrlNum)
return return
} }
@ -220,6 +219,11 @@ func (hc *HealthCheck) close() {
} }
func NewHealthCheck(proxies []C.Proxy, url string, interval uint, lazy bool, expectedStatus utils.IntRanges[uint16]) *HealthCheck { func NewHealthCheck(proxies []C.Proxy, url string, interval uint, lazy bool, expectedStatus utils.IntRanges[uint16]) *HealthCheck {
if len(url) == 0 {
interval = 0
expectedStatus = nil
}
return &HealthCheck{ return &HealthCheck{
proxies: proxies, proxies: proxies,
url: url, url: url,

View file

@ -44,6 +44,7 @@ const (
DefaultTCPTimeout = 5 * time.Second DefaultTCPTimeout = 5 * time.Second
DefaultUDPTimeout = DefaultTCPTimeout DefaultUDPTimeout = DefaultTCPTimeout
DefaultTLSTimeout = DefaultTCPTimeout DefaultTLSTimeout = DefaultTCPTimeout
DefaultMaxHealthCheckUrlNum = 16
) )
var ErrNotSupport = errors.New("no support") var ErrNotSupport = errors.New("no support")