refactor: 重构StickySessions
This commit is contained in:
parent
125f35e426
commit
c260affd46
2 changed files with 27 additions and 42 deletions
|
@ -5,7 +5,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"github.com/Dreamacro/clash/common/cache"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -60,6 +60,16 @@ func getKey(metadata *C.Metadata) string {
|
||||||
return metadata.DstIP.String()
|
return metadata.DstIP.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getKeyWithSrcAndDst(metadata *C.Metadata) string {
|
||||||
|
dst := getKey(metadata)
|
||||||
|
src := ""
|
||||||
|
if metadata != nil {
|
||||||
|
src = metadata.SrcIP.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("%s%s", src, dst)
|
||||||
|
}
|
||||||
|
|
||||||
func jumpHash(key uint64, buckets int32) int32 {
|
func jumpHash(key uint64, buckets int32) int32 {
|
||||||
var b, j int64
|
var b, j int64
|
||||||
|
|
||||||
|
@ -140,53 +150,31 @@ func strategyConsistentHashing() strategyFn {
|
||||||
}
|
}
|
||||||
|
|
||||||
func strategyStickySessions() strategyFn {
|
func strategyStickySessions() strategyFn {
|
||||||
timeout := int64(600)
|
ttl := time.Minute * 10
|
||||||
type Session struct {
|
|
||||||
idx int
|
c := cache.New[uint64, int](1 * time.Second)
|
||||||
time time.Time
|
|
||||||
}
|
|
||||||
Sessions := make(map[string]map[string]Session)
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
time.Sleep(time.Second * 60)
|
|
||||||
now := time.Now().Unix()
|
|
||||||
for _, subMap := range Sessions {
|
|
||||||
for dest, session := range subMap {
|
|
||||||
if now-session.time.Unix() > timeout {
|
|
||||||
delete(subMap, dest)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return func(proxies []C.Proxy, metadata *C.Metadata) C.Proxy {
|
return func(proxies []C.Proxy, metadata *C.Metadata) C.Proxy {
|
||||||
src := metadata.SrcIP.String()
|
key := uint64(murmur3.Sum32([]byte(getKeyWithSrcAndDst(metadata))))
|
||||||
dest := getKey(metadata)
|
|
||||||
now := time.Now()
|
|
||||||
length := len(proxies)
|
length := len(proxies)
|
||||||
if Sessions[src] == nil {
|
idx, expireTime := c.GetWithExpire(key)
|
||||||
Sessions[src] = make(map[string]Session)
|
if expireTime.IsZero() {
|
||||||
|
idx = int(jumpHash(key+uint64(time.Now().UnixMilli()), int32(length)))
|
||||||
}
|
}
|
||||||
session, ok := Sessions[src][dest]
|
|
||||||
if !ok || now.Unix()-session.time.Unix() > timeout {
|
|
||||||
session.idx = rand.Intn(length)
|
|
||||||
}
|
|
||||||
session.time = now
|
|
||||||
|
|
||||||
session.idx = 0
|
|
||||||
var res = proxies[0]
|
|
||||||
for i := 0; i < length; i++ {
|
for i := 0; i < length; i++ {
|
||||||
idx := (session.idx + i) % length
|
nowIdx := (idx + 1) % length
|
||||||
proxy := proxies[idx]
|
proxy := proxies[nowIdx]
|
||||||
if proxy.Alive() {
|
if proxy.Alive() {
|
||||||
session.idx = idx
|
if nowIdx != idx {
|
||||||
res = proxy
|
c.Put(key, idx, -1)
|
||||||
break
|
c.Put(key, nowIdx, ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
return proxy
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Sessions[src][dest] = session
|
return proxies[0]
|
||||||
return res
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -89,9 +89,6 @@ type Metadata struct {
|
||||||
RemoteDst string `json:"remoteDestination"`
|
RemoteDst string `json:"remoteDestination"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// avoid stack overflow
|
|
||||||
type jsonMetadata Metadata
|
|
||||||
|
|
||||||
func (m *Metadata) RemoteAddress() string {
|
func (m *Metadata) RemoteAddress() string {
|
||||||
return net.JoinHostPort(m.String(), m.DstPort)
|
return net.JoinHostPort(m.String(), m.DstPort)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue