chore: using xsync.MapOf replace sync.Map
This commit is contained in:
parent
aab21720b5
commit
d0d576bdee
14 changed files with 110 additions and 86 deletions
|
@ -10,7 +10,6 @@ import (
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Dreamacro/clash/common/atomic"
|
"github.com/Dreamacro/clash/common/atomic"
|
||||||
|
@ -19,6 +18,8 @@ import (
|
||||||
"github.com/Dreamacro/clash/component/dialer"
|
"github.com/Dreamacro/clash/component/dialer"
|
||||||
C "github.com/Dreamacro/clash/constant"
|
C "github.com/Dreamacro/clash/constant"
|
||||||
"github.com/Dreamacro/clash/log"
|
"github.com/Dreamacro/clash/log"
|
||||||
|
|
||||||
|
"github.com/puzpuzpuz/xsync/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
var UnifiedDelay = atomic.NewBool(false)
|
var UnifiedDelay = atomic.NewBool(false)
|
||||||
|
@ -37,7 +38,7 @@ type Proxy struct {
|
||||||
history *queue.Queue[C.DelayHistory]
|
history *queue.Queue[C.DelayHistory]
|
||||||
alive *atomic.Bool
|
alive *atomic.Bool
|
||||||
url string
|
url string
|
||||||
extra sync.Map
|
extra *xsync.MapOf[string, *extraProxyState]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Alive implements C.Proxy
|
// Alive implements C.Proxy
|
||||||
|
@ -48,7 +49,7 @@ func (p *Proxy) Alive() bool {
|
||||||
// AliveForTestUrl implements C.Proxy
|
// AliveForTestUrl implements C.Proxy
|
||||||
func (p *Proxy) AliveForTestUrl(url string) bool {
|
func (p *Proxy) AliveForTestUrl(url string) bool {
|
||||||
if state, ok := p.extra.Load(url); ok {
|
if state, ok := p.extra.Load(url); ok {
|
||||||
return state.(*extraProxyState).alive.Load()
|
return state.alive.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.alive.Load()
|
return p.alive.Load()
|
||||||
|
@ -96,7 +97,7 @@ func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory {
|
||||||
var queueM []C.DelayHistory
|
var queueM []C.DelayHistory
|
||||||
|
|
||||||
if state, ok := p.extra.Load(url); ok {
|
if state, ok := p.extra.Load(url); ok {
|
||||||
queueM = state.(*extraProxyState).history.Copy()
|
queueM = state.history.Copy()
|
||||||
}
|
}
|
||||||
|
|
||||||
if queueM == nil {
|
if queueM == nil {
|
||||||
|
@ -113,10 +114,10 @@ func (p *Proxy) DelayHistoryForTestUrl(url string) []C.DelayHistory {
|
||||||
func (p *Proxy) ExtraDelayHistory() map[string][]C.DelayHistory {
|
func (p *Proxy) ExtraDelayHistory() map[string][]C.DelayHistory {
|
||||||
extraHistory := map[string][]C.DelayHistory{}
|
extraHistory := map[string][]C.DelayHistory{}
|
||||||
|
|
||||||
p.extra.Range(func(k, v interface{}) bool {
|
p.extra.Range(func(k string, v *extraProxyState) bool {
|
||||||
|
|
||||||
testUrl := k.(string)
|
testUrl := k
|
||||||
state := v.(*extraProxyState)
|
state := v
|
||||||
|
|
||||||
histories := []C.DelayHistory{}
|
histories := []C.DelayHistory{}
|
||||||
queueM := state.history.Copy()
|
queueM := state.history.Copy()
|
||||||
|
@ -155,8 +156,8 @@ func (p *Proxy) LastDelayForTestUrl(url string) (delay uint16) {
|
||||||
history := p.history.Last()
|
history := p.history.Last()
|
||||||
|
|
||||||
if state, ok := p.extra.Load(url); ok {
|
if state, ok := p.extra.Load(url); ok {
|
||||||
alive = state.(*extraProxyState).alive.Load()
|
alive = state.alive.Load()
|
||||||
history = state.(*extraProxyState).history.Last()
|
history = state.history.Last()
|
||||||
}
|
}
|
||||||
|
|
||||||
if !alive {
|
if !alive {
|
||||||
|
@ -226,10 +227,10 @@ func (p *Proxy) URLTest(ctx context.Context, url string, expectedStatus utils.In
|
||||||
p.extra.Store(url, state)
|
p.extra.Store(url, state)
|
||||||
}
|
}
|
||||||
|
|
||||||
state.(*extraProxyState).alive.Store(alive)
|
state.alive.Store(alive)
|
||||||
state.(*extraProxyState).history.Put(record)
|
state.history.Put(record)
|
||||||
if state.(*extraProxyState).history.Len() > defaultHistoriesNum {
|
if state.history.Len() > defaultHistoriesNum {
|
||||||
state.(*extraProxyState).history.Pop()
|
state.history.Pop()
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
log.Debugln("health check result will be discarded, url: %s alive: %t, delay: %d", url, alive, t)
|
log.Debugln("health check result will be discarded, url: %s alive: %t, delay: %d", url, alive, t)
|
||||||
|
@ -311,7 +312,7 @@ func NewProxy(adapter C.ProxyAdapter) *Proxy {
|
||||||
history: queue.New[C.DelayHistory](defaultHistoriesNum),
|
history: queue.New[C.DelayHistory](defaultHistoriesNum),
|
||||||
alive: atomic.NewBool(true),
|
alive: atomic.NewBool(true),
|
||||||
url: "",
|
url: "",
|
||||||
extra: sync.Map{}}
|
extra: xsync.NewMapOf[*extraProxyState]()}
|
||||||
}
|
}
|
||||||
|
|
||||||
func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
|
func urlToMetadata(rawURL string) (addr C.Metadata, err error) {
|
||||||
|
@ -355,7 +356,7 @@ func (p *Proxy) determineFinalStoreType(store C.DelayHistoryStoreType, url strin
|
||||||
}
|
}
|
||||||
|
|
||||||
length := 0
|
length := 0
|
||||||
p.extra.Range(func(_, _ interface{}) bool {
|
p.extra.Range(func(_ string, _ *extraProxyState) bool {
|
||||||
length++
|
length++
|
||||||
return length < 2*C.DefaultMaxHealthCheckUrlNum
|
return length < 2*C.DefaultMaxHealthCheckUrlNum
|
||||||
})
|
})
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
package auth
|
package auth
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"github.com/puzpuzpuz/xsync/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Authenticator interface {
|
type Authenticator interface {
|
||||||
|
@ -15,7 +15,7 @@ type AuthUser struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type inMemoryAuthenticator struct {
|
type inMemoryAuthenticator struct {
|
||||||
storage *sync.Map
|
storage *xsync.MapOf[string, string]
|
||||||
usernames []string
|
usernames []string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,13 +31,13 @@ func NewAuthenticator(users []AuthUser) Authenticator {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
au := &inMemoryAuthenticator{storage: &sync.Map{}}
|
au := &inMemoryAuthenticator{storage: xsync.NewMapOf[string]()}
|
||||||
for _, user := range users {
|
for _, user := range users {
|
||||||
au.storage.Store(user.User, user.Pass)
|
au.storage.Store(user.User, user.Pass)
|
||||||
}
|
}
|
||||||
usernames := make([]string, 0, len(users))
|
usernames := make([]string, 0, len(users))
|
||||||
au.storage.Range(func(key, value any) bool {
|
au.storage.Range(func(key string, value string) bool {
|
||||||
usernames = append(usernames, key.(string))
|
usernames = append(usernames, key)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
au.usernames = usernames
|
au.usernames = usernames
|
||||||
|
|
|
@ -5,23 +5,28 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
C "github.com/Dreamacro/clash/constant"
|
C "github.com/Dreamacro/clash/constant"
|
||||||
|
|
||||||
|
"github.com/puzpuzpuz/xsync/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Table struct {
|
type Table struct {
|
||||||
mapping sync.Map
|
mapping *xsync.MapOf[string, *Entry]
|
||||||
|
lockMap *xsync.MapOf[string, *sync.Cond]
|
||||||
}
|
}
|
||||||
|
|
||||||
type Entry struct {
|
type Entry struct {
|
||||||
PacketConn C.PacketConn
|
PacketConn C.PacketConn
|
||||||
WriteBackProxy C.WriteBackProxy
|
WriteBackProxy C.WriteBackProxy
|
||||||
LocalUDPConnMap sync.Map
|
LocalUDPConnMap *xsync.MapOf[string, *net.UDPConn]
|
||||||
|
LocalLockMap *xsync.MapOf[string, *sync.Cond]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Table) Set(key string, e C.PacketConn, w C.WriteBackProxy) {
|
func (t *Table) Set(key string, e C.PacketConn, w C.WriteBackProxy) {
|
||||||
t.mapping.Store(key, &Entry{
|
t.mapping.Store(key, &Entry{
|
||||||
PacketConn: e,
|
PacketConn: e,
|
||||||
WriteBackProxy: w,
|
WriteBackProxy: w,
|
||||||
LocalUDPConnMap: sync.Map{},
|
LocalUDPConnMap: xsync.NewMapOf[*net.UDPConn](),
|
||||||
|
LocalLockMap: xsync.NewMapOf[*sync.Cond](),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,15 +39,19 @@ func (t *Table) Get(key string) (C.PacketConn, C.WriteBackProxy) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Table) GetOrCreateLock(key string) (*sync.Cond, bool) {
|
func (t *Table) GetOrCreateLock(key string) (*sync.Cond, bool) {
|
||||||
item, loaded := t.mapping.LoadOrStore(key, sync.NewCond(&sync.Mutex{}))
|
item, loaded := t.lockMap.LoadOrCompute(key, makeLock)
|
||||||
return item.(*sync.Cond), loaded
|
return item, loaded
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Table) Delete(key string) {
|
func (t *Table) Delete(key string) {
|
||||||
t.mapping.Delete(key)
|
t.mapping.Delete(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Table) GetLocalConn(lAddr, rAddr string) *net.UDPConn {
|
func (t *Table) DeleteLock(lockKey string) {
|
||||||
|
t.lockMap.Delete(lockKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Table) GetForLocalConn(lAddr, rAddr string) *net.UDPConn {
|
||||||
entry, exist := t.getEntry(lAddr)
|
entry, exist := t.getEntry(lAddr)
|
||||||
if !exist {
|
if !exist {
|
||||||
return nil
|
return nil
|
||||||
|
@ -51,10 +60,10 @@ func (t *Table) GetLocalConn(lAddr, rAddr string) *net.UDPConn {
|
||||||
if !exist {
|
if !exist {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return item.(*net.UDPConn)
|
return item
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Table) AddLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool {
|
func (t *Table) AddForLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool {
|
||||||
entry, exist := t.getEntry(lAddr)
|
entry, exist := t.getEntry(lAddr)
|
||||||
if !exist {
|
if !exist {
|
||||||
return false
|
return false
|
||||||
|
@ -63,7 +72,7 @@ func (t *Table) AddLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Table) RangeLocalConn(lAddr string, f func(key, value any) bool) {
|
func (t *Table) RangeForLocalConn(lAddr string, f func(key string, value *net.UDPConn) bool) {
|
||||||
entry, exist := t.getEntry(lAddr)
|
entry, exist := t.getEntry(lAddr)
|
||||||
if !exist {
|
if !exist {
|
||||||
return
|
return
|
||||||
|
@ -76,11 +85,11 @@ func (t *Table) GetOrCreateLockForLocalConn(lAddr, key string) (*sync.Cond, bool
|
||||||
if !loaded {
|
if !loaded {
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
item, loaded := entry.LocalUDPConnMap.LoadOrStore(key, sync.NewCond(&sync.Mutex{}))
|
item, loaded := entry.LocalLockMap.LoadOrCompute(key, makeLock)
|
||||||
return item.(*sync.Cond), loaded
|
return item, loaded
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Table) DeleteLocalConnMap(lAddr, key string) {
|
func (t *Table) DeleteForLocalConn(lAddr, key string) {
|
||||||
entry, loaded := t.getEntry(lAddr)
|
entry, loaded := t.getEntry(lAddr)
|
||||||
if !loaded {
|
if !loaded {
|
||||||
return
|
return
|
||||||
|
@ -88,17 +97,26 @@ func (t *Table) DeleteLocalConnMap(lAddr, key string) {
|
||||||
entry.LocalUDPConnMap.Delete(key)
|
entry.LocalUDPConnMap.Delete(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *Table) getEntry(key string) (*Entry, bool) {
|
func (t *Table) DeleteLockForLocalConn(lAddr, key string) {
|
||||||
item, ok := t.mapping.Load(key)
|
entry, loaded := t.getEntry(lAddr)
|
||||||
// This should not happen usually since this function called after PacketConn created
|
if !loaded {
|
||||||
if !ok {
|
return
|
||||||
return nil, false
|
|
||||||
}
|
}
|
||||||
entry, ok := item.(*Entry)
|
entry.LocalLockMap.Delete(key)
|
||||||
return entry, ok
|
}
|
||||||
|
|
||||||
|
func (t *Table) getEntry(key string) (*Entry, bool) {
|
||||||
|
return t.mapping.Load(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeLock() *sync.Cond {
|
||||||
|
return sync.NewCond(&sync.Mutex{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// New return *Cache
|
// New return *Cache
|
||||||
func New() *Table {
|
func New() *Table {
|
||||||
return &Table{}
|
return &Table{
|
||||||
|
mapping: xsync.NewMapOf[*Entry](),
|
||||||
|
lockMap: xsync.NewMapOf[*sync.Cond](),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -267,13 +267,17 @@ type NatTable interface {
|
||||||
|
|
||||||
Delete(key string)
|
Delete(key string)
|
||||||
|
|
||||||
GetLocalConn(lAddr, rAddr string) *net.UDPConn
|
DeleteLock(key string)
|
||||||
|
|
||||||
AddLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool
|
GetForLocalConn(lAddr, rAddr string) *net.UDPConn
|
||||||
|
|
||||||
RangeLocalConn(lAddr string, f func(key, value any) bool)
|
AddForLocalConn(lAddr, rAddr string, conn *net.UDPConn) bool
|
||||||
|
|
||||||
GetOrCreateLockForLocalConn(lAddr, key string) (*sync.Cond, bool)
|
RangeForLocalConn(lAddr string, f func(key string, value *net.UDPConn) bool)
|
||||||
|
|
||||||
DeleteLocalConnMap(lAddr, key string)
|
GetOrCreateLockForLocalConn(lAddr string, key string) (*sync.Cond, bool)
|
||||||
|
|
||||||
|
DeleteForLocalConn(lAddr, key string)
|
||||||
|
|
||||||
|
DeleteLockForLocalConn(lAddr, key string)
|
||||||
}
|
}
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -30,6 +30,7 @@ require (
|
||||||
github.com/mroth/weightedrand/v2 v2.1.0
|
github.com/mroth/weightedrand/v2 v2.1.0
|
||||||
github.com/openacid/low v0.1.21
|
github.com/openacid/low v0.1.21
|
||||||
github.com/oschwald/maxminddb-golang v1.12.0
|
github.com/oschwald/maxminddb-golang v1.12.0
|
||||||
|
github.com/puzpuzpuz/xsync/v2 v2.5.0
|
||||||
github.com/sagernet/netlink v0.0.0-20220905062125-8043b4a9aa97
|
github.com/sagernet/netlink v0.0.0-20220905062125-8043b4a9aa97
|
||||||
github.com/sagernet/sing v0.2.10-0.20230807080248-4db0062caa0a
|
github.com/sagernet/sing v0.2.10-0.20230807080248-4db0062caa0a
|
||||||
github.com/sagernet/sing-mux v0.1.3-0.20230811111955-dc1639b5204c
|
github.com/sagernet/sing-mux v0.1.3-0.20230811111955-dc1639b5204c
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -134,6 +134,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
|
||||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
|
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
|
||||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
|
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
|
||||||
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
|
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
|
||||||
|
github.com/puzpuzpuz/xsync/v2 v2.5.0 h1:2k4qrO/orvmEXZ3hmtHqIy9XaQtPTwzMZk1+iErpE8c=
|
||||||
|
github.com/puzpuzpuz/xsync/v2 v2.5.0/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU=
|
||||||
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
|
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
|
||||||
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
|
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
|
||||||
github.com/quic-go/qtls-go1-20 v0.3.2 h1:rRgN3WfnKbyik4dBV8A6girlJVxGand/d+jVKbQq5GI=
|
github.com/quic-go/qtls-go1-20 v0.3.2 h1:rRgN3WfnKbyik4dBV8A6girlJVxGand/d+jVKbQq5GI=
|
||||||
|
|
|
@ -55,16 +55,15 @@ func (c *packet) InAddr() net.Addr {
|
||||||
func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natTable C.NatTable) (*net.UDPConn, error) {
|
func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natTable C.NatTable) (*net.UDPConn, error) {
|
||||||
remote := rAddr.String()
|
remote := rAddr.String()
|
||||||
local := lAddr.String()
|
local := lAddr.String()
|
||||||
localConn := natTable.GetLocalConn(local, remote)
|
localConn := natTable.GetForLocalConn(local, remote)
|
||||||
// localConn not exist
|
// localConn not exist
|
||||||
if localConn == nil {
|
if localConn == nil {
|
||||||
lockKey := remote + "-lock"
|
cond, loaded := natTable.GetOrCreateLockForLocalConn(local, remote)
|
||||||
cond, loaded := natTable.GetOrCreateLockForLocalConn(local, lockKey)
|
|
||||||
if loaded {
|
if loaded {
|
||||||
cond.L.Lock()
|
cond.L.Lock()
|
||||||
cond.Wait()
|
cond.Wait()
|
||||||
// we should get localConn here
|
// we should get localConn here
|
||||||
localConn = natTable.GetLocalConn(local, remote)
|
localConn = natTable.GetForLocalConn(local, remote)
|
||||||
if localConn == nil {
|
if localConn == nil {
|
||||||
return nil, fmt.Errorf("localConn is nil, nat entry not exist")
|
return nil, fmt.Errorf("localConn is nil, nat entry not exist")
|
||||||
}
|
}
|
||||||
|
@ -74,7 +73,7 @@ func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natT
|
||||||
return nil, fmt.Errorf("cond is nil, nat entry not exist")
|
return nil, fmt.Errorf("cond is nil, nat entry not exist")
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
natTable.DeleteLocalConnMap(local, lockKey)
|
natTable.DeleteLockForLocalConn(local, remote)
|
||||||
cond.Broadcast()
|
cond.Broadcast()
|
||||||
}()
|
}()
|
||||||
conn, err := listenLocalConn(rAddr, lAddr, in, natTable)
|
conn, err := listenLocalConn(rAddr, lAddr, in, natTable)
|
||||||
|
@ -82,7 +81,7 @@ func createOrGetLocalConn(rAddr, lAddr net.Addr, in chan<- C.PacketAdapter, natT
|
||||||
log.Errorln("listenLocalConn failed with error: %s, packet loss (rAddr[%T]=%s lAddr[%T]=%s)", err.Error(), rAddr, remote, lAddr, local)
|
log.Errorln("listenLocalConn failed with error: %s, packet loss (rAddr[%T]=%s lAddr[%T]=%s)", err.Error(), rAddr, remote, lAddr, local)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
natTable.AddLocalConn(local, remote, conn)
|
natTable.AddForLocalConn(local, remote, conn)
|
||||||
localConn = conn
|
localConn = conn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"github.com/Dreamacro/clash/transport/tuic/common"
|
"github.com/Dreamacro/clash/transport/tuic/common"
|
||||||
|
|
||||||
"github.com/metacubex/quic-go"
|
"github.com/metacubex/quic-go"
|
||||||
|
"github.com/puzpuzpuz/xsync/v2"
|
||||||
"github.com/zhangyunhao116/fastrand"
|
"github.com/zhangyunhao116/fastrand"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -49,7 +50,7 @@ type clientImpl struct {
|
||||||
openStreams atomic.Int64
|
openStreams atomic.Int64
|
||||||
closed atomic.Bool
|
closed atomic.Bool
|
||||||
|
|
||||||
udpInputMap sync.Map
|
udpInputMap *xsync.MapOf[uint32, net.Conn]
|
||||||
|
|
||||||
// only ready for PoolClient
|
// only ready for PoolClient
|
||||||
dialerRef C.Dialer
|
dialerRef C.Dialer
|
||||||
|
@ -263,11 +264,10 @@ func (t *clientImpl) forceClose(quicConn quic.Connection, err error) {
|
||||||
if quicConn != nil {
|
if quicConn != nil {
|
||||||
_ = quicConn.CloseWithError(ProtocolError, errStr)
|
_ = quicConn.CloseWithError(ProtocolError, errStr)
|
||||||
}
|
}
|
||||||
udpInputMap := &t.udpInputMap
|
udpInputMap := t.udpInputMap
|
||||||
udpInputMap.Range(func(key, value any) bool {
|
udpInputMap.Range(func(key uint32, value net.Conn) bool {
|
||||||
if conn, ok := value.(net.Conn); ok {
|
conn := value
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
}
|
|
||||||
udpInputMap.Delete(key)
|
udpInputMap.Delete(key)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -469,6 +469,7 @@ func NewClient(clientOption *ClientOption, udp bool, dialerRef C.Dialer) *Client
|
||||||
ClientOption: clientOption,
|
ClientOption: clientOption,
|
||||||
udp: udp,
|
udp: udp,
|
||||||
dialerRef: dialerRef,
|
dialerRef: dialerRef,
|
||||||
|
udpInputMap: xsync.NewIntegerMapOf[uint32, net.Conn](),
|
||||||
}
|
}
|
||||||
c := &Client{ci}
|
c := &Client{ci}
|
||||||
runtime.SetFinalizer(c, closeClient)
|
runtime.SetFinalizer(c, closeClient)
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
|
|
||||||
"github.com/gofrs/uuid/v5"
|
"github.com/gofrs/uuid/v5"
|
||||||
"github.com/metacubex/quic-go"
|
"github.com/metacubex/quic-go"
|
||||||
|
"github.com/puzpuzpuz/xsync/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ServerOption struct {
|
type ServerOption struct {
|
||||||
|
@ -33,6 +34,7 @@ func NewServerHandler(option *ServerOption, quicConn quic.EarlyConnection, uuid
|
||||||
quicConn: quicConn,
|
quicConn: quicConn,
|
||||||
uuid: uuid,
|
uuid: uuid,
|
||||||
authCh: make(chan struct{}),
|
authCh: make(chan struct{}),
|
||||||
|
udpInputMap: xsync.NewIntegerMapOf[uint32, *atomic.Bool](),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,7 +47,7 @@ type serverHandler struct {
|
||||||
authOk atomic.Bool
|
authOk atomic.Bool
|
||||||
authOnce sync.Once
|
authOnce sync.Once
|
||||||
|
|
||||||
udpInputMap sync.Map
|
udpInputMap *xsync.MapOf[uint32, *atomic.Bool]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *serverHandler) AuthOk() bool {
|
func (s *serverHandler) AuthOk() bool {
|
||||||
|
@ -78,8 +80,7 @@ func (s *serverHandler) parsePacket(packet *Packet, udpRelayMode common.UdpRelay
|
||||||
|
|
||||||
assocId = packet.ASSOC_ID
|
assocId = packet.ASSOC_ID
|
||||||
|
|
||||||
v, _ := s.udpInputMap.LoadOrStore(assocId, &atomic.Bool{})
|
writeClosed, _ := s.udpInputMap.LoadOrCompute(assocId, func() *atomic.Bool { return &atomic.Bool{} })
|
||||||
writeClosed := v.(*atomic.Bool)
|
|
||||||
if writeClosed.Load() {
|
if writeClosed.Load() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -173,8 +174,7 @@ func (s *serverHandler) HandleUniStream(reader *bufio.Reader) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if v, loaded := s.udpInputMap.LoadAndDelete(disassociate.ASSOC_ID); loaded {
|
if writeClosed, loaded := s.udpInputMap.LoadAndDelete(disassociate.ASSOC_ID); loaded {
|
||||||
writeClosed := v.(*atomic.Bool)
|
|
||||||
writeClosed.Store(true)
|
writeClosed.Store(true)
|
||||||
}
|
}
|
||||||
case HeartbeatType:
|
case HeartbeatType:
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"github.com/Dreamacro/clash/transport/tuic/common"
|
"github.com/Dreamacro/clash/transport/tuic/common"
|
||||||
|
|
||||||
"github.com/metacubex/quic-go"
|
"github.com/metacubex/quic-go"
|
||||||
|
"github.com/puzpuzpuz/xsync/v2"
|
||||||
"github.com/zhangyunhao116/fastrand"
|
"github.com/zhangyunhao116/fastrand"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -46,7 +47,7 @@ type clientImpl struct {
|
||||||
openStreams atomic.Int64
|
openStreams atomic.Int64
|
||||||
closed atomic.Bool
|
closed atomic.Bool
|
||||||
|
|
||||||
udpInputMap sync.Map
|
udpInputMap xsync.MapOf[uint16, net.Conn]
|
||||||
|
|
||||||
// only ready for PoolClient
|
// only ready for PoolClient
|
||||||
dialerRef C.Dialer
|
dialerRef C.Dialer
|
||||||
|
@ -270,10 +271,9 @@ func (t *clientImpl) forceClose(quicConn quic.Connection, err error) {
|
||||||
_ = quicConn.CloseWithError(ProtocolError, errStr)
|
_ = quicConn.CloseWithError(ProtocolError, errStr)
|
||||||
}
|
}
|
||||||
udpInputMap := &t.udpInputMap
|
udpInputMap := &t.udpInputMap
|
||||||
udpInputMap.Range(func(key, value any) bool {
|
udpInputMap.Range(func(key uint16, value net.Conn) bool {
|
||||||
if conn, ok := value.(net.Conn); ok {
|
conn := value
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
}
|
|
||||||
udpInputMap.Delete(key)
|
udpInputMap.Delete(key)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
@ -406,6 +406,7 @@ func NewClient(clientOption *ClientOption, udp bool, dialerRef C.Dialer) *Client
|
||||||
ClientOption: clientOption,
|
ClientOption: clientOption,
|
||||||
udp: udp,
|
udp: udp,
|
||||||
dialerRef: dialerRef,
|
dialerRef: dialerRef,
|
||||||
|
udpInputMap: *xsync.NewIntegerMapOf[uint16, net.Conn](),
|
||||||
}
|
}
|
||||||
c := &Client{ci}
|
c := &Client{ci}
|
||||||
runtime.SetFinalizer(c, closeClient)
|
runtime.SetFinalizer(c, closeClient)
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
|
|
||||||
"github.com/gofrs/uuid/v5"
|
"github.com/gofrs/uuid/v5"
|
||||||
"github.com/metacubex/quic-go"
|
"github.com/metacubex/quic-go"
|
||||||
|
"github.com/puzpuzpuz/xsync/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ServerOption struct {
|
type ServerOption struct {
|
||||||
|
@ -32,6 +33,7 @@ func NewServerHandler(option *ServerOption, quicConn quic.EarlyConnection, uuid
|
||||||
quicConn: quicConn,
|
quicConn: quicConn,
|
||||||
uuid: uuid,
|
uuid: uuid,
|
||||||
authCh: make(chan struct{}),
|
authCh: make(chan struct{}),
|
||||||
|
udpInputMap: xsync.NewIntegerMapOf[uint16, *serverUDPInput](),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,7 +47,7 @@ type serverHandler struct {
|
||||||
authUUID atomic.TypedValue[string]
|
authUUID atomic.TypedValue[string]
|
||||||
authOnce sync.Once
|
authOnce sync.Once
|
||||||
|
|
||||||
udpInputMap sync.Map
|
udpInputMap *xsync.MapOf[uint16, *serverUDPInput]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *serverHandler) AuthOk() bool {
|
func (s *serverHandler) AuthOk() bool {
|
||||||
|
@ -94,8 +96,7 @@ func (s *serverHandler) parsePacket(packet *Packet, udpRelayMode common.UdpRelay
|
||||||
|
|
||||||
assocId = packet.ASSOC_ID
|
assocId = packet.ASSOC_ID
|
||||||
|
|
||||||
v, _ := s.udpInputMap.LoadOrStore(assocId, &serverUDPInput{})
|
input, _ := s.udpInputMap.LoadOrCompute(assocId, func() *serverUDPInput { return &serverUDPInput{} })
|
||||||
input := v.(*serverUDPInput)
|
|
||||||
if input.writeClosed.Load() {
|
if input.writeClosed.Load() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -186,8 +187,7 @@ func (s *serverHandler) HandleUniStream(reader *bufio.Reader) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if v, loaded := s.udpInputMap.LoadAndDelete(disassociate.ASSOC_ID); loaded {
|
if input, loaded := s.udpInputMap.LoadAndDelete(disassociate.ASSOC_ID); loaded {
|
||||||
input := v.(*serverUDPInput)
|
|
||||||
input.writeClosed.Store(true)
|
input.writeClosed.Store(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,12 +73,9 @@ func handleUDPToLocal(writeBack C.WriteBack, pc N.EnhancePacketConn, key string,
|
||||||
}
|
}
|
||||||
|
|
||||||
func closeAllLocalCoon(lAddr string) {
|
func closeAllLocalCoon(lAddr string) {
|
||||||
natTable.RangeLocalConn(lAddr, func(key, value any) bool {
|
natTable.RangeForLocalConn(lAddr, func(key string, value *net.UDPConn) bool {
|
||||||
conn, ok := value.(*net.UDPConn)
|
conn := value
|
||||||
if !ok || conn == nil {
|
|
||||||
log.Debugln("Value %#v unknown value when closing TProxy local conn...", conn)
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
conn.Close()
|
conn.Close()
|
||||||
log.Debugln("Closing TProxy local conn... lAddr=%s rAddr=%s", lAddr, key)
|
log.Debugln("Closing TProxy local conn... lAddr=%s rAddr=%s", lAddr, key)
|
||||||
return true
|
return true
|
||||||
|
|
|
@ -2,11 +2,11 @@ package statistic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/Dreamacro/clash/common/atomic"
|
"github.com/Dreamacro/clash/common/atomic"
|
||||||
|
|
||||||
|
"github.com/puzpuzpuz/xsync/v2"
|
||||||
"github.com/shirou/gopsutil/v3/process"
|
"github.com/shirou/gopsutil/v3/process"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -14,6 +14,7 @@ var DefaultManager *Manager
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
DefaultManager = &Manager{
|
DefaultManager = &Manager{
|
||||||
|
connections: xsync.NewMapOf[Tracker](),
|
||||||
uploadTemp: atomic.NewInt64(0),
|
uploadTemp: atomic.NewInt64(0),
|
||||||
downloadTemp: atomic.NewInt64(0),
|
downloadTemp: atomic.NewInt64(0),
|
||||||
uploadBlip: atomic.NewInt64(0),
|
uploadBlip: atomic.NewInt64(0),
|
||||||
|
@ -27,7 +28,7 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
connections sync.Map
|
connections *xsync.MapOf[string, Tracker]
|
||||||
uploadTemp *atomic.Int64
|
uploadTemp *atomic.Int64
|
||||||
downloadTemp *atomic.Int64
|
downloadTemp *atomic.Int64
|
||||||
uploadBlip *atomic.Int64
|
uploadBlip *atomic.Int64
|
||||||
|
@ -48,14 +49,14 @@ func (m *Manager) Leave(c Tracker) {
|
||||||
|
|
||||||
func (m *Manager) Get(id string) (c Tracker) {
|
func (m *Manager) Get(id string) (c Tracker) {
|
||||||
if value, ok := m.connections.Load(id); ok {
|
if value, ok := m.connections.Load(id); ok {
|
||||||
c = value.(Tracker)
|
c = value
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) Range(f func(c Tracker) bool) {
|
func (m *Manager) Range(f func(c Tracker) bool) {
|
||||||
m.connections.Range(func(key, value any) bool {
|
m.connections.Range(func(key string, value Tracker) bool {
|
||||||
return f(value.(Tracker))
|
return f(value)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -318,8 +318,7 @@ func handleUDPConn(packet C.PacketAdapter) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
lockKey := key + "-lock"
|
cond, loaded := natTable.GetOrCreateLock(key)
|
||||||
cond, loaded := natTable.GetOrCreateLock(lockKey)
|
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer packet.Drop()
|
defer packet.Drop()
|
||||||
|
@ -333,7 +332,7 @@ func handleUDPConn(packet C.PacketAdapter) {
|
||||||
}
|
}
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
natTable.Delete(lockKey)
|
natTable.DeleteLock(key)
|
||||||
cond.Broadcast()
|
cond.Broadcast()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue