Fix: grpc implementation SetDeadline for udp issue

This commit is contained in:
Dreamacro 2021-04-13 23:34:24 +08:00
parent a40274e2a2
commit c9943fb857

View file

@ -35,15 +35,18 @@ var (
type DialFn = func(network, addr string) (net.Conn, error) type DialFn = func(network, addr string) (net.Conn, error)
type Conn struct { type Conn struct {
response *http.Response response *http.Response
request *http.Request request *http.Request
client *http.Client transport *http2.Transport
writer *io.PipeWriter writer *io.PipeWriter
once sync.Once once sync.Once
close *atomic.Bool close *atomic.Bool
err error err error
remain int remain int
br *bufio.Reader br *bufio.Reader
// deadlines
deadline *time.Timer
} }
type Config struct { type Config struct {
@ -52,7 +55,7 @@ type Config struct {
} }
func (g *Conn) initRequest() { func (g *Conn) initRequest() {
response, err := g.client.Do(g.request) response, err := g.transport.RoundTrip(g.request)
if err != nil { if err != nil {
g.err = err g.err = err
g.writer.Close() g.writer.Close()
@ -174,9 +177,20 @@ func (g *Conn) Close() error {
func (g *Conn) LocalAddr() net.Addr { return &net.TCPAddr{IP: net.IPv4zero, Port: 0} } func (g *Conn) LocalAddr() net.Addr { return &net.TCPAddr{IP: net.IPv4zero, Port: 0} }
func (g *Conn) RemoteAddr() net.Addr { return &net.TCPAddr{IP: net.IPv4zero, Port: 0} } func (g *Conn) RemoteAddr() net.Addr { return &net.TCPAddr{IP: net.IPv4zero, Port: 0} }
func (g *Conn) SetDeadline(t time.Time) error { return nil } func (g *Conn) SetReadDeadline(t time.Time) error { return g.SetDeadline(t) }
func (g *Conn) SetReadDeadline(t time.Time) error { return nil } func (g *Conn) SetWriteDeadline(t time.Time) error { return g.SetDeadline(t) }
func (g *Conn) SetWriteDeadline(t time.Time) error { return nil }
func (g *Conn) SetDeadline(t time.Time) error {
d := time.Until(t)
if g.deadline != nil {
g.deadline.Reset(d)
return nil
}
g.deadline = time.AfterFunc(d, func() {
g.Close()
})
return nil
}
func NewHTTP2Client(dialFn DialFn, tlsConfig *tls.Config) *http2.Transport { func NewHTTP2Client(dialFn DialFn, tlsConfig *tls.Config) *http2.Transport {
dialFunc := func(network, addr string, cfg *tls.Config) (net.Conn, error) { dialFunc := func(network, addr string, cfg *tls.Config) (net.Conn, error) {
@ -203,7 +217,6 @@ func NewHTTP2Client(dialFn DialFn, tlsConfig *tls.Config) *http2.Transport {
TLSClientConfig: tlsConfig, TLSClientConfig: tlsConfig,
AllowHTTP: false, AllowHTTP: false,
DisableCompression: true, DisableCompression: true,
ReadIdleTimeout: 0,
PingTimeout: 0, PingTimeout: 0,
} }
} }
@ -214,10 +227,6 @@ func StreamGunWithTransport(transport *http2.Transport, cfg *Config) (net.Conn,
serviceName = cfg.ServiceName serviceName = cfg.ServiceName
} }
client := &http.Client{
Transport: transport,
}
reader, writer := io.Pipe() reader, writer := io.Pipe()
request := &http.Request{ request := &http.Request{
Method: http.MethodPost, Method: http.MethodPost,
@ -234,10 +243,10 @@ func StreamGunWithTransport(transport *http2.Transport, cfg *Config) (net.Conn,
} }
conn := &Conn{ conn := &Conn{
request: request, request: request,
client: client, transport: transport,
writer: writer, writer: writer,
close: atomic.NewBool(false), close: atomic.NewBool(false),
} }
go conn.once.Do(conn.initRequest) go conn.once.Do(conn.initRequest)