chore: add tuic-server listener
This commit is contained in:
parent
797e54c92f
commit
dbe9c4ba47
13 changed files with 1063 additions and 330 deletions
|
@ -212,7 +212,7 @@ func NewTuic(option TuicOption) (*Tuic, error) {
|
|||
UdpRelayMode: option.UdpRelayMode,
|
||||
CongestionController: option.CongestionController,
|
||||
ReduceRtt: option.ReduceRtt,
|
||||
RequestTimeout: option.RequestTimeout,
|
||||
RequestTimeout: time.Duration(option.RequestTimeout) * time.Millisecond,
|
||||
MaxUdpRelayPacketSize: option.MaxUdpRelayPacketSize,
|
||||
FastOpen: option.FastOpen,
|
||||
MaxOpenStreams: clientMaxOpenStreams,
|
||||
|
|
|
@ -54,6 +54,7 @@ type General struct {
|
|||
TCPConcurrent bool `json:"tcp-concurrent"`
|
||||
EnableProcess bool `json:"enable-process"`
|
||||
Tun Tun `json:"tun"`
|
||||
TuicServer TuicServer `json:"tuic-server"`
|
||||
Sniffing bool `json:"sniffing"`
|
||||
EBpf EBpf `json:"-"`
|
||||
}
|
||||
|
@ -114,6 +115,24 @@ type Profile struct {
|
|||
StoreFakeIP bool `yaml:"store-fake-ip"`
|
||||
}
|
||||
|
||||
type TuicServer struct {
|
||||
Enable bool `yaml:"enable" json:"enable"`
|
||||
Listen string `yaml:"listen" json:"listen"`
|
||||
Token []string `yaml:"token" json:"token"`
|
||||
Certificate string `yaml:"certificate" json:"certificate"`
|
||||
PrivateKey string `yaml:"private-key" json:"private-key"`
|
||||
CongestionController string `yaml:"congestion-controller" json:"congestion-controller,omitempty"`
|
||||
MaxIdleTime int `yaml:"max-idle-time" json:"max-idle-time,omitempty"`
|
||||
AuthenticationTimeout int `yaml:"authentication-timeout" json:"authentication-timeout,omitempty"`
|
||||
ALPN []string `yaml:"alpn" json:"alpn,omitempty"`
|
||||
MaxUdpRelayPacketSize int `yaml:"max-udp-relay-packet-size" json:"max-udp-relay-packet-size,omitempty"`
|
||||
}
|
||||
|
||||
func (t TuicServer) String() string {
|
||||
b, _ := json.Marshal(t)
|
||||
return string(b)
|
||||
}
|
||||
|
||||
// Tun config
|
||||
type Tun struct {
|
||||
Enable bool `yaml:"enable" json:"enable"`
|
||||
|
@ -282,6 +301,19 @@ type RawTun struct {
|
|||
UDPTimeout int64 `yaml:"udp-timeout" json:"udp_timeout,omitempty"`
|
||||
}
|
||||
|
||||
type RawTuicServer struct {
|
||||
Enable bool `yaml:"enable" json:"enable"`
|
||||
Listen string `yaml:"listen" json:"listen"`
|
||||
Token []string `yaml:"token" json:"token"`
|
||||
Certificate string `yaml:"certificate" json:"certificate"`
|
||||
PrivateKey string `yaml:"private-key" json:"private-key"`
|
||||
CongestionController string `yaml:"congestion-controller" json:"congestion-controller,omitempty"`
|
||||
MaxIdleTime int `yaml:"max-idle-time" json:"max-idle-time,omitempty"`
|
||||
AuthenticationTimeout int `yaml:"authentication-timeout" json:"authentication-timeout,omitempty"`
|
||||
ALPN []string `yaml:"alpn" json:"alpn,omitempty"`
|
||||
MaxUdpRelayPacketSize int `yaml:"max-udp-relay-packet-size" json:"max-udp-relay-packet-size,omitempty"`
|
||||
}
|
||||
|
||||
type RawConfig struct {
|
||||
Port int `yaml:"port"`
|
||||
SocksPort int `yaml:"socks-port"`
|
||||
|
@ -316,6 +348,7 @@ type RawConfig struct {
|
|||
Hosts map[string]string `yaml:"hosts"`
|
||||
DNS RawDNS `yaml:"dns"`
|
||||
Tun RawTun `yaml:"tun"`
|
||||
TuicServer RawTuicServer `yaml:"tuic-server"`
|
||||
EBpf EBpf `yaml:"ebpf"`
|
||||
IPTables IPTables `yaml:"iptables"`
|
||||
Experimental Experimental `yaml:"experimental"`
|
||||
|
@ -392,6 +425,18 @@ func UnmarshalRawConfig(buf []byte) (*RawConfig, error) {
|
|||
AutoDetectInterface: true,
|
||||
Inet6Address: []ListenPrefix{ListenPrefix(netip.MustParsePrefix("fdfe:dcba:9876::1/126"))},
|
||||
},
|
||||
TuicServer: RawTuicServer{
|
||||
Enable: false,
|
||||
Token: nil,
|
||||
Certificate: "",
|
||||
PrivateKey: "",
|
||||
Listen: "",
|
||||
CongestionController: "",
|
||||
MaxIdleTime: 15000,
|
||||
AuthenticationTimeout: 1000,
|
||||
ALPN: []string{"h3"},
|
||||
MaxUdpRelayPacketSize: 1500,
|
||||
},
|
||||
EBpf: EBpf{
|
||||
RedirectToTun: []string{},
|
||||
AutoRedir: []string{},
|
||||
|
@ -508,6 +553,11 @@ func ParseRawConfig(rawCfg *RawConfig) (*Config, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
err = parseTuicServer(rawCfg.TuicServer, config.General)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
config.Users = parseAuthentication(rawCfg.Authentication)
|
||||
|
||||
config.Sniffer, err = parseSniffer(rawCfg.Sniffer)
|
||||
|
@ -905,8 +955,8 @@ func parseNameServer(servers []string, preferH3 bool) ([]dns.NameServer, error)
|
|||
if _, _, err := net.SplitHostPort(host); err != nil && strings.Contains(err.Error(), "missing port in address") {
|
||||
host = net.JoinHostPort(host, "443")
|
||||
} else {
|
||||
if err!=nil{
|
||||
return nil,err
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
clearURL := url.URL{Scheme: "https", Host: host, Path: u.Path}
|
||||
|
@ -1197,6 +1247,22 @@ func parseTun(rawTun RawTun, general *General) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func parseTuicServer(rawTuic RawTuicServer, general *General) error {
|
||||
general.TuicServer = TuicServer{
|
||||
Enable: rawTuic.Enable,
|
||||
Listen: rawTuic.Listen,
|
||||
Token: rawTuic.Token,
|
||||
Certificate: rawTuic.Certificate,
|
||||
PrivateKey: rawTuic.PrivateKey,
|
||||
CongestionController: rawTuic.CongestionController,
|
||||
MaxIdleTime: rawTuic.MaxIdleTime,
|
||||
AuthenticationTimeout: rawTuic.AuthenticationTimeout,
|
||||
ALPN: rawTuic.ALPN,
|
||||
MaxUdpRelayPacketSize: rawTuic.MaxUdpRelayPacketSize,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseSniffer(snifferRaw RawSniffer) (*Sniffer, error) {
|
||||
sniffer := &Sniffer{
|
||||
Enable: snifferRaw.Enable,
|
||||
|
|
|
@ -27,6 +27,7 @@ const (
|
|||
TCPTUN
|
||||
UDPTUN
|
||||
TUN
|
||||
TUIC
|
||||
INNER
|
||||
)
|
||||
|
||||
|
@ -71,6 +72,8 @@ func (t Type) String() string {
|
|||
return "UdpTun"
|
||||
case TUN:
|
||||
return "Tun"
|
||||
case TUIC:
|
||||
return "Tuic"
|
||||
case INNER:
|
||||
return "Inner"
|
||||
default:
|
||||
|
@ -89,12 +92,22 @@ func ParseType(t string) (*Type, error) {
|
|||
res = SOCKS4
|
||||
case "SOCKS5":
|
||||
res = SOCKS5
|
||||
case "SHADOWSOCKS":
|
||||
res = SHADOWSOCKS
|
||||
case "VMESS":
|
||||
res = VMESS
|
||||
case "REDIR":
|
||||
res = REDIR
|
||||
case "TPROXY":
|
||||
res = TPROXY
|
||||
case "TCPTUN":
|
||||
res = TCPTUN
|
||||
case "UDPTUN":
|
||||
res = UDPTUN
|
||||
case "TUN":
|
||||
res = TUN
|
||||
case "TUIC":
|
||||
res = TUIC
|
||||
case "INNER":
|
||||
res = INNER
|
||||
default:
|
||||
|
|
|
@ -103,6 +103,21 @@ sniffer:
|
|||
# tcptun-config: 127.0.0.1:801=www.example.com:80,127.0.0.1:4431=www.example.com:443
|
||||
# udptun-config: 127.0.0.1:531=8.8.8.8:53,127.0.0.1:532=1.1.1.1:53
|
||||
|
||||
# tuic服务器入口(传入流量将和socks,mixed等入口一样按照mode所指定的方式进行匹配处理)
|
||||
#tuic-server:
|
||||
# enable: true
|
||||
# listen: 127.0.0.1:10443
|
||||
# token:
|
||||
# - TOKEN
|
||||
# certificate: ./server.crt
|
||||
# private-key: ./server.key
|
||||
# congestion-controller: bbr
|
||||
# max-idle-time: 15000
|
||||
# authentication-timeout: 1000
|
||||
# alpn:
|
||||
# - h3
|
||||
# max-udp-relay-packet-size: 1500
|
||||
|
||||
profile:
|
||||
# 存储select选择记录
|
||||
store-selected: false
|
||||
|
|
|
@ -351,6 +351,7 @@ func updateGeneral(general *config.General, force bool) {
|
|||
P.ReCreateVmess(general.VmessConfig, tcpIn, udpIn)
|
||||
P.ReCreateTcpTun(general.TcpTunConfig, tcpIn, udpIn)
|
||||
P.ReCreateUdpTun(general.UdpTunConfig, tcpIn, udpIn)
|
||||
P.ReCreateTuic(general.TuicServer, tcpIn, udpIn)
|
||||
}
|
||||
|
||||
func updateUsers(users []auth.AuthUser) {
|
||||
|
|
|
@ -41,6 +41,7 @@ type configSchema struct {
|
|||
TProxyPort *int `json:"tproxy-port"`
|
||||
MixedPort *int `json:"mixed-port"`
|
||||
Tun *tunSchema `json:"tun"`
|
||||
TuicServer *config.TuicServer `json:"tuic-server"`
|
||||
ShadowSocksConfig *string `json:"ss-config"`
|
||||
VmessConfig *string `json:"vmess-config"`
|
||||
TcptunConfig *string `json:"tcptun-config"`
|
||||
|
@ -203,6 +204,9 @@ func patchConfigs(w http.ResponseWriter, r *http.Request) {
|
|||
P.ReCreateVmess(pointerOrDefaultString(general.VmessConfig, ports.VmessConfig), tcpIn, udpIn)
|
||||
P.ReCreateTcpTun(pointerOrDefaultString(general.TcptunConfig, ports.TcpTunConfig), tcpIn, udpIn)
|
||||
P.ReCreateUdpTun(pointerOrDefaultString(general.UdptunConfig, ports.UdpTunConfig), tcpIn, udpIn)
|
||||
if general.TuicServer != nil {
|
||||
P.ReCreateTuic(*general.TuicServer, tcpIn, udpIn)
|
||||
}
|
||||
|
||||
if general.Mode != nil {
|
||||
tunnel.SetMode(*general.Mode)
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/Dreamacro/clash/listener/sing_vmess"
|
||||
"github.com/Dreamacro/clash/listener/socks"
|
||||
"github.com/Dreamacro/clash/listener/tproxy"
|
||||
"github.com/Dreamacro/clash/listener/tuic"
|
||||
"github.com/Dreamacro/clash/listener/tunnel"
|
||||
"github.com/Dreamacro/clash/log"
|
||||
)
|
||||
|
@ -43,6 +44,7 @@ var (
|
|||
vmessListener *sing_vmess.Listener
|
||||
tcpTunListener *tunnel.Listener
|
||||
udpTunListener *tunnel.UdpListener
|
||||
tuicListener *tuic.Listener
|
||||
autoRedirListener *autoredir.Listener
|
||||
autoRedirProgram *ebpf.TcEBpfProgram
|
||||
tcProgram *ebpf.TcEBpfProgram
|
||||
|
@ -58,6 +60,7 @@ var (
|
|||
vmessMux sync.Mutex
|
||||
tcpTunMux sync.Mutex
|
||||
udpTunMux sync.Mutex
|
||||
tuicMux sync.Mutex
|
||||
autoRedirMux sync.Mutex
|
||||
tcMux sync.Mutex
|
||||
|
||||
|
@ -390,6 +393,45 @@ func ReCreateUdpTun(config string, tcpIn chan<- C.ConnContext, udpIn chan<- *inb
|
|||
return
|
||||
}
|
||||
|
||||
func ReCreateTuic(config config.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<- *inbound.PacketAdapter) {
|
||||
tuicMux.Lock()
|
||||
defer tuicMux.Unlock()
|
||||
shouldIgnore := false
|
||||
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.Errorln("Start Tuic server error: %s", err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
if tuicListener != nil {
|
||||
if tuicListener.Config().String() != config.String() {
|
||||
tuicListener.Close()
|
||||
tuicListener = nil
|
||||
} else {
|
||||
shouldIgnore = true
|
||||
}
|
||||
}
|
||||
|
||||
if shouldIgnore {
|
||||
return
|
||||
}
|
||||
|
||||
if !config.Enable {
|
||||
return
|
||||
}
|
||||
|
||||
listener, err := tuic.New(config, tcpIn, udpIn)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
tuicListener = listener
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func ReCreateTProxy(port int, tcpIn chan<- C.ConnContext, udpIn chan<- *inbound.PacketAdapter) {
|
||||
tproxyMux.Lock()
|
||||
defer tproxyMux.Unlock()
|
||||
|
|
132
listener/tuic/server.go
Normal file
132
listener/tuic/server.go
Normal file
|
@ -0,0 +1,132 @@
|
|||
package tuic
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/metacubex/quic-go"
|
||||
|
||||
"github.com/Dreamacro/clash/adapter/inbound"
|
||||
"github.com/Dreamacro/clash/common/sockopt"
|
||||
"github.com/Dreamacro/clash/config"
|
||||
C "github.com/Dreamacro/clash/constant"
|
||||
"github.com/Dreamacro/clash/log"
|
||||
"github.com/Dreamacro/clash/transport/socks5"
|
||||
"github.com/Dreamacro/clash/transport/tuic"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultStreamReceiveWindow = 15728640 // 15 MB/s
|
||||
DefaultConnectionReceiveWindow = 67108864 // 64 MB/s
|
||||
)
|
||||
|
||||
type Listener struct {
|
||||
closed bool
|
||||
config config.TuicServer
|
||||
udpListeners []net.PacketConn
|
||||
servers []*tuic.Server
|
||||
}
|
||||
|
||||
func New(config config.TuicServer, tcpIn chan<- C.ConnContext, udpIn chan<- *inbound.PacketAdapter) (*Listener, error) {
|
||||
cert, err := tls.LoadX509KeyPair(config.Certificate, config.PrivateKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tlsConfig := &tls.Config{
|
||||
MinVersion: tls.VersionTLS13,
|
||||
Certificates: []tls.Certificate{cert},
|
||||
}
|
||||
if len(config.ALPN) > 0 {
|
||||
tlsConfig.NextProtos = config.ALPN
|
||||
} else {
|
||||
tlsConfig.NextProtos = []string{"h3"}
|
||||
}
|
||||
quicConfig := &quic.Config{
|
||||
MaxIdleTimeout: time.Duration(config.MaxIdleTime) * time.Millisecond,
|
||||
MaxIncomingStreams: 1 >> 32,
|
||||
MaxIncomingUniStreams: 1 >> 32,
|
||||
EnableDatagrams: true,
|
||||
}
|
||||
quicConfig.InitialStreamReceiveWindow = DefaultStreamReceiveWindow / 10
|
||||
quicConfig.MaxStreamReceiveWindow = DefaultStreamReceiveWindow
|
||||
quicConfig.InitialConnectionReceiveWindow = DefaultConnectionReceiveWindow / 10
|
||||
quicConfig.MaxConnectionReceiveWindow = DefaultConnectionReceiveWindow
|
||||
|
||||
tokens := make([][32]byte, len(config.Token))
|
||||
for i, token := range config.Token {
|
||||
tokens[i] = tuic.GenTKN(token)
|
||||
}
|
||||
|
||||
option := &tuic.ServerOption{
|
||||
HandleTcpFn: func(conn net.Conn, addr string) error {
|
||||
tcpIn <- inbound.NewSocket(socks5.ParseAddr(addr), conn, C.TUIC)
|
||||
return nil
|
||||
},
|
||||
HandleUdpFn: func(addr *net.UDPAddr, packet C.UDPPacket) error {
|
||||
select {
|
||||
case udpIn <- inbound.NewPacket(socks5.ParseAddrToSocksAddr(addr), packet, C.TUIC):
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
},
|
||||
TlsConfig: tlsConfig,
|
||||
QuicConfig: quicConfig,
|
||||
Tokens: tokens,
|
||||
CongestionController: config.CongestionController,
|
||||
AuthenticationTimeout: time.Duration(config.AuthenticationTimeout) * time.Millisecond,
|
||||
MaxUdpRelayPacketSize: config.MaxUdpRelayPacketSize,
|
||||
}
|
||||
|
||||
sl := &Listener{false, config, nil, nil}
|
||||
|
||||
for _, addr := range strings.Split(config.Listen, ",") {
|
||||
addr := addr
|
||||
|
||||
ul, err := net.ListenPacket("udp", addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = sockopt.UDPReuseaddr(ul.(*net.UDPConn))
|
||||
if err != nil {
|
||||
log.Warnln("Failed to Reuse UDP Address: %s", err)
|
||||
}
|
||||
|
||||
sl.udpListeners = append(sl.udpListeners, ul)
|
||||
|
||||
server, err := tuic.NewServer(option, ul)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sl.servers = append(sl.servers, server)
|
||||
|
||||
go func() {
|
||||
log.Infoln("Tuic proxy listening at: %s", ul.LocalAddr().String())
|
||||
err := server.Serve()
|
||||
if err != nil {
|
||||
if sl.closed {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return sl, nil
|
||||
}
|
||||
|
||||
func (l *Listener) Close() {
|
||||
l.closed = true
|
||||
for _, lis := range l.servers {
|
||||
_ = lis.Close()
|
||||
}
|
||||
for _, lis := range l.udpListeners {
|
||||
_ = lis.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Listener) Config() config.TuicServer {
|
||||
return l.config
|
||||
}
|
|
@ -6,10 +6,8 @@ import (
|
|||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"net/netip"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -21,7 +19,6 @@ import (
|
|||
"github.com/Dreamacro/clash/common/pool"
|
||||
"github.com/Dreamacro/clash/component/dialer"
|
||||
C "github.com/Dreamacro/clash/constant"
|
||||
"github.com/Dreamacro/clash/transport/tuic/congestion"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -39,7 +36,7 @@ type ClientOption struct {
|
|||
UdpRelayMode string
|
||||
CongestionController string
|
||||
ReduceRtt bool
|
||||
RequestTimeout int
|
||||
RequestTimeout time.Duration
|
||||
MaxUdpRelayPacketSize int
|
||||
FastOpen bool
|
||||
MaxOpenStreams int64
|
||||
|
@ -53,6 +50,7 @@ type Client struct {
|
|||
connMutex sync.Mutex
|
||||
|
||||
openStreams atomic.Int64
|
||||
closed atomic.Bool
|
||||
|
||||
udpInputMap sync.Map
|
||||
|
||||
|
@ -82,160 +80,141 @@ func (t *Client) getQuicConn(ctx context.Context) (quic.Connection, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
switch t.CongestionController {
|
||||
case "cubic":
|
||||
quicConn.SetCongestionControl(
|
||||
congestion.NewCubicSender(
|
||||
congestion.DefaultClock{},
|
||||
congestion.GetMaxPacketSize(quicConn.RemoteAddr()),
|
||||
false,
|
||||
nil,
|
||||
),
|
||||
)
|
||||
case "new_reno":
|
||||
quicConn.SetCongestionControl(
|
||||
congestion.NewCubicSender(
|
||||
congestion.DefaultClock{},
|
||||
congestion.GetMaxPacketSize(quicConn.RemoteAddr()),
|
||||
true,
|
||||
nil,
|
||||
),
|
||||
)
|
||||
case "bbr":
|
||||
quicConn.SetCongestionControl(
|
||||
congestion.NewBBRSender(
|
||||
congestion.DefaultClock{},
|
||||
congestion.GetMaxPacketSize(quicConn.RemoteAddr()),
|
||||
congestion.InitialCongestionWindow,
|
||||
congestion.DefaultBBRMaxCongestionWindow,
|
||||
),
|
||||
)
|
||||
}
|
||||
SetCongestionController(quicConn, t.CongestionController)
|
||||
|
||||
sendAuthentication := func(quicConn quic.Connection) (err error) {
|
||||
defer func() {
|
||||
t.deferQuicConn(quicConn, err)
|
||||
}()
|
||||
stream, err := quicConn.OpenUniStream()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
buf := pool.GetBuffer()
|
||||
defer pool.PutBuffer(buf)
|
||||
err = NewAuthenticate(t.Token).WriteTo(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = buf.WriteTo(stream)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = stream.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
parseUDP := func(quicConn quic.Connection) (err error) {
|
||||
defer func() {
|
||||
t.deferQuicConn(quicConn, err)
|
||||
}()
|
||||
switch t.UdpRelayMode {
|
||||
case "quic":
|
||||
for {
|
||||
var stream quic.ReceiveStream
|
||||
stream, err = quicConn.AcceptUniStream(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() (err error) {
|
||||
var assocId uint32
|
||||
defer func() {
|
||||
t.deferQuicConn(quicConn, err)
|
||||
if err != nil && assocId != 0 {
|
||||
if val, ok := t.udpInputMap.LoadAndDelete(assocId); ok {
|
||||
if conn, ok := val.(net.Conn); ok {
|
||||
_ = conn.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
stream.CancelRead(0)
|
||||
}()
|
||||
reader := bufio.NewReader(stream)
|
||||
packet, err := ReadPacket(reader)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
assocId = packet.ASSOC_ID
|
||||
if val, ok := t.udpInputMap.Load(assocId); ok {
|
||||
if conn, ok := val.(net.Conn); ok {
|
||||
writer := bufio.NewWriterSize(conn, packet.BytesLen())
|
||||
_ = packet.WriteTo(writer)
|
||||
_ = writer.Flush()
|
||||
}
|
||||
}
|
||||
return
|
||||
}()
|
||||
}
|
||||
default: // native
|
||||
for {
|
||||
var message []byte
|
||||
message, err = quicConn.ReceiveMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() (err error) {
|
||||
var assocId uint32
|
||||
defer func() {
|
||||
t.deferQuicConn(quicConn, err)
|
||||
if err != nil && assocId != 0 {
|
||||
if val, ok := t.udpInputMap.LoadAndDelete(assocId); ok {
|
||||
if conn, ok := val.(net.Conn); ok {
|
||||
_ = conn.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
buffer := bytes.NewBuffer(message)
|
||||
packet, err := ReadPacket(buffer)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
assocId = packet.ASSOC_ID
|
||||
if val, ok := t.udpInputMap.Load(assocId); ok {
|
||||
if conn, ok := val.(net.Conn); ok {
|
||||
_, _ = conn.Write(message)
|
||||
}
|
||||
}
|
||||
return
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
go sendAuthentication(quicConn)
|
||||
go func() {
|
||||
_ = t.sendAuthentication(quicConn)
|
||||
}()
|
||||
|
||||
if t.udp {
|
||||
go parseUDP(quicConn)
|
||||
go func() {
|
||||
_ = t.parseUDP(quicConn)
|
||||
}()
|
||||
}
|
||||
|
||||
t.quicConn = quicConn
|
||||
t.openStreams.Store(0)
|
||||
return quicConn, nil
|
||||
}
|
||||
|
||||
func (t *Client) sendAuthentication(quicConn quic.Connection) (err error) {
|
||||
defer func() {
|
||||
t.deferQuicConn(quicConn, err)
|
||||
}()
|
||||
stream, err := quicConn.OpenUniStream()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
buf := pool.GetBuffer()
|
||||
defer pool.PutBuffer(buf)
|
||||
err = NewAuthenticate(t.Token).WriteTo(buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = buf.WriteTo(stream)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = stream.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *Client) parseUDP(quicConn quic.Connection) (err error) {
|
||||
defer func() {
|
||||
t.deferQuicConn(quicConn, err)
|
||||
}()
|
||||
switch t.UdpRelayMode {
|
||||
case "quic":
|
||||
for {
|
||||
var stream quic.ReceiveStream
|
||||
stream, err = quicConn.AcceptUniStream(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() (err error) {
|
||||
var assocId uint32
|
||||
defer func() {
|
||||
t.deferQuicConn(quicConn, err)
|
||||
if err != nil && assocId != 0 {
|
||||
if val, ok := t.udpInputMap.LoadAndDelete(assocId); ok {
|
||||
if conn, ok := val.(net.Conn); ok {
|
||||
_ = conn.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
stream.CancelRead(0)
|
||||
}()
|
||||
reader := bufio.NewReader(stream)
|
||||
packet, err := ReadPacket(reader)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
assocId = packet.ASSOC_ID
|
||||
if val, ok := t.udpInputMap.Load(assocId); ok {
|
||||
if conn, ok := val.(net.Conn); ok {
|
||||
writer := bufio.NewWriterSize(conn, packet.BytesLen())
|
||||
_ = packet.WriteTo(writer)
|
||||
_ = writer.Flush()
|
||||
}
|
||||
}
|
||||
return
|
||||
}()
|
||||
}
|
||||
default: // native
|
||||
for {
|
||||
var message []byte
|
||||
message, err = quicConn.ReceiveMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() (err error) {
|
||||
var assocId uint32
|
||||
defer func() {
|
||||
t.deferQuicConn(quicConn, err)
|
||||
if err != nil && assocId != 0 {
|
||||
if val, ok := t.udpInputMap.LoadAndDelete(assocId); ok {
|
||||
if conn, ok := val.(net.Conn); ok {
|
||||
_ = conn.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
buffer := bytes.NewBuffer(message)
|
||||
packet, err := ReadPacket(buffer)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
assocId = packet.ASSOC_ID
|
||||
if val, ok := t.udpInputMap.Load(assocId); ok {
|
||||
if conn, ok := val.(net.Conn); ok {
|
||||
_, _ = conn.Write(message)
|
||||
}
|
||||
}
|
||||
return
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Client) deferQuicConn(quicConn quic.Connection, err error) {
|
||||
var netError net.Error
|
||||
if err != nil && errors.As(err, &netError) {
|
||||
t.connMutex.Lock()
|
||||
defer t.connMutex.Unlock()
|
||||
if t.quicConn == quicConn {
|
||||
t.Close(err)
|
||||
t.forceClose(err, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Client) Close(err error) {
|
||||
func (t *Client) forceClose(err error, locked bool) {
|
||||
if !locked {
|
||||
t.connMutex.Lock()
|
||||
defer t.connMutex.Unlock()
|
||||
}
|
||||
quicConn := t.quicConn
|
||||
if quicConn != nil {
|
||||
_ = quicConn.CloseWithError(ProtocolError, err.Error())
|
||||
|
@ -250,6 +229,13 @@ func (t *Client) Close(err error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (t *Client) Close() {
|
||||
t.closed.Store(true)
|
||||
if t.openStreams.Load() == 0 {
|
||||
t.forceClose(ClientClosed, false)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Client) DialContext(ctx context.Context, metadata *C.Metadata) (net.Conn, error) {
|
||||
quicConn, err := t.getQuicConn(ctx)
|
||||
if err != nil {
|
||||
|
@ -278,7 +264,15 @@ func (t *Client) DialContext(ctx context.Context, metadata *C.Metadata) (net.Con
|
|||
Stream: quicStream,
|
||||
lAddr: quicConn.LocalAddr(),
|
||||
rAddr: quicConn.RemoteAddr(),
|
||||
client: t,
|
||||
ref: t,
|
||||
closeDeferFn: func() {
|
||||
time.AfterFunc(C.DefaultTCPTimeout, func() {
|
||||
openStreams := t.openStreams.Add(-1)
|
||||
if openStreams == 0 && t.closed.Load() {
|
||||
t.forceClose(ClientClosed, false)
|
||||
}
|
||||
})
|
||||
},
|
||||
}
|
||||
_, err = buf.WriteTo(stream)
|
||||
if err != nil {
|
||||
|
@ -306,12 +300,12 @@ type earlyConn struct {
|
|||
resOnce sync.Once
|
||||
resErr error
|
||||
|
||||
RequestTimeout int
|
||||
RequestTimeout time.Duration
|
||||
}
|
||||
|
||||
func (conn *earlyConn) response() error {
|
||||
if conn.RequestTimeout > 0 {
|
||||
_ = conn.SetReadDeadline(time.Now().Add(time.Duration(conn.RequestTimeout) * time.Millisecond))
|
||||
_ = conn.SetReadDeadline(time.Now().Add(conn.RequestTimeout))
|
||||
}
|
||||
response, err := ReadResponse(conn)
|
||||
if err != nil {
|
||||
|
@ -341,59 +335,6 @@ func (conn *earlyConn) Read(b []byte) (n int, err error) {
|
|||
return conn.BufferedConn.Read(b)
|
||||
}
|
||||
|
||||
type quicStreamConn struct {
|
||||
quic.Stream
|
||||
lock sync.Mutex
|
||||
lAddr net.Addr
|
||||
rAddr net.Addr
|
||||
client *Client
|
||||
|
||||
closeOnce sync.Once
|
||||
closeErr error
|
||||
}
|
||||
|
||||
func (q *quicStreamConn) Write(p []byte) (n int, err error) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
return q.Stream.Write(p)
|
||||
}
|
||||
|
||||
func (q *quicStreamConn) Close() error {
|
||||
q.closeOnce.Do(func() {
|
||||
q.closeErr = q.close()
|
||||
})
|
||||
return q.closeErr
|
||||
}
|
||||
|
||||
func (q *quicStreamConn) close() error {
|
||||
defer time.AfterFunc(C.DefaultTCPTimeout, func() {
|
||||
q.client.openStreams.Add(-1)
|
||||
})
|
||||
|
||||
// https://github.com/cloudflare/cloudflared/commit/ed2bac026db46b239699ac5ce4fcf122d7cab2cd
|
||||
// Make sure a possible writer does not block the lock forever. We need it, so we can close the writer
|
||||
// side of the stream safely.
|
||||
_ = q.Stream.SetWriteDeadline(time.Now())
|
||||
|
||||
// This lock is eventually acquired despite Write also acquiring it, because we set a deadline to writes.
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
// We have to clean up the receiving stream ourselves since the Close in the bottom does not handle that.
|
||||
q.Stream.CancelRead(0)
|
||||
return q.Stream.Close()
|
||||
}
|
||||
|
||||
func (q *quicStreamConn) LocalAddr() net.Addr {
|
||||
return q.lAddr
|
||||
}
|
||||
|
||||
func (q *quicStreamConn) RemoteAddr() net.Addr {
|
||||
return q.rAddr
|
||||
}
|
||||
|
||||
var _ net.Conn = &quicStreamConn{}
|
||||
|
||||
func (t *Client) ListenPacketContext(ctx context.Context, metadata *C.Metadata) (net.PacketConn, error) {
|
||||
quicConn, err := t.getQuicConn(ctx)
|
||||
if err != nil {
|
||||
|
@ -415,139 +356,27 @@ func (t *Client) ListenPacketContext(ctx context.Context, metadata *C.Metadata)
|
|||
}
|
||||
}
|
||||
pc := &quicStreamPacketConn{
|
||||
connId: connId,
|
||||
quicConn: quicConn,
|
||||
lAddr: quicConn.LocalAddr(),
|
||||
client: t,
|
||||
inputConn: N.NewBufferedConn(pipe2),
|
||||
connId: connId,
|
||||
quicConn: quicConn,
|
||||
lAddr: quicConn.LocalAddr(),
|
||||
inputConn: N.NewBufferedConn(pipe2),
|
||||
udpRelayMode: t.UdpRelayMode,
|
||||
maxUdpRelayPacketSize: t.MaxUdpRelayPacketSize,
|
||||
ref: t,
|
||||
deferQuicConnFn: t.deferQuicConn,
|
||||
closeDeferFn: func() {
|
||||
t.udpInputMap.Delete(connId)
|
||||
time.AfterFunc(C.DefaultUDPTimeout, func() {
|
||||
openStreams := t.openStreams.Add(-1)
|
||||
if openStreams == 0 && t.closed.Load() {
|
||||
t.forceClose(ClientClosed, false)
|
||||
}
|
||||
})
|
||||
},
|
||||
}
|
||||
return pc, nil
|
||||
}
|
||||
|
||||
type quicStreamPacketConn struct {
|
||||
connId uint32
|
||||
quicConn quic.Connection
|
||||
lAddr net.Addr
|
||||
client *Client
|
||||
inputConn *N.BufferedConn
|
||||
|
||||
closeOnce sync.Once
|
||||
closeErr error
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) Close() error {
|
||||
q.closeOnce.Do(func() {
|
||||
q.closed = true
|
||||
q.closeErr = q.close()
|
||||
})
|
||||
return q.closeErr
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) close() (err error) {
|
||||
defer time.AfterFunc(C.DefaultTCPTimeout, func() {
|
||||
q.client.openStreams.Add(-1)
|
||||
})
|
||||
defer func() {
|
||||
q.client.deferQuicConn(q.quicConn, err)
|
||||
}()
|
||||
q.client.udpInputMap.Delete(q.connId)
|
||||
_ = q.inputConn.Close()
|
||||
buf := pool.GetBuffer()
|
||||
defer pool.PutBuffer(buf)
|
||||
err = NewDissociate(q.connId).WriteTo(buf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
stream, err := q.quicConn.OpenUniStream()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, err = buf.WriteTo(stream)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = stream.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) SetDeadline(t time.Time) error {
|
||||
//TODO implement me
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) SetReadDeadline(t time.Time) error {
|
||||
return q.inputConn.SetReadDeadline(t)
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) SetWriteDeadline(t time.Time) error {
|
||||
//TODO implement me
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
|
||||
packet, err := ReadPacket(q.inputConn)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n = copy(p, packet.DATA)
|
||||
addr = packet.ADDR.UDPAddr()
|
||||
return
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
|
||||
if len(p) > q.client.MaxUdpRelayPacketSize {
|
||||
return 0, fmt.Errorf("udp packet too large(%d > %d)", len(p), q.client.MaxUdpRelayPacketSize)
|
||||
}
|
||||
if q.closed {
|
||||
return 0, net.ErrClosed
|
||||
}
|
||||
defer func() {
|
||||
q.client.deferQuicConn(q.quicConn, err)
|
||||
}()
|
||||
addr.String()
|
||||
buf := pool.GetBuffer()
|
||||
defer pool.PutBuffer(buf)
|
||||
addrPort, err := netip.ParseAddrPort(addr.String())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = NewPacket(q.connId, uint16(len(p)), NewAddressAddrPort(addrPort), p).WriteTo(buf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
switch q.client.UdpRelayMode {
|
||||
case "quic":
|
||||
var stream quic.SendStream
|
||||
stream, err = q.quicConn.OpenUniStream()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer stream.Close()
|
||||
_, err = buf.WriteTo(stream)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
default: // native
|
||||
err = q.quicConn.SendMessage(buf.Bytes())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
n = len(p)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) LocalAddr() net.Addr {
|
||||
return q.lAddr
|
||||
}
|
||||
|
||||
var _ net.PacketConn = &quicStreamPacketConn{}
|
||||
|
||||
func NewClient(clientOption *ClientOption, udp bool) *Client {
|
||||
c := &Client{
|
||||
ClientOption: clientOption,
|
||||
|
@ -558,5 +387,5 @@ func NewClient(clientOption *ClientOption, udp bool) *Client {
|
|||
}
|
||||
|
||||
func closeClient(client *Client) {
|
||||
client.Close(ClientClosed)
|
||||
client.forceClose(ClientClosed, false)
|
||||
}
|
||||
|
|
250
transport/tuic/conn.go
Normal file
250
transport/tuic/conn.go
Normal file
|
@ -0,0 +1,250 @@
|
|||
package tuic
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/metacubex/quic-go"
|
||||
|
||||
N "github.com/Dreamacro/clash/common/net"
|
||||
"github.com/Dreamacro/clash/common/pool"
|
||||
"github.com/Dreamacro/clash/transport/tuic/congestion"
|
||||
)
|
||||
|
||||
func SetCongestionController(quicConn quic.Connection, cc string) {
|
||||
switch cc {
|
||||
case "cubic":
|
||||
quicConn.SetCongestionControl(
|
||||
congestion.NewCubicSender(
|
||||
congestion.DefaultClock{},
|
||||
congestion.GetMaxPacketSize(quicConn.RemoteAddr()),
|
||||
false,
|
||||
nil,
|
||||
),
|
||||
)
|
||||
case "new_reno":
|
||||
quicConn.SetCongestionControl(
|
||||
congestion.NewCubicSender(
|
||||
congestion.DefaultClock{},
|
||||
congestion.GetMaxPacketSize(quicConn.RemoteAddr()),
|
||||
true,
|
||||
nil,
|
||||
),
|
||||
)
|
||||
case "bbr":
|
||||
quicConn.SetCongestionControl(
|
||||
congestion.NewBBRSender(
|
||||
congestion.DefaultClock{},
|
||||
congestion.GetMaxPacketSize(quicConn.RemoteAddr()),
|
||||
congestion.InitialCongestionWindow,
|
||||
congestion.DefaultBBRMaxCongestionWindow,
|
||||
),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
type quicStreamConn struct {
|
||||
quic.Stream
|
||||
lock sync.Mutex
|
||||
lAddr net.Addr
|
||||
rAddr net.Addr
|
||||
|
||||
ref any
|
||||
|
||||
closeDeferFn func()
|
||||
|
||||
closeOnce sync.Once
|
||||
closeErr error
|
||||
}
|
||||
|
||||
func (q *quicStreamConn) Write(p []byte) (n int, err error) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
return q.Stream.Write(p)
|
||||
}
|
||||
|
||||
func (q *quicStreamConn) Close() error {
|
||||
q.closeOnce.Do(func() {
|
||||
q.closeErr = q.close()
|
||||
})
|
||||
return q.closeErr
|
||||
}
|
||||
|
||||
func (q *quicStreamConn) close() error {
|
||||
if q.closeDeferFn != nil {
|
||||
defer q.closeDeferFn()
|
||||
}
|
||||
|
||||
// https://github.com/cloudflare/cloudflared/commit/ed2bac026db46b239699ac5ce4fcf122d7cab2cd
|
||||
// Make sure a possible writer does not block the lock forever. We need it, so we can close the writer
|
||||
// side of the stream safely.
|
||||
_ = q.Stream.SetWriteDeadline(time.Now())
|
||||
|
||||
// This lock is eventually acquired despite Write also acquiring it, because we set a deadline to writes.
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
// We have to clean up the receiving stream ourselves since the Close in the bottom does not handle that.
|
||||
q.Stream.CancelRead(0)
|
||||
return q.Stream.Close()
|
||||
}
|
||||
|
||||
func (q *quicStreamConn) LocalAddr() net.Addr {
|
||||
return q.lAddr
|
||||
}
|
||||
|
||||
func (q *quicStreamConn) RemoteAddr() net.Addr {
|
||||
return q.rAddr
|
||||
}
|
||||
|
||||
var _ net.Conn = &quicStreamConn{}
|
||||
|
||||
type quicStreamPacketConn struct {
|
||||
connId uint32
|
||||
quicConn quic.Connection
|
||||
lAddr net.Addr
|
||||
inputConn *N.BufferedConn
|
||||
|
||||
udpRelayMode string
|
||||
maxUdpRelayPacketSize int
|
||||
|
||||
ref any
|
||||
|
||||
deferQuicConnFn func(quicConn quic.Connection, err error)
|
||||
closeDeferFn func()
|
||||
|
||||
closeOnce sync.Once
|
||||
closeErr error
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) Close() error {
|
||||
q.closeOnce.Do(func() {
|
||||
q.closed = true
|
||||
q.closeErr = q.close()
|
||||
})
|
||||
return q.closeErr
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) close() (err error) {
|
||||
if q.closeDeferFn != nil {
|
||||
defer q.closeDeferFn()
|
||||
}
|
||||
defer func() {
|
||||
if q.deferQuicConnFn != nil {
|
||||
q.deferQuicConnFn(q.quicConn, err)
|
||||
}
|
||||
}()
|
||||
if q.inputConn != nil {
|
||||
_ = q.inputConn.Close()
|
||||
q.inputConn = nil
|
||||
|
||||
buf := pool.GetBuffer()
|
||||
defer pool.PutBuffer(buf)
|
||||
err = NewDissociate(q.connId).WriteTo(buf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
var stream quic.SendStream
|
||||
stream, err = q.quicConn.OpenUniStream()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, err = buf.WriteTo(stream)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = stream.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) SetDeadline(t time.Time) error {
|
||||
//TODO implement me
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) SetReadDeadline(t time.Time) error {
|
||||
if q.inputConn != nil {
|
||||
return q.inputConn.SetReadDeadline(t)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) SetWriteDeadline(t time.Time) error {
|
||||
//TODO implement me
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
|
||||
if q.inputConn != nil {
|
||||
var packet Packet
|
||||
packet, err = ReadPacket(q.inputConn)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
n = copy(p, packet.DATA)
|
||||
addr = packet.ADDR.UDPAddr()
|
||||
} else {
|
||||
err = net.ErrClosed
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
|
||||
if len(p) > q.maxUdpRelayPacketSize {
|
||||
return 0, fmt.Errorf("udp packet too large(%d > %d)", len(p), q.maxUdpRelayPacketSize)
|
||||
}
|
||||
if q.closed {
|
||||
return 0, net.ErrClosed
|
||||
}
|
||||
defer func() {
|
||||
if q.deferQuicConnFn != nil {
|
||||
q.deferQuicConnFn(q.quicConn, err)
|
||||
}
|
||||
}()
|
||||
addr.String()
|
||||
buf := pool.GetBuffer()
|
||||
defer pool.PutBuffer(buf)
|
||||
addrPort, err := netip.ParseAddrPort(addr.String())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = NewPacket(q.connId, uint16(len(p)), NewAddressAddrPort(addrPort), p).WriteTo(buf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
switch q.udpRelayMode {
|
||||
case "quic":
|
||||
var stream quic.SendStream
|
||||
stream, err = q.quicConn.OpenUniStream()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer stream.Close()
|
||||
_, err = buf.WriteTo(stream)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
default: // native
|
||||
err = q.quicConn.SendMessage(buf.Bytes())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
n = len(p)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (q *quicStreamPacketConn) LocalAddr() net.Addr {
|
||||
return q.lAddr
|
||||
}
|
||||
|
||||
var _ net.PacketConn = &quicStreamPacketConn{}
|
|
@ -75,7 +75,7 @@ func (t *PoolClient) dial(ctx context.Context, opts ...dialer.Option) (pc net.Pa
|
|||
return pc, addr, err
|
||||
}
|
||||
|
||||
func (t *PoolClient) Close() {
|
||||
func (t *PoolClient) forceClose() {
|
||||
t.dialResultMutex.Lock()
|
||||
defer t.dialResultMutex.Unlock()
|
||||
for key := range t.dialResultMap {
|
||||
|
@ -141,6 +141,7 @@ func (t *PoolClient) getClient(udp bool, opts ...dialer.Option) *Client {
|
|||
}
|
||||
}
|
||||
if client.openStreams.Load() == 0 && time.Now().Sub(client.lastVisited) > 30*time.Minute {
|
||||
client.Close()
|
||||
next := it.Next()
|
||||
clients.Remove(it)
|
||||
it = next
|
||||
|
@ -173,5 +174,5 @@ func NewClientPool(clientOption *ClientOption) *PoolClient {
|
|||
}
|
||||
|
||||
func closeClientPool(client *PoolClient) {
|
||||
client.Close()
|
||||
client.forceClose()
|
||||
}
|
||||
|
|
|
@ -112,6 +112,29 @@ func NewAuthenticate(TKN [32]byte) Authenticate {
|
|||
}
|
||||
}
|
||||
|
||||
func ReadAuthenticateWithHead(head CommandHead, reader BufferedReader) (c Authenticate, err error) {
|
||||
c.CommandHead = head
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if c.CommandHead.TYPE != AuthenticateType {
|
||||
err = fmt.Errorf("error command type: %s", c.CommandHead.TYPE)
|
||||
}
|
||||
_, err = io.ReadFull(reader, c.TKN[:])
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func ReadAuthenticate(reader BufferedReader) (c Authenticate, err error) {
|
||||
head, err := ReadCommandHead(reader)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return ReadAuthenticateWithHead(head, reader)
|
||||
}
|
||||
|
||||
func GenTKN(token string) [32]byte {
|
||||
return blake3.Sum256([]byte(token))
|
||||
}
|
||||
|
@ -144,6 +167,29 @@ func NewConnect(ADDR Address) Connect {
|
|||
}
|
||||
}
|
||||
|
||||
func ReadConnectWithHead(head CommandHead, reader BufferedReader) (c Connect, err error) {
|
||||
c.CommandHead = head
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if c.CommandHead.TYPE != ConnectType {
|
||||
err = fmt.Errorf("error command type: %s", c.CommandHead.TYPE)
|
||||
}
|
||||
c.ADDR, err = ReadAddress(reader)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func ReadConnect(reader BufferedReader) (c Connect, err error) {
|
||||
head, err := ReadCommandHead(reader)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return ReadConnectWithHead(head, reader)
|
||||
}
|
||||
|
||||
func (c Connect) WriteTo(writer BufferedWriter) (err error) {
|
||||
err = c.CommandHead.WriteTo(writer)
|
||||
if err != nil {
|
||||
|
@ -254,6 +300,29 @@ func NewDissociate(ASSOC_ID uint32) Dissociate {
|
|||
}
|
||||
}
|
||||
|
||||
func ReadDissociateWithHead(head CommandHead, reader BufferedReader) (c Dissociate, err error) {
|
||||
c.CommandHead = head
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if c.CommandHead.TYPE != PacketType {
|
||||
err = fmt.Errorf("error command type: %s", c.CommandHead.TYPE)
|
||||
}
|
||||
err = binary.Read(reader, binary.BigEndian, &c.ASSOC_ID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func ReadDissociate(reader BufferedReader) (c Dissociate, err error) {
|
||||
head, err := ReadCommandHead(reader)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return ReadDissociateWithHead(head, reader)
|
||||
}
|
||||
|
||||
func (c Dissociate) WriteTo(writer BufferedWriter) (err error) {
|
||||
err = c.CommandHead.WriteTo(writer)
|
||||
if err != nil {
|
||||
|
@ -308,6 +377,14 @@ func NewResponse(REP byte) Response {
|
|||
}
|
||||
}
|
||||
|
||||
func NewResponseSucceed() Response {
|
||||
return NewResponse(0x00)
|
||||
}
|
||||
|
||||
func NewResponseFailed() Response {
|
||||
return NewResponse(0xff)
|
||||
}
|
||||
|
||||
func ReadResponseWithHead(head CommandHead, reader BufferedReader) (c Response, err error) {
|
||||
c.CommandHead = head
|
||||
if c.CommandHead.TYPE != ResponseType {
|
||||
|
@ -466,6 +543,17 @@ func (c Address) WriteTo(writer BufferedWriter) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (c Address) String() string {
|
||||
switch c.TYPE {
|
||||
case AtypDomainName:
|
||||
return net.JoinHostPort(string(c.ADDR[1:]), strconv.Itoa(int(c.PORT)))
|
||||
default:
|
||||
addr, _ := netip.AddrFromSlice(c.ADDR)
|
||||
addrPort := netip.AddrPortFrom(addr, c.PORT)
|
||||
return addrPort.String()
|
||||
}
|
||||
}
|
||||
|
||||
func (c Address) UDPAddr() *net.UDPAddr {
|
||||
return &net.UDPAddr{
|
||||
IP: c.ADDR,
|
||||
|
|
292
transport/tuic/server.go
Normal file
292
transport/tuic/server.go
Normal file
|
@ -0,0 +1,292 @@
|
|||
package tuic
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gofrs/uuid"
|
||||
"github.com/metacubex/quic-go"
|
||||
|
||||
N "github.com/Dreamacro/clash/common/net"
|
||||
"github.com/Dreamacro/clash/common/pool"
|
||||
C "github.com/Dreamacro/clash/constant"
|
||||
)
|
||||
|
||||
type ServerOption struct {
|
||||
HandleTcpFn func(conn net.Conn, addr string) error
|
||||
HandleUdpFn func(addr *net.UDPAddr, packet C.UDPPacket) error
|
||||
|
||||
TlsConfig *tls.Config
|
||||
QuicConfig *quic.Config
|
||||
Tokens [][32]byte
|
||||
CongestionController string
|
||||
AuthenticationTimeout time.Duration
|
||||
MaxUdpRelayPacketSize int
|
||||
}
|
||||
|
||||
type Server struct {
|
||||
*ServerOption
|
||||
listener quic.EarlyListener
|
||||
}
|
||||
|
||||
func NewServer(option *ServerOption, pc net.PacketConn) (*Server, error) {
|
||||
listener, err := quic.ListenEarly(pc, option.TlsConfig, option.QuicConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Server{
|
||||
ServerOption: option,
|
||||
listener: listener,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (s *Server) Serve() error {
|
||||
for {
|
||||
conn, err := s.listener.Accept(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
uuid, err := uuid.NewV4()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
h := &serverHandler{
|
||||
Server: s,
|
||||
quicConn: conn,
|
||||
uuid: uuid,
|
||||
authCh: make(chan struct{}),
|
||||
}
|
||||
go h.handle()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) Close() error {
|
||||
return s.listener.Close()
|
||||
}
|
||||
|
||||
type serverHandler struct {
|
||||
*Server
|
||||
quicConn quic.Connection
|
||||
uuid uuid.UUID
|
||||
|
||||
authCh chan struct{}
|
||||
authOk bool
|
||||
authOnce sync.Once
|
||||
}
|
||||
|
||||
func (s *serverHandler) handle() {
|
||||
time.AfterFunc(s.AuthenticationTimeout, func() {
|
||||
s.authOnce.Do(func() {
|
||||
_ = s.quicConn.CloseWithError(AuthenticationTimeout, "")
|
||||
s.authOk = false
|
||||
close(s.authCh)
|
||||
})
|
||||
})
|
||||
go func() {
|
||||
_ = s.handleUniStream()
|
||||
}()
|
||||
go func() {
|
||||
_ = s.handleStream()
|
||||
}()
|
||||
go func() {
|
||||
_ = s.handleMessage()
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *serverHandler) handleMessage() (err error) {
|
||||
for {
|
||||
var message []byte
|
||||
message, err = s.quicConn.ReceiveMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() (err error) {
|
||||
buffer := bytes.NewBuffer(message)
|
||||
packet, err := ReadPacket(buffer)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return s.parsePacket(packet, "native")
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *serverHandler) parsePacket(packet Packet, udpRelayMode string) (err error) {
|
||||
<-s.authCh
|
||||
if !s.authOk {
|
||||
return
|
||||
}
|
||||
var assocId uint32
|
||||
|
||||
assocId = packet.ASSOC_ID
|
||||
pc := &quicStreamPacketConn{
|
||||
connId: assocId,
|
||||
quicConn: s.quicConn,
|
||||
lAddr: s.quicConn.LocalAddr(),
|
||||
inputConn: nil,
|
||||
udpRelayMode: udpRelayMode,
|
||||
maxUdpRelayPacketSize: s.MaxUdpRelayPacketSize,
|
||||
ref: s,
|
||||
deferQuicConnFn: nil,
|
||||
closeDeferFn: nil,
|
||||
}
|
||||
|
||||
return s.HandleUdpFn(packet.ADDR.UDPAddr(), &serverUDPPacket{
|
||||
pc: pc,
|
||||
packet: &packet,
|
||||
rAddr: s.genServerAssocIdAddr(assocId),
|
||||
})
|
||||
}
|
||||
|
||||
func (s *serverHandler) genServerAssocIdAddr(assocId uint32) net.Addr {
|
||||
return ServerAssocIdAddr(fmt.Sprintf("tuic-%s-%d", s.uuid.String(), assocId))
|
||||
}
|
||||
|
||||
func (s *serverHandler) handleStream() (err error) {
|
||||
for {
|
||||
var quicStream quic.Stream
|
||||
quicStream, err = s.quicConn.AcceptStream(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() (err error) {
|
||||
stream := &quicStreamConn{
|
||||
Stream: quicStream,
|
||||
lAddr: s.quicConn.LocalAddr(),
|
||||
rAddr: s.quicConn.RemoteAddr(),
|
||||
ref: s,
|
||||
}
|
||||
conn := N.NewBufferedConn(stream)
|
||||
connect, err := ReadConnect(conn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
<-s.authCh
|
||||
if !s.authOk {
|
||||
return conn.Close()
|
||||
}
|
||||
|
||||
buf := pool.GetBuffer()
|
||||
defer pool.PutBuffer(buf)
|
||||
err = s.HandleTcpFn(conn, connect.ADDR.String())
|
||||
if err != nil {
|
||||
err = NewResponseFailed().WriteTo(buf)
|
||||
}
|
||||
err = NewResponseSucceed().WriteTo(buf)
|
||||
if err != nil {
|
||||
_ = conn.Close()
|
||||
return err
|
||||
}
|
||||
_, err = buf.WriteTo(stream)
|
||||
if err != nil {
|
||||
_ = conn.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
return
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *serverHandler) handleUniStream() (err error) {
|
||||
for {
|
||||
var stream quic.ReceiveStream
|
||||
stream, err = s.quicConn.AcceptUniStream(context.Background())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() (err error) {
|
||||
defer func() {
|
||||
stream.CancelRead(0)
|
||||
}()
|
||||
reader := bufio.NewReader(stream)
|
||||
commandHead, err := ReadCommandHead(reader)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
switch commandHead.TYPE {
|
||||
case AuthenticateType:
|
||||
var authenticate Authenticate
|
||||
authenticate, err = ReadAuthenticateWithHead(commandHead, reader)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ok := false
|
||||
for _, tkn := range s.Tokens {
|
||||
if authenticate.TKN == tkn {
|
||||
ok = true
|
||||
break
|
||||
}
|
||||
}
|
||||
s.authOnce.Do(func() {
|
||||
if !ok {
|
||||
_ = s.quicConn.CloseWithError(AuthenticationFailed, "")
|
||||
}
|
||||
s.authOk = ok
|
||||
close(s.authCh)
|
||||
})
|
||||
case PacketType:
|
||||
var packet Packet
|
||||
packet, err = ReadPacketWithHead(commandHead, reader)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return s.parsePacket(packet, "quic")
|
||||
case DissociateType:
|
||||
var disassociate Dissociate
|
||||
disassociate, err = ReadDissociateWithHead(commandHead, reader)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
disassociate.BytesLen()
|
||||
}
|
||||
return
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
type ServerAssocIdAddr string
|
||||
|
||||
func (a ServerAssocIdAddr) Network() string {
|
||||
return "ServerAssocIdAddr"
|
||||
}
|
||||
|
||||
func (a ServerAssocIdAddr) String() string {
|
||||
return string(a)
|
||||
}
|
||||
|
||||
type serverUDPPacket struct {
|
||||
pc *quicStreamPacketConn
|
||||
packet *Packet
|
||||
rAddr net.Addr
|
||||
}
|
||||
|
||||
func (s *serverUDPPacket) InAddr() net.Addr {
|
||||
return s.pc.LocalAddr()
|
||||
}
|
||||
|
||||
func (s *serverUDPPacket) LocalAddr() net.Addr {
|
||||
return s.rAddr
|
||||
}
|
||||
|
||||
func (s *serverUDPPacket) Data() []byte {
|
||||
return s.packet.DATA
|
||||
}
|
||||
|
||||
func (s *serverUDPPacket) WriteBack(b []byte, addr net.Addr) (n int, err error) {
|
||||
return s.pc.WriteTo(b, addr)
|
||||
}
|
||||
|
||||
func (s *serverUDPPacket) Drop() {
|
||||
s.packet.DATA = nil
|
||||
}
|
||||
|
||||
var _ C.UDPPacket = &serverUDPPacket{}
|
||||
var _ C.UDPPacketInAddr = &serverUDPPacket{}
|
Loading…
Reference in a new issue