From c7bad89af36d755e2971ed875ca790af3a9c30e1 Mon Sep 17 00:00:00 2001 From: wwqgtxx Date: Fri, 25 Nov 2022 19:14:09 +0800 Subject: [PATCH] fix: tuic better stream close --- transport/tuic/client.go | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/transport/tuic/client.go b/transport/tuic/client.go index a28e6ec2..d5d2664d 100644 --- a/transport/tuic/client.go +++ b/transport/tuic/client.go @@ -26,7 +26,7 @@ var ( TooManyOpenStreams = errors.New("tuic: too many open streams") ) -const MaxOpenStreams = 100 - 10 +const MaxOpenStreams = 100 - 90 type Client struct { TlsConfig *tls.Config @@ -230,9 +230,9 @@ func (t *Client) Close(err error) { if conn, ok := value.(net.Conn); ok { _ = conn.Close() } + t.udpInputMap.Delete(key) return true }) - t.udpInputMap = sync.Map{} // new one t.quicConn = nil } } @@ -260,7 +260,12 @@ func (t *Client) DialContext(ctx context.Context, metadata *C.Metadata, dialFn f if err != nil { return nil, err } - stream = &quicStreamConn{quicStream, quicConn.LocalAddr(), quicConn.RemoteAddr(), t} + stream = &quicStreamConn{ + Stream: quicStream, + lAddr: quicConn.LocalAddr(), + rAddr: quicConn.RemoteAddr(), + client: t, + } _, err = buf.WriteTo(stream) if err != nil { _ = stream.Close() @@ -291,15 +296,34 @@ func (t *Client) DialContext(ctx context.Context, metadata *C.Metadata, dialFn f type quicStreamConn struct { quic.Stream + lock sync.Mutex lAddr net.Addr rAddr net.Addr client *Client } +func (q *quicStreamConn) Write(p []byte) (n int, err error) { + q.lock.Lock() + defer q.lock.Unlock() + return q.Stream.Write(p) +} + func (q *quicStreamConn) Close() error { - //defer q.client.openStreams.Add(-1) + defer time.AfterFunc(C.DefaultTCPTimeout, func() { + q.client.openStreams.Add(-1) + }) + + // https://github.com/cloudflare/cloudflared/commit/ed2bac026db46b239699ac5ce4fcf122d7cab2cd + // Make sure a possible writer does not block the lock forever. We need it, so we can close the writer + // side of the stream safely. + _ = q.Stream.SetWriteDeadline(time.Now()) + + // This lock is eventually acquired despite Write also acquiring it, because we set a deadline to writes. + q.lock.Lock() + defer q.lock.Unlock() + + // We have to clean up the receiving stream ourselves since the Close in the bottom does not handle that. q.Stream.CancelRead(0) - q.Stream.CancelWrite(0) return q.Stream.Close() }