Improve: provider can be auto GC

This commit is contained in:
gVisor bot 2020-04-26 22:38:15 +08:00
parent 6902880db9
commit 3542b3e3b1
4 changed files with 221 additions and 170 deletions

View file

@ -0,0 +1,154 @@
package provider
import (
"bytes"
"crypto/md5"
"io/ioutil"
"os"
"time"
"github.com/Dreamacro/clash/log"
)
var (
fileMode os.FileMode = 0666
)
type parser = func([]byte) (interface{}, error)
type fetcher struct {
name string
vehicle Vehicle
updatedAt *time.Time
ticker *time.Ticker
hash [16]byte
parser parser
onUpdate func(interface{})
}
func (f *fetcher) Name() string {
return f.name
}
func (f *fetcher) VehicleType() VehicleType {
return f.vehicle.Type()
}
func (f *fetcher) Initial() (interface{}, error) {
var buf []byte
var err error
var isLocal bool
if stat, err := os.Stat(f.vehicle.Path()); err == nil {
buf, err = ioutil.ReadFile(f.vehicle.Path())
modTime := stat.ModTime()
f.updatedAt = &modTime
isLocal = true
} else {
buf, err = f.vehicle.Read()
}
if err != nil {
return nil, err
}
proxies, err := f.parser(buf)
if err != nil {
if !isLocal {
return nil, err
}
// parse local file error, fallback to remote
buf, err = f.vehicle.Read()
if err != nil {
return nil, err
}
proxies, err = f.parser(buf)
if err != nil {
return nil, err
}
}
if err := ioutil.WriteFile(f.vehicle.Path(), buf, fileMode); err != nil {
return nil, err
}
f.hash = md5.Sum(buf)
// pull proxies automatically
if f.ticker != nil {
go f.pullLoop()
}
return proxies, nil
}
func (f *fetcher) Update() (interface{}, bool, error) {
buf, err := f.vehicle.Read()
if err != nil {
return nil, false, err
}
now := time.Now()
hash := md5.Sum(buf)
if bytes.Equal(f.hash[:], hash[:]) {
f.updatedAt = &now
return nil, true, nil
}
proxies, err := f.parser(buf)
if err != nil {
return nil, false, err
}
if err := ioutil.WriteFile(f.vehicle.Path(), buf, fileMode); err != nil {
return nil, false, err
}
f.updatedAt = &now
f.hash = hash
return proxies, false, nil
}
func (f *fetcher) Destroy() error {
if f.ticker != nil {
f.ticker.Stop()
}
return nil
}
func (f *fetcher) pullLoop() {
for range f.ticker.C {
elm, same, err := f.Update()
if err != nil {
log.Warnln("[Provider] %s pull error: %s", f.Name(), err.Error())
continue
}
if same {
log.Debugln("[Provider] %s's proxies doesn't change", f.Name())
continue
}
log.Infoln("[Provider] %s's proxies update", f.Name())
if f.onUpdate != nil {
f.onUpdate(elm)
}
}
}
func newFetcher(name string, interval time.Duration, vehicle Vehicle, parser parser, onUpdate func(interface{})) *fetcher {
var ticker *time.Ticker
if interval != 0 {
ticker = time.NewTicker(interval)
}
return &fetcher{
name: name,
ticker: ticker,
vehicle: vehicle,
parser: parser,
onUpdate: onUpdate,
}
}

View file

@ -1,26 +1,20 @@
package provider
import (
"bytes"
"crypto/md5"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"runtime"
"time"
"github.com/Dreamacro/clash/adapters/outbound"
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/log"
"gopkg.in/yaml.v2"
)
const (
ReservedName = "default"
fileMode = 0666
)
// Provider Type
@ -49,8 +43,7 @@ type Provider interface {
VehicleType() VehicleType
Type() ProviderType
Initial() error
Reload() error
Destroy() error
Update() error
}
// ProxyProvider interface
@ -58,24 +51,24 @@ type ProxyProvider interface {
Provider
Proxies() []C.Proxy
HealthCheck()
Update() error
}
type ProxySchema struct {
Proxies []map[string]interface{} `yaml:"proxies"`
}
// for auto gc
type ProxySetProvider struct {
name string
vehicle Vehicle
hash [16]byte
proxies []C.Proxy
healthCheck *HealthCheck
ticker *time.Ticker
updatedAt *time.Time
*proxySetProvider
}
func (pp *ProxySetProvider) MarshalJSON() ([]byte, error) {
type proxySetProvider struct {
*fetcher
proxies []C.Proxy
healthCheck *HealthCheck
}
func (pp *proxySetProvider) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]interface{}{
"name": pp.Name(),
"type": pp.Type().String(),
@ -85,134 +78,41 @@ func (pp *ProxySetProvider) MarshalJSON() ([]byte, error) {
})
}
func (pp *ProxySetProvider) Name() string {
func (pp *proxySetProvider) Name() string {
return pp.name
}
func (pp *ProxySetProvider) Reload() error {
return nil
}
func (pp *ProxySetProvider) HealthCheck() {
func (pp *proxySetProvider) HealthCheck() {
pp.healthCheck.check()
}
func (pp *ProxySetProvider) Update() error {
return pp.pull()
func (pp *proxySetProvider) Update() error {
elm, same, err := pp.fetcher.Update()
if err == nil && !same {
pp.onUpdate(elm)
}
return err
}
func (pp *ProxySetProvider) Destroy() error {
pp.healthCheck.close()
if pp.ticker != nil {
pp.ticker.Stop()
}
return nil
}
func (pp *ProxySetProvider) Initial() error {
var buf []byte
var err error
var isLocal bool
if stat, err := os.Stat(pp.vehicle.Path()); err == nil {
buf, err = ioutil.ReadFile(pp.vehicle.Path())
modTime := stat.ModTime()
pp.updatedAt = &modTime
isLocal = true
} else {
buf, err = pp.vehicle.Read()
}
func (pp *proxySetProvider) Initial() error {
elm, err := pp.fetcher.Initial()
if err != nil {
return err
}
proxies, err := pp.parse(buf)
if err != nil {
if !isLocal {
return err
}
// parse local file error, fallback to remote
buf, err = pp.vehicle.Read()
if err != nil {
return err
}
proxies, err = pp.parse(buf)
if err != nil {
return err
}
}
if err := ioutil.WriteFile(pp.vehicle.Path(), buf, fileMode); err != nil {
return err
}
pp.hash = md5.Sum(buf)
pp.setProxies(proxies)
// pull proxies automatically
if pp.ticker != nil {
go pp.pullLoop()
}
pp.onUpdate(elm)
return nil
}
func (pp *ProxySetProvider) VehicleType() VehicleType {
return pp.vehicle.Type()
}
func (pp *ProxySetProvider) Type() ProviderType {
func (pp *proxySetProvider) Type() ProviderType {
return Proxy
}
func (pp *ProxySetProvider) Proxies() []C.Proxy {
func (pp *proxySetProvider) Proxies() []C.Proxy {
return pp.proxies
}
func (pp *ProxySetProvider) pullLoop() {
for range pp.ticker.C {
if err := pp.pull(); err != nil {
log.Warnln("[Provider] %s pull error: %s", pp.Name(), err.Error())
}
}
}
func (pp *ProxySetProvider) pull() error {
buf, err := pp.vehicle.Read()
if err != nil {
return err
}
now := time.Now()
hash := md5.Sum(buf)
if bytes.Equal(pp.hash[:], hash[:]) {
log.Debugln("[Provider] %s's proxies doesn't change", pp.Name())
pp.updatedAt = &now
return nil
}
proxies, err := pp.parse(buf)
if err != nil {
return err
}
log.Infoln("[Provider] %s's proxies update", pp.Name())
if err := ioutil.WriteFile(pp.vehicle.Path(), buf, fileMode); err != nil {
return err
}
pp.updatedAt = &now
pp.hash = hash
pp.setProxies(proxies)
return nil
}
func (pp *ProxySetProvider) parse(buf []byte) ([]C.Proxy, error) {
func proxiesParse(buf []byte) (interface{}, error) {
schema := &ProxySchema{}
if err := yaml.Unmarshal(buf, schema); err != nil {
@ -239,38 +139,52 @@ func (pp *ProxySetProvider) parse(buf []byte) ([]C.Proxy, error) {
return proxies, nil
}
func (pp *ProxySetProvider) setProxies(proxies []C.Proxy) {
func (pp *proxySetProvider) setProxies(proxies []C.Proxy) {
pp.proxies = proxies
pp.healthCheck.setProxy(proxies)
go pp.healthCheck.check()
}
func NewProxySetProvider(name string, interval time.Duration, vehicle Vehicle, hc *HealthCheck) *ProxySetProvider {
var ticker *time.Ticker
if interval != 0 {
ticker = time.NewTicker(interval)
}
func stopProxyProvider(pd *ProxySetProvider) {
pd.healthCheck.close()
pd.fetcher.Destroy()
}
func NewProxySetProvider(name string, interval time.Duration, vehicle Vehicle, hc *HealthCheck) *ProxySetProvider {
if hc.auto() {
go hc.process()
}
return &ProxySetProvider{
name: name,
vehicle: vehicle,
pd := &proxySetProvider{
proxies: []C.Proxy{},
healthCheck: hc,
ticker: ticker,
}
onUpdate := func(elm interface{}) {
ret := elm.([]C.Proxy)
pd.setProxies(ret)
}
fetcher := newFetcher(name, interval, vehicle, proxiesParse, onUpdate)
pd.fetcher = fetcher
wrapper := &ProxySetProvider{pd}
runtime.SetFinalizer(wrapper, stopProxyProvider)
return wrapper
}
// for auto gc
type CompatibleProvider struct {
*compatibleProvider
}
type compatibleProvider struct {
name string
healthCheck *HealthCheck
proxies []C.Proxy
}
func (cp *CompatibleProvider) MarshalJSON() ([]byte, error) {
func (cp *compatibleProvider) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]interface{}{
"name": cp.Name(),
"type": cp.Type().String(),
@ -279,43 +193,38 @@ func (cp *CompatibleProvider) MarshalJSON() ([]byte, error) {
})
}
func (cp *CompatibleProvider) Name() string {
func (cp *compatibleProvider) Name() string {
return cp.name
}
func (cp *CompatibleProvider) Reload() error {
return nil
}
func (cp *CompatibleProvider) Destroy() error {
cp.healthCheck.close()
return nil
}
func (cp *CompatibleProvider) HealthCheck() {
func (cp *compatibleProvider) HealthCheck() {
cp.healthCheck.check()
}
func (cp *CompatibleProvider) Update() error {
func (cp *compatibleProvider) Update() error {
return nil
}
func (cp *CompatibleProvider) Initial() error {
func (cp *compatibleProvider) Initial() error {
return nil
}
func (cp *CompatibleProvider) VehicleType() VehicleType {
func (cp *compatibleProvider) VehicleType() VehicleType {
return Compatible
}
func (cp *CompatibleProvider) Type() ProviderType {
func (cp *compatibleProvider) Type() ProviderType {
return Proxy
}
func (cp *CompatibleProvider) Proxies() []C.Proxy {
func (cp *compatibleProvider) Proxies() []C.Proxy {
return cp.proxies
}
func stopCompatibleProvider(pd *CompatibleProvider) {
pd.healthCheck.close()
}
func NewCompatibleProvider(name string, proxies []C.Proxy, hc *HealthCheck) (*CompatibleProvider, error) {
if len(proxies) == 0 {
return nil, errors.New("Provider need one proxy at least")
@ -325,9 +234,13 @@ func NewCompatibleProvider(name string, proxies []C.Proxy, hc *HealthCheck) (*Co
go hc.process()
}
return &CompatibleProvider{
pd := &compatibleProvider{
name: name,
proxies: proxies,
healthCheck: hc,
}, nil
}
wrapper := &CompatibleProvider{pd}
runtime.SetFinalizer(wrapper, stopCompatibleProvider)
return wrapper, nil
}

View file

@ -268,15 +268,6 @@ func parseProxies(cfg *RawConfig) (proxies map[string]C.Proxy, providersMap map[
providersConfig = cfg.ProxyProviderOld
}
defer func() {
// Destroy already created provider when err != nil
if err != nil {
for _, provider := range providersMap {
provider.Destroy()
}
}
}()
proxies["DIRECT"] = outbound.NewProxy(outbound.NewDirect())
proxies["REJECT"] = outbound.NewProxy(outbound.NewReject())
proxyList = append(proxyList, "DIRECT", "REJECT")

View file

@ -158,13 +158,6 @@ func updateHosts(tree *trie.Trie) {
}
func updateProxies(proxies map[string]C.Proxy, providers map[string]provider.ProxyProvider) {
oldProviders := tunnel.Providers()
// close providers goroutine
for _, provider := range oldProviders {
provider.Destroy()
}
tunnel.UpdateProxies(proxies, providers)
}