fix: tuic better stream close
This commit is contained in:
parent
10e194a238
commit
29df286610
1 changed files with 29 additions and 5 deletions
|
@ -26,7 +26,7 @@ var (
|
||||||
TooManyOpenStreams = errors.New("tuic: too many open streams")
|
TooManyOpenStreams = errors.New("tuic: too many open streams")
|
||||||
)
|
)
|
||||||
|
|
||||||
const MaxOpenStreams = 100 - 10
|
const MaxOpenStreams = 100 - 90
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
TlsConfig *tls.Config
|
TlsConfig *tls.Config
|
||||||
|
@ -230,9 +230,9 @@ func (t *Client) Close(err error) {
|
||||||
if conn, ok := value.(net.Conn); ok {
|
if conn, ok := value.(net.Conn); ok {
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
}
|
}
|
||||||
|
t.udpInputMap.Delete(key)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
t.udpInputMap = sync.Map{} // new one
|
|
||||||
t.quicConn = nil
|
t.quicConn = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -260,7 +260,12 @@ func (t *Client) DialContext(ctx context.Context, metadata *C.Metadata, dialFn f
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
stream = &quicStreamConn{quicStream, quicConn.LocalAddr(), quicConn.RemoteAddr(), t}
|
stream = &quicStreamConn{
|
||||||
|
Stream: quicStream,
|
||||||
|
lAddr: quicConn.LocalAddr(),
|
||||||
|
rAddr: quicConn.RemoteAddr(),
|
||||||
|
client: t,
|
||||||
|
}
|
||||||
_, err = buf.WriteTo(stream)
|
_, err = buf.WriteTo(stream)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = stream.Close()
|
_ = stream.Close()
|
||||||
|
@ -291,15 +296,34 @@ func (t *Client) DialContext(ctx context.Context, metadata *C.Metadata, dialFn f
|
||||||
|
|
||||||
type quicStreamConn struct {
|
type quicStreamConn struct {
|
||||||
quic.Stream
|
quic.Stream
|
||||||
|
lock sync.Mutex
|
||||||
lAddr net.Addr
|
lAddr net.Addr
|
||||||
rAddr net.Addr
|
rAddr net.Addr
|
||||||
client *Client
|
client *Client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (q *quicStreamConn) Write(p []byte) (n int, err error) {
|
||||||
|
q.lock.Lock()
|
||||||
|
defer q.lock.Unlock()
|
||||||
|
return q.Stream.Write(p)
|
||||||
|
}
|
||||||
|
|
||||||
func (q *quicStreamConn) Close() error {
|
func (q *quicStreamConn) Close() error {
|
||||||
//defer q.client.openStreams.Add(-1)
|
defer time.AfterFunc(C.DefaultTCPTimeout, func() {
|
||||||
|
q.client.openStreams.Add(-1)
|
||||||
|
})
|
||||||
|
|
||||||
|
// https://github.com/cloudflare/cloudflared/commit/ed2bac026db46b239699ac5ce4fcf122d7cab2cd
|
||||||
|
// Make sure a possible writer does not block the lock forever. We need it, so we can close the writer
|
||||||
|
// side of the stream safely.
|
||||||
|
_ = q.Stream.SetWriteDeadline(time.Now())
|
||||||
|
|
||||||
|
// This lock is eventually acquired despite Write also acquiring it, because we set a deadline to writes.
|
||||||
|
q.lock.Lock()
|
||||||
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
|
// We have to clean up the receiving stream ourselves since the Close in the bottom does not handle that.
|
||||||
q.Stream.CancelRead(0)
|
q.Stream.CancelRead(0)
|
||||||
q.Stream.CancelWrite(0)
|
|
||||||
return q.Stream.Close()
|
return q.Stream.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue