diff --git a/common/observable/observable_test.go b/common/observable/observable_test.go index 67e29341..6dd6ee42 100644 --- a/common/observable/observable_test.go +++ b/common/observable/observable_test.go @@ -113,3 +113,34 @@ func TestObservable_SubscribeGoroutineLeak(t *testing.T) { _, more := <-list[0] assert.False(t, more) } + +func Benchmark_Observable_1000(b *testing.B) { + ch := make(chan interface{}) + o := NewObservable(ch) + num := 1000 + + subs := []Subscription{} + for i := 0; i < num; i++ { + sub, _ := o.Subscribe() + subs = append(subs, sub) + } + + wg := sync.WaitGroup{} + wg.Add(num) + + b.ResetTimer() + for _, sub := range subs { + go func(s Subscription) { + for range s { + } + wg.Done() + }(sub) + } + + for i := 0; i < b.N; i++ { + ch <- i + } + + close(ch) + wg.Wait() +} diff --git a/common/observable/subscriber.go b/common/observable/subscriber.go index 3fb1e587..cb2a70f4 100644 --- a/common/observable/subscriber.go +++ b/common/observable/subscriber.go @@ -2,34 +2,32 @@ package observable import ( "sync" - - "gopkg.in/eapache/channels.v1" ) type Subscription <-chan interface{} type Subscriber struct { - buffer *channels.InfiniteChannel + buffer chan interface{} once sync.Once } func (s *Subscriber) Emit(item interface{}) { - s.buffer.In() <- item + s.buffer <- item } func (s *Subscriber) Out() Subscription { - return s.buffer.Out() + return s.buffer } func (s *Subscriber) Close() { s.once.Do(func() { - s.buffer.Close() + close(s.buffer) }) } func newSubscriber() *Subscriber { sub := &Subscriber{ - buffer: channels.NewInfiniteChannel(), + buffer: make(chan interface{}, 200), } return sub } diff --git a/go.mod b/go.mod index 3a3c67f7..0b62577d 100644 --- a/go.mod +++ b/go.mod @@ -1,23 +1,21 @@ module github.com/Dreamacro/clash -go 1.14 +go 1.15 require ( github.com/Dreamacro/go-shadowsocks2 v0.1.6 - github.com/eapache/queue v1.1.0 // indirect github.com/go-chi/chi v4.1.2+incompatible github.com/go-chi/cors v1.1.1 github.com/go-chi/render v1.0.1 github.com/gofrs/uuid v3.3.0+incompatible github.com/gorilla/websocket v1.4.2 - github.com/miekg/dns v1.1.32 + github.com/miekg/dns v1.1.34 github.com/oschwald/geoip2-golang v1.4.0 github.com/sirupsen/logrus v1.7.0 github.com/stretchr/testify v1.6.1 - golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 - golang.org/x/net v0.0.0-20201010224723-4f7140c49acb + golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 + golang.org/x/net v0.0.0-20201020065357-d65d470038a5 golang.org/x/sync v0.0.0-20201008141435-b3e1573b7520 - golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634 - gopkg.in/eapache/channels.v1 v1.1.0 + golang.org/x/sys v0.0.0-20201018230417-eeed37f84f13 gopkg.in/yaml.v2 v2.3.0 ) diff --git a/go.sum b/go.sum index 5c7408a4..1495863c 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,6 @@ github.com/Dreamacro/go-shadowsocks2 v0.1.6/go.mod h1:LSXCjyHesPY3pLjhwff1mQX72I github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= -github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyNz34tQRec= github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= github.com/go-chi/cors v1.1.1 h1:eHuqxsIw89iXcWnWUN8R72JMibABJTN/4IOYI5WERvw= @@ -15,8 +13,8 @@ github.com/gofrs/uuid v3.3.0+incompatible h1:8K4tyRfvU1CYPgJsveYFQMhpFd/wXNM7iK6 github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/miekg/dns v1.1.32 h1:MDaYYzWOYscpvDOEgPMT1c1mebCZmIdxZI/J161OdJU= -github.com/miekg/dns v1.1.32/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= +github.com/miekg/dns v1.1.34 h1:SgTzfkN+oLoIHF1bgUP+C71mzuDl3AhLApHzCCIAMWM= +github.com/miekg/dns v1.1.34/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= github.com/oschwald/geoip2-golang v1.4.0 h1:5RlrjCgRyIGDz/mBmPfnAF4h8k0IAcRv9PvrpOfz+Ug= github.com/oschwald/geoip2-golang v1.4.0/go.mod h1:8QwxJvRImBH+Zl6Aa6MaIcs5YdlZSTKtzmPGzQqi9ng= github.com/oschwald/maxminddb-golang v1.6.0 h1:KAJSjdHQ8Kv45nFIbtoLGrGWqHFajOIm7skTyz/+Dls= @@ -34,14 +32,14 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE= -golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNmw2api+jEfxLoykJVice/E= +golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20201010224723-4f7140c49acb h1:mUVeFHoDKis5nxCAzoAi7E8Ghb86EXh/RK6wtvJIqRY= -golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201020065357-d65d470038a5 h1:KrxvpY64uUzANd9wKWr6ZAsufiii93XnvXaeikyCJ2g= +golang.org/x/net v0.0.0-20201020065357-d65d470038a5/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201008141435-b3e1573b7520 h1:Bx6FllMpG4NWDOfhMBz1VR2QYNp/SAOHPIAsaVmxfPo= golang.org/x/sync v0.0.0-20201008141435-b3e1573b7520/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -51,8 +49,8 @@ golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191224085550-c709ea063b76/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634 h1:bNEHhJCnrwMKNMmOx3yAynp5vs5/gRy+XWFtZFu7NBM= -golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201018230417-eeed37f84f13 h1:5jaG59Zhd+8ZXe8C+lgiAGqkOaZBruqrWclLkgAww34= +golang.org/x/sys v0.0.0-20201018230417-eeed37f84f13/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= @@ -62,8 +60,6 @@ golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapK golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/eapache/channels.v1 v1.1.0 h1:5bGAyKKvyCTWjSj7mhefG6Lc68VyN4MH1v8/7OoeeB4= -gopkg.in/eapache/channels.v1 v1.1.0/go.mod h1:BHIBujSvu9yMTrTYbTCjDD43gUhtmaOtTWDe7sTv1js= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/tunnel/tunnel.go b/tunnel/tunnel.go index c645ce08..19d9f8b2 100644 --- a/tunnel/tunnel.go +++ b/tunnel/tunnel.go @@ -13,13 +13,11 @@ import ( "github.com/Dreamacro/clash/component/resolver" C "github.com/Dreamacro/clash/constant" "github.com/Dreamacro/clash/log" - - channels "gopkg.in/eapache/channels.v1" ) var ( - tcpQueue = channels.NewInfiniteChannel() - udpQueue = channels.NewInfiniteChannel() + tcpQueue = make(chan C.ServerAdapter, 200) + udpQueue = make(chan *inbound.PacketAdapter, 200) natTable = nat.New() rules []C.Rule proxies = make(map[string]C.Proxy) @@ -39,12 +37,12 @@ func init() { // Add request to queue func Add(req C.ServerAdapter) { - tcpQueue.In() <- req + tcpQueue <- req } // AddPacket add udp Packet to queue func AddPacket(packet *inbound.PacketAdapter) { - udpQueue.In() <- packet + udpQueue <- packet } // Rules return all rules @@ -89,9 +87,8 @@ func SetMode(m TunnelMode) { // processUDP starts a loop to handle udp packet func processUDP() { - queue := udpQueue.Out() - for elm := range queue { - conn := elm.(*inbound.PacketAdapter) + queue := udpQueue + for conn := range queue { handleUDPConn(conn) } } @@ -105,9 +102,8 @@ func process() { go processUDP() } - queue := tcpQueue.Out() - for elm := range queue { - conn := elm.(C.ServerAdapter) + queue := tcpQueue + for conn := range queue { go handleTCPConn(conn) } }