fix: concurrent map writes #707

This commit is contained in:
Larvan2 2023-08-31 19:56:20 +08:00 committed by wwqgtxx
parent 54fee7bd3a
commit 9ceaf20584

View file

@ -10,6 +10,7 @@ import (
"net/netip"
"net/url"
"strconv"
"sync"
"time"
"github.com/Dreamacro/clash/common/atomic"
@ -36,7 +37,7 @@ type Proxy struct {
history *queue.Queue[C.DelayHistory]
alive *atomic.Bool
url string
extra map[string]*extraProxyState
extra sync.Map
}
// Alive implements C.Proxy
@ -46,10 +47,8 @@ func (p *Proxy) Alive() bool {
// AliveForTestUrl implements C.Proxy
func (p *Proxy) AliveForTestUrl(url string) bool {
if p.extra != nil {
if state, ok := p.extra[url]; ok {
return state.alive.Load()
}
if state, ok := p.extra.Load(url); ok {
return state.(*extraProxyState).alive.Load()
}
return p.alive.Load()
@ -88,16 +87,16 @@ func (p *Proxy) DelayHistory() []C.DelayHistory {
for _, item := range queueM {
histories = append(histories, item)
}
return histories
}
// DelayHistoryForTestUrl implements C.Proxy
func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory {
var queueM []C.DelayHistory
if p.extra != nil {
if state, ok := p.extra[url]; ok {
queueM = state.history.Copy()
}
if state, ok := p.extra.Load(url); ok {
queueM = state.(*extraProxyState).history.Copy()
}
if queueM == nil {
@ -112,19 +111,25 @@ func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory {
}
func (p *Proxy) ExtraDelayHistory() map[string][]C.DelayHistory {
extra := map[string][]C.DelayHistory{}
if p.extra != nil && len(p.extra) != 0 {
for testUrl, option := range p.extra {
histories := []C.DelayHistory{}
queueM := option.history.Copy()
for _, item := range queueM {
histories = append(histories, item)
}
extraHistory := map[string][]C.DelayHistory{}
extra[testUrl] = histories
p.extra.Range(func(k, v interface{}) bool {
testUrl := k.(string)
state := v.(*extraProxyState)
histories := []C.DelayHistory{}
queueM := state.history.Copy()
for _, item := range queueM {
histories = append(histories, item)
}
}
return extra
extraHistory[testUrl] = histories
return true
})
return extraHistory
}
// LastDelay return last history record. if proxy is not alive, return the max value of uint16.
@ -149,11 +154,9 @@ func (p *Proxy) LastDelayForTestUrl(url string) (delay uint16) {
alive := p.alive.Load()
history := p.history.Last()
if p.extra != nil {
if state, ok := p.extra[url]; ok {
alive = state.alive.Load()
history = state.history.Last()
}
if state, ok := p.extra.Load(url); ok {
alive = state.(*extraProxyState).alive.Load()
history = state.(*extraProxyState).history.Last()
}
if !alive {
@ -214,23 +217,19 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In
record.Delay = t
}
if p.extra == nil {
p.extra = map[string]*extraProxyState{}
}
state, ok := p.extra[url]
state, ok := p.extra.Load(url)
if !ok {
state = &extraProxyState{
history: queue.New[C.DelayHistory](defaultHistoriesNum),
alive: atomic.NewBool(true),
}
p.extra[url] = state
p.extra.Store(url, state)
}
state.alive.Store(alive)
state.history.Put(record)
if state.history.Len() > defaultHistoriesNum {
state.history.Pop()
state.(*extraProxyState).alive.Store(alive)
state.(*extraProxyState).history.Put(record)
if state.(*extraProxyState).history.Len() > defaultHistoriesNum {
state.(*extraProxyState).history.Pop()
}
default:
log.Debugln("health check result will be discarded, url: %s alive: %t, delay: %d", url, alive, t)
@ -307,7 +306,12 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In
}
func NewProxy(adapter C.ProxyAdapter) *Proxy {
return &Proxy{adapter, queue.New[C.DelayHistory](defaultHistoriesNum), atomic.NewBool(true), "", map[string]*extraProxyState{}}
return &Proxy{
ProxyAdapter: adapter,
history: queue.New[C.DelayHistory](defaultHistoriesNum),
alive: atomic.NewBool(true),
url: "",
extra: sync.Map{}}
}
func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
@ -350,14 +354,24 @@ func (p *Proxy) determineFinalStoreType(store C.DelayHistoryStoreType, url strin
return C.OriginalHistory
}
if p.extra == nil {
store = C.ExtraHistory
} else {
if _, ok := p.extra[url]; ok {
store = C.ExtraHistory
} else if len(p.extra) < 2*C.DefaultMaxHealthCheckUrlNum {
store = C.ExtraHistory
}
length := 0
p.extra.Range(func(_, _ interface{}) bool {
length++
return length < 2*C.DefaultMaxHealthCheckUrlNum
})
if length == 0 {
return C.ExtraHistory
}
_, ok := p.extra.Load(url)
if ok {
return C.ExtraHistory
}
if length < 2*C.DefaultMaxHealthCheckUrlNum {
return C.ExtraHistory
}
return store
}