From b926f4cf09d6fc4a7983bf830628d9d70e769152 Mon Sep 17 00:00:00 2001 From: Yunhao Zhang Date: Fri, 9 Aug 2019 01:28:37 +0800 Subject: [PATCH] Feature: trace adapters when dialing (#170) --- adapters/outbound/base.go | 38 +++++++++++++++++++++++++-- adapters/outbound/direct.go | 8 +++--- adapters/outbound/fallback.go | 16 +++++++++--- adapters/outbound/http.go | 4 +-- adapters/outbound/loadbalance.go | 23 +++++++++++++---- adapters/outbound/reject.go | 4 +-- adapters/outbound/selector.go | 16 +++++++++--- adapters/outbound/shadowsocks.go | 8 +++--- adapters/outbound/socks5.go | 8 +++--- adapters/outbound/urltest.go | 12 ++++++--- adapters/outbound/vmess.go | 8 +++--- constant/adapters.go | 33 ++++++++++++++++++++++-- tunnel/tunnel.go | 44 ++++++++++++++++++++------------ 13 files changed, 166 insertions(+), 56 deletions(-) diff --git a/adapters/outbound/base.go b/adapters/outbound/base.go index 6fb18069..3254209b 100644 --- a/adapters/outbound/base.go +++ b/adapters/outbound/base.go @@ -30,7 +30,7 @@ func (b *Base) Type() C.AdapterType { return b.tp } -func (b *Base) DialUDP(metadata *C.Metadata) (net.PacketConn, net.Addr, error) { +func (b *Base) DialUDP(metadata *C.Metadata) (C.PacketConn, net.Addr, error) { return nil, nil, errors.New("no support") } @@ -46,6 +46,40 @@ func (b *Base) MarshalJSON() ([]byte, error) { }) } +type conn struct { + net.Conn + chain C.Chain +} + +func (c *conn) Chains() C.Chain { + return c.chain +} + +func (c *conn) AppendToChains(a C.ProxyAdapter) { + c.chain = append(c.chain, a.Name()) +} + +func newConn(c net.Conn, a C.ProxyAdapter) C.Conn { + return &conn{c, []string{a.Name()}} +} + +type packetConn struct { + net.PacketConn + chain C.Chain +} + +func (c *packetConn) Chains() C.Chain { + return c.chain +} + +func (c *packetConn) AppendToChains(a C.ProxyAdapter) { + c.chain = append(c.chain, a.Name()) +} + +func newPacketConn(c net.PacketConn, a C.ProxyAdapter) C.PacketConn { + return &packetConn{c, []string{a.Name()}} +} + type Proxy struct { C.ProxyAdapter history *queue.Queue @@ -56,7 +90,7 @@ func (p *Proxy) Alive() bool { return p.alive } -func (p *Proxy) Dial(metadata *C.Metadata) (net.Conn, error) { +func (p *Proxy) Dial(metadata *C.Metadata) (C.Conn, error) { conn, err := p.ProxyAdapter.Dial(metadata) if err != nil { p.alive = false diff --git a/adapters/outbound/direct.go b/adapters/outbound/direct.go index 491c170e..0f3e6dc4 100644 --- a/adapters/outbound/direct.go +++ b/adapters/outbound/direct.go @@ -10,7 +10,7 @@ type Direct struct { *Base } -func (d *Direct) Dial(metadata *C.Metadata) (net.Conn, error) { +func (d *Direct) Dial(metadata *C.Metadata) (C.Conn, error) { address := net.JoinHostPort(metadata.Host, metadata.DstPort) if metadata.DstIP != nil { address = net.JoinHostPort(metadata.DstIP.String(), metadata.DstPort) @@ -21,10 +21,10 @@ func (d *Direct) Dial(metadata *C.Metadata) (net.Conn, error) { return nil, err } tcpKeepAlive(c) - return c, nil + return newConn(c, d), nil } -func (d *Direct) DialUDP(metadata *C.Metadata) (net.PacketConn, net.Addr, error) { +func (d *Direct) DialUDP(metadata *C.Metadata) (C.PacketConn, net.Addr, error) { pc, err := net.ListenPacket("udp", "") if err != nil { return nil, nil, err @@ -34,7 +34,7 @@ func (d *Direct) DialUDP(metadata *C.Metadata) (net.PacketConn, net.Addr, error) if err != nil { return nil, nil, err } - return pc, addr, nil + return newPacketConn(pc, d), addr, nil } func NewDirect() *Direct { diff --git a/adapters/outbound/fallback.go b/adapters/outbound/fallback.go index 67a6ab08..9b44edeb 100644 --- a/adapters/outbound/fallback.go +++ b/adapters/outbound/fallback.go @@ -31,14 +31,22 @@ func (f *Fallback) Now() string { return proxy.Name() } -func (f *Fallback) Dial(metadata *C.Metadata) (net.Conn, error) { +func (f *Fallback) Dial(metadata *C.Metadata) (C.Conn, error) { proxy := f.findAliveProxy() - return proxy.Dial(metadata) + c, err := proxy.Dial(metadata) + if err == nil { + c.AppendToChains(f) + } + return c, err } -func (f *Fallback) DialUDP(metadata *C.Metadata) (net.PacketConn, net.Addr, error) { +func (f *Fallback) DialUDP(metadata *C.Metadata) (C.PacketConn, net.Addr, error) { proxy := f.findAliveProxy() - return proxy.DialUDP(metadata) + pc, addr, err := proxy.DialUDP(metadata) + if err == nil { + pc.AppendToChains(f) + } + return pc, addr, err } func (f *Fallback) SupportUDP() bool { diff --git a/adapters/outbound/http.go b/adapters/outbound/http.go index a617de5b..9b3791f0 100644 --- a/adapters/outbound/http.go +++ b/adapters/outbound/http.go @@ -35,7 +35,7 @@ type HttpOption struct { SkipCertVerify bool `proxy:"skip-cert-verify,omitempty"` } -func (h *Http) Dial(metadata *C.Metadata) (net.Conn, error) { +func (h *Http) Dial(metadata *C.Metadata) (C.Conn, error) { c, err := dialTimeout("tcp", h.addr, tcpTimeout) if err == nil && h.tls { cc := tls.Client(c, h.tlsConfig) @@ -51,7 +51,7 @@ func (h *Http) Dial(metadata *C.Metadata) (net.Conn, error) { return nil, err } - return c, nil + return newConn(c, h), nil } func (h *Http) shakeHand(metadata *C.Metadata, rw io.ReadWriter) error { diff --git a/adapters/outbound/loadbalance.go b/adapters/outbound/loadbalance.go index 9418863f..c719e8b8 100644 --- a/adapters/outbound/loadbalance.go +++ b/adapters/outbound/loadbalance.go @@ -54,21 +54,34 @@ func jumpHash(key uint64, buckets int32) int32 { return int32(b) } -func (lb *LoadBalance) Dial(metadata *C.Metadata) (net.Conn, error) { +func (lb *LoadBalance) Dial(metadata *C.Metadata) (c C.Conn, err error) { + defer func() { + if err == nil { + c.AppendToChains(lb) + } + }() + key := uint64(murmur3.Sum32([]byte(getKey(metadata)))) buckets := int32(len(lb.proxies)) for i := 0; i < lb.maxRetry; i, key = i+1, key+1 { idx := jumpHash(key, buckets) proxy := lb.proxies[idx] if proxy.Alive() { - return proxy.Dial(metadata) + c, err = proxy.Dial(metadata) + return } } - - return lb.proxies[0].Dial(metadata) + c, err = lb.proxies[0].Dial(metadata) + return } -func (lb *LoadBalance) DialUDP(metadata *C.Metadata) (net.PacketConn, net.Addr, error) { +func (lb *LoadBalance) DialUDP(metadata *C.Metadata) (pc C.PacketConn, addr net.Addr, err error) { + defer func() { + if err == nil { + pc.AppendToChains(lb) + } + }() + key := uint64(murmur3.Sum32([]byte(getKey(metadata)))) buckets := int32(len(lb.proxies)) for i := 0; i < lb.maxRetry; i, key = i+1, key+1 { diff --git a/adapters/outbound/reject.go b/adapters/outbound/reject.go index f78d4f45..de395d58 100644 --- a/adapters/outbound/reject.go +++ b/adapters/outbound/reject.go @@ -12,8 +12,8 @@ type Reject struct { *Base } -func (r *Reject) Dial(metadata *C.Metadata) (net.Conn, error) { - return &NopConn{}, nil +func (r *Reject) Dial(metadata *C.Metadata) (C.Conn, error) { + return newConn(&NopConn{}, r), nil } func NewReject() *Reject { diff --git a/adapters/outbound/selector.go b/adapters/outbound/selector.go index a14a7ce3..31d5a0a5 100644 --- a/adapters/outbound/selector.go +++ b/adapters/outbound/selector.go @@ -20,12 +20,20 @@ type SelectorOption struct { Proxies []string `proxy:"proxies"` } -func (s *Selector) Dial(metadata *C.Metadata) (net.Conn, error) { - return s.selected.Dial(metadata) +func (s *Selector) Dial(metadata *C.Metadata) (C.Conn, error) { + c, err := s.selected.Dial(metadata) + if err == nil { + c.AppendToChains(s) + } + return c, err } -func (s *Selector) DialUDP(metadata *C.Metadata) (net.PacketConn, net.Addr, error) { - return s.selected.DialUDP(metadata) +func (s *Selector) DialUDP(metadata *C.Metadata) (C.PacketConn, net.Addr, error) { + pc, addr, err := s.selected.DialUDP(metadata) + if err == nil { + pc.AppendToChains(s) + } + return pc, addr, err } func (s *Selector) SupportUDP() bool { diff --git a/adapters/outbound/shadowsocks.go b/adapters/outbound/shadowsocks.go index a77c1a9a..a958191f 100644 --- a/adapters/outbound/shadowsocks.go +++ b/adapters/outbound/shadowsocks.go @@ -57,7 +57,7 @@ type v2rayObfsOption struct { SkipCertVerify bool `obfs:"skip-cert-verify,omitempty"` } -func (ss *ShadowSocks) Dial(metadata *C.Metadata) (net.Conn, error) { +func (ss *ShadowSocks) Dial(metadata *C.Metadata) (C.Conn, error) { c, err := dialTimeout("tcp", ss.server, tcpTimeout) if err != nil { return nil, fmt.Errorf("%s connect error: %s", ss.server, err.Error()) @@ -78,10 +78,10 @@ func (ss *ShadowSocks) Dial(metadata *C.Metadata) (net.Conn, error) { } c = ss.cipher.StreamConn(c) _, err = c.Write(serializesSocksAddr(metadata)) - return c, err + return newConn(c, ss), err } -func (ss *ShadowSocks) DialUDP(metadata *C.Metadata) (net.PacketConn, net.Addr, error) { +func (ss *ShadowSocks) DialUDP(metadata *C.Metadata) (C.PacketConn, net.Addr, error) { pc, err := net.ListenPacket("udp", "") if err != nil { return nil, nil, err @@ -98,7 +98,7 @@ func (ss *ShadowSocks) DialUDP(metadata *C.Metadata) (net.PacketConn, net.Addr, } pc = ss.cipher.PacketConn(pc) - return &ssUDPConn{PacketConn: pc, rAddr: targetAddr}, addr, nil + return newPacketConn(&ssUDPConn{PacketConn: pc, rAddr: targetAddr}, ss), addr, nil } func (ss *ShadowSocks) MarshalJSON() ([]byte, error) { diff --git a/adapters/outbound/socks5.go b/adapters/outbound/socks5.go index 1b08bf87..d3f7a3a9 100644 --- a/adapters/outbound/socks5.go +++ b/adapters/outbound/socks5.go @@ -33,7 +33,7 @@ type Socks5Option struct { SkipCertVerify bool `proxy:"skip-cert-verify,omitempty"` } -func (ss *Socks5) Dial(metadata *C.Metadata) (net.Conn, error) { +func (ss *Socks5) Dial(metadata *C.Metadata) (C.Conn, error) { c, err := dialTimeout("tcp", ss.addr, tcpTimeout) if err == nil && ss.tls { @@ -56,10 +56,10 @@ func (ss *Socks5) Dial(metadata *C.Metadata) (net.Conn, error) { if _, err := socks5.ClientHandshake(c, serializesSocksAddr(metadata), socks5.CmdConnect, user); err != nil { return nil, err } - return c, nil + return newConn(c, ss), nil } -func (ss *Socks5) DialUDP(metadata *C.Metadata) (_ net.PacketConn, _ net.Addr, err error) { +func (ss *Socks5) DialUDP(metadata *C.Metadata) (_ C.PacketConn, _ net.Addr, err error) { c, err := dialTimeout("tcp", ss.addr, tcpTimeout) if err != nil { err = fmt.Errorf("%s connect error", ss.addr) @@ -116,7 +116,7 @@ func (ss *Socks5) DialUDP(metadata *C.Metadata) (_ net.PacketConn, _ net.Addr, e pc.Close() }() - return &socksUDPConn{PacketConn: pc, rAddr: targetAddr}, addr, nil + return newPacketConn(&socksUDPConn{PacketConn: pc, rAddr: targetAddr}, ss), addr, nil } func NewSocks5(option Socks5Option) *Socks5 { diff --git a/adapters/outbound/urltest.go b/adapters/outbound/urltest.go index 351c2e4c..4e9dcc74 100644 --- a/adapters/outbound/urltest.go +++ b/adapters/outbound/urltest.go @@ -33,16 +33,22 @@ func (u *URLTest) Now() string { return u.fast.Name() } -func (u *URLTest) Dial(metadata *C.Metadata) (net.Conn, error) { +func (u *URLTest) Dial(metadata *C.Metadata) (C.Conn, error) { a, err := u.fast.Dial(metadata) if err != nil { u.fallback() + } else { + a.AppendToChains(u) } return a, err } -func (u *URLTest) DialUDP(metadata *C.Metadata) (net.PacketConn, net.Addr, error) { - return u.fast.DialUDP(metadata) +func (u *URLTest) DialUDP(metadata *C.Metadata) (C.PacketConn, net.Addr, error) { + pc, addr, err := u.fast.DialUDP(metadata) + if err == nil { + pc.AppendToChains(u) + } + return pc, addr, err } func (u *URLTest) SupportUDP() bool { diff --git a/adapters/outbound/vmess.go b/adapters/outbound/vmess.go index 2d6d69c8..0121feb4 100644 --- a/adapters/outbound/vmess.go +++ b/adapters/outbound/vmess.go @@ -31,17 +31,17 @@ type VmessOption struct { SkipCertVerify bool `proxy:"skip-cert-verify,omitempty"` } -func (v *Vmess) Dial(metadata *C.Metadata) (net.Conn, error) { +func (v *Vmess) Dial(metadata *C.Metadata) (C.Conn, error) { c, err := dialTimeout("tcp", v.server, tcpTimeout) if err != nil { return nil, fmt.Errorf("%s connect error", v.server) } tcpKeepAlive(c) c, err = v.client.New(c, parseVmessAddr(metadata)) - return c, err + return newConn(c, v), err } -func (v *Vmess) DialUDP(metadata *C.Metadata) (net.PacketConn, net.Addr, error) { +func (v *Vmess) DialUDP(metadata *C.Metadata) (C.PacketConn, net.Addr, error) { c, err := dialTimeout("tcp", v.server, tcpTimeout) if err != nil { return nil, nil, fmt.Errorf("%s connect error", v.server) @@ -51,7 +51,7 @@ func (v *Vmess) DialUDP(metadata *C.Metadata) (net.PacketConn, net.Addr, error) if err != nil { return nil, nil, fmt.Errorf("new vmess client error: %v", err) } - return &fakeUDPConn{Conn: c}, c.RemoteAddr(), nil + return newPacketConn(&fakeUDPConn{Conn: c}, v), c.RemoteAddr(), nil } func NewVmess(option VmessOption) (*Vmess, error) { diff --git a/constant/adapters.go b/constant/adapters.go index 11844bc7..30ea5012 100644 --- a/constant/adapters.go +++ b/constant/adapters.go @@ -2,6 +2,7 @@ package constant import ( "context" + "fmt" "net" "time" ) @@ -25,11 +26,39 @@ type ServerAdapter interface { Metadata() *Metadata } +type Connection interface { + Chains() Chain + AppendToChains(adapter ProxyAdapter) +} + +type Chain []string + +func (c Chain) String() string { + switch len(c) { + case 0: + return "" + case 1: + return c[0] + default: + return fmt.Sprintf("%s[%s]", c[len(c)-1], c[0]) + } +} + +type Conn interface { + net.Conn + Connection +} + +type PacketConn interface { + net.PacketConn + Connection +} + type ProxyAdapter interface { Name() string Type() AdapterType - Dial(metadata *Metadata) (net.Conn, error) - DialUDP(metadata *Metadata) (net.PacketConn, net.Addr, error) + Dial(metadata *Metadata) (Conn, error) + DialUDP(metadata *Metadata) (PacketConn, net.Addr, error) SupportUDP() bool Destroy() MarshalJSON() ([]byte, error) diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 984af7b9..59368615 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -135,6 +135,7 @@ func (t *Tunnel) handleConn(localConn C.ServerAdapter) { } var proxy C.Proxy + var rule C.Rule switch t.mode { case Direct: proxy = t.proxies["DIRECT"] @@ -143,7 +144,7 @@ func (t *Tunnel) handleConn(localConn C.ServerAdapter) { // Rule default: var err error - proxy, err = t.match(metadata) + proxy, rule, err = t.match(metadata) if err != nil { return } @@ -151,22 +152,29 @@ func (t *Tunnel) handleConn(localConn C.ServerAdapter) { switch metadata.NetWork { case C.TCP: - t.handleTCPConn(localConn, metadata, proxy) + t.handleTCPConn(localConn, metadata, proxy, rule) case C.UDP: - t.handleUDPConn(localConn, metadata, proxy) + t.handleUDPConn(localConn, metadata, proxy, rule) } } -func (t *Tunnel) handleUDPConn(localConn C.ServerAdapter, metadata *C.Metadata, proxy C.Proxy) { +func (t *Tunnel) handleUDPConn(localConn C.ServerAdapter, metadata *C.Metadata, proxy C.Proxy, rule C.Rule) { pc, addr := natTable.Get(localConn.RemoteAddr()) if pc == nil { - var err error - pc, addr, err = proxy.DialUDP(metadata) + rawpc, naddr, err := proxy.DialUDP(metadata) + addr = naddr + pc = rawpc if err != nil { - log.Warnln("Proxy[%s] connect [%s --> %s] error: %s", proxy.Name(), metadata.SrcIP.String(), metadata.String(), err.Error()) + log.Warnln("%s --> %v match %s using %s error: %s", metadata.SrcIP.String(), metadata.String(), rule.RuleType().String(), rule.Adapter(), err.Error()) return } + if rule != nil { + log.Infoln("%s --> %v match %s using %s", metadata.SrcIP.String(), metadata.String(), rule.RuleType().String(), rawpc.Chains().String()) + } else { + log.Infoln("%s --> %v doesn't match any rule using DIRECT", metadata.SrcIP.String(), metadata.String()) + } + natTable.Set(localConn.RemoteAddr(), pc, addr) go t.handleUDPToLocal(localConn, pc) } @@ -174,14 +182,21 @@ func (t *Tunnel) handleUDPConn(localConn C.ServerAdapter, metadata *C.Metadata, t.handleUDPToRemote(localConn, pc, addr) } -func (t *Tunnel) handleTCPConn(localConn C.ServerAdapter, metadata *C.Metadata, proxy C.Proxy) { +func (t *Tunnel) handleTCPConn(localConn C.ServerAdapter, metadata *C.Metadata, proxy C.Proxy, rule C.Rule) { remoConn, err := proxy.Dial(metadata) + if err != nil { - log.Warnln("Proxy[%s] connect [%s --> %s] error: %s", proxy.Name(), metadata.SrcIP.String(), metadata.String(), err.Error()) + log.Warnln("%s --> %v match %s using %s error: %s", metadata.SrcIP.String(), metadata.String(), rule.RuleType().String(), rule.Adapter(), err.Error()) return } defer remoConn.Close() + if rule != nil { + log.Infoln("%s --> %v match %s using %s", metadata.SrcIP.String(), metadata.String(), rule.RuleType().String(), remoConn.Chains().String()) + } else { + log.Infoln("%s --> %v doesn't match any rule using DIRECT", metadata.SrcIP.String(), metadata.String()) + } + switch adapter := localConn.(type) { case *InboundAdapter.HTTPAdapter: t.handleHTTP(adapter, remoConn) @@ -194,7 +209,7 @@ func (t *Tunnel) shouldResolveIP(rule C.Rule, metadata *C.Metadata) bool { return (rule.RuleType() == C.GEOIP || rule.RuleType() == C.IPCIDR) && metadata.Host != "" && metadata.DstIP == nil } -func (t *Tunnel) match(metadata *C.Metadata) (C.Proxy, error) { +func (t *Tunnel) match(metadata *C.Metadata) (C.Proxy, C.Rule, error) { t.configMux.RLock() defer t.configMux.RUnlock() @@ -204,7 +219,7 @@ func (t *Tunnel) match(metadata *C.Metadata) (C.Proxy, error) { ip, err := t.resolveIP(metadata.Host) if err != nil { if !t.ignoreResolveFail { - return nil, fmt.Errorf("[DNS] resolve %s error: %s", metadata.Host, err.Error()) + return nil, nil, fmt.Errorf("[DNS] resolve %s error: %s", metadata.Host, err.Error()) } log.Debugln("[DNS] resolve %s error: %s", metadata.Host, err.Error()) } else { @@ -224,13 +239,10 @@ func (t *Tunnel) match(metadata *C.Metadata) (C.Proxy, error) { log.Debugln("%v UDP is not supported", adapter.Name()) continue } - - log.Infoln("%s --> %v match %s using %s", metadata.SrcIP.String(), metadata.String(), rule.RuleType().String(), rule.Adapter()) - return adapter, nil + return adapter, rule, nil } } - log.Infoln("%s --> %v doesn't match any rule using DIRECT", metadata.SrcIP.String(), metadata.String()) - return t.proxies["DIRECT"], nil + return t.proxies["DIRECT"], nil, nil } func newTunnel() *Tunnel {