chore: better tuicV5 deFragger
This commit is contained in:
parent
b0fed73236
commit
191243a1d2
5 changed files with 105 additions and 50 deletions
42
common/cache/lrucache.go
vendored
42
common/cache/lrucache.go
vendored
|
@ -82,6 +82,9 @@ func New[K comparable, V any](options ...Option[K, V]) *LruCache[K, V] {
|
||||||
// Get returns the any representation of a cached response and a bool
|
// Get returns the any representation of a cached response and a bool
|
||||||
// set to true if the key was found.
|
// set to true if the key was found.
|
||||||
func (c *LruCache[K, V]) Get(key K) (V, bool) {
|
func (c *LruCache[K, V]) Get(key K) (V, bool) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
el := c.get(key)
|
el := c.get(key)
|
||||||
if el == nil {
|
if el == nil {
|
||||||
return getZero[V](), false
|
return getZero[V](), false
|
||||||
|
@ -91,11 +94,29 @@ func (c *LruCache[K, V]) Get(key K) (V, bool) {
|
||||||
return value, true
|
return value, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *LruCache[K, V]) GetOrStore(key K, constructor func() V) (V, bool) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
el := c.get(key)
|
||||||
|
if el == nil {
|
||||||
|
value := constructor()
|
||||||
|
c.set(key, value)
|
||||||
|
return value, false
|
||||||
|
}
|
||||||
|
value := el.value
|
||||||
|
|
||||||
|
return value, true
|
||||||
|
}
|
||||||
|
|
||||||
// GetWithExpire returns the any representation of a cached response,
|
// GetWithExpire returns the any representation of a cached response,
|
||||||
// a time.Time Give expected expires,
|
// a time.Time Give expected expires,
|
||||||
// and a bool set to true if the key was found.
|
// and a bool set to true if the key was found.
|
||||||
// This method will NOT check the maxAge of element and will NOT update the expires.
|
// This method will NOT check the maxAge of element and will NOT update the expires.
|
||||||
func (c *LruCache[K, V]) GetWithExpire(key K) (V, time.Time, bool) {
|
func (c *LruCache[K, V]) GetWithExpire(key K) (V, time.Time, bool) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
el := c.get(key)
|
el := c.get(key)
|
||||||
if el == nil {
|
if el == nil {
|
||||||
return getZero[V](), time.Time{}, false
|
return getZero[V](), time.Time{}, false
|
||||||
|
@ -115,11 +136,18 @@ func (c *LruCache[K, V]) Exist(key K) bool {
|
||||||
|
|
||||||
// Set stores the any representation of a response for a given key.
|
// Set stores the any representation of a response for a given key.
|
||||||
func (c *LruCache[K, V]) Set(key K, value V) {
|
func (c *LruCache[K, V]) Set(key K, value V) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
c.set(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *LruCache[K, V]) set(key K, value V) {
|
||||||
expires := int64(0)
|
expires := int64(0)
|
||||||
if c.maxAge > 0 {
|
if c.maxAge > 0 {
|
||||||
expires = time.Now().Unix() + c.maxAge
|
expires = time.Now().Unix() + c.maxAge
|
||||||
}
|
}
|
||||||
c.SetWithExpire(key, value, time.Unix(expires, 0))
|
c.setWithExpire(key, value, time.Unix(expires, 0))
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetWithExpire stores the any representation of a response for a given key and given expires.
|
// SetWithExpire stores the any representation of a response for a given key and given expires.
|
||||||
|
@ -128,6 +156,10 @@ func (c *LruCache[K, V]) SetWithExpire(key K, value V, expires time.Time) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
c.setWithExpire(key, value, expires)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *LruCache[K, V]) setWithExpire(key K, value V, expires time.Time) {
|
||||||
if le, ok := c.cache[key]; ok {
|
if le, ok := c.cache[key]; ok {
|
||||||
c.lru.MoveToBack(le)
|
c.lru.MoveToBack(le)
|
||||||
e := le.Value
|
e := le.Value
|
||||||
|
@ -165,9 +197,6 @@ func (c *LruCache[K, V]) CloneTo(n *LruCache[K, V]) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LruCache[K, V]) get(key K) *entry[K, V] {
|
func (c *LruCache[K, V]) get(key K) *entry[K, V] {
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
|
|
||||||
le, ok := c.cache[key]
|
le, ok := c.cache[key]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
|
@ -191,12 +220,11 @@ func (c *LruCache[K, V]) get(key K) *entry[K, V] {
|
||||||
// Delete removes the value associated with a key.
|
// Delete removes the value associated with a key.
|
||||||
func (c *LruCache[K, V]) Delete(key K) {
|
func (c *LruCache[K, V]) Delete(key K) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
if le, ok := c.cache[key]; ok {
|
if le, ok := c.cache[key]; ok {
|
||||||
c.deleteElement(le)
|
c.deleteElement(le)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LruCache[K, V]) maybeDeleteOldest() {
|
func (c *LruCache[K, V]) maybeDeleteOldest() {
|
||||||
|
@ -219,10 +247,10 @@ func (c *LruCache[K, V]) deleteElement(le *list.Element[*entry[K, V]]) {
|
||||||
|
|
||||||
func (c *LruCache[K, V]) Clear() error {
|
func (c *LruCache[K, V]) Clear() error {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
c.cache = make(map[K]*list.Element[*entry[K, V]])
|
c.cache = make(map[K]*list.Element[*entry[K, V]])
|
||||||
|
|
||||||
c.mu.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -66,10 +66,10 @@ func (s *serverHandler) HandleMessage(message []byte) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return s.parsePacket(packet, common.NATIVE)
|
return s.parsePacket(&packet, common.NATIVE)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *serverHandler) parsePacket(packet Packet, udpRelayMode common.UdpRelayMode) (err error) {
|
func (s *serverHandler) parsePacket(packet *Packet, udpRelayMode common.UdpRelayMode) (err error) {
|
||||||
<-s.authCh
|
<-s.authCh
|
||||||
if !s.authOk.Load() {
|
if !s.authOk.Load() {
|
||||||
return
|
return
|
||||||
|
@ -97,7 +97,7 @@ func (s *serverHandler) parsePacket(packet Packet, udpRelayMode common.UdpRelayM
|
||||||
|
|
||||||
return s.HandleUdpFn(packet.ADDR.SocksAddr(), &serverUDPPacket{
|
return s.HandleUdpFn(packet.ADDR.SocksAddr(), &serverUDPPacket{
|
||||||
pc: pc,
|
pc: pc,
|
||||||
packet: &packet,
|
packet: packet,
|
||||||
rAddr: N.NewCustomAddr("tuic", fmt.Sprintf("tuic-%s-%d", s.uuid, assocId), s.quicConn.RemoteAddr()), // for tunnel's handleUDPConn
|
rAddr: N.NewCustomAddr("tuic", fmt.Sprintf("tuic-%s-%d", s.uuid, assocId), s.quicConn.RemoteAddr()), // for tunnel's handleUDPConn
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -166,7 +166,7 @@ func (s *serverHandler) HandleUniStream(reader *bufio.Reader) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return s.parsePacket(packet, common.QUIC)
|
return s.parsePacket(&packet, common.QUIC)
|
||||||
case DissociateType:
|
case DissociateType:
|
||||||
var disassociate Dissociate
|
var disassociate Dissociate
|
||||||
disassociate, err = ReadDissociateWithHead(commandHead, reader)
|
disassociate, err = ReadDissociateWithHead(commandHead, reader)
|
||||||
|
|
|
@ -2,6 +2,9 @@ package v5
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/Dreamacro/clash/common/cache"
|
||||||
|
|
||||||
"github.com/metacubex/quic-go"
|
"github.com/metacubex/quic-go"
|
||||||
)
|
)
|
||||||
|
@ -39,42 +42,68 @@ func fragWriteNative(quicConn quic.Connection, packet Packet, buf *bytes.Buffer,
|
||||||
}
|
}
|
||||||
|
|
||||||
type deFragger struct {
|
type deFragger struct {
|
||||||
pkgID uint16
|
lru *cache.LruCache[uint16, *packetBag]
|
||||||
frags []*Packet
|
once sync.Once
|
||||||
count uint8
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *deFragger) Feed(m Packet) *Packet {
|
type packetBag struct {
|
||||||
|
frags []*Packet
|
||||||
|
count uint8
|
||||||
|
mutex sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPacketBag() *packetBag {
|
||||||
|
return new(packetBag)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *deFragger) init() {
|
||||||
|
if d.lru == nil {
|
||||||
|
d.lru = cache.New(
|
||||||
|
cache.WithAge[uint16, *packetBag](10),
|
||||||
|
cache.WithUpdateAgeOnGet[uint16, *packetBag](),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *deFragger) Feed(m *Packet) *Packet {
|
||||||
if m.FRAG_TOTAL <= 1 {
|
if m.FRAG_TOTAL <= 1 {
|
||||||
return &m
|
return m
|
||||||
}
|
}
|
||||||
if m.FRAG_ID >= m.FRAG_TOTAL {
|
if m.FRAG_ID >= m.FRAG_TOTAL {
|
||||||
// wtf is this?
|
// wtf is this?
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if d.count == 0 || m.PKT_ID != d.pkgID {
|
d.once.Do(d.init) // lazy init
|
||||||
|
bag, _ := d.lru.GetOrStore(m.PKT_ID, newPacketBag)
|
||||||
|
bag.mutex.Lock()
|
||||||
|
defer bag.mutex.Unlock()
|
||||||
|
if int(m.FRAG_TOTAL) != len(bag.frags) {
|
||||||
// new message, clear previous state
|
// new message, clear previous state
|
||||||
d.pkgID = m.PKT_ID
|
bag.frags = make([]*Packet, m.FRAG_TOTAL)
|
||||||
d.frags = make([]*Packet, m.FRAG_TOTAL)
|
bag.count = 1
|
||||||
d.count = 1
|
bag.frags[m.FRAG_ID] = m
|
||||||
d.frags[m.FRAG_ID] = &m
|
return nil
|
||||||
} else if d.frags[m.FRAG_ID] == nil {
|
|
||||||
d.frags[m.FRAG_ID] = &m
|
|
||||||
d.count++
|
|
||||||
if int(d.count) == len(d.frags) {
|
|
||||||
// all fragments received, assemble
|
|
||||||
var data []byte
|
|
||||||
for _, frag := range d.frags {
|
|
||||||
data = append(data, frag.DATA...)
|
|
||||||
}
|
|
||||||
p := d.frags[0] // recover from first fragment
|
|
||||||
p.SIZE = uint16(len(data))
|
|
||||||
p.DATA = data
|
|
||||||
p.FRAG_ID = 0
|
|
||||||
p.FRAG_TOTAL = 1
|
|
||||||
d.count = 0
|
|
||||||
return p
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return nil
|
if bag.frags[m.FRAG_ID] != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
bag.frags[m.FRAG_ID] = m
|
||||||
|
bag.count++
|
||||||
|
if int(bag.count) != len(bag.frags) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// all fragments received, assemble
|
||||||
|
var data []byte
|
||||||
|
for _, frag := range bag.frags {
|
||||||
|
data = append(data, frag.DATA...)
|
||||||
|
}
|
||||||
|
p := *bag.frags[0] // recover from first fragment
|
||||||
|
p.SIZE = uint16(len(data))
|
||||||
|
p.DATA = data
|
||||||
|
p.FRAG_ID = 0
|
||||||
|
p.FRAG_TOTAL = 1
|
||||||
|
bag.frags = nil
|
||||||
|
d.lru.Delete(m.PKT_ID)
|
||||||
|
return &p
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,7 +103,7 @@ func (q *quicStreamPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err err
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if packetPtr := q.deFragger.Feed(packet); packetPtr != nil {
|
if packetPtr := q.deFragger.Feed(&packet); packetPtr != nil {
|
||||||
n = copy(p, packet.DATA)
|
n = copy(p, packet.DATA)
|
||||||
addr = packetPtr.ADDR.UDPAddr()
|
addr = packetPtr.ADDR.UDPAddr()
|
||||||
return
|
return
|
||||||
|
@ -123,7 +123,7 @@ func (q *quicStreamPacketConn) WaitReadFrom() (data []byte, put func(), addr net
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if packetPtr := q.deFragger.Feed(packet); packetPtr != nil {
|
if packetPtr := q.deFragger.Feed(&packet); packetPtr != nil {
|
||||||
data = packetPtr.DATA
|
data = packetPtr.DATA
|
||||||
addr = packetPtr.ADDR.UDPAddr()
|
addr = packetPtr.ADDR.UDPAddr()
|
||||||
return
|
return
|
||||||
|
@ -178,16 +178,14 @@ func (q *quicStreamPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err erro
|
||||||
default: // native
|
default: // native
|
||||||
if len(p) > q.maxUdpRelayPacketSize {
|
if len(p) > q.maxUdpRelayPacketSize {
|
||||||
err = fragWriteNative(q.quicConn, packet, buf, q.maxUdpRelayPacketSize)
|
err = fragWriteNative(q.quicConn, packet, buf, q.maxUdpRelayPacketSize)
|
||||||
|
} else {
|
||||||
|
err = packet.WriteTo(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
data := buf.Bytes()
|
||||||
|
err = q.quicConn.SendMessage(data)
|
||||||
}
|
}
|
||||||
err = packet.WriteTo(buf)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
data := buf.Bytes()
|
|
||||||
err = q.quicConn.SendMessage(data)
|
|
||||||
|
|
||||||
var tooLarge quic.ErrMessageTooLarge
|
var tooLarge quic.ErrMessageTooLarge
|
||||||
if errors.As(err, &tooLarge) {
|
if errors.As(err, &tooLarge) {
|
||||||
|
|
|
@ -73,7 +73,7 @@ func (s *serverHandler) HandleMessage(message []byte) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return s.parsePacket(packet, common.NATIVE)
|
return s.parsePacket(&packet, common.NATIVE)
|
||||||
case HeartbeatType:
|
case HeartbeatType:
|
||||||
var heartbeat Heartbeat
|
var heartbeat Heartbeat
|
||||||
heartbeat, err = ReadHeartbeatWithHead(commandHead, reader)
|
heartbeat, err = ReadHeartbeatWithHead(commandHead, reader)
|
||||||
|
@ -85,7 +85,7 @@ func (s *serverHandler) HandleMessage(message []byte) (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *serverHandler) parsePacket(packet Packet, udpRelayMode common.UdpRelayMode) (err error) {
|
func (s *serverHandler) parsePacket(packet *Packet, udpRelayMode common.UdpRelayMode) (err error) {
|
||||||
<-s.authCh
|
<-s.authCh
|
||||||
if !s.authOk.Load() {
|
if !s.authOk.Load() {
|
||||||
return
|
return
|
||||||
|
@ -179,7 +179,7 @@ func (s *serverHandler) HandleUniStream(reader *bufio.Reader) (err error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return s.parsePacket(packet, common.QUIC)
|
return s.parsePacket(&packet, common.QUIC)
|
||||||
case DissociateType:
|
case DissociateType:
|
||||||
var disassociate Dissociate
|
var disassociate Dissociate
|
||||||
disassociate, err = ReadDissociateWithHead(commandHead, reader)
|
disassociate, err = ReadDissociateWithHead(commandHead, reader)
|
||||||
|
|
Loading…
Reference in a new issue