diff --git a/adapters/direct.go b/adapters/direct.go index bb380880..f561ee9c 100644 --- a/adapters/direct.go +++ b/adapters/direct.go @@ -22,12 +22,13 @@ func (d *DirectAdapter) Close() { d.conn.Close() } -// Close is used to close connection +// Conn is used to http request func (d *DirectAdapter) Conn() net.Conn { return d.conn } type Direct struct { + traffic *C.Traffic } func (d *Direct) Name() string { @@ -40,9 +41,9 @@ func (d *Direct) Generator(addr *C.Addr) (adapter C.ProxyAdapter, err error) { return } c.(*net.TCPConn).SetKeepAlive(true) - return &DirectAdapter{conn: c}, nil + return &DirectAdapter{conn: NewTrafficTrack(c, d.traffic)}, nil } -func NewDirect() *Direct { - return &Direct{} +func NewDirect(traffic *C.Traffic) *Direct { + return &Direct{traffic: traffic} } diff --git a/adapters/reject.go b/adapters/reject.go index 17ca08ed..897e1c60 100644 --- a/adapters/reject.go +++ b/adapters/reject.go @@ -19,7 +19,7 @@ func (r *RejectAdapter) ReadWriter() io.ReadWriter { // Close is used to close connection func (r *RejectAdapter) Close() {} -// Close is used to close connection +// Conn is used to http request func (r *RejectAdapter) Conn() net.Conn { return nil } diff --git a/adapters/shadowsocks.go b/adapters/shadowsocks.go index bef3ae05..1d4a79c2 100644 --- a/adapters/shadowsocks.go +++ b/adapters/shadowsocks.go @@ -34,9 +34,10 @@ func (ss *ShadowsocksAdapter) Conn() net.Conn { } type ShadowSocks struct { - server string - name string - cipher core.Cipher + server string + name string + cipher core.Cipher + traffic *C.Traffic } func (ss *ShadowSocks) Name() string { @@ -51,10 +52,10 @@ func (ss *ShadowSocks) Generator(addr *C.Addr) (adapter C.ProxyAdapter, err erro c.(*net.TCPConn).SetKeepAlive(true) c = ss.cipher.StreamConn(c) _, err = c.Write(serializesSocksAddr(addr)) - return &ShadowsocksAdapter{conn: c}, err + return &ShadowsocksAdapter{conn: NewTrafficTrack(c, ss.traffic)}, err } -func NewShadowSocks(name string, ssURL string) (*ShadowSocks, error) { +func NewShadowSocks(name string, ssURL string, traffic *C.Traffic) (*ShadowSocks, error) { var key []byte server, cipher, password, _ := parseURL(ssURL) ciph, err := core.PickCipher(cipher, key, password) @@ -62,9 +63,10 @@ func NewShadowSocks(name string, ssURL string) (*ShadowSocks, error) { return nil, fmt.Errorf("ss %s initialize error: %s", server, err.Error()) } return &ShadowSocks{ - server: server, - name: name, - cipher: ciph, + server: server, + name: name, + cipher: ciph, + traffic: traffic, }, nil } diff --git a/adapters/util.go b/adapters/util.go new file mode 100644 index 00000000..1f41c548 --- /dev/null +++ b/adapters/util.go @@ -0,0 +1,28 @@ +package adapters + +import ( + "net" + + C "github.com/Dreamacro/clash/constant" +) + +type TrafficTrack struct { + net.Conn + traffic *C.Traffic +} + +func (tt *TrafficTrack) Read(b []byte) (int, error) { + n, err := tt.Conn.Read(b) + tt.traffic.Down() <- int64(n) + return n, err +} + +func (tt *TrafficTrack) Write(b []byte) (int, error) { + n, err := tt.Conn.Write(b) + tt.traffic.Up() <- int64(n) + return n, err +} + +func NewTrafficTrack(conn net.Conn, traffic *C.Traffic) *TrafficTrack { + return &TrafficTrack{traffic: traffic, Conn: conn} +} diff --git a/constant/traffic.go b/constant/traffic.go new file mode 100644 index 00000000..edf67368 --- /dev/null +++ b/constant/traffic.go @@ -0,0 +1,55 @@ +package constant + +import ( + "time" +) + +type Traffic struct { + up chan int64 + down chan int64 + upCount int64 + downCount int64 + upTotal int64 + downTotal int64 + interval time.Duration +} + +func (t *Traffic) Up() chan<- int64 { + return t.up +} + +func (t *Traffic) Down() chan<- int64 { + return t.down +} + +func (t *Traffic) Now() (up int64, down int64) { + return t.upTotal, t.downTotal +} + +func (t *Traffic) handle() { + go t.handleCh(t.up, &t.upCount, &t.upTotal) + go t.handleCh(t.down, &t.downCount, &t.downTotal) +} + +func (t *Traffic) handleCh(ch <-chan int64, count *int64, total *int64) { + ticker := time.NewTicker(t.interval) + for { + select { + case n := <-ch: + *count += n + case <-ticker.C: + *total = *count + *count = 0 + } + } +} + +func NewTraffic(interval time.Duration) *Traffic { + t := &Traffic{ + up: make(chan int64), + down: make(chan int64), + interval: interval, + } + go t.handle() + return t +} diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index df073e82..57615cef 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -27,6 +27,7 @@ type Tunnel struct { observable *observable.Observable logCh chan interface{} configLock *sync.RWMutex + traffic *C.Traffic } func (t *Tunnel) Add(req C.ServerAdapter) { @@ -61,7 +62,7 @@ func (t *Tunnel) UpdateConfig() (err error) { continue } ssURL := fmt.Sprintf("ss://%s:%s@%s:%s", proxy[3], proxy[4], proxy[1], proxy[2]) - ss, err := adapters.NewShadowSocks(key.Name(), ssURL) + ss, err := adapters.NewShadowSocks(key.Name(), ssURL, t.traffic) if err != nil { return err } @@ -70,7 +71,7 @@ func (t *Tunnel) UpdateConfig() (err error) { } // init proxy - proxys["DIRECT"] = adapters.NewDirect() + proxys["DIRECT"] = adapters.NewDirect(t.traffic) proxys["REJECT"] = adapters.NewReject() // parse rules @@ -167,7 +168,7 @@ func (t *Tunnel) match(addr *C.Addr) C.Proxy { return a } } - t.logCh <- newLog(INFO, "don't find, direct") + t.logCh <- newLog(INFO, "%v doesn't match any rule using DIRECT", addr.String()) return t.proxys["DIRECT"] } @@ -179,6 +180,7 @@ func newTunnel() *Tunnel { observable: observable.NewObservable(logCh), logCh: logCh, configLock: &sync.RWMutex{}, + traffic: C.NewTraffic(time.Second), } go tunnel.process() go tunnel.subscribeLogs()