From c9943fb8570b80a4067926489e513eec67b41184 Mon Sep 17 00:00:00 2001 From: Dreamacro <8615343+Dreamacro@users.noreply.github.com> Date: Tue, 13 Apr 2021 23:34:24 +0800 Subject: [PATCH] Fix: grpc implementation SetDeadline for udp issue --- component/gun/gun.go | 53 ++++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/component/gun/gun.go b/component/gun/gun.go index 6136235d..9c673932 100644 --- a/component/gun/gun.go +++ b/component/gun/gun.go @@ -35,15 +35,18 @@ var ( type DialFn = func(network, addr string) (net.Conn, error) type Conn struct { - response *http.Response - request *http.Request - client *http.Client - writer *io.PipeWriter - once sync.Once - close *atomic.Bool - err error - remain int - br *bufio.Reader + response *http.Response + request *http.Request + transport *http2.Transport + writer *io.PipeWriter + once sync.Once + close *atomic.Bool + err error + remain int + br *bufio.Reader + + // deadlines + deadline *time.Timer } type Config struct { @@ -52,7 +55,7 @@ type Config struct { } func (g *Conn) initRequest() { - response, err := g.client.Do(g.request) + response, err := g.transport.RoundTrip(g.request) if err != nil { g.err = err g.writer.Close() @@ -174,9 +177,20 @@ func (g *Conn) Close() error { func (g *Conn) LocalAddr() net.Addr { return &net.TCPAddr{IP: net.IPv4zero, Port: 0} } func (g *Conn) RemoteAddr() net.Addr { return &net.TCPAddr{IP: net.IPv4zero, Port: 0} } -func (g *Conn) SetDeadline(t time.Time) error { return nil } -func (g *Conn) SetReadDeadline(t time.Time) error { return nil } -func (g *Conn) SetWriteDeadline(t time.Time) error { return nil } +func (g *Conn) SetReadDeadline(t time.Time) error { return g.SetDeadline(t) } +func (g *Conn) SetWriteDeadline(t time.Time) error { return g.SetDeadline(t) } + +func (g *Conn) SetDeadline(t time.Time) error { + d := time.Until(t) + if g.deadline != nil { + g.deadline.Reset(d) + return nil + } + g.deadline = time.AfterFunc(d, func() { + g.Close() + }) + return nil +} func NewHTTP2Client(dialFn DialFn, tlsConfig *tls.Config) *http2.Transport { dialFunc := func(network, addr string, cfg *tls.Config) (net.Conn, error) { @@ -203,7 +217,6 @@ func NewHTTP2Client(dialFn DialFn, tlsConfig *tls.Config) *http2.Transport { TLSClientConfig: tlsConfig, AllowHTTP: false, DisableCompression: true, - ReadIdleTimeout: 0, PingTimeout: 0, } } @@ -214,10 +227,6 @@ func StreamGunWithTransport(transport *http2.Transport, cfg *Config) (net.Conn, serviceName = cfg.ServiceName } - client := &http.Client{ - Transport: transport, - } - reader, writer := io.Pipe() request := &http.Request{ Method: http.MethodPost, @@ -234,10 +243,10 @@ func StreamGunWithTransport(transport *http2.Transport, cfg *Config) (net.Conn, } conn := &Conn{ - request: request, - client: client, - writer: writer, - close: atomic.NewBool(false), + request: request, + transport: transport, + writer: writer, + close: atomic.NewBool(false), } go conn.once.Do(conn.initRequest)