chore: sing inbound support WaitReadPacket

This commit is contained in:
gVisor bot 2023-05-10 22:35:50 +08:00
parent 50fef8e201
commit 96511d8613
4 changed files with 63 additions and 17 deletions

View file

@ -32,7 +32,7 @@ func NewUDP(addr string, pickCipher core.Cipher, in chan<- C.PacketAdapter) (*UD
conn := pickCipher.PacketConn(l) conn := pickCipher.PacketConn(l)
go func() { go func() {
for { for {
buf := pool.Get(pool.RelayBufferSize) buf := pool.Get(pool.UDPBufferSize)
n, remoteAddr, err := conn.ReadFrom(buf) n, remoteAddr, err := conn.ReadFrom(buf)
if err != nil { if err != nil {
pool.Put(buf) pool.Put(buf)

View file

@ -116,11 +116,26 @@ func (h *ListenerHandler) NewPacketConnection(ctx context.Context, conn network.
defer mutex.Unlock() defer mutex.Unlock()
conn2 = nil conn2 = nil
}() }()
readWaiter, isReadWaiter := bufio.CreatePacketReadWaiter(conn)
for { for {
buff := buf.NewPacket() // do not use stack buffer var (
dest, err := conn.ReadPacket(buff) buff *buf.Buffer
dest M.Socksaddr
err error
)
newBuffer := func() *buf.Buffer {
buff = buf.NewPacket() // do not use stack buffer
return buff
}
if isReadWaiter {
dest, err = readWaiter.WaitReadPacket(newBuffer)
} else {
dest, err = conn.ReadPacket(newBuffer())
}
if err != nil { if err != nil {
buff.Release() if buff != nil {
buff.Release()
}
if ShouldIgnorePacketError(err) { if ShouldIgnorePacketError(err) {
break break
} }

View file

@ -21,7 +21,7 @@ import (
"github.com/sagernet/sing/common" "github.com/sagernet/sing/common"
"github.com/sagernet/sing/common/buf" "github.com/sagernet/sing/common/buf"
"github.com/sagernet/sing/common/bufio" "github.com/sagernet/sing/common/bufio"
"github.com/sagernet/sing/common/metadata" M "github.com/sagernet/sing/common/metadata"
) )
type Listener struct { type Listener struct {
@ -92,19 +92,34 @@ func New(config LC.ShadowsocksServer, tcpIn chan<- C.ConnContext, udpIn chan<- C
go func() { go func() {
conn := bufio.NewPacketConn(ul) conn := bufio.NewPacketConn(ul)
readWaiter, isReadWaiter := bufio.CreatePacketReadWaiter(conn)
for { for {
buff := buf.NewPacket() var (
remoteAddr, err := conn.ReadPacket(buff) buff *buf.Buffer
dest M.Socksaddr
err error
)
newBuffer := func() *buf.Buffer {
buff = buf.NewPacket() // do not use stack buffer
return buff
}
if isReadWaiter {
dest, err = readWaiter.WaitReadPacket(newBuffer)
} else {
dest, err = conn.ReadPacket(newBuffer())
}
if err != nil { if err != nil {
buff.Release() if buff != nil {
buff.Release()
}
if sl.closed { if sl.closed {
break break
} }
continue continue
} }
_ = sl.service.NewPacket(context.TODO(), conn, buff, metadata.Metadata{ _ = sl.service.NewPacket(context.TODO(), conn, buff, M.Metadata{
Protocol: "shadowsocks", Protocol: "shadowsocks",
Source: remoteAddr, Source: dest,
}) })
} }
}() }()
@ -170,9 +185,9 @@ func (l *Listener) AddrList() (addrList []net.Addr) {
func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) { func (l *Listener) HandleConn(conn net.Conn, in chan<- C.ConnContext, additions ...inbound.Addition) {
ctx := sing.WithAdditions(context.TODO(), additions...) ctx := sing.WithAdditions(context.TODO(), additions...)
err := l.service.NewConnection(ctx, conn, metadata.Metadata{ err := l.service.NewConnection(ctx, conn, M.Metadata{
Protocol: "shadowsocks", Protocol: "shadowsocks",
Source: metadata.ParseSocksaddr(conn.RemoteAddr().String()), Source: M.ParseSocksaddr(conn.RemoteAddr().String()),
}) })
if err != nil { if err != nil {
_ = conn.Close() _ = conn.Close()

View file

@ -17,6 +17,7 @@ import (
D "github.com/miekg/dns" D "github.com/miekg/dns"
"github.com/sagernet/sing/common/buf" "github.com/sagernet/sing/common/buf"
"github.com/sagernet/sing/common/bufio"
M "github.com/sagernet/sing/common/metadata" M "github.com/sagernet/sing/common/metadata"
"github.com/sagernet/sing/common/network" "github.com/sagernet/sing/common/network"
) )
@ -108,14 +109,29 @@ func (h *ListenerHandler) NewPacketConnection(ctx context.Context, conn network.
defer mutex.Unlock() defer mutex.Unlock()
conn2 = nil conn2 = nil
}() }()
readWaiter, isReadWaiter := bufio.CreatePacketReadWaiter(conn)
for { for {
// safe size which is 1232 from https://dnsflagday.net/2020/. var (
// so 2048 is enough buff *buf.Buffer
buff := buf.NewSize(2 * 1024) dest M.Socksaddr
err error
)
newBuffer := func() *buf.Buffer {
// safe size which is 1232 from https://dnsflagday.net/2020/.
// so 2048 is enough
buff = buf.NewSize(2 * 1024)
return buff
}
_ = conn.SetReadDeadline(time.Now().Add(DefaultDnsReadTimeout)) _ = conn.SetReadDeadline(time.Now().Add(DefaultDnsReadTimeout))
dest, err := conn.ReadPacket(buff) if isReadWaiter {
dest, err = readWaiter.WaitReadPacket(newBuffer)
} else {
dest, err = conn.ReadPacket(newBuffer())
}
if err != nil { if err != nil {
buff.Release() if buff != nil {
buff.Release()
}
if sing.ShouldIgnorePacketError(err) { if sing.ShouldIgnorePacketError(err) {
break break
} }