healthcheck latency of the provider is also stored in the extra, without compromising rest api compatibility

This commit is contained in:
tommytag 2023-12-06 17:11:24 +08:00
parent 2d7538aca6
commit f63acc0202
7 changed files with 69 additions and 109 deletions

View file

@ -26,21 +26,20 @@ const (
defaultHistoriesNum = 10
)
type extraProxyState struct {
history *queue.Queue[C.DelayHistory]
type internalProxyState struct {
alive atomic.Bool
history *queue.Queue[C.DelayHistory]
}
type Proxy struct {
C.ProxyAdapter
history *queue.Queue[C.DelayHistory]
alive atomic.Bool
url string
extra *xsync.MapOf[string, *extraProxyState]
extra *xsync.MapOf[string, *internalProxyState]
}
// AliveForTestUrl implements C.Proxy
func (p *Proxy) AliveForTestUrl(url string) bool {
// Alive implements C.Proxy
func (p *Proxy) Alive(url string) bool {
if state, ok := p.extra.Load(url); ok {
return state.alive.Load()
}
@ -48,10 +47,6 @@ func (p *Proxy) AliveForTestUrl(url string) bool {
return p.alive.Load()
}
func (p *Proxy) OriginalHealthCheckUrl(url string) {
p.url = url
}
// Dial implements C.Proxy
func (p *Proxy) Dial(metadata *C.Metadata) (C.Conn, error) {
ctx, cancel := context.WithTimeout(context.Background(), C.DefaultTCPTimeout)
@ -81,7 +76,7 @@ func (p *Proxy) ListenPacketContext(ctx context.Context, metadata *C.Metadata, o
// DelayHistory implements C.Proxy
func (p *Proxy) DelayHistory() []C.DelayHistory {
queueM := p.history.Copy()
histories := []C.DelayHistory{}
var histories []C.DelayHistory
for _, item := range queueM {
histories = append(histories, item)
}
@ -97,71 +92,52 @@ func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory {
queueM = state.history.Copy()
}
if queueM == nil {
queueM = p.history.Copy()
}
histories := []C.DelayHistory{}
var histories []C.DelayHistory
for _, item := range queueM {
histories = append(histories, item)
}
return histories
}
func (p *Proxy) ExtraDelayHistory() map[string][]C.DelayHistory {
extraHistory := map[string][]C.DelayHistory{}
p.extra.Range(func(k string, v *extraProxyState) bool {
// ExtraDelayHistories return all delay histories for each test URL
// implements C.Proxy
func (p *Proxy) ExtraDelayHistories() map[string]C.ProxyState {
histories := map[string]C.ProxyState{}
p.extra.Range(func(k string, v *internalProxyState) bool {
testUrl := k
state := v
histories := []C.DelayHistory{}
queueM := state.history.Copy()
var history []C.DelayHistory
for _, item := range queueM {
histories = append(histories, item)
history = append(history, item)
}
extraHistory[testUrl] = histories
histories[testUrl] = C.ProxyState{
Alive: state.alive.Load(),
History: history,
}
return true
})
return extraHistory
return histories
}
// LastDelay return last history record. if proxy is not alive, return the max value of uint16.
// LastDelayForTestUrl return last history record of the specified URL. if proxy is not alive, return the max value of uint16.
// implements C.Proxy
func (p *Proxy) LastDelay() (delay uint16) {
var max uint16 = 0xffff
if !p.alive.Load() {
return max
}
history := p.history.Last()
if history.Delay == 0 {
return max
}
return history.Delay
}
// LastDelayForTestUrl implements C.Proxy
func (p *Proxy) LastDelayForTestUrl(url string) (delay uint16) {
var max uint16 = 0xffff
alive := p.alive.Load()
history := p.history.Last()
alive := false
var history C.DelayHistory
if state, ok := p.extra.Load(url); ok {
alive = state.alive.Load()
history = state.history.Last()
}
if !alive {
return max
}
if history.Delay == 0 {
if !alive || history.Delay == 0 {
return max
}
return history.Delay
@ -177,8 +153,8 @@ func (p *Proxy) MarshalJSON() ([]byte, error) {
mapping := map[string]any{}
_ = json.Unmarshal(inner, &mapping)
mapping["history"] = p.DelayHistory()
mapping["extra"] = p.ExtraDelayHistory()
mapping["alive"] = p.AliveForTestUrl(p.url)
mapping["extra"] = p.ExtraDelayHistories()
mapping["alive"] = p.alive.Load()
mapping["name"] = p.Name()
mapping["udp"] = p.SupportUDP()
mapping["xudp"] = p.SupportXUDP()
@ -191,43 +167,32 @@ func (p *Proxy) MarshalJSON() ([]byte, error) {
func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16]) (t uint16, err error) {
defer func() {
alive := err == nil
if len(p.url) == 0 || url == p.url {
p.alive.Store(alive)
record := C.DelayHistory{Time: time.Now()}
if alive {
record.Delay = t
}
p.history.Put(record)
if p.history.Len() > defaultHistoriesNum {
p.history.Pop()
}
// test URL configured by the proxy provider
if len(p.url) == 0 {
p.url = url
}
} else {
record := C.DelayHistory{Time: time.Now()}
if alive {
record.Delay = t
}
state, ok := p.extra.Load(url)
if !ok {
state = &extraProxyState{
history: queue.New[C.DelayHistory](defaultHistoriesNum),
alive: atomic.NewBool(true),
}
p.extra.Store(url, state)
}
state.alive.Store(alive)
state.history.Put(record)
if state.history.Len() > defaultHistoriesNum {
state.history.Pop()
}
record := C.DelayHistory{Time: time.Now()}
if alive {
record.Delay = t
}
p.alive.Store(alive)
p.history.Put(record)
if p.history.Len() > defaultHistoriesNum {
p.history.Pop()
}
state, ok := p.extra.Load(url)
if !ok {
state = &internalProxyState{
history: queue.New[C.DelayHistory](defaultHistoriesNum),
alive: atomic.NewBool(true),
}
p.extra.Store(url, state)
}
state.alive.Store(alive)
state.history.Put(record)
if state.history.Len() > defaultHistoriesNum {
state.history.Pop()
}
}()
unifiedDelay := UnifiedDelay.Load()
@ -304,8 +269,7 @@ func NewProxy(adapter C.ProxyAdapter) *Proxy {
ProxyAdapter: adapter,
history: queue.New[C.DelayHistory](defaultHistoriesNum),
alive: atomic.NewBool(true),
url: "",
extra: xsync.NewMapOf[string, *extraProxyState]()}
extra: xsync.NewMapOf[string, *internalProxyState]()}
}
func urlToMetadata(rawURL string) (addr C.Metadata, err error) {

View file

@ -102,12 +102,12 @@ func (f *Fallback) findAliveProxy(touch bool) C.Proxy {
proxies := f.GetProxies(touch)
for _, proxy := range proxies {
if len(f.selected) == 0 {
if proxy.AliveForTestUrl(f.testUrl) {
if proxy.Alive(f.testUrl) {
return proxy
}
} else {
if proxy.Name() == f.selected {
if proxy.AliveForTestUrl(f.testUrl) {
if proxy.Alive(f.testUrl) {
return proxy
} else {
f.selected = ""
@ -133,7 +133,7 @@ func (f *Fallback) Set(name string) error {
}
f.selected = name
if !p.AliveForTestUrl(f.testUrl) {
if !p.Alive(f.testUrl) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(5000))
defer cancel()
expectedStatus, _ := utils.NewIntRanges[uint16](f.expectedStatus)

View file

@ -150,8 +150,7 @@ func strategyRoundRobin(url string) strategyFn {
for ; i < length; i++ {
id := (idx + i) % length
proxy := proxies[id]
// if proxy.Alive() {
if proxy.AliveForTestUrl(url) {
if proxy.Alive(url) {
i++
return proxy
}
@ -169,16 +168,14 @@ func strategyConsistentHashing(url string) strategyFn {
for i := 0; i < maxRetry; i, key = i+1, key+1 {
idx := jumpHash(key, buckets)
proxy := proxies[idx]
// if proxy.Alive() {
if proxy.AliveForTestUrl(url) {
if proxy.Alive(url) {
return proxy
}
}
// when availability is poor, traverse the entire list to get the available nodes
for _, proxy := range proxies {
// if proxy.Alive() {
if proxy.AliveForTestUrl(url) {
if proxy.Alive(url) {
return proxy
}
}
@ -204,8 +201,7 @@ func strategyStickySessions(url string) strategyFn {
nowIdx := idx
for i := 1; i < maxRetry; i++ {
proxy := proxies[nowIdx]
// if proxy.Alive() {
if proxy.AliveForTestUrl(url) {
if proxy.Alive(url) {
if nowIdx != idx {
lruCache.Delete(key)
lruCache.Set(key, nowIdx)

View file

@ -101,7 +101,7 @@ func (u *URLTest) fast(touch bool) C.Proxy {
proxies := u.GetProxies(touch)
if u.selected != "" {
for _, proxy := range proxies {
if !proxy.AliveForTestUrl(u.testUrl) {
if !proxy.Alive(u.testUrl) {
continue
}
if proxy.Name() == u.selected {
@ -121,7 +121,7 @@ func (u *URLTest) fast(touch bool) C.Proxy {
fastNotExist = false
}
if !proxy.AliveForTestUrl(u.testUrl) {
if !proxy.Alive(u.testUrl) {
continue
}
@ -133,7 +133,7 @@ func (u *URLTest) fast(touch bool) C.Proxy {
}
// tolerance
if u.fastNode == nil || fastNotExist || !u.fastNode.AliveForTestUrl(u.testUrl) || u.fastNode.LastDelayForTestUrl(u.testUrl) > fast.LastDelayForTestUrl(u.testUrl)+u.tolerance {
if u.fastNode == nil || fastNotExist || !u.fastNode.Alive(u.testUrl) || u.fastNode.LastDelayForTestUrl(u.testUrl) > fast.LastDelayForTestUrl(u.testUrl)+u.tolerance {
u.fastNode = fast
}
return u.fastNode, nil

View file

@ -202,7 +202,7 @@ func (hc *HealthCheck) execute(b *batch.Batch[bool], url, uid string, option *ex
defer cancel()
log.Debugln("Health Checking, proxy: %s, url: %s, id: {%s}", p.Name(), url, uid)
_, _ = p.URLTest(ctx, url, expectedStatus)
log.Debugln("Health Checked, proxy: %s, url: %s, alive: %t, delay: %d ms uid: {%s}", p.Name(), url, p.AliveForTestUrl(url), p.LastDelayForTestUrl(url), uid)
log.Debugln("Health Checked, proxy: %s, url: %s, alive: %t, delay: %d ms uid: {%s}", p.Name(), url, p.Alive(url), p.LastDelayForTestUrl(url), uid)
return false, nil
})
}

View file

@ -114,10 +114,6 @@ func (pp *proxySetProvider) RegisterHealthCheckTask(url string, expectedStatus u
func (pp *proxySetProvider) setProxies(proxies []C.Proxy) {
pp.proxies = proxies
for _, proxy := range pp.proxies {
proxy.OriginalHealthCheckUrl(pp.healthCheck.url)
}
pp.healthCheck.setProxy(proxies)
if pp.healthCheck.auto() {
go pp.healthCheck.check()

View file

@ -147,15 +147,19 @@ type DelayHistory struct {
Delay uint16 `json:"delay"`
}
type ProxyState struct {
Alive bool `json:"alive"`
History []DelayHistory `json:"history"`
}
type DelayHistoryStoreType int
type Proxy interface {
ProxyAdapter
AliveForTestUrl(url string) bool
Alive(url string) bool
DelayHistory() []DelayHistory
ExtraDelayHistory() map[string][]DelayHistory
ExtraDelayHistories() map[string]ProxyState
LastDelayForTestUrl(url string) uint16
OriginalHealthCheckUrl(url string)
URLTest(ctx context.Context, url string, expectedStatus utils.IntRanges[uint16]) (uint16, error)
// Deprecated: use DialContext instead.