From 86e97cc59669bf0b9ffb190bea848b402aa8a113 Mon Sep 17 00:00:00 2001 From: gVisor bot Date: Sat, 16 Mar 2019 00:43:16 +0800 Subject: [PATCH] Improve: `Dial` would reset proxy alive status --- adapters/outbound/base.go | 57 ++++++++++++++++++++++++++++++ adapters/outbound/fallback.go | 60 ++++++++------------------------ adapters/outbound/loadbalance.go | 8 ++--- adapters/outbound/urltest.go | 8 +++-- adapters/outbound/util.go | 49 -------------------------- config/config.go | 15 ++++---- constant/adapters.go | 9 ++++- hub/executor/executor.go | 10 ++---- hub/route/proxies.go | 9 +++-- 9 files changed, 103 insertions(+), 122 deletions(-) diff --git a/adapters/outbound/base.go b/adapters/outbound/base.go index 9701d0e1..34bade77 100644 --- a/adapters/outbound/base.go +++ b/adapters/outbound/base.go @@ -2,6 +2,9 @@ package adapters import ( "encoding/json" + "net" + "net/http" + "time" C "github.com/Dreamacro/clash/constant" ) @@ -19,8 +22,62 @@ func (b *Base) Type() C.AdapterType { return b.tp } +func (b *Base) Destroy() {} + func (b *Base) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]string{ "type": b.Type().String(), }) } + +type Proxy struct { + C.ProxyAdapter + alive bool +} + +func (p *Proxy) Alive() bool { + return p.alive +} + +func (p *Proxy) Dial(metadata *C.Metadata) (net.Conn, error) { + conn, err := p.ProxyAdapter.Dial(metadata) + p.alive = err == nil + return conn, err +} + +// URLTest get the delay for the specified URL +func (p *Proxy) URLTest(url string) (t int16, err error) { + addr, err := urlToMetadata(url) + if err != nil { + return + } + + start := time.Now() + instance, err := p.ProxyAdapter.Dial(&addr) + if err != nil { + return + } + defer instance.Close() + transport := &http.Transport{ + Dial: func(string, string) (net.Conn, error) { + return instance, nil + }, + // from http.DefaultTransport + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } + client := http.Client{Transport: transport} + resp, err := client.Get(url) + if err != nil { + return + } + resp.Body.Close() + t = int16(time.Since(start) / time.Millisecond) + return +} + +func NewProxy(adapter C.ProxyAdapter) *Proxy { + return &Proxy{adapter, true} +} diff --git a/adapters/outbound/fallback.go b/adapters/outbound/fallback.go index 00073010..78f573ae 100644 --- a/adapters/outbound/fallback.go +++ b/adapters/outbound/fallback.go @@ -10,14 +10,9 @@ import ( C "github.com/Dreamacro/clash/constant" ) -type proxy struct { - RawProxy C.Proxy - Valid bool -} - type Fallback struct { *Base - proxies []*proxy + proxies []C.Proxy rawURL string interval time.Duration done chan struct{} @@ -31,36 +26,19 @@ type FallbackOption struct { } func (f *Fallback) Now() string { - _, proxy := f.findNextValidProxy(0) - if proxy != nil { - return proxy.RawProxy.Name() - } - return f.proxies[0].RawProxy.Name() + proxy := f.findAliveProxy() + return proxy.Name() } func (f *Fallback) Dial(metadata *C.Metadata) (net.Conn, error) { - idx := 0 - var proxy *proxy - for { - idx, proxy = f.findNextValidProxy(idx) - if proxy == nil { - break - } - adapter, err := proxy.RawProxy.Dial(metadata) - if err != nil { - proxy.Valid = false - idx++ - continue - } - return adapter, err - } - return f.proxies[0].RawProxy.Dial(metadata) + proxy := f.findAliveProxy() + return proxy.Dial(metadata) } func (f *Fallback) MarshalJSON() ([]byte, error) { var all []string for _, proxy := range f.proxies { - all = append(all, proxy.RawProxy.Name()) + all = append(all, proxy.Name()) } return json.Marshal(map[string]interface{}{ "type": f.Type().String(), @@ -69,7 +47,7 @@ func (f *Fallback) MarshalJSON() ([]byte, error) { }) } -func (f *Fallback) Close() { +func (f *Fallback) Destroy() { f.done <- struct{}{} } @@ -87,13 +65,13 @@ Loop: } } -func (f *Fallback) findNextValidProxy(start int) (int, *proxy) { - for i := start; i < len(f.proxies); i++ { - if f.proxies[i].Valid { - return i, f.proxies[i] +func (f *Fallback) findAliveProxy() C.Proxy { + for _, proxy := range f.proxies { + if proxy.Alive() { + return proxy } } - return -1, nil + return f.proxies[0] } func (f *Fallback) validTest() { @@ -101,9 +79,8 @@ func (f *Fallback) validTest() { wg.Add(len(f.proxies)) for _, p := range f.proxies { - go func(p *proxy) { - _, err := DelayTest(p.RawProxy, f.rawURL) - p.Valid = err == nil + go func(p C.Proxy) { + p.URLTest(f.rawURL) wg.Done() }(p) } @@ -122,20 +99,13 @@ func NewFallback(option FallbackOption, proxies []C.Proxy) (*Fallback, error) { } interval := time.Duration(option.Interval) * time.Second - warpperProxies := make([]*proxy, len(proxies)) - for idx := range proxies { - warpperProxies[idx] = &proxy{ - RawProxy: proxies[idx], - Valid: true, - } - } Fallback := &Fallback{ Base: &Base{ name: option.Name, tp: C.Fallback, }, - proxies: warpperProxies, + proxies: proxies, rawURL: option.URL, interval: interval, done: make(chan struct{}), diff --git a/adapters/outbound/loadbalance.go b/adapters/outbound/loadbalance.go index 0ba08731..9c183ccf 100644 --- a/adapters/outbound/loadbalance.go +++ b/adapters/outbound/loadbalance.go @@ -51,12 +51,12 @@ func jumpHash(key uint64, buckets int32) int32 { func (lb *LoadBalance) Dial(metadata *C.Metadata) (net.Conn, error) { key := uint64(murmur3.Sum32([]byte(getKey(metadata)))) buckets := int32(len(lb.proxies)) - for i := 0; i < lb.maxRetry; i++ { + for i := 0; i < lb.maxRetry; i, key = i+1, key+1 { idx := jumpHash(key, buckets) - if proxy, err := lb.proxies[idx].Dial(metadata); err == nil { - return proxy, nil + proxy := lb.proxies[idx] + if proxy.Alive() { + return proxy.Dial(metadata) } - key++ } return lb.proxies[0].Dial(metadata) diff --git a/adapters/outbound/urltest.go b/adapters/outbound/urltest.go index 5f5cfb4e..6cce88a3 100644 --- a/adapters/outbound/urltest.go +++ b/adapters/outbound/urltest.go @@ -1,6 +1,7 @@ package adapters import ( + "context" "encoding/json" "errors" "net" @@ -9,6 +10,7 @@ import ( "sync/atomic" "time" + "github.com/Dreamacro/clash/common/picker" C "github.com/Dreamacro/clash/constant" ) @@ -54,7 +56,7 @@ func (u *URLTest) MarshalJSON() ([]byte, error) { }) } -func (u *URLTest) Close() { +func (u *URLTest) Destroy() { u.done <- struct{}{} } @@ -81,12 +83,12 @@ func (u *URLTest) speedTest() { wg := sync.WaitGroup{} wg.Add(len(u.proxies)) c := make(chan interface{}) - fast := selectFast(c) + fast := picker.SelectFast(context.Background(), c) timer := time.NewTimer(u.interval) for _, p := range u.proxies { go func(p C.Proxy) { - _, err := DelayTest(p, u.rawURL) + _, err := p.URLTest(u.rawURL) if err == nil { c <- p } diff --git a/adapters/outbound/util.go b/adapters/outbound/util.go index 5c07e955..363bcc3e 100644 --- a/adapters/outbound/util.go +++ b/adapters/outbound/util.go @@ -4,7 +4,6 @@ import ( "crypto/tls" "fmt" "net" - "net/http" "net/url" "sync" "time" @@ -21,39 +20,6 @@ var ( once sync.Once ) -// DelayTest get the delay for the specified URL -func DelayTest(proxy C.Proxy, url string) (t int16, err error) { - addr, err := urlToMetadata(url) - if err != nil { - return - } - - start := time.Now() - instance, err := proxy.Dial(&addr) - if err != nil { - return - } - defer instance.Close() - transport := &http.Transport{ - Dial: func(string, string) (net.Conn, error) { - return instance, nil - }, - // from http.DefaultTransport - MaxIdleConns: 100, - IdleConnTimeout: 90 * time.Second, - TLSHandshakeTimeout: 10 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - } - client := http.Client{Transport: transport} - resp, err := client.Get(url) - if err != nil { - return - } - resp.Body.Close() - t = int16(time.Since(start) / time.Millisecond) - return -} - func urlToMetadata(rawURL string) (addr C.Metadata, err error) { u, err := url.Parse(rawURL) if err != nil { @@ -81,21 +47,6 @@ func urlToMetadata(rawURL string) (addr C.Metadata, err error) { return } -func selectFast(in chan interface{}) chan interface{} { - out := make(chan interface{}) - go func() { - p, open := <-in - if open { - out <- p - } - close(out) - for range in { - } - }() - - return out -} - func tcpKeepAlive(c net.Conn) { if tcp, ok := c.(*net.TCPConn); ok { tcp.SetKeepAlive(true) diff --git a/config/config.go b/config/config.go index 40fa889f..1c948952 100644 --- a/config/config.go +++ b/config/config.go @@ -184,8 +184,8 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) { decoder := structure.NewDecoder(structure.Option{TagName: "proxy", WeaklyTypedInput: true}) - proxies["DIRECT"] = adapters.NewDirect() - proxies["REJECT"] = adapters.NewReject() + proxies["DIRECT"] = adapters.NewProxy(adapters.NewDirect()) + proxies["REJECT"] = adapters.NewProxy(adapters.NewReject()) // parse proxy for idx, mapping := range proxiesConfig { @@ -194,7 +194,7 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) { return nil, fmt.Errorf("Proxy %d missing type", idx) } - var proxy C.Proxy + var proxy C.ProxyAdapter err := fmt.Errorf("can't parse") switch proxyType { case "ss": @@ -236,7 +236,7 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) { if _, exist := proxies[proxy.Name()]; exist { return nil, fmt.Errorf("Proxy %s is the duplicate name", proxy.Name()) } - proxies[proxy.Name()] = proxy + proxies[proxy.Name()] = adapters.NewProxy(proxy) } // parse proxy group @@ -250,7 +250,7 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) { if _, exist := proxies[groupName]; exist { return nil, fmt.Errorf("ProxyGroup %s: the duplicate name", groupName) } - var group C.Proxy + var group C.ProxyAdapter ps := []C.Proxy{} err := fmt.Errorf("can't parse") @@ -307,7 +307,7 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) { if err != nil { return nil, fmt.Errorf("Proxy %s: %s", groupName, err.Error()) } - proxies[groupName] = group + proxies[groupName] = adapters.NewProxy(group) } ps := []C.Proxy{} @@ -315,7 +315,8 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) { ps = append(ps, v) } - proxies["GLOBAL"], _ = adapters.NewSelector("GLOBAL", ps) + global, _ := adapters.NewSelector("GLOBAL", ps) + proxies["GLOBAL"] = adapters.NewProxy(global) return proxies, nil } diff --git a/constant/adapters.go b/constant/adapters.go index e3ce76e4..0fee6aea 100644 --- a/constant/adapters.go +++ b/constant/adapters.go @@ -23,13 +23,20 @@ type ServerAdapter interface { Close() } -type Proxy interface { +type ProxyAdapter interface { Name() string Type() AdapterType Dial(metadata *Metadata) (net.Conn, error) + Destroy() MarshalJSON() ([]byte, error) } +type Proxy interface { + ProxyAdapter + Alive() bool + URLTest(url string) (int16, error) +} + // AdapterType is enum of adapter type type AdapterType int diff --git a/hub/executor/executor.go b/hub/executor/executor.go index 390d4304..936bf4c6 100644 --- a/hub/executor/executor.go +++ b/hub/executor/executor.go @@ -1,7 +1,6 @@ package executor import ( - adapters "github.com/Dreamacro/clash/adapters/outbound" "github.com/Dreamacro/clash/config" C "github.com/Dreamacro/clash/constant" "github.com/Dreamacro/clash/dns" @@ -66,14 +65,9 @@ func updateProxies(proxies map[string]C.Proxy) { tunnel := T.Instance() oldProxies := tunnel.Proxies() - // close old goroutine + // close proxy group goroutine for _, proxy := range oldProxies { - switch raw := proxy.(type) { - case *adapters.URLTest: - raw.Close() - case *adapters.Fallback: - raw.Close() - } + proxy.Destroy() } tunnel.UpdateProxies(proxies) diff --git a/hub/route/proxies.go b/hub/route/proxies.go index 3da30a24..77255b7a 100644 --- a/hub/route/proxies.go +++ b/hub/route/proxies.go @@ -81,12 +81,11 @@ func updateProxy(w http.ResponseWriter, r *http.Request) { return } - proxy := r.Context().Value(CtxKeyProxy).(C.Proxy) - - selector, ok := proxy.(*A.Selector) + proxy := r.Context().Value(CtxKeyProxy).(*A.Proxy) + selector, ok := proxy.ProxyAdapter.(*A.Selector) if !ok { render.Status(r, http.StatusBadRequest) - render.JSON(w, r, ErrBadRequest) + render.JSON(w, r, newError("Must be a Selector")) return } @@ -113,7 +112,7 @@ func getProxyDelay(w http.ResponseWriter, r *http.Request) { sigCh := make(chan int16) go func() { - t, err := A.DelayTest(proxy, url) + t, err := proxy.URLTest(url) if err != nil { sigCh <- 0 }