From 83891503186098418ddca005eda337534eb929c6 Mon Sep 17 00:00:00 2001 From: Dreamacro <305009791@qq.com> Date: Thu, 26 Jul 2018 00:04:59 +0800 Subject: [PATCH] Improve: config convergent and add log-level --- adapters/local/http.go | 32 +++ {proxy/http => adapters/local}/https.go | 8 +- adapters/local/socks.go | 32 +++ adapters/local/util.go | 66 ++++++ adapters/{ => remote}/direct.go | 16 +- adapters/{ => remote}/reject.go | 5 - adapters/{ => remote}/selector.go | 0 adapters/{ => remote}/shadowsocks.go | 24 +- adapters/{ => remote}/urltest.go | 0 config/config.go | 296 ++++++++++++++++++++++++ config/general.go | 17 ++ {tunnel => config}/mode.go | 11 +- {tunnel => config}/utils.go | 2 +- constant/adapters.go | 3 - constant/addr.go | 8 +- constant/config.go | 11 - constant/log.go | 35 +++ hub/common.go | 4 +- hub/configs.go | 34 ++- hub/proxies.go | 8 +- hub/rules.go | 6 +- hub/server.go | 34 ++- main.go | 22 +- proxy/http/http.go | 77 ------ proxy/http/server.go | 36 +-- proxy/listener.go | 58 ++--- proxy/socks/tcp.go | 61 +---- tunnel/connection.go | 67 ++++++ tunnel/log.go | 40 +--- tunnel/tunnel.go | 223 ++++++------------ {adapters => tunnel}/util.go | 5 +- 31 files changed, 757 insertions(+), 484 deletions(-) create mode 100644 adapters/local/http.go rename {proxy/http => adapters/local}/https.go (72%) create mode 100644 adapters/local/socks.go create mode 100644 adapters/local/util.go rename adapters/{ => remote}/direct.go (68%) rename adapters/{ => remote}/reject.go (86%) rename adapters/{ => remote}/selector.go (100%) rename adapters/{ => remote}/shadowsocks.go (81%) rename adapters/{ => remote}/urltest.go (100%) create mode 100644 config/config.go create mode 100644 config/general.go rename {tunnel => config}/mode.go (58%) rename {tunnel => config}/utils.go (90%) create mode 100644 constant/log.go delete mode 100644 proxy/http/http.go create mode 100644 tunnel/connection.go rename {adapters => tunnel}/util.go (78%) diff --git a/adapters/local/http.go b/adapters/local/http.go new file mode 100644 index 00000000..69712aa9 --- /dev/null +++ b/adapters/local/http.go @@ -0,0 +1,32 @@ +package adapters + +import ( + "net/http" + + C "github.com/Dreamacro/clash/constant" +) + +type HttpAdapter struct { + addr *C.Addr + R *http.Request + W http.ResponseWriter + done chan struct{} +} + +func (h *HttpAdapter) Close() { + h.done <- struct{}{} +} + +func (h *HttpAdapter) Addr() *C.Addr { + return h.addr +} + +func NewHttp(host string, w http.ResponseWriter, r *http.Request) (*HttpAdapter, chan struct{}) { + done := make(chan struct{}) + return &HttpAdapter{ + addr: parseHttpAddr(host), + R: r, + W: w, + done: done, + }, done +} diff --git a/proxy/http/https.go b/adapters/local/https.go similarity index 72% rename from proxy/http/https.go rename to adapters/local/https.go index 33eed177..da766609 100644 --- a/proxy/http/https.go +++ b/adapters/local/https.go @@ -1,8 +1,7 @@ -package http +package adapters import ( "bufio" - "io" "net" C "github.com/Dreamacro/clash/constant" @@ -22,9 +21,8 @@ func (h *HttpsAdapter) Addr() *C.Addr { return h.addr } -func (h *HttpsAdapter) Connect(proxy C.ProxyAdapter) { - go io.Copy(h.conn, proxy.ReadWriter()) - io.Copy(proxy.ReadWriter(), h.conn) +func (h *HttpsAdapter) Conn() net.Conn { + return h.conn } func NewHttps(host string, conn net.Conn) *HttpsAdapter { diff --git a/adapters/local/socks.go b/adapters/local/socks.go new file mode 100644 index 00000000..dd1c4623 --- /dev/null +++ b/adapters/local/socks.go @@ -0,0 +1,32 @@ +package adapters + +import ( + "net" + + C "github.com/Dreamacro/clash/constant" + "github.com/riobard/go-shadowsocks2/socks" +) + +type SocksAdapter struct { + conn net.Conn + addr *C.Addr +} + +func (s *SocksAdapter) Close() { + s.conn.Close() +} + +func (s *SocksAdapter) Addr() *C.Addr { + return s.addr +} + +func (s *SocksAdapter) Conn() net.Conn { + return s.conn +} + +func NewSocks(target socks.Addr, conn net.Conn) *SocksAdapter { + return &SocksAdapter{ + conn: conn, + addr: parseSocksAddr(target), + } +} diff --git a/adapters/local/util.go b/adapters/local/util.go new file mode 100644 index 00000000..0fb20430 --- /dev/null +++ b/adapters/local/util.go @@ -0,0 +1,66 @@ +package adapters + +import ( + "net" + "strconv" + + C "github.com/Dreamacro/clash/constant" + "github.com/riobard/go-shadowsocks2/socks" +) + +func parseSocksAddr(target socks.Addr) *C.Addr { + var host, port string + var ip net.IP + + switch target[0] { + case socks.AtypDomainName: + host = string(target[2 : 2+target[1]]) + port = strconv.Itoa((int(target[2+target[1]]) << 8) | int(target[2+target[1]+1])) + ipAddr, err := net.ResolveIPAddr("ip", host) + if err == nil { + ip = ipAddr.IP + } + case socks.AtypIPv4: + ip = net.IP(target[1 : 1+net.IPv4len]) + port = strconv.Itoa((int(target[1+net.IPv4len]) << 8) | int(target[1+net.IPv4len+1])) + case socks.AtypIPv6: + ip = net.IP(target[1 : 1+net.IPv6len]) + port = strconv.Itoa((int(target[1+net.IPv6len]) << 8) | int(target[1+net.IPv6len+1])) + } + + return &C.Addr{ + NetWork: C.TCP, + AddrType: int(target[0]), + Host: host, + IP: &ip, + Port: port, + } +} + +func parseHttpAddr(target string) *C.Addr { + host, port, _ := net.SplitHostPort(target) + ipAddr, err := net.ResolveIPAddr("ip", host) + var resolveIP *net.IP + if err == nil { + resolveIP = &ipAddr.IP + } + + var addType int + ip := net.ParseIP(host) + switch { + case ip == nil: + addType = socks.AtypDomainName + case ip.To4() == nil: + addType = socks.AtypIPv6 + default: + addType = socks.AtypIPv4 + } + + return &C.Addr{ + NetWork: C.TCP, + AddrType: addType, + Host: host, + IP: resolveIP, + Port: port, + } +} diff --git a/adapters/direct.go b/adapters/remote/direct.go similarity index 68% rename from adapters/direct.go rename to adapters/remote/direct.go index 3df410da..ac46c4a7 100644 --- a/adapters/direct.go +++ b/adapters/remote/direct.go @@ -1,7 +1,6 @@ package adapters import ( - "io" "net" C "github.com/Dreamacro/clash/constant" @@ -12,11 +11,6 @@ type DirectAdapter struct { conn net.Conn } -// ReadWriter is used to handle network traffic -func (d *DirectAdapter) ReadWriter() io.ReadWriter { - return d.conn -} - // Close is used to close connection func (d *DirectAdapter) Close() { d.conn.Close() @@ -27,9 +21,7 @@ func (d *DirectAdapter) Conn() net.Conn { return d.conn } -type Direct struct { - traffic *C.Traffic -} +type Direct struct{} func (d *Direct) Name() string { return "Direct" @@ -45,9 +37,9 @@ func (d *Direct) Generator(addr *C.Addr) (adapter C.ProxyAdapter, err error) { return } c.(*net.TCPConn).SetKeepAlive(true) - return &DirectAdapter{conn: NewTrafficTrack(c, d.traffic)}, nil + return &DirectAdapter{conn: c}, nil } -func NewDirect(traffic *C.Traffic) *Direct { - return &Direct{traffic: traffic} +func NewDirect() *Direct { + return &Direct{} } diff --git a/adapters/reject.go b/adapters/remote/reject.go similarity index 86% rename from adapters/reject.go rename to adapters/remote/reject.go index 91bdfd1e..7833e004 100644 --- a/adapters/reject.go +++ b/adapters/remote/reject.go @@ -11,11 +11,6 @@ import ( type RejectAdapter struct { } -// ReadWriter is used to handle network traffic -func (r *RejectAdapter) ReadWriter() io.ReadWriter { - return &NopRW{} -} - // Close is used to close connection func (r *RejectAdapter) Close() {} diff --git a/adapters/selector.go b/adapters/remote/selector.go similarity index 100% rename from adapters/selector.go rename to adapters/remote/selector.go diff --git a/adapters/shadowsocks.go b/adapters/remote/shadowsocks.go similarity index 81% rename from adapters/shadowsocks.go rename to adapters/remote/shadowsocks.go index 6884aacc..6323a897 100644 --- a/adapters/shadowsocks.go +++ b/adapters/remote/shadowsocks.go @@ -3,7 +3,6 @@ package adapters import ( "bytes" "fmt" - "io" "net" "net/url" "strconv" @@ -19,11 +18,6 @@ type ShadowsocksAdapter struct { conn net.Conn } -// ReadWriter is used to handle network traffic -func (ss *ShadowsocksAdapter) ReadWriter() io.ReadWriter { - return ss.conn -} - // Close is used to close connection func (ss *ShadowsocksAdapter) Close() { ss.conn.Close() @@ -34,10 +28,9 @@ func (ss *ShadowsocksAdapter) Conn() net.Conn { } type ShadowSocks struct { - server string - name string - cipher core.Cipher - traffic *C.Traffic + server string + name string + cipher core.Cipher } func (ss *ShadowSocks) Name() string { @@ -56,10 +49,10 @@ func (ss *ShadowSocks) Generator(addr *C.Addr) (adapter C.ProxyAdapter, err erro c.(*net.TCPConn).SetKeepAlive(true) c = ss.cipher.StreamConn(c) _, err = c.Write(serializesSocksAddr(addr)) - return &ShadowsocksAdapter{conn: NewTrafficTrack(c, ss.traffic)}, err + return &ShadowsocksAdapter{conn: c}, err } -func NewShadowSocks(name string, ssURL string, traffic *C.Traffic) (*ShadowSocks, error) { +func NewShadowSocks(name string, ssURL string) (*ShadowSocks, error) { var key []byte server, cipher, password, _ := parseURL(ssURL) ciph, err := core.PickCipher(cipher, key, password) @@ -67,10 +60,9 @@ func NewShadowSocks(name string, ssURL string, traffic *C.Traffic) (*ShadowSocks return nil, fmt.Errorf("ss %s initialize error: %s", server, err.Error()) } return &ShadowSocks{ - server: server, - name: name, - cipher: ciph, - traffic: traffic, + server: server, + name: name, + cipher: ciph, }, nil } diff --git a/adapters/urltest.go b/adapters/remote/urltest.go similarity index 100% rename from adapters/urltest.go rename to adapters/remote/urltest.go diff --git a/config/config.go b/config/config.go new file mode 100644 index 00000000..c6bb4e94 --- /dev/null +++ b/config/config.go @@ -0,0 +1,296 @@ +package config + +import ( + "fmt" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/Dreamacro/clash/adapters/remote" + C "github.com/Dreamacro/clash/constant" + "github.com/Dreamacro/clash/observable" + R "github.com/Dreamacro/clash/rules" + + "gopkg.in/ini.v1" +) + +var ( + config *Config + once sync.Once +) + +// Config is clash config manager +type Config struct { + general *General + rules []C.Rule + proxies map[string]C.Proxy + lastUpdate time.Time + + event chan<- interface{} + errCh chan interface{} + observable *observable.Observable +} + +// Event is event of clash config +type Event struct { + Type string + Payload interface{} +} + +// Subscribe config stream +func (c *Config) Subscribe() observable.Subscription { + sub, _ := c.observable.Subscribe() + return sub +} + +// Report return a channel for collecting error message +func (c *Config) Report() chan<- interface{} { + return c.errCh +} + +func (c *Config) readConfig() (*ini.File, error) { + if _, err := os.Stat(C.ConfigPath); os.IsNotExist(err) { + return nil, err + } + return ini.LoadSources( + ini.LoadOptions{AllowBooleanKeys: true}, + C.ConfigPath, + ) +} + +// Parse config +func (c *Config) Parse() error { + cfg, err := c.readConfig() + if err != nil { + return err + } + + if err := c.parseGeneral(cfg); err != nil { + return err + } + + if err := c.parseProxies(cfg); err != nil { + return err + } + + return c.parseRules(cfg) +} + +// Proxies return proxies of clash +func (c *Config) Proxies() map[string]C.Proxy { + return c.proxies +} + +// Rules return rules of clash +func (c *Config) Rules() []C.Rule { + return c.rules +} + +// SetMode change mode of clash +func (c *Config) SetMode(mode Mode) { + c.general.Mode = mode + c.event <- &Event{Type: "mode", Payload: mode} +} + +// General return clash general config +func (c *Config) General() General { + return *c.general +} + +// UpdateRules is a function for hot reload rules +func (c *Config) UpdateRules() error { + cfg, err := c.readConfig() + if err != nil { + return err + } + + return c.parseRules(cfg) +} + +func (c *Config) parseGeneral(cfg *ini.File) error { + general := cfg.Section("General") + + port := general.Key("port").RangeInt(C.DefalutHTTPPort, 1, 65535) + socksPort := general.Key("socks-port").RangeInt(C.DefalutSOCKSPort, 1, 65535) + allowLan := general.Key("allow-lan").MustBool() + logLevelString := general.Key("log-level").MustString(C.INFO.String()) + modeString := general.Key("mode").MustString(Rule.String()) + + mode, exist := ModeMapping[modeString] + if !exist { + return fmt.Errorf("General.mode value invalid") + } + + logLevel, exist := C.LogLevelMapping[logLevelString] + if !exist { + return fmt.Errorf("General.log-level value invalid") + } + + c.general = &General{ + Base: &Base{ + Port: &port, + SocketPort: &socksPort, + AllowLan: &allowLan, + }, + Mode: mode, + LogLevel: logLevel, + } + + if restAddr := general.Key("external-controller").String(); restAddr != "" { + c.event <- &Event{Type: "external-controller", Payload: restAddr} + } + + c.UpdateGeneral(*c.general) + return nil +} + +// UpdateGeneral dispatch update event +func (c *Config) UpdateGeneral(general General) { + c.event <- &Event{Type: "base", Payload: *general.Base} + c.event <- &Event{Type: "mode", Payload: general.Mode} + c.event <- &Event{Type: "log-level", Payload: general.LogLevel} +} + +func (c *Config) parseProxies(cfg *ini.File) error { + proxies := make(map[string]C.Proxy) + proxiesConfig := cfg.Section("Proxy") + groupsConfig := cfg.Section("Proxy Group") + + // parse proxy + for _, key := range proxiesConfig.Keys() { + proxy := key.Strings(",") + if len(proxy) == 0 { + continue + } + switch proxy[0] { + // ss, server, port, cipter, password + case "ss": + if len(proxy) < 5 { + continue + } + ssURL := fmt.Sprintf("ss://%s:%s@%s:%s", proxy[3], proxy[4], proxy[1], proxy[2]) + ss, err := adapters.NewShadowSocks(key.Name(), ssURL) + if err != nil { + return err + } + proxies[key.Name()] = ss + } + } + + // parse proxy group + for _, key := range groupsConfig.Keys() { + rule := strings.Split(key.Value(), ",") + rule = trimArr(rule) + switch rule[0] { + case "url-test": + if len(rule) < 4 { + return fmt.Errorf("URLTest need more than 4 param") + } + proxyNames := rule[1 : len(rule)-2] + delay, _ := strconv.Atoi(rule[len(rule)-1]) + url := rule[len(rule)-2] + var ps []C.Proxy + for _, name := range proxyNames { + if p, ok := proxies[name]; ok { + ps = append(ps, p) + } + } + + adapter, err := adapters.NewURLTest(key.Name(), ps, url, time.Duration(delay)*time.Second) + if err != nil { + return fmt.Errorf("Config error: %s", err.Error()) + } + proxies[key.Name()] = adapter + case "select": + if len(rule) < 3 { + return fmt.Errorf("Selector need more than 3 param") + } + proxyNames := rule[1:] + selectProxy := make(map[string]C.Proxy) + for _, name := range proxyNames { + proxy, exist := proxies[name] + if !exist { + return fmt.Errorf("Proxy %s not exist", name) + } + selectProxy[name] = proxy + } + selector, err := adapters.NewSelector(key.Name(), selectProxy) + if err != nil { + return fmt.Errorf("Selector create error: %s", err.Error()) + } + proxies[key.Name()] = selector + } + } + + // init proxy + proxies["DIRECT"] = adapters.NewDirect() + proxies["REJECT"] = adapters.NewReject() + + c.proxies = proxies + c.event <- &Event{Type: "proxies", Payload: proxies} + return nil +} + +func (c *Config) parseRules(cfg *ini.File) error { + rules := []C.Rule{} + + rulesConfig := cfg.Section("Rule") + // parse rules + for _, key := range rulesConfig.Keys() { + rule := strings.Split(key.Name(), ",") + if len(rule) < 3 { + continue + } + rule = trimArr(rule) + switch rule[0] { + case "DOMAIN-SUFFIX": + rules = append(rules, R.NewDomainSuffix(rule[1], rule[2])) + case "DOMAIN-KEYWORD": + rules = append(rules, R.NewDomainKeyword(rule[1], rule[2])) + case "GEOIP": + rules = append(rules, R.NewGEOIP(rule[1], rule[2])) + case "IP-CIDR", "IP-CIDR6": + rules = append(rules, R.NewIPCIDR(rule[1], rule[2])) + case "FINAL": + rules = append(rules, R.NewFinal(rule[2])) + } + } + + c.rules = rules + c.event <- &Event{Type: "rules", Payload: rules} + return nil +} + +func (c *Config) handleErrorMessage() { + for elm := range c.errCh { + event := elm.(Event) + switch event.Type { + case "base": + c.general.Base = event.Payload.(*Base) + } + } +} + +func newConfig() *Config { + event := make(chan interface{}) + config := &Config{ + general: &General{}, + proxies: make(map[string]C.Proxy), + rules: []C.Rule{}, + lastUpdate: time.Now(), + + event: event, + observable: observable.NewObservable(event), + } + go config.handleErrorMessage() + return config +} + +func Instance() *Config { + once.Do(func() { + config = newConfig() + }) + return config +} diff --git a/config/general.go b/config/general.go new file mode 100644 index 00000000..12141447 --- /dev/null +++ b/config/general.go @@ -0,0 +1,17 @@ +package config + +import ( + C "github.com/Dreamacro/clash/constant" +) + +type General struct { + *Base + Mode Mode + LogLevel C.LogLevel +} + +type Base struct { + Port *int + SocketPort *int + AllowLan *bool +} diff --git a/tunnel/mode.go b/config/mode.go similarity index 58% rename from tunnel/mode.go rename to config/mode.go index d82a0aee..b910eba1 100644 --- a/tunnel/mode.go +++ b/config/mode.go @@ -1,7 +1,16 @@ -package tunnel +package config type Mode int +var ( + // ModeMapping is a mapping for Mode enum + ModeMapping = map[string]Mode{ + "Global": Global, + "Rule": Rule, + "Direct": Direct, + } +) + const ( Global Mode = iota Rule diff --git a/tunnel/utils.go b/config/utils.go similarity index 90% rename from tunnel/utils.go rename to config/utils.go index 4e72f18d..cc2b99d4 100644 --- a/tunnel/utils.go +++ b/config/utils.go @@ -1,4 +1,4 @@ -package tunnel +package config import ( "strings" diff --git a/constant/adapters.go b/constant/adapters.go index df265bdc..1509570a 100644 --- a/constant/adapters.go +++ b/constant/adapters.go @@ -1,7 +1,6 @@ package constant import ( - "io" "net" ) @@ -15,14 +14,12 @@ const ( ) type ProxyAdapter interface { - ReadWriter() io.ReadWriter Conn() net.Conn Close() } type ServerAdapter interface { Addr() *Addr - Connect(ProxyAdapter) Close() } diff --git a/constant/addr.go b/constant/addr.go index 6f498f98..e404b489 100644 --- a/constant/addr.go +++ b/constant/addr.go @@ -10,8 +10,11 @@ const ( AtypDomainName = 3 AtypIPv6 = 4 - TCP = iota + TCP NetWork = iota UDP + + HTTP SourceType = iota + SOCKS ) type NetWork int @@ -23,9 +26,12 @@ func (n *NetWork) String() string { return "udp" } +type SourceType int + // Addr is used to store connection address type Addr struct { NetWork NetWork + Source SourceType AddrType int Host string IP *net.IP diff --git a/constant/config.go b/constant/config.go index c915ae6c..c12c208b 100644 --- a/constant/config.go +++ b/constant/config.go @@ -11,7 +11,6 @@ import ( "strings" log "github.com/sirupsen/logrus" - "gopkg.in/ini.v1" ) const ( @@ -107,13 +106,3 @@ func downloadMMDB(path string) (err error) { return nil } - -func GetConfig() (*ini.File, error) { - if _, err := os.Stat(ConfigPath); os.IsNotExist(err) { - return nil, err - } - return ini.LoadSources( - ini.LoadOptions{AllowBooleanKeys: true}, - ConfigPath, - ) -} diff --git a/constant/log.go b/constant/log.go new file mode 100644 index 00000000..4423322e --- /dev/null +++ b/constant/log.go @@ -0,0 +1,35 @@ +package constant + +var ( + // LogLevelMapping is a mapping for LogLevel enum + LogLevelMapping = map[string]LogLevel{ + "error": ERROR, + "warning": WARNING, + "info": INFO, + "debug": DEBUG, + } +) + +const ( + ERROR LogLevel = iota + WARNING + INFO + DEBUG +) + +type LogLevel int + +func (l LogLevel) String() string { + switch l { + case INFO: + return "info" + case WARNING: + return "warning" + case ERROR: + return "error" + case DEBUG: + return "debug" + default: + return "unknow" + } +} diff --git a/hub/common.go b/hub/common.go index bd900c28..4898d226 100644 --- a/hub/common.go +++ b/hub/common.go @@ -1,12 +1,14 @@ package hub import ( + "github.com/Dreamacro/clash/config" "github.com/Dreamacro/clash/proxy" T "github.com/Dreamacro/clash/tunnel" ) var ( - tunnel = T.GetInstance() + tunnel = T.Instance() + cfg = config.Instance() listener = proxy.Instance() ) diff --git a/hub/configs.go b/hub/configs.go index 63d7fff4..86019093 100644 --- a/hub/configs.go +++ b/hub/configs.go @@ -4,9 +4,9 @@ import ( "fmt" "net/http" + "github.com/Dreamacro/clash/config" C "github.com/Dreamacro/clash/constant" "github.com/Dreamacro/clash/proxy" - T "github.com/Dreamacro/clash/tunnel" "github.com/go-chi/chi" "github.com/go-chi/render" @@ -19,17 +19,23 @@ func configRouter() http.Handler { return r } -var modeMapping = map[string]T.Mode{ - "Global": T.Global, - "Rule": T.Rule, - "Direct": T.Direct, +type configSchema struct { + Port int `json:"port"` + SocketPort int `json:"socket-port"` + AllowLan bool `json:"allow-lan"` + Mode string `json:"mode"` + LogLevel string `json:"log-level"` } func getConfigs(w http.ResponseWriter, r *http.Request) { - info := listener.Info() - mode := tunnel.GetMode().String() - info.Mode = &mode - render.JSON(w, r, info) + general := cfg.General() + render.JSON(w, r, configSchema{ + Port: *general.Port, + SocketPort: *general.SocketPort, + AllowLan: *general.AllowLan, + Mode: general.Mode.String(), + LogLevel: general.LogLevel.String(), + }) } func updateConfigs(w http.ResponseWriter, r *http.Request) { @@ -48,15 +54,19 @@ func updateConfigs(w http.ResponseWriter, r *http.Request) { // update proxy listener := proxy.Instance() - proxyErr = listener.Update(general.AllowLan, general.Port, general.SocksPort) + proxyErr = listener.Update(&config.Base{ + AllowLan: general.AllowLan, + Port: general.Port, + SocketPort: general.SocksPort, + }) // update mode if general.Mode != nil { - mode, ok := modeMapping[*general.Mode] + mode, ok := config.ModeMapping[*general.Mode] if !ok { modeErr = fmt.Errorf("Mode error") } else { - tunnel.SetMode(mode) + cfg.SetMode(mode) } } diff --git a/hub/proxies.go b/hub/proxies.go index b8d90e2f..42c1af4d 100644 --- a/hub/proxies.go +++ b/hub/proxies.go @@ -4,7 +4,7 @@ import ( "fmt" "net/http" - A "github.com/Dreamacro/clash/adapters" + A "github.com/Dreamacro/clash/adapters/remote" C "github.com/Dreamacro/clash/constant" "github.com/go-chi/chi" @@ -61,7 +61,7 @@ type GetProxiesResponse struct { } func getProxies(w http.ResponseWriter, r *http.Request) { - _, rawProxies := tunnel.Config() + rawProxies := cfg.Proxies() proxies := make(map[string]interface{}) for name, proxy := range rawProxies { proxies[name] = transformProxy(proxy) @@ -71,7 +71,7 @@ func getProxies(w http.ResponseWriter, r *http.Request) { func getProxy(w http.ResponseWriter, r *http.Request) { name := chi.URLParam(r, "name") - _, proxies := tunnel.Config() + proxies := cfg.Proxies() proxy, exist := proxies[name] if !exist { w.WriteHeader(http.StatusNotFound) @@ -98,7 +98,7 @@ func updateProxy(w http.ResponseWriter, r *http.Request) { } name := chi.URLParam(r, "name") - _, proxies := tunnel.Config() + proxies := cfg.Proxies() proxy, exist := proxies[name] if !exist { w.WriteHeader(http.StatusNotFound) diff --git a/hub/rules.go b/hub/rules.go index 3f6dbe14..b8780fc1 100644 --- a/hub/rules.go +++ b/hub/rules.go @@ -24,10 +24,10 @@ type GetRulesResponse struct { } func getRules(w http.ResponseWriter, r *http.Request) { - rulesCfg, _ := tunnel.Config() + rawRules := cfg.Rules() var rules []Rule - for _, rule := range rulesCfg { + for _, rule := range rawRules { rules = append(rules, Rule{ Name: rule.RuleType().String(), Payload: rule.Payload(), @@ -41,7 +41,7 @@ func getRules(w http.ResponseWriter, r *http.Request) { } func updateRules(w http.ResponseWriter, r *http.Request) { - err := tunnel.UpdateConfig() + err := cfg.UpdateRules() if err != nil { w.WriteHeader(http.StatusInternalServerError) render.JSON(w, r, Error{ diff --git a/hub/server.go b/hub/server.go index 5a6e7276..7d7a35d6 100644 --- a/hub/server.go +++ b/hub/server.go @@ -5,6 +5,8 @@ import ( "net/http" "time" + "github.com/Dreamacro/clash/config" + C "github.com/Dreamacro/clash/constant" T "github.com/Dreamacro/clash/tunnel" "github.com/go-chi/chi" @@ -18,7 +20,19 @@ type Traffic struct { Down int64 `json:"down"` } -func NewHub(addr string) { +func newHub(signal chan struct{}) { + var addr string + ch := config.Instance().Subscribe() + signal <- struct{}{} + for { + elm := <-ch + event := elm.(*config.Event) + if event.Type == "external-controller" { + addr = event.Payload.(string) + break + } + } + r := chi.NewRouter() cors := cors.New(cors.Options{ @@ -38,7 +52,7 @@ func NewHub(addr string) { err := http.ListenAndServe(addr, r) if err != nil { - log.Fatalf("External controller error: %s", err.Error()) + log.Errorf("External controller error: %s", err.Error()) } } @@ -75,14 +89,7 @@ func getLogs(w http.ResponseWriter, r *http.Request) { req.Level = "info" } - mapping := map[string]T.LogLevel{ - "info": T.INFO, - "debug": T.DEBUG, - "error": T.ERROR, - "warning": T.WARNING, - } - - level, ok := mapping[req.Level] + level, ok := C.LogLevelMapping[req.Level] if !ok { w.WriteHeader(http.StatusBadRequest) render.JSON(w, r, Error{ @@ -117,3 +124,10 @@ func getLogs(w http.ResponseWriter, r *http.Request) { w.(http.Flusher).Flush() } } + +// Run initial hub +func Run() { + signal := make(chan struct{}) + go newHub(signal) + <-signal +} diff --git a/main.go b/main.go index 309f1281..7bb6d8b6 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,7 @@ import ( "os/signal" "syscall" - C "github.com/Dreamacro/clash/constant" + "github.com/Dreamacro/clash/config" "github.com/Dreamacro/clash/hub" "github.com/Dreamacro/clash/proxy" "github.com/Dreamacro/clash/tunnel" @@ -14,23 +14,13 @@ import ( ) func main() { - if err := tunnel.GetInstance().UpdateConfig(); err != nil { - log.Fatalf("Parse config error: %s", err.Error()) - } + tunnel.Instance().Run() + proxy.Instance().Run() + hub.Run() - if err := proxy.Instance().Run(); err != nil { - log.Fatalf("Proxy listen error: %s", err.Error()) - } - - // Hub - cfg, err := C.GetConfig() + err := config.Instance().Parse() if err != nil { - log.Fatalf("Read config error: %s", err.Error()) - } - - section := cfg.Section("General") - if key, err := section.GetKey("external-controller"); err == nil { - go hub.NewHub(key.Value()) + log.Fatalf("Parse config error: %s", err.Error()) } sigCh := make(chan os.Signal, 1) diff --git a/proxy/http/http.go b/proxy/http/http.go deleted file mode 100644 index 8807c928..00000000 --- a/proxy/http/http.go +++ /dev/null @@ -1,77 +0,0 @@ -package http - -import ( - "io" - "net" - "net/http" - "time" - - C "github.com/Dreamacro/clash/constant" -) - -type HttpAdapter struct { - addr *C.Addr - r *http.Request - w http.ResponseWriter - done chan struct{} -} - -func (h *HttpAdapter) Close() { - h.done <- struct{}{} -} - -func (h *HttpAdapter) Addr() *C.Addr { - return h.addr -} - -func (h *HttpAdapter) Connect(proxy C.ProxyAdapter) { - req := http.Transport{ - Dial: func(string, string) (net.Conn, error) { - return proxy.Conn(), nil - }, - // from http.DefaultTransport - MaxIdleConns: 100, - IdleConnTimeout: 90 * time.Second, - ExpectContinueTimeout: 1 * time.Second, - } - resp, err := req.RoundTrip(h.r) - if err != nil { - return - } - defer resp.Body.Close() - - header := h.w.Header() - for k, vv := range resp.Header { - for _, v := range vv { - header.Add(k, v) - } - } - h.w.WriteHeader(resp.StatusCode) - var writer io.Writer = h.w - if len(resp.TransferEncoding) > 0 && resp.TransferEncoding[0] == "chunked" { - writer = ChunkWriter{Writer: h.w} - } - io.Copy(writer, resp.Body) -} - -type ChunkWriter struct { - io.Writer -} - -func (cw ChunkWriter) Write(b []byte) (int, error) { - n, err := cw.Writer.Write(b) - if err == nil { - cw.Writer.(http.Flusher).Flush() - } - return n, err -} - -func NewHttp(host string, w http.ResponseWriter, r *http.Request) (*HttpAdapter, chan struct{}) { - done := make(chan struct{}) - return &HttpAdapter{ - addr: parseHttpAddr(host), - r: r, - w: w, - done: done, - }, done -} diff --git a/proxy/http/server.go b/proxy/http/server.go index 47dbc5bc..e7c1850c 100644 --- a/proxy/http/server.go +++ b/proxy/http/server.go @@ -6,15 +6,15 @@ import ( "net/http" "strings" + "github.com/Dreamacro/clash/adapters/local" C "github.com/Dreamacro/clash/constant" "github.com/Dreamacro/clash/tunnel" - "github.com/riobard/go-shadowsocks2/socks" log "github.com/sirupsen/logrus" ) var ( - tun = tunnel.GetInstance() + tun = tunnel.Instance() ) func NewHttpProxy(addr string) (*C.ProxySignal, error) { @@ -61,7 +61,7 @@ func handleHTTP(w http.ResponseWriter, r *http.Request) { if !strings.Contains(addr, ":") { addr += ":80" } - req, done := NewHttp(addr, w, r) + req, done := adapters.NewHttp(addr, w, r) tun.Add(req) <-done } @@ -77,33 +77,5 @@ func handleTunneling(w http.ResponseWriter, r *http.Request) { } // w.WriteHeader(http.StatusOK) doesn't works in Safari conn.Write([]byte("HTTP/1.1 200 OK\r\n\r\n")) - tun.Add(NewHttps(r.Host, conn)) -} - -func parseHttpAddr(target string) *C.Addr { - host, port, _ := net.SplitHostPort(target) - ipAddr, err := net.ResolveIPAddr("ip", host) - var resolveIP *net.IP - if err == nil { - resolveIP = &ipAddr.IP - } - - var addType int - ip := net.ParseIP(host) - switch { - case ip == nil: - addType = socks.AtypDomainName - case ip.To4() == nil: - addType = socks.AtypIPv6 - default: - addType = socks.AtypIPv4 - } - - return &C.Addr{ - NetWork: C.TCP, - AddrType: addType, - Host: host, - IP: resolveIP, - Port: port, - } + tun.Add(adapters.NewHttps(r.Host, conn)) } diff --git a/proxy/listener.go b/proxy/listener.go index 853508cf..d72ed6c6 100644 --- a/proxy/listener.go +++ b/proxy/listener.go @@ -4,11 +4,10 @@ import ( "fmt" "sync" + "github.com/Dreamacro/clash/config" C "github.com/Dreamacro/clash/constant" "github.com/Dreamacro/clash/proxy/http" "github.com/Dreamacro/clash/proxy/socks" - - log "github.com/sirupsen/logrus" ) var ( @@ -35,24 +34,24 @@ func (l *Listener) Info() (info C.General) { } } -func (l *Listener) Update(allowLan *bool, httpPort *int, socksPort *int) error { - if allowLan != nil { - l.allowLan = *allowLan +func (l *Listener) Update(base *config.Base) error { + if base.AllowLan != nil { + l.allowLan = *base.AllowLan } var socksErr, httpErr error - if allowLan != nil || httpPort != nil { + if base.AllowLan != nil || base.Port != nil { newHTTPPort := l.httpPort - if httpPort != nil { - newHTTPPort = *httpPort + if base.Port != nil { + newHTTPPort = *base.Port } httpErr = l.updateHTTP(newHTTPPort) } - if allowLan != nil || socksPort != nil { + if base.AllowLan != nil || base.SocketPort != nil { newSocksPort := l.socksPort - if socksPort != nil { - newSocksPort = *socksPort + if base.SocketPort != nil { + newSocksPort = *base.SocketPort } socksErr = l.updateSocks(newSocksPort) } @@ -112,29 +111,30 @@ func (l *Listener) genAddr(port int) string { return fmt.Sprintf("%s:%d", host, port) } -func (l *Listener) Run() error { - return l.Update(&l.allowLan, &l.httpPort, &l.socksPort) +func (l *Listener) process(signal chan<- struct{}) { + sub := config.Instance().Subscribe() + signal <- struct{}{} + for elm := range sub { + event := elm.(*config.Event) + if event.Type == "base" { + base := event.Payload.(config.Base) + l.Update(&base) + } + } +} + +// Run ensure config monitoring +func (l *Listener) Run() { + signal := make(chan struct{}) + go l.process(signal) + <-signal } func newListener() *Listener { - cfg, err := C.GetConfig() - if err != nil { - log.Fatalf("Read config error: %s", err.Error()) - } - - general := cfg.Section("General") - - port := general.Key("port").RangeInt(C.DefalutHTTPPort, 1, 65535) - socksPort := general.Key("socks-port").RangeInt(C.DefalutSOCKSPort, 1, 65535) - allowLan := general.Key("allow-lan").MustBool() - - return &Listener{ - httpPort: port, - socksPort: socksPort, - allowLan: allowLan, - } + return &Listener{} } +// Instance return singleton instance of Listener func Instance() *Listener { once.Do(func() { listener = newListener() diff --git a/proxy/socks/tcp.go b/proxy/socks/tcp.go index c6e86356..0fe78e29 100644 --- a/proxy/socks/tcp.go +++ b/proxy/socks/tcp.go @@ -1,10 +1,9 @@ package socks import ( - "io" "net" - "strconv" + "github.com/Dreamacro/clash/adapters/local" C "github.com/Dreamacro/clash/constant" "github.com/Dreamacro/clash/tunnel" @@ -13,7 +12,7 @@ import ( ) var ( - tun = tunnel.GetInstance() + tun = tunnel.Instance() ) func NewSocksProxy(addr string) (*C.ProxySignal, error) { @@ -60,59 +59,5 @@ func handleSocks(conn net.Conn) { return } conn.(*net.TCPConn).SetKeepAlive(true) - tun.Add(NewSocks(target, conn)) -} - -type SocksAdapter struct { - conn net.Conn - addr *C.Addr -} - -func (s *SocksAdapter) Close() { - s.conn.Close() -} - -func (s *SocksAdapter) Addr() *C.Addr { - return s.addr -} - -func (s *SocksAdapter) Connect(proxy C.ProxyAdapter) { - go io.Copy(s.conn, proxy.ReadWriter()) - io.Copy(proxy.ReadWriter(), s.conn) -} - -func parseSocksAddr(target socks.Addr) *C.Addr { - var host, port string - var ip net.IP - - switch target[0] { - case socks.AtypDomainName: - host = string(target[2 : 2+target[1]]) - port = strconv.Itoa((int(target[2+target[1]]) << 8) | int(target[2+target[1]+1])) - ipAddr, err := net.ResolveIPAddr("ip", host) - if err == nil { - ip = ipAddr.IP - } - case socks.AtypIPv4: - ip = net.IP(target[1 : 1+net.IPv4len]) - port = strconv.Itoa((int(target[1+net.IPv4len]) << 8) | int(target[1+net.IPv4len+1])) - case socks.AtypIPv6: - ip = net.IP(target[1 : 1+net.IPv6len]) - port = strconv.Itoa((int(target[1+net.IPv6len]) << 8) | int(target[1+net.IPv6len+1])) - } - - return &C.Addr{ - NetWork: C.TCP, - AddrType: int(target[0]), - Host: host, - IP: &ip, - Port: port, - } -} - -func NewSocks(target socks.Addr, conn net.Conn) *SocksAdapter { - return &SocksAdapter{ - conn: conn, - addr: parseSocksAddr(target), - } + tun.Add(adapters.NewSocks(target, conn)) } diff --git a/tunnel/connection.go b/tunnel/connection.go new file mode 100644 index 00000000..794e06b6 --- /dev/null +++ b/tunnel/connection.go @@ -0,0 +1,67 @@ +package tunnel + +import ( + "io" + "net" + "net/http" + "time" + + "github.com/Dreamacro/clash/adapters/local" + C "github.com/Dreamacro/clash/constant" +) + +func (t *Tunnel) handleHTTP(request *adapters.HttpAdapter, proxy C.ProxyAdapter) { + req := http.Transport{ + Dial: func(string, string) (net.Conn, error) { + conn := newTrafficTrack(proxy.Conn(), t.traffic) + return conn, nil + }, + // from http.DefaultTransport + MaxIdleConns: 100, + IdleConnTimeout: 90 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } + resp, err := req.RoundTrip(request.R) + if err != nil { + return + } + defer resp.Body.Close() + + header := request.W.Header() + for k, vv := range resp.Header { + for _, v := range vv { + header.Add(k, v) + } + } + request.W.WriteHeader(resp.StatusCode) + var writer io.Writer = request.W + if len(resp.TransferEncoding) > 0 && resp.TransferEncoding[0] == "chunked" { + writer = ChunkWriter{Writer: request.W} + } + io.Copy(writer, resp.Body) +} + +func (t *Tunnel) handleHTTPS(request *adapters.HttpsAdapter, proxy C.ProxyAdapter) { + conn := newTrafficTrack(proxy.Conn(), t.traffic) + go io.Copy(request.Conn(), conn) + io.Copy(conn, request.Conn()) +} + +func (t *Tunnel) handleSOCKS(request *adapters.SocksAdapter, proxy C.ProxyAdapter) { + conn := newTrafficTrack(proxy.Conn(), t.traffic) + go io.Copy(request.Conn(), conn) + io.Copy(conn, request.Conn()) +} + +// ChunkWriter is a writer wrapper and used when TransferEncoding is chunked +type ChunkWriter struct { + io.Writer +} + +func (cw ChunkWriter) Write(b []byte) (int, error) { + n, err := cw.Writer.Write(b) + if err == nil { + cw.Writer.(http.Flusher).Flush() + } + return n, err +} diff --git a/tunnel/log.go b/tunnel/log.go index 2021a354..f703d62c 100644 --- a/tunnel/log.go +++ b/tunnel/log.go @@ -3,47 +3,29 @@ package tunnel import ( "fmt" + C "github.com/Dreamacro/clash/constant" + log "github.com/sirupsen/logrus" ) -const ( - ERROR LogLevel = iota - WARNING - INFO - DEBUG -) - -type LogLevel int - type Log struct { - LogLevel LogLevel + LogLevel C.LogLevel Payload string } func (l *Log) Type() string { - switch l.LogLevel { - case INFO: - return "Info" - case WARNING: - return "Warning" - case ERROR: - return "Error" - case DEBUG: - return "Debug" - default: - return "Unknow" - } + return l.LogLevel.String() } func print(data Log) { switch data.LogLevel { - case INFO: + case C.INFO: log.Infoln(data.Payload) - case WARNING: + case C.WARNING: log.Warnln(data.Payload) - case ERROR: + case C.ERROR: log.Errorln(data.Payload) - case DEBUG: + case C.DEBUG: log.Debugln(data.Payload) } } @@ -55,11 +37,13 @@ func (t *Tunnel) subscribeLogs() { } for elm := range sub { data := elm.(Log) - print(data) + if data.LogLevel <= t.logLevel { + print(data) + } } } -func newLog(logLevel LogLevel, format string, v ...interface{}) Log { +func newLog(logLevel C.LogLevel, format string, v ...interface{}) Log { return Log{ LogLevel: logLevel, Payload: fmt.Sprintf(format, v...), diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index 16e43645..8d84a9d6 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -1,16 +1,14 @@ package tunnel import ( - "fmt" - "strconv" - "strings" "sync" "time" - "github.com/Dreamacro/clash/adapters" + LocalAdapter "github.com/Dreamacro/clash/adapters/local" + RemoteAdapter "github.com/Dreamacro/clash/adapters/remote" + cfg "github.com/Dreamacro/clash/config" C "github.com/Dreamacro/clash/constant" "github.com/Dreamacro/clash/observable" - R "github.com/Dreamacro/clash/rules" "gopkg.in/eapache/channels.v1" ) @@ -20,168 +18,61 @@ var ( once sync.Once ) +// Tunnel handle proxy socket and HTTP/SOCKS socket type Tunnel struct { queue *channels.InfiniteChannel rules []C.Rule proxies map[string]C.Proxy - observable *observable.Observable - logCh chan interface{} configLock *sync.RWMutex traffic *C.Traffic - mode Mode - selector *adapters.Selector + + // Outbound Rule + mode cfg.Mode + selector *RemoteAdapter.Selector + + // Log + logCh chan interface{} + observable *observable.Observable + logLevel C.LogLevel } +// Add request to queue func (t *Tunnel) Add(req C.ServerAdapter) { t.queue.In() <- req } +// Traffic return traffic of all connections func (t *Tunnel) Traffic() *C.Traffic { return t.traffic } -func (t *Tunnel) Config() ([]C.Rule, map[string]C.Proxy) { - return t.rules, t.proxies -} - +// Log return clash log stream func (t *Tunnel) Log() *observable.Observable { return t.observable } -func (t *Tunnel) SetMode(mode Mode) { - t.mode = mode -} - -func (t *Tunnel) GetMode() Mode { - return t.mode -} - -func (t *Tunnel) UpdateConfig() (err error) { - cfg, err := C.GetConfig() - if err != nil { - return - } - - // empty proxies and rules - proxies := make(map[string]C.Proxy) - rules := []C.Rule{} - - proxiesConfig := cfg.Section("Proxy") - rulesConfig := cfg.Section("Rule") - groupsConfig := cfg.Section("Proxy Group") - - // parse proxy - for _, key := range proxiesConfig.Keys() { - proxy := key.Strings(",") - if len(proxy) == 0 { - continue - } - switch proxy[0] { - // ss, server, port, cipter, password - case "ss": - if len(proxy) < 5 { - continue - } - ssURL := fmt.Sprintf("ss://%s:%s@%s:%s", proxy[3], proxy[4], proxy[1], proxy[2]) - ss, err := adapters.NewShadowSocks(key.Name(), ssURL, t.traffic) - if err != nil { - return err - } - proxies[key.Name()] = ss +func (t *Tunnel) configMonitor(signal chan<- struct{}) { + sub := cfg.Instance().Subscribe() + signal <- struct{}{} + for elm := range sub { + event := elm.(*cfg.Event) + switch event.Type { + case "proxies": + proxies := event.Payload.(map[string]C.Proxy) + t.configLock.Lock() + t.proxies = proxies + t.configLock.Unlock() + case "rules": + rules := event.Payload.([]C.Rule) + t.configLock.Lock() + t.rules = rules + t.configLock.Unlock() + case "mode": + t.mode = event.Payload.(cfg.Mode) + case "log-level": + t.logLevel = event.Payload.(C.LogLevel) } } - - // parse rules - for _, key := range rulesConfig.Keys() { - rule := strings.Split(key.Name(), ",") - if len(rule) < 3 { - continue - } - rule = trimArr(rule) - switch rule[0] { - case "DOMAIN-SUFFIX": - rules = append(rules, R.NewDomainSuffix(rule[1], rule[2])) - case "DOMAIN-KEYWORD": - rules = append(rules, R.NewDomainKeyword(rule[1], rule[2])) - case "GEOIP": - rules = append(rules, R.NewGEOIP(rule[1], rule[2])) - case "IP-CIDR", "IP-CIDR6": - rules = append(rules, R.NewIPCIDR(rule[1], rule[2])) - case "FINAL": - rules = append(rules, R.NewFinal(rule[2])) - } - } - - // parse proxy groups - for _, key := range groupsConfig.Keys() { - rule := strings.Split(key.Value(), ",") - rule = trimArr(rule) - switch rule[0] { - case "url-test": - if len(rule) < 4 { - return fmt.Errorf("URLTest need more than 4 param") - } - proxyNames := rule[1 : len(rule)-2] - delay, _ := strconv.Atoi(rule[len(rule)-1]) - url := rule[len(rule)-2] - var ps []C.Proxy - for _, name := range proxyNames { - if p, ok := proxies[name]; ok { - ps = append(ps, p) - } - } - - adapter, err := adapters.NewURLTest(key.Name(), ps, url, time.Duration(delay)*time.Second) - if err != nil { - return fmt.Errorf("Config error: %s", err.Error()) - } - proxies[key.Name()] = adapter - case "select": - if len(rule) < 3 { - return fmt.Errorf("Selector need more than 3 param") - } - proxyNames := rule[1:] - selectProxy := make(map[string]C.Proxy) - for _, name := range proxyNames { - proxy, exist := proxies[name] - if !exist { - return fmt.Errorf("Proxy %s not exist", name) - } - selectProxy[name] = proxy - } - selector, err := adapters.NewSelector(key.Name(), selectProxy) - if err != nil { - return fmt.Errorf("Selector create error: %s", err.Error()) - } - proxies[key.Name()] = selector - } - } - - // init proxy - proxies["DIRECT"] = adapters.NewDirect(t.traffic) - proxies["REJECT"] = adapters.NewReject() - - t.configLock.Lock() - defer t.configLock.Unlock() - - // stop url-test - for _, elm := range t.proxies { - urlTest, ok := elm.(*adapters.URLTest) - if ok { - urlTest.Close() - } - } - - s, err := adapters.NewSelector("Proxy", proxies) - if err != nil { - return err - } - - t.proxies = proxies - t.rules = rules - t.selector = s - - return nil } func (t *Tunnel) process() { @@ -199,9 +90,9 @@ func (t *Tunnel) handleConn(localConn C.ServerAdapter) { var proxy C.Proxy switch t.mode { - case Direct: + case cfg.Direct: proxy = t.proxies["DIRECT"] - case Global: + case cfg.Global: proxy = t.selector // Rule default: @@ -209,12 +100,22 @@ func (t *Tunnel) handleConn(localConn C.ServerAdapter) { } remoConn, err := proxy.Generator(addr) if err != nil { - t.logCh <- newLog(WARNING, "Proxy connect error: %s", err.Error()) + t.logCh <- newLog(C.WARNING, "Proxy connect error: %s", err.Error()) return } defer remoConn.Close() - localConn.Connect(remoConn) + switch adapter := localConn.(type) { + case *LocalAdapter.HttpAdapter: + t.handleHTTP(adapter, remoConn) + break + case *LocalAdapter.HttpsAdapter: + t.handleHTTPS(adapter, remoConn) + break + case *LocalAdapter.SocksAdapter: + t.handleSOCKS(adapter, remoConn) + break + } } func (t *Tunnel) match(addr *C.Addr) C.Proxy { @@ -227,31 +128,39 @@ func (t *Tunnel) match(addr *C.Addr) C.Proxy { if !ok { continue } - t.logCh <- newLog(INFO, "%v match %s using %s", addr.String(), rule.RuleType().String(), rule.Adapter()) + t.logCh <- newLog(C.INFO, "%v match %s using %s", addr.String(), rule.RuleType().String(), rule.Adapter()) return a } } - t.logCh <- newLog(INFO, "%v doesn't match any rule using DIRECT", addr.String()) + t.logCh <- newLog(C.INFO, "%v doesn't match any rule using DIRECT", addr.String()) return t.proxies["DIRECT"] } +// Run initial task +func (t *Tunnel) Run() { + go t.process() + go t.subscribeLogs() + signal := make(chan struct{}) + go t.configMonitor(signal) + <-signal +} + func newTunnel() *Tunnel { logCh := make(chan interface{}) - tunnel := &Tunnel{ + return &Tunnel{ queue: channels.NewInfiniteChannel(), proxies: make(map[string]C.Proxy), observable: observable.NewObservable(logCh), logCh: logCh, configLock: &sync.RWMutex{}, traffic: C.NewTraffic(time.Second), - mode: Rule, + mode: cfg.Rule, + logLevel: C.INFO, } - go tunnel.process() - go tunnel.subscribeLogs() - return tunnel } -func GetInstance() *Tunnel { +// Instance return singleton instance of Tunnel +func Instance() *Tunnel { once.Do(func() { tunnel = newTunnel() }) diff --git a/adapters/util.go b/tunnel/util.go similarity index 78% rename from adapters/util.go rename to tunnel/util.go index 1f41c548..15c4d3da 100644 --- a/adapters/util.go +++ b/tunnel/util.go @@ -1,4 +1,4 @@ -package adapters +package tunnel import ( "net" @@ -6,6 +6,7 @@ import ( C "github.com/Dreamacro/clash/constant" ) +// TrafficTrack record traffic of net.Conn type TrafficTrack struct { net.Conn traffic *C.Traffic @@ -23,6 +24,6 @@ func (tt *TrafficTrack) Write(b []byte) (int, error) { return n, err } -func NewTrafficTrack(conn net.Conn, traffic *C.Traffic) *TrafficTrack { +func newTrafficTrack(conn net.Conn, traffic *C.Traffic) *TrafficTrack { return &TrafficTrack{traffic: traffic, Conn: conn} }