From 6978963270b3afb6fe0736900ef71e51b779a89d Mon Sep 17 00:00:00 2001 From: gVisor bot Date: Sun, 17 Mar 2019 14:52:39 +0800 Subject: [PATCH] Feature: add delay history and improve url-test behavior --- adapters/outbound/base.go | 56 ++++++++++++++++++++++++++-- adapters/outbound/urltest.go | 19 +++++++++- common/queue/queue.go | 71 ++++++++++++++++++++++++++++++++++++ constant/adapters.go | 10 ++++- hub/route/proxies.go | 2 +- 5 files changed, 151 insertions(+), 7 deletions(-) create mode 100644 common/queue/queue.go diff --git a/adapters/outbound/base.go b/adapters/outbound/base.go index 34bade77..f234ced6 100644 --- a/adapters/outbound/base.go +++ b/adapters/outbound/base.go @@ -6,6 +6,7 @@ import ( "net/http" "time" + "github.com/Dreamacro/clash/common/queue" C "github.com/Dreamacro/clash/constant" ) @@ -32,7 +33,8 @@ func (b *Base) MarshalJSON() ([]byte, error) { type Proxy struct { C.ProxyAdapter - alive bool + history *queue.Queue + alive bool } func (p *Proxy) Alive() bool { @@ -45,8 +47,54 @@ func (p *Proxy) Dial(metadata *C.Metadata) (net.Conn, error) { return conn, err } +func (p *Proxy) DelayHistory() []C.DelayHistory { + queue := p.history.Copy() + histories := []C.DelayHistory{} + for _, item := range queue { + histories = append(histories, item.(C.DelayHistory)) + } + return histories +} + +func (p *Proxy) LastDelay() (delay uint16) { + head := p.history.First() + if head == nil { + delay-- + return + } + history := head.(C.DelayHistory) + if history.Delay == 0 { + delay-- + return + } + return history.Delay +} + +func (p *Proxy) MarshalJSON() ([]byte, error) { + inner, err := p.ProxyAdapter.MarshalJSON() + if err != nil { + return inner, err + } + + mapping := map[string]interface{}{} + json.Unmarshal(inner, &mapping) + mapping["history"] = p.DelayHistory() + return json.Marshal(mapping) +} + // URLTest get the delay for the specified URL -func (p *Proxy) URLTest(url string) (t int16, err error) { +func (p *Proxy) URLTest(url string) (t uint16, err error) { + defer func() { + record := C.DelayHistory{Time: time.Now()} + if err == nil { + record.Delay = t + } + p.history.Put(record) + if p.history.Len() > 10 { + p.history.Pop() + } + }() + addr, err := urlToMetadata(url) if err != nil { return @@ -74,10 +122,10 @@ func (p *Proxy) URLTest(url string) (t int16, err error) { return } resp.Body.Close() - t = int16(time.Since(start) / time.Millisecond) + t = uint16(time.Since(start) / time.Millisecond) return } func NewProxy(adapter C.ProxyAdapter) *Proxy { - return &Proxy{adapter, true} + return &Proxy{adapter, queue.New(10), true} } diff --git a/adapters/outbound/urltest.go b/adapters/outbound/urltest.go index 6cce88a3..84fdccbe 100644 --- a/adapters/outbound/urltest.go +++ b/adapters/outbound/urltest.go @@ -38,7 +38,7 @@ func (u *URLTest) Now() string { func (u *URLTest) Dial(metadata *C.Metadata) (net.Conn, error) { a, err := u.fast.Dial(metadata) if err != nil { - go u.speedTest() + u.fallback() } return a, err } @@ -74,6 +74,23 @@ Loop: } } +func (u *URLTest) fallback() { + fast := u.proxies[0] + min := fast.LastDelay() + for _, proxy := range u.proxies[1:] { + if !proxy.Alive() { + continue + } + + delay := proxy.LastDelay() + if delay < min { + fast = proxy + min = delay + } + } + u.fast = fast +} + func (u *URLTest) speedTest() { if atomic.AddInt32(&u.once, 1) != 1 { return diff --git a/common/queue/queue.go b/common/queue/queue.go new file mode 100644 index 00000000..7cb51b85 --- /dev/null +++ b/common/queue/queue.go @@ -0,0 +1,71 @@ +package queue + +import ( + "sync" +) + +// Queue is a simple concurrent safe queue +type Queue struct { + items []interface{} + lock sync.RWMutex +} + +// Put add the item to the queue. +func (q *Queue) Put(items ...interface{}) { + if len(items) == 0 { + return + } + + q.lock.Lock() + q.items = append(q.items, items...) + q.lock.Unlock() +} + +// Pop returns the head of items. +func (q *Queue) Pop() interface{} { + if len(q.items) == 0 { + return nil + } + + q.lock.Lock() + head := q.items[0] + q.items = q.items[1:] + q.lock.Unlock() + return head +} + +// First returns the head of items without deleting. +func (q *Queue) First() interface{} { + if len(q.items) == 0 { + return nil + } + + q.lock.RLock() + head := q.items[0] + q.lock.RUnlock() + return head +} + +// Copy get the copy of queue. +func (q *Queue) Copy() []interface{} { + items := []interface{}{} + q.lock.RLock() + items = append(items, q.items...) + q.lock.RUnlock() + return items +} + +// Len returns the number of items in this queue. +func (q *Queue) Len() int64 { + q.lock.Lock() + defer q.lock.Unlock() + + return int64(len(q.items)) +} + +// New is a constructor for a new concurrent safe queue. +func New(hint int64) *Queue { + return &Queue{ + items: make([]interface{}, 0, hint), + } +} diff --git a/constant/adapters.go b/constant/adapters.go index 0fee6aea..be755554 100644 --- a/constant/adapters.go +++ b/constant/adapters.go @@ -2,6 +2,7 @@ package constant import ( "net" + "time" ) // Adapter Type @@ -31,10 +32,17 @@ type ProxyAdapter interface { MarshalJSON() ([]byte, error) } +type DelayHistory struct { + Time time.Time `json:"time"` + Delay uint16 `json:"delay"` +} + type Proxy interface { ProxyAdapter Alive() bool - URLTest(url string) (int16, error) + DelayHistory() []DelayHistory + LastDelay() uint16 + URLTest(url string) (uint16, error) } // AdapterType is enum of adapter type diff --git a/hub/route/proxies.go b/hub/route/proxies.go index 77255b7a..71221917 100644 --- a/hub/route/proxies.go +++ b/hub/route/proxies.go @@ -110,7 +110,7 @@ func getProxyDelay(w http.ResponseWriter, r *http.Request) { proxy := r.Context().Value(CtxKeyProxy).(C.Proxy) - sigCh := make(chan int16) + sigCh := make(chan uint16) go func() { t, err := proxy.URLTest(url) if err != nil {