diff --git a/rpc_util.go b/rpc_util.go index 934fc1aa015e..3491db1bb133 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -540,6 +540,9 @@ type parser struct { // The header of a gRPC message. Find more detail at // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md header [5]byte + + // buffer is recycled memory used to read the message. + buffer []byte } // recvMsg reads a complete gRPC message from the stream. @@ -573,9 +576,10 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt if int(length) > maxReceiveMessageSize { return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) } - // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead - // of making it for each message: - msg = make([]byte, int(length)) + if uint32(cap(p.buffer)) < length { + p.buffer = make([]byte, int(length)) + } + msg = p.buffer[:length] if _, err := p.r.Read(msg); err != nil { if err == io.EOF { err = io.ErrUnexpectedEOF @@ -688,12 +692,12 @@ type payloadInfo struct { } func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) { - pf, d, err := p.recvMsg(maxReceiveMessageSize) + pf, buf, err := p.recvMsg(maxReceiveMessageSize) if err != nil { return nil, err } if payInfo != nil { - payInfo.wireLength = len(d) + payInfo.wireLength = len(buf) } if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil { @@ -705,10 +709,10 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei // To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor, // use this decompressor as the default. if dc != nil { - d, err = dc.Do(bytes.NewReader(d)) - size = len(d) + buf, err = dc.Do(bytes.NewReader(buf)) + size = len(buf) } else { - d, size, err = decompress(compressor, d, maxReceiveMessageSize) + buf, size, err = decompress(compressor, buf, maxReceiveMessageSize) } if err != nil { return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) @@ -719,7 +723,7 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize) } } - return d, nil + return buf, nil } // Using compressor, decompress d, returning data and size. @@ -754,15 +758,16 @@ func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize // dc takes precedence over compressor. // TODO(dfawley): wrap the old compressor/decompressor using the new API? func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error { - d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor) + buf, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor) if err != nil { return err } - if err := c.Unmarshal(d, m); err != nil { + if err := c.Unmarshal(buf, m); err != nil { return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) } - if payInfo != nil { - payInfo.uncompressedBytes = d + if payInfo != nil && buf != nil { + payInfo.uncompressedBytes = make([]byte, len(buf)) + copy(payInfo.uncompressedBytes, buf) } return nil }