Improve: Dial would reset proxy alive status

This commit is contained in:
gVisor bot 2019-03-16 00:43:16 +08:00
parent 4456aaa4b5
commit 86e97cc596
9 changed files with 103 additions and 122 deletions

View file

@ -2,6 +2,9 @@ package adapters
import ( import (
"encoding/json" "encoding/json"
"net"
"net/http"
"time"
C "github.com/Dreamacro/clash/constant" C "github.com/Dreamacro/clash/constant"
) )
@ -19,8 +22,62 @@ func (b *Base) Type() C.AdapterType {
return b.tp return b.tp
} }
func (b *Base) Destroy() {}
func (b *Base) MarshalJSON() ([]byte, error) { func (b *Base) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]string{ return json.Marshal(map[string]string{
"type": b.Type().String(), "type": b.Type().String(),
}) })
} }
type Proxy struct {
C.ProxyAdapter
alive bool
}
func (p *Proxy) Alive() bool {
return p.alive
}
func (p *Proxy) Dial(metadata *C.Metadata) (net.Conn, error) {
conn, err := p.ProxyAdapter.Dial(metadata)
p.alive = err == nil
return conn, err
}
// URLTest get the delay for the specified URL
func (p *Proxy) URLTest(url string) (t int16, err error) {
addr, err := urlToMetadata(url)
if err != nil {
return
}
start := time.Now()
instance, err := p.ProxyAdapter.Dial(&addr)
if err != nil {
return
}
defer instance.Close()
transport := &http.Transport{
Dial: func(string, string) (net.Conn, error) {
return instance, nil
},
// from http.DefaultTransport
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
client := http.Client{Transport: transport}
resp, err := client.Get(url)
if err != nil {
return
}
resp.Body.Close()
t = int16(time.Since(start) / time.Millisecond)
return
}
func NewProxy(adapter C.ProxyAdapter) *Proxy {
return &Proxy{adapter, true}
}

View file

@ -10,14 +10,9 @@ import (
C "github.com/Dreamacro/clash/constant" C "github.com/Dreamacro/clash/constant"
) )
type proxy struct {
RawProxy C.Proxy
Valid bool
}
type Fallback struct { type Fallback struct {
*Base *Base
proxies []*proxy proxies []C.Proxy
rawURL string rawURL string
interval time.Duration interval time.Duration
done chan struct{} done chan struct{}
@ -31,36 +26,19 @@ type FallbackOption struct {
} }
func (f *Fallback) Now() string { func (f *Fallback) Now() string {
_, proxy := f.findNextValidProxy(0) proxy := f.findAliveProxy()
if proxy != nil { return proxy.Name()
return proxy.RawProxy.Name()
}
return f.proxies[0].RawProxy.Name()
} }
func (f *Fallback) Dial(metadata *C.Metadata) (net.Conn, error) { func (f *Fallback) Dial(metadata *C.Metadata) (net.Conn, error) {
idx := 0 proxy := f.findAliveProxy()
var proxy *proxy return proxy.Dial(metadata)
for {
idx, proxy = f.findNextValidProxy(idx)
if proxy == nil {
break
}
adapter, err := proxy.RawProxy.Dial(metadata)
if err != nil {
proxy.Valid = false
idx++
continue
}
return adapter, err
}
return f.proxies[0].RawProxy.Dial(metadata)
} }
func (f *Fallback) MarshalJSON() ([]byte, error) { func (f *Fallback) MarshalJSON() ([]byte, error) {
var all []string var all []string
for _, proxy := range f.proxies { for _, proxy := range f.proxies {
all = append(all, proxy.RawProxy.Name()) all = append(all, proxy.Name())
} }
return json.Marshal(map[string]interface{}{ return json.Marshal(map[string]interface{}{
"type": f.Type().String(), "type": f.Type().String(),
@ -69,7 +47,7 @@ func (f *Fallback) MarshalJSON() ([]byte, error) {
}) })
} }
func (f *Fallback) Close() { func (f *Fallback) Destroy() {
f.done <- struct{}{} f.done <- struct{}{}
} }
@ -87,13 +65,13 @@ Loop:
} }
} }
func (f *Fallback) findNextValidProxy(start int) (int, *proxy) { func (f *Fallback) findAliveProxy() C.Proxy {
for i := start; i < len(f.proxies); i++ { for _, proxy := range f.proxies {
if f.proxies[i].Valid { if proxy.Alive() {
return i, f.proxies[i] return proxy
} }
} }
return -1, nil return f.proxies[0]
} }
func (f *Fallback) validTest() { func (f *Fallback) validTest() {
@ -101,9 +79,8 @@ func (f *Fallback) validTest() {
wg.Add(len(f.proxies)) wg.Add(len(f.proxies))
for _, p := range f.proxies { for _, p := range f.proxies {
go func(p *proxy) { go func(p C.Proxy) {
_, err := DelayTest(p.RawProxy, f.rawURL) p.URLTest(f.rawURL)
p.Valid = err == nil
wg.Done() wg.Done()
}(p) }(p)
} }
@ -122,20 +99,13 @@ func NewFallback(option FallbackOption, proxies []C.Proxy) (*Fallback, error) {
} }
interval := time.Duration(option.Interval) * time.Second interval := time.Duration(option.Interval) * time.Second
warpperProxies := make([]*proxy, len(proxies))
for idx := range proxies {
warpperProxies[idx] = &proxy{
RawProxy: proxies[idx],
Valid: true,
}
}
Fallback := &Fallback{ Fallback := &Fallback{
Base: &Base{ Base: &Base{
name: option.Name, name: option.Name,
tp: C.Fallback, tp: C.Fallback,
}, },
proxies: warpperProxies, proxies: proxies,
rawURL: option.URL, rawURL: option.URL,
interval: interval, interval: interval,
done: make(chan struct{}), done: make(chan struct{}),

View file

@ -51,12 +51,12 @@ func jumpHash(key uint64, buckets int32) int32 {
func (lb *LoadBalance) Dial(metadata *C.Metadata) (net.Conn, error) { func (lb *LoadBalance) Dial(metadata *C.Metadata) (net.Conn, error) {
key := uint64(murmur3.Sum32([]byte(getKey(metadata)))) key := uint64(murmur3.Sum32([]byte(getKey(metadata))))
buckets := int32(len(lb.proxies)) buckets := int32(len(lb.proxies))
for i := 0; i < lb.maxRetry; i++ { for i := 0; i < lb.maxRetry; i, key = i+1, key+1 {
idx := jumpHash(key, buckets) idx := jumpHash(key, buckets)
if proxy, err := lb.proxies[idx].Dial(metadata); err == nil { proxy := lb.proxies[idx]
return proxy, nil if proxy.Alive() {
return proxy.Dial(metadata)
} }
key++
} }
return lb.proxies[0].Dial(metadata) return lb.proxies[0].Dial(metadata)

View file

@ -1,6 +1,7 @@
package adapters package adapters
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"net" "net"
@ -9,6 +10,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/Dreamacro/clash/common/picker"
C "github.com/Dreamacro/clash/constant" C "github.com/Dreamacro/clash/constant"
) )
@ -54,7 +56,7 @@ func (u *URLTest) MarshalJSON() ([]byte, error) {
}) })
} }
func (u *URLTest) Close() { func (u *URLTest) Destroy() {
u.done <- struct{}{} u.done <- struct{}{}
} }
@ -81,12 +83,12 @@ func (u *URLTest) speedTest() {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(u.proxies)) wg.Add(len(u.proxies))
c := make(chan interface{}) c := make(chan interface{})
fast := selectFast(c) fast := picker.SelectFast(context.Background(), c)
timer := time.NewTimer(u.interval) timer := time.NewTimer(u.interval)
for _, p := range u.proxies { for _, p := range u.proxies {
go func(p C.Proxy) { go func(p C.Proxy) {
_, err := DelayTest(p, u.rawURL) _, err := p.URLTest(u.rawURL)
if err == nil { if err == nil {
c <- p c <- p
} }

View file

@ -4,7 +4,6 @@ import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net" "net"
"net/http"
"net/url" "net/url"
"sync" "sync"
"time" "time"
@ -21,39 +20,6 @@ var (
once sync.Once once sync.Once
) )
// DelayTest get the delay for the specified URL
func DelayTest(proxy C.Proxy, url string) (t int16, err error) {
addr, err := urlToMetadata(url)
if err != nil {
return
}
start := time.Now()
instance, err := proxy.Dial(&addr)
if err != nil {
return
}
defer instance.Close()
transport := &http.Transport{
Dial: func(string, string) (net.Conn, error) {
return instance, nil
},
// from http.DefaultTransport
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
client := http.Client{Transport: transport}
resp, err := client.Get(url)
if err != nil {
return
}
resp.Body.Close()
t = int16(time.Since(start) / time.Millisecond)
return
}
func urlToMetadata(rawURL string) (addr C.Metadata, err error) { func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
u, err := url.Parse(rawURL) u, err := url.Parse(rawURL)
if err != nil { if err != nil {
@ -81,21 +47,6 @@ func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
return return
} }
func selectFast(in chan interface{}) chan interface{} {
out := make(chan interface{})
go func() {
p, open := <-in
if open {
out <- p
}
close(out)
for range in {
}
}()
return out
}
func tcpKeepAlive(c net.Conn) { func tcpKeepAlive(c net.Conn) {
if tcp, ok := c.(*net.TCPConn); ok { if tcp, ok := c.(*net.TCPConn); ok {
tcp.SetKeepAlive(true) tcp.SetKeepAlive(true)

View file

@ -184,8 +184,8 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) {
decoder := structure.NewDecoder(structure.Option{TagName: "proxy", WeaklyTypedInput: true}) decoder := structure.NewDecoder(structure.Option{TagName: "proxy", WeaklyTypedInput: true})
proxies["DIRECT"] = adapters.NewDirect() proxies["DIRECT"] = adapters.NewProxy(adapters.NewDirect())
proxies["REJECT"] = adapters.NewReject() proxies["REJECT"] = adapters.NewProxy(adapters.NewReject())
// parse proxy // parse proxy
for idx, mapping := range proxiesConfig { for idx, mapping := range proxiesConfig {
@ -194,7 +194,7 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) {
return nil, fmt.Errorf("Proxy %d missing type", idx) return nil, fmt.Errorf("Proxy %d missing type", idx)
} }
var proxy C.Proxy var proxy C.ProxyAdapter
err := fmt.Errorf("can't parse") err := fmt.Errorf("can't parse")
switch proxyType { switch proxyType {
case "ss": case "ss":
@ -236,7 +236,7 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) {
if _, exist := proxies[proxy.Name()]; exist { if _, exist := proxies[proxy.Name()]; exist {
return nil, fmt.Errorf("Proxy %s is the duplicate name", proxy.Name()) return nil, fmt.Errorf("Proxy %s is the duplicate name", proxy.Name())
} }
proxies[proxy.Name()] = proxy proxies[proxy.Name()] = adapters.NewProxy(proxy)
} }
// parse proxy group // parse proxy group
@ -250,7 +250,7 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) {
if _, exist := proxies[groupName]; exist { if _, exist := proxies[groupName]; exist {
return nil, fmt.Errorf("ProxyGroup %s: the duplicate name", groupName) return nil, fmt.Errorf("ProxyGroup %s: the duplicate name", groupName)
} }
var group C.Proxy var group C.ProxyAdapter
ps := []C.Proxy{} ps := []C.Proxy{}
err := fmt.Errorf("can't parse") err := fmt.Errorf("can't parse")
@ -307,7 +307,7 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("Proxy %s: %s", groupName, err.Error()) return nil, fmt.Errorf("Proxy %s: %s", groupName, err.Error())
} }
proxies[groupName] = group proxies[groupName] = adapters.NewProxy(group)
} }
ps := []C.Proxy{} ps := []C.Proxy{}
@ -315,7 +315,8 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) {
ps = append(ps, v) ps = append(ps, v)
} }
proxies["GLOBAL"], _ = adapters.NewSelector("GLOBAL", ps) global, _ := adapters.NewSelector("GLOBAL", ps)
proxies["GLOBAL"] = adapters.NewProxy(global)
return proxies, nil return proxies, nil
} }

View file

@ -23,13 +23,20 @@ type ServerAdapter interface {
Close() Close()
} }
type Proxy interface { type ProxyAdapter interface {
Name() string Name() string
Type() AdapterType Type() AdapterType
Dial(metadata *Metadata) (net.Conn, error) Dial(metadata *Metadata) (net.Conn, error)
Destroy()
MarshalJSON() ([]byte, error) MarshalJSON() ([]byte, error)
} }
type Proxy interface {
ProxyAdapter
Alive() bool
URLTest(url string) (int16, error)
}
// AdapterType is enum of adapter type // AdapterType is enum of adapter type
type AdapterType int type AdapterType int

View file

@ -1,7 +1,6 @@
package executor package executor
import ( import (
adapters "github.com/Dreamacro/clash/adapters/outbound"
"github.com/Dreamacro/clash/config" "github.com/Dreamacro/clash/config"
C "github.com/Dreamacro/clash/constant" C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/dns" "github.com/Dreamacro/clash/dns"
@ -66,14 +65,9 @@ func updateProxies(proxies map[string]C.Proxy) {
tunnel := T.Instance() tunnel := T.Instance()
oldProxies := tunnel.Proxies() oldProxies := tunnel.Proxies()
// close old goroutine // close proxy group goroutine
for _, proxy := range oldProxies { for _, proxy := range oldProxies {
switch raw := proxy.(type) { proxy.Destroy()
case *adapters.URLTest:
raw.Close()
case *adapters.Fallback:
raw.Close()
}
} }
tunnel.UpdateProxies(proxies) tunnel.UpdateProxies(proxies)

View file

@ -81,12 +81,11 @@ func updateProxy(w http.ResponseWriter, r *http.Request) {
return return
} }
proxy := r.Context().Value(CtxKeyProxy).(C.Proxy) proxy := r.Context().Value(CtxKeyProxy).(*A.Proxy)
selector, ok := proxy.ProxyAdapter.(*A.Selector)
selector, ok := proxy.(*A.Selector)
if !ok { if !ok {
render.Status(r, http.StatusBadRequest) render.Status(r, http.StatusBadRequest)
render.JSON(w, r, ErrBadRequest) render.JSON(w, r, newError("Must be a Selector"))
return return
} }
@ -113,7 +112,7 @@ func getProxyDelay(w http.ResponseWriter, r *http.Request) {
sigCh := make(chan int16) sigCh := make(chan int16)
go func() { go func() {
t, err := A.DelayTest(proxy, url) t, err := proxy.URLTest(url)
if err != nil { if err != nil {
sigCh <- 0 sigCh <- 0
} }