Improve: Dial would reset proxy alive status

This commit is contained in:
Dreamacro 2019-03-16 00:43:16 +08:00
parent 8c608f5d7a
commit acf55a7f64
9 changed files with 103 additions and 122 deletions

View file

@ -2,6 +2,9 @@ package adapters
import (
"encoding/json"
"net"
"net/http"
"time"
C "github.com/Dreamacro/clash/constant"
)
@ -19,8 +22,62 @@ func (b *Base) Type() C.AdapterType {
return b.tp
}
func (b *Base) Destroy() {}
func (b *Base) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]string{
"type": b.Type().String(),
})
}
type Proxy struct {
C.ProxyAdapter
alive bool
}
func (p *Proxy) Alive() bool {
return p.alive
}
func (p *Proxy) Dial(metadata *C.Metadata) (net.Conn, error) {
conn, err := p.ProxyAdapter.Dial(metadata)
p.alive = err == nil
return conn, err
}
// URLTest get the delay for the specified URL
func (p *Proxy) URLTest(url string) (t int16, err error) {
addr, err := urlToMetadata(url)
if err != nil {
return
}
start := time.Now()
instance, err := p.ProxyAdapter.Dial(&addr)
if err != nil {
return
}
defer instance.Close()
transport := &http.Transport{
Dial: func(string, string) (net.Conn, error) {
return instance, nil
},
// from http.DefaultTransport
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
client := http.Client{Transport: transport}
resp, err := client.Get(url)
if err != nil {
return
}
resp.Body.Close()
t = int16(time.Since(start) / time.Millisecond)
return
}
func NewProxy(adapter C.ProxyAdapter) *Proxy {
return &Proxy{adapter, true}
}

View file

@ -10,14 +10,9 @@ import (
C "github.com/Dreamacro/clash/constant"
)
type proxy struct {
RawProxy C.Proxy
Valid bool
}
type Fallback struct {
*Base
proxies []*proxy
proxies []C.Proxy
rawURL string
interval time.Duration
done chan struct{}
@ -31,36 +26,19 @@ type FallbackOption struct {
}
func (f *Fallback) Now() string {
_, proxy := f.findNextValidProxy(0)
if proxy != nil {
return proxy.RawProxy.Name()
}
return f.proxies[0].RawProxy.Name()
proxy := f.findAliveProxy()
return proxy.Name()
}
func (f *Fallback) Dial(metadata *C.Metadata) (net.Conn, error) {
idx := 0
var proxy *proxy
for {
idx, proxy = f.findNextValidProxy(idx)
if proxy == nil {
break
}
adapter, err := proxy.RawProxy.Dial(metadata)
if err != nil {
proxy.Valid = false
idx++
continue
}
return adapter, err
}
return f.proxies[0].RawProxy.Dial(metadata)
proxy := f.findAliveProxy()
return proxy.Dial(metadata)
}
func (f *Fallback) MarshalJSON() ([]byte, error) {
var all []string
for _, proxy := range f.proxies {
all = append(all, proxy.RawProxy.Name())
all = append(all, proxy.Name())
}
return json.Marshal(map[string]interface{}{
"type": f.Type().String(),
@ -69,7 +47,7 @@ func (f *Fallback) MarshalJSON() ([]byte, error) {
})
}
func (f *Fallback) Close() {
func (f *Fallback) Destroy() {
f.done <- struct{}{}
}
@ -87,13 +65,13 @@ Loop:
}
}
func (f *Fallback) findNextValidProxy(start int) (int, *proxy) {
for i := start; i < len(f.proxies); i++ {
if f.proxies[i].Valid {
return i, f.proxies[i]
func (f *Fallback) findAliveProxy() C.Proxy {
for _, proxy := range f.proxies {
if proxy.Alive() {
return proxy
}
}
return -1, nil
return f.proxies[0]
}
func (f *Fallback) validTest() {
@ -101,9 +79,8 @@ func (f *Fallback) validTest() {
wg.Add(len(f.proxies))
for _, p := range f.proxies {
go func(p *proxy) {
_, err := DelayTest(p.RawProxy, f.rawURL)
p.Valid = err == nil
go func(p C.Proxy) {
p.URLTest(f.rawURL)
wg.Done()
}(p)
}
@ -122,20 +99,13 @@ func NewFallback(option FallbackOption, proxies []C.Proxy) (*Fallback, error) {
}
interval := time.Duration(option.Interval) * time.Second
warpperProxies := make([]*proxy, len(proxies))
for idx := range proxies {
warpperProxies[idx] = &proxy{
RawProxy: proxies[idx],
Valid: true,
}
}
Fallback := &Fallback{
Base: &Base{
name: option.Name,
tp: C.Fallback,
},
proxies: warpperProxies,
proxies: proxies,
rawURL: option.URL,
interval: interval,
done: make(chan struct{}),

View file

@ -51,12 +51,12 @@ func jumpHash(key uint64, buckets int32) int32 {
func (lb *LoadBalance) Dial(metadata *C.Metadata) (net.Conn, error) {
key := uint64(murmur3.Sum32([]byte(getKey(metadata))))
buckets := int32(len(lb.proxies))
for i := 0; i < lb.maxRetry; i++ {
for i := 0; i < lb.maxRetry; i, key = i+1, key+1 {
idx := jumpHash(key, buckets)
if proxy, err := lb.proxies[idx].Dial(metadata); err == nil {
return proxy, nil
proxy := lb.proxies[idx]
if proxy.Alive() {
return proxy.Dial(metadata)
}
key++
}
return lb.proxies[0].Dial(metadata)

View file

@ -1,6 +1,7 @@
package adapters
import (
"context"
"encoding/json"
"errors"
"net"
@ -9,6 +10,7 @@ import (
"sync/atomic"
"time"
"github.com/Dreamacro/clash/common/picker"
C "github.com/Dreamacro/clash/constant"
)
@ -54,7 +56,7 @@ func (u *URLTest) MarshalJSON() ([]byte, error) {
})
}
func (u *URLTest) Close() {
func (u *URLTest) Destroy() {
u.done <- struct{}{}
}
@ -81,12 +83,12 @@ func (u *URLTest) speedTest() {
wg := sync.WaitGroup{}
wg.Add(len(u.proxies))
c := make(chan interface{})
fast := selectFast(c)
fast := picker.SelectFast(context.Background(), c)
timer := time.NewTimer(u.interval)
for _, p := range u.proxies {
go func(p C.Proxy) {
_, err := DelayTest(p, u.rawURL)
_, err := p.URLTest(u.rawURL)
if err == nil {
c <- p
}

View file

@ -4,7 +4,6 @@ import (
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
"sync"
"time"
@ -21,39 +20,6 @@ var (
once sync.Once
)
// DelayTest get the delay for the specified URL
func DelayTest(proxy C.Proxy, url string) (t int16, err error) {
addr, err := urlToMetadata(url)
if err != nil {
return
}
start := time.Now()
instance, err := proxy.Dial(&addr)
if err != nil {
return
}
defer instance.Close()
transport := &http.Transport{
Dial: func(string, string) (net.Conn, error) {
return instance, nil
},
// from http.DefaultTransport
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
client := http.Client{Transport: transport}
resp, err := client.Get(url)
if err != nil {
return
}
resp.Body.Close()
t = int16(time.Since(start) / time.Millisecond)
return
}
func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
u, err := url.Parse(rawURL)
if err != nil {
@ -81,21 +47,6 @@ func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
return
}
func selectFast(in chan interface{}) chan interface{} {
out := make(chan interface{})
go func() {
p, open := <-in
if open {
out <- p
}
close(out)
for range in {
}
}()
return out
}
func tcpKeepAlive(c net.Conn) {
if tcp, ok := c.(*net.TCPConn); ok {
tcp.SetKeepAlive(true)

View file

@ -184,8 +184,8 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) {
decoder := structure.NewDecoder(structure.Option{TagName: "proxy", WeaklyTypedInput: true})
proxies["DIRECT"] = adapters.NewDirect()
proxies["REJECT"] = adapters.NewReject()
proxies["DIRECT"] = adapters.NewProxy(adapters.NewDirect())
proxies["REJECT"] = adapters.NewProxy(adapters.NewReject())
// parse proxy
for idx, mapping := range proxiesConfig {
@ -194,7 +194,7 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) {
return nil, fmt.Errorf("Proxy %d missing type", idx)
}
var proxy C.Proxy
var proxy C.ProxyAdapter
err := fmt.Errorf("can't parse")
switch proxyType {
case "ss":
@ -236,7 +236,7 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) {
if _, exist := proxies[proxy.Name()]; exist {
return nil, fmt.Errorf("Proxy %s is the duplicate name", proxy.Name())
}
proxies[proxy.Name()] = proxy
proxies[proxy.Name()] = adapters.NewProxy(proxy)
}
// parse proxy group
@ -250,7 +250,7 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) {
if _, exist := proxies[groupName]; exist {
return nil, fmt.Errorf("ProxyGroup %s: the duplicate name", groupName)
}
var group C.Proxy
var group C.ProxyAdapter
ps := []C.Proxy{}
err := fmt.Errorf("can't parse")
@ -307,7 +307,7 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) {
if err != nil {
return nil, fmt.Errorf("Proxy %s: %s", groupName, err.Error())
}
proxies[groupName] = group
proxies[groupName] = adapters.NewProxy(group)
}
ps := []C.Proxy{}
@ -315,7 +315,8 @@ func parseProxies(cfg *rawConfig) (map[string]C.Proxy, error) {
ps = append(ps, v)
}
proxies["GLOBAL"], _ = adapters.NewSelector("GLOBAL", ps)
global, _ := adapters.NewSelector("GLOBAL", ps)
proxies["GLOBAL"] = adapters.NewProxy(global)
return proxies, nil
}

View file

@ -23,13 +23,20 @@ type ServerAdapter interface {
Close()
}
type Proxy interface {
type ProxyAdapter interface {
Name() string
Type() AdapterType
Dial(metadata *Metadata) (net.Conn, error)
Destroy()
MarshalJSON() ([]byte, error)
}
type Proxy interface {
ProxyAdapter
Alive() bool
URLTest(url string) (int16, error)
}
// AdapterType is enum of adapter type
type AdapterType int

View file

@ -1,7 +1,6 @@
package executor
import (
adapters "github.com/Dreamacro/clash/adapters/outbound"
"github.com/Dreamacro/clash/config"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/dns"
@ -66,14 +65,9 @@ func updateProxies(proxies map[string]C.Proxy) {
tunnel := T.Instance()
oldProxies := tunnel.Proxies()
// close old goroutine
// close proxy group goroutine
for _, proxy := range oldProxies {
switch raw := proxy.(type) {
case *adapters.URLTest:
raw.Close()
case *adapters.Fallback:
raw.Close()
}
proxy.Destroy()
}
tunnel.UpdateProxies(proxies)

View file

@ -81,12 +81,11 @@ func updateProxy(w http.ResponseWriter, r *http.Request) {
return
}
proxy := r.Context().Value(CtxKeyProxy).(C.Proxy)
selector, ok := proxy.(*A.Selector)
proxy := r.Context().Value(CtxKeyProxy).(*A.Proxy)
selector, ok := proxy.ProxyAdapter.(*A.Selector)
if !ok {
render.Status(r, http.StatusBadRequest)
render.JSON(w, r, ErrBadRequest)
render.JSON(w, r, newError("Must be a Selector"))
return
}
@ -113,7 +112,7 @@ func getProxyDelay(w http.ResponseWriter, r *http.Request) {
sigCh := make(chan int16)
go func() {
t, err := A.DelayTest(proxy, url)
t, err := proxy.URLTest(url)
if err != nil {
sigCh <- 0
}