From 9ceaf20584830db6f0548f2e2e08bcbf988d5c58 Mon Sep 17 00:00:00 2001 From: Larvan2 <78135608+Larvan2@users.noreply.github.com> Date: Thu, 31 Aug 2023 19:56:20 +0800 Subject: [PATCH] fix: concurrent map writes #707 --- adapter/adapter.go | 102 ++++++++++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 44 deletions(-) diff --git a/adapter/adapter.go b/adapter/adapter.go index 6cc79c3a..c7351061 100644 --- a/adapter/adapter.go +++ b/adapter/adapter.go @@ -10,6 +10,7 @@ import ( "net/netip" "net/url" "strconv" + "sync" "time" "github.com/Dreamacro/clash/common/atomic" @@ -36,7 +37,7 @@ type Proxy struct { history *queue.Queue[C.DelayHistory] alive *atomic.Bool url string - extra map[string]*extraProxyState + extra sync.Map } // Alive implements C.Proxy @@ -46,10 +47,8 @@ func (p *Proxy) Alive() bool { // AliveForTestUrl implements C.Proxy func (p *Proxy) AliveForTestUrl(url string) bool { - if p.extra != nil { - if state, ok := p.extra[url]; ok { - return state.alive.Load() - } + if state, ok := p.extra.Load(url); ok { + return state.(*extraProxyState).alive.Load() } return p.alive.Load() @@ -88,16 +87,16 @@ func (p *Proxy) DelayHistory() []C.DelayHistory { for _, item := range queueM { histories = append(histories, item) } + return histories } // DelayHistoryForTestUrl implements C.Proxy func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory { var queueM []C.DelayHistory - if p.extra != nil { - if state, ok := p.extra[url]; ok { - queueM = state.history.Copy() - } + + if state, ok := p.extra.Load(url); ok { + queueM = state.(*extraProxyState).history.Copy() } if queueM == nil { @@ -112,19 +111,25 @@ func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory { } func (p *Proxy) ExtraDelayHistory() map[string][]C.DelayHistory { - extra := map[string][]C.DelayHistory{} - if p.extra != nil && len(p.extra) != 0 { - for testUrl, option := range p.extra { - histories := []C.DelayHistory{} - queueM := option.history.Copy() - for _, item := range queueM { - histories = append(histories, item) - } + extraHistory := map[string][]C.DelayHistory{} - extra[testUrl] = histories + p.extra.Range(func(k, v interface{}) bool { + + testUrl := k.(string) + state := v.(*extraProxyState) + + histories := []C.DelayHistory{} + queueM := state.history.Copy() + + for _, item := range queueM { + histories = append(histories, item) } - } - return extra + + extraHistory[testUrl] = histories + + return true + }) + return extraHistory } // LastDelay return last history record. if proxy is not alive, return the max value of uint16. @@ -149,11 +154,9 @@ func (p *Proxy) LastDelayForTestUrl(url string) (delay uint16) { alive := p.alive.Load() history := p.history.Last() - if p.extra != nil { - if state, ok := p.extra[url]; ok { - alive = state.alive.Load() - history = state.history.Last() - } + if state, ok := p.extra.Load(url); ok { + alive = state.(*extraProxyState).alive.Load() + history = state.(*extraProxyState).history.Last() } if !alive { @@ -214,23 +217,19 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In record.Delay = t } - if p.extra == nil { - p.extra = map[string]*extraProxyState{} - } - - state, ok := p.extra[url] + state, ok := p.extra.Load(url) if !ok { state = &extraProxyState{ history: queue.New[C.DelayHistory](defaultHistoriesNum), alive: atomic.NewBool(true), } - p.extra[url] = state + p.extra.Store(url, state) } - state.alive.Store(alive) - state.history.Put(record) - if state.history.Len() > defaultHistoriesNum { - state.history.Pop() + state.(*extraProxyState).alive.Store(alive) + state.(*extraProxyState).history.Put(record) + if state.(*extraProxyState).history.Len() > defaultHistoriesNum { + state.(*extraProxyState).history.Pop() } default: log.Debugln("health check result will be discarded, url: %s alive: %t, delay: %d", url, alive, t) @@ -307,7 +306,12 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In } func NewProxy(adapter C.ProxyAdapter) *Proxy { - return &Proxy{adapter, queue.New[C.DelayHistory](defaultHistoriesNum), atomic.NewBool(true), "", map[string]*extraProxyState{}} + return &Proxy{ + ProxyAdapter: adapter, + history: queue.New[C.DelayHistory](defaultHistoriesNum), + alive: atomic.NewBool(true), + url: "", + extra: sync.Map{}} } func urlToMetadata(rawURL string) (addr C.Metadata, err error) { @@ -350,14 +354,24 @@ func (p *Proxy) determineFinalStoreType(store C.DelayHistoryStoreType, url strin return C.OriginalHistory } - if p.extra == nil { - store = C.ExtraHistory - } else { - if _, ok := p.extra[url]; ok { - store = C.ExtraHistory - } else if len(p.extra) < 2*C.DefaultMaxHealthCheckUrlNum { - store = C.ExtraHistory - } + length := 0 + p.extra.Range(func(_, _ interface{}) bool { + length++ + return length < 2*C.DefaultMaxHealthCheckUrlNum + }) + + if length == 0 { + return C.ExtraHistory } + + _, ok := p.extra.Load(url) + if ok { + return C.ExtraHistory + } + + if length < 2*C.DefaultMaxHealthCheckUrlNum { + return C.ExtraHistory + } + return store }