Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc_util: Reuse memory buffer for receiving message #5862

Merged
merged 42 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
493be5d
rpc_util: add bytes pool for the parser
hueypark Jan 25, 2023
ecdb6d2
rpc_util: handle empty buffer case for copy payInfo.uncompressedBytes
hueypark Jan 25, 2023
cb8827d
rpc_util: Use a pool of bytes with varying sizes for the parser
hueypark Jan 31, 2023
bae37e3
grpc: Add useBytesPoolForParser option
hueypark Jan 31, 2023
cbaef54
rpc_util: Fix a. vet error
hueypark Jan 31, 2023
f9730dd
rpc_util: Copy the buffer only if the pool option is enabled
hueypark Feb 25, 2023
e746250
rpc_util: Rename the option to `useSharedRecvBuffers`
hueypark Feb 25, 2023
711db78
rpc_util: Introduce user created shared recv buffer pool option
hueypark Feb 25, 2023
c05feed
benchmark: Introduce shared receive buffers(nil and simple) feature
hueypark Feb 25, 2023
9390057
Merge branch 'master' into master
hueypark Mar 4, 2023
b1c6496
rpc_util: export a SharedBufferPool
hueypark Mar 15, 2023
086dd28
rpc_util: improve comment for the option function
hueypark Mar 15, 2023
d6c1d97
rpc_util: introduce simple pool implementation for new user
hueypark Mar 15, 2023
1b11bba
rpc_util: add a copyright
hueypark Mar 15, 2023
a44c200
rpc_util: set the default value of the sharedRecvBufferPool to nil
hueypark Mar 15, 2023
6b5000f
rpc_util: add experimental notices to SharedBufferPool
hueypark Mar 15, 2023
c34da60
rpc_util: provide more detail in SharedBufferPool comment
hueypark Mar 15, 2023
acec626
rpc_util: use multiple buckets for the simpleSharedBufferPool
hueypark Mar 15, 2023
3cbb6b5
rpc_util: add pool size comments
hueypark Mar 17, 2023
57b9c67
rpc_util: add a SharedBufferPool function comments
hueypark Mar 17, 2023
76caf74
rpc_util: use preconfigured size buffers for each pool
hueypark Mar 17, 2023
8f33b9b
rpc_util: remove duplicate Put function
hueypark Mar 22, 2023
5155566
rpc_util: add default recv buffer pool with nop functionality
hueypark Apr 9, 2023
15f820e
rpc_util: use array for the simple shared buffer pool
hueypark Apr 9, 2023
539eef3
Merge remote-tracking branch 'upstream/master'
hueypark Apr 9, 2023
056b3e5
rpc_util: use fallback bytes pool for the simple shared buffer pool
hueypark Apr 9, 2023
0d07cfc
rpc_util: remove a wrong test
hueypark Apr 11, 2023
7f6fdb2
Merge remote-tracking branch 'upstream/master'
hueypark Apr 19, 2023
ca9f6de
rpc_util: add limitation comment for the shared buffer pool
hueypark Apr 19, 2023
9be7889
rpc_util: exclude shared prefix from shared recv buffer pool
hueypark Apr 19, 2023
25b60e3
rpc_util: rename NewSimpleSharedBufferPool to NewsimpleSharedBufferPool
hueypark Apr 19, 2023
8e8f683
rpc_util: add a TestRecvBufferPool
hueypark Apr 19, 2023
96f5a27
rpc_util: rename NewsimpleSharedBufferPool to NewSharedBufferPool
hueypark May 20, 2023
fe1294f
rpc_util: merge fallback bytespool functionality into bytepool
hueypark May 20, 2023
5535416
rpc_util: add capacity checks for all buffer pool sizes
hueypark May 23, 2023
2678efb
rpc_util: return unused buffers to memory pool
hueypark May 23, 2023
8199eeb
Merge remote-tracking branch 'upstream/master'
hueypark May 28, 2023
a70df17
Merge remote-tracking branch 'upstream/master'
hueypark May 31, 2023
86d999f
Merge remote-tracking branch 'upstream/master'
hueypark Jun 27, 2023
63a360e
rpc_util: remove a useless newline
hueypark Jun 27, 2023
1f4bc35
rpc_util: standardize buffer pool interface to use []byte in Put method
hueypark Jun 27, 2023
3dcd833
Revert "rpc_util: standardize buffer pool interface to use []byte in …
hueypark Jun 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ var (
clientWriteBufferSize = flags.IntSlice("clientWriteBufferSize", []int{-1}, "Configures the client write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
serverReadBufferSize = flags.IntSlice("serverReadBufferSize", []int{-1}, "Configures the server read buffer size in bytes. If negative, use the default - may be a a comma-separated list")
serverWriteBufferSize = flags.IntSlice("serverWriteBufferSize", []int{-1}, "Configures the server write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
recvBufferPool = flags.StringWithAllowedValues("recvBufferPool", recvBufferPoolNil, "Configures the shared receive buffer pool. One of: nil, simple, all", allRecvBufferPools)

logger = grpclog.Component("benchmark")
)
Expand All @@ -133,6 +134,10 @@ const (
networkModeLAN = "LAN"
networkModeWAN = "WAN"
networkLongHaul = "Longhaul"
// Shared recv buffer pool
recvBufferPoolNil = "nil"
recvBufferPoolSimple = "simple"
recvBufferPoolAll = "all"

numStatsBuckets = 10
warmupCallCount = 10
Expand All @@ -144,6 +149,7 @@ var (
allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll}
allToggleModes = []string{toggleModeOff, toggleModeOn, toggleModeBoth}
allNetworkModes = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul}
allRecvBufferPools = []string{recvBufferPoolNil, recvBufferPoolSimple, recvBufferPoolAll}
defaultReadLatency = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
defaultReadKbps = []int{0, 10240} // if non-positive, infinite
defaultReadMTU = []int{0} // if non-positive, infinite
Expand Down Expand Up @@ -321,6 +327,15 @@ func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) {
if bf.ServerWriteBufferSize >= 0 {
sopts = append(sopts, grpc.WriteBufferSize(bf.ServerWriteBufferSize))
}
switch bf.RecvBufferPool {
case recvBufferPoolNil:
// Do nothing.
case recvBufferPoolSimple:
opts = append(opts, grpc.WithRecvBufferPool(grpc.NewSharedBufferPool()))
sopts = append(sopts, grpc.RecvBufferPool(grpc.NewSharedBufferPool()))
default:
logger.Fatalf("Unknown shared recv buffer pool type: %v", bf.RecvBufferPool)
}

sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1)))
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down Expand Up @@ -528,6 +543,7 @@ type featureOpts struct {
clientWriteBufferSize []int
serverReadBufferSize []int
serverWriteBufferSize []int
recvBufferPools []string
}

// makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
Expand Down Expand Up @@ -572,6 +588,8 @@ func makeFeaturesNum(b *benchOpts) []int {
featuresNum[i] = len(b.features.serverReadBufferSize)
case stats.ServerWriteBufferSize:
featuresNum[i] = len(b.features.serverWriteBufferSize)
case stats.RecvBufferPool:
featuresNum[i] = len(b.features.recvBufferPools)
default:
log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
}
Expand Down Expand Up @@ -638,6 +656,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
ClientWriteBufferSize: b.features.clientWriteBufferSize[curPos[stats.ClientWriteBufferSize]],
ServerReadBufferSize: b.features.serverReadBufferSize[curPos[stats.ServerReadBufferSize]],
ServerWriteBufferSize: b.features.serverWriteBufferSize[curPos[stats.ServerWriteBufferSize]],
RecvBufferPool: b.features.recvBufferPools[curPos[stats.RecvBufferPool]],
}
if len(b.features.reqPayloadCurves) == 0 {
f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]]
Expand Down Expand Up @@ -708,6 +727,7 @@ func processFlags() *benchOpts {
clientWriteBufferSize: append([]int(nil), *clientWriteBufferSize...),
serverReadBufferSize: append([]int(nil), *serverReadBufferSize...),
serverWriteBufferSize: append([]int(nil), *serverWriteBufferSize...),
recvBufferPools: setRecvBufferPool(*recvBufferPool),
},
}

Expand Down Expand Up @@ -783,6 +803,19 @@ func setCompressorMode(val string) []string {
}
}

func setRecvBufferPool(val string) []string {
switch val {
case recvBufferPoolNil, recvBufferPoolSimple:
return []string{val}
case recvBufferPoolAll:
return []string{recvBufferPoolNil, recvBufferPoolSimple}
default:
// This should never happen because a wrong value passed to this flag would
// be caught during flag.Parse().
return []string{}
}
}

func main() {
opts := processFlags()
before(opts)
Expand Down
10 changes: 8 additions & 2 deletions benchmark/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
ClientWriteBufferSize
ServerReadBufferSize
ServerWriteBufferSize
RecvBufferPool

// MaxFeatureIndex is a place holder to indicate the total number of feature
// indices we have. Any new feature indices should be added above this.
Expand Down Expand Up @@ -121,6 +122,8 @@ type Features struct {
ServerReadBufferSize int
// ServerWriteBufferSize is the size of the server write buffer in bytes. If negative, use the default buffer size.
ServerWriteBufferSize int
// RecvBufferPool represents the shared recv buffer pool used.
RecvBufferPool string
}

// String returns all the feature values as a string.
Expand All @@ -139,12 +142,13 @@ func (f Features) String() string {
return fmt.Sprintf("networkMode_%v-bufConn_%v-keepalive_%v-benchTime_%v-"+
"trace_%v-latency_%v-kbps_%v-MTU_%v-maxConcurrentCalls_%v-%s-%s-"+
"compressor_%v-channelz_%v-preloader_%v-clientReadBufferSize_%v-"+
"clientWriteBufferSize_%v-serverReadBufferSize_%v-serverWriteBufferSize_%v-",
"clientWriteBufferSize_%v-serverReadBufferSize_%v-serverWriteBufferSize_%v-"+
"recvBufferPool_%v-",
f.NetworkMode, f.UseBufConn, f.EnableKeepalive, f.BenchTime, f.EnableTrace,
f.Latency, f.Kbps, f.MTU, f.MaxConcurrentCalls, reqPayloadString,
respPayloadString, f.ModeCompressor, f.EnableChannelz, f.EnablePreloader,
f.ClientReadBufferSize, f.ClientWriteBufferSize, f.ServerReadBufferSize,
f.ServerWriteBufferSize)
f.ServerWriteBufferSize, f.RecvBufferPool)
}

// SharedFeatures returns the shared features as a pretty printable string.
Expand Down Expand Up @@ -216,6 +220,8 @@ func (f Features) partialString(b *bytes.Buffer, wantFeatures []bool, sep, delim
b.WriteString(fmt.Sprintf("ServerReadBufferSize%v%v%v", sep, f.ServerReadBufferSize, delim))
case ServerWriteBufferSize:
b.WriteString(fmt.Sprintf("ServerWriteBufferSize%v%v%v", sep, f.ServerWriteBufferSize, delim))
case RecvBufferPool:
b.WriteString(fmt.Sprintf("RecvBufferPool%v%v%v", sep, f.RecvBufferPool, delim))
default:
log.Fatalf("Unknown feature index %v. maxFeatureIndex is %v", i, MaxFeatureIndex)
}
Expand Down
25 changes: 24 additions & 1 deletion dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type dialOptions struct {
defaultServiceConfigRawJSON *string
resolvers []resolver.Builder
idleTimeout time.Duration
recvBufferPool SharedBufferPool
}

// DialOption configures how we set up the connection.
Expand Down Expand Up @@ -628,7 +629,8 @@ func defaultDialOptions() dialOptions {
ReadBufferSize: defaultReadBufSize,
UseProxy: true,
},
idleTimeout: 30 * time.Minute,
idleTimeout: 30 * time.Minute,
recvBufferPool: nopBufferPool{},
}
}

Expand Down Expand Up @@ -677,3 +679,24 @@ func WithIdleTimeout(d time.Duration) DialOption {
o.idleTimeout = d
})
}

// WithRecvBufferPool returns a DialOption that configures the ClientConn
// to use the provided shared buffer pool for parsing incoming messages. Depending
// on the application's workload, this could result in reduced memory allocation.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
//
// If you are unsure about how to implement a memory pool but want to utilize one,
// begin with grpc.NewSharedBufferPool.
//
// Note: The shared buffer pool feature will not be active if any of the following
// options are used: WithStatsHandler, EnableTracing, or binary logging. In such
// cases, the shared buffer pool will be ignored.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func WithRecvBufferPool(bufferPool SharedBufferPool) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.recvBufferPool = bufferPool
})
}
27 changes: 15 additions & 12 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,9 @@ type parser struct {
// The header of a gRPC message. Find more detail at
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
header [5]byte

// recvBufferPool is the pool of shared receive buffers.
recvBufferPool SharedBufferPool
}

// recvMsg reads a complete gRPC message from the stream.
Expand Down Expand Up @@ -610,9 +613,7 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
if int(length) > maxReceiveMessageSize {
return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
}
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
msg = make([]byte, int(length))
msg = p.recvBufferPool.Get(int(length))
if _, err := p.r.Read(msg); err != nil {
if err == io.EOF {
err = io.ErrUnexpectedEOF
Expand Down Expand Up @@ -726,12 +727,12 @@ type payloadInfo struct {
}

func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
pf, d, err := p.recvMsg(maxReceiveMessageSize)
pf, buf, err := p.recvMsg(maxReceiveMessageSize)
if err != nil {
return nil, err
}
if payInfo != nil {
payInfo.compressedLength = len(d)
payInfo.compressedLength = len(buf)
}

if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
Expand All @@ -743,10 +744,10 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
// To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
// use this decompressor as the default.
if dc != nil {
d, err = dc.Do(bytes.NewReader(d))
size = len(d)
buf, err = dc.Do(bytes.NewReader(buf))
size = len(buf)
} else {
d, size, err = decompress(compressor, d, maxReceiveMessageSize)
buf, size, err = decompress(compressor, buf, maxReceiveMessageSize)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message: %v", err)
Expand All @@ -757,7 +758,7 @@ func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxRecei
return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message after decompression larger than max (%d vs. %d)", size, maxReceiveMessageSize)
}
}
return d, nil
return buf, nil
}

// Using compressor, decompress d, returning data and size.
Expand Down Expand Up @@ -792,15 +793,17 @@ func decompress(compressor encoding.Compressor, d []byte, maxReceiveMessageSize
// dc takes precedence over compressor.
// TODO(dfawley): wrap the old compressor/decompressor using the new API?
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
buf, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
if err != nil {
return err
}
if err := c.Unmarshal(d, m); err != nil {
if err := c.Unmarshal(buf, m); err != nil {
return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err)
}
if payInfo != nil {
payInfo.uncompressedBytes = d
payInfo.uncompressedBytes = buf
} else {
p.recvBufferPool.Put(&buf)
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions rpc_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s) TestSimpleParsing(t *testing.T) {
{append([]byte{0, 1, 0, 0, 0}, bigMsg...), nil, bigMsg, compressionNone},
} {
buf := fullReader{bytes.NewReader(test.p)}
parser := &parser{r: buf}
parser := &parser{r: buf, recvBufferPool: nopBufferPool{}}
pt, b, err := parser.recvMsg(math.MaxInt32)
if err != test.err || !bytes.Equal(b, test.b) || pt != test.pt {
t.Fatalf("parser{%v}.recvMsg(_) = %v, %v, %v\nwant %v, %v, %v", test.p, pt, b, err, test.pt, test.b, test.err)
Expand All @@ -77,7 +77,7 @@ func (s) TestMultipleParsing(t *testing.T) {
// Set a byte stream consists of 3 messages with their headers.
p := []byte{0, 0, 0, 0, 1, 'a', 0, 0, 0, 0, 2, 'b', 'c', 0, 0, 0, 0, 1, 'd'}
b := fullReader{bytes.NewReader(p)}
parser := &parser{r: b}
parser := &parser{r: b, recvBufferPool: nopBufferPool{}}

wantRecvs := []struct {
pt payloadFormat
Expand Down
27 changes: 25 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ type serverOptions struct {
maxHeaderListSize *uint32
headerTableSize *uint32
numServerWorkers uint32
recvBufferPool SharedBufferPool
}

var defaultServerOptions = serverOptions{
Expand All @@ -182,6 +183,7 @@ var defaultServerOptions = serverOptions{
connectionTimeout: 120 * time.Second,
writeBufferSize: defaultWriteBufSize,
readBufferSize: defaultReadBufSize,
recvBufferPool: nopBufferPool{},
}
var globalServerOptions []ServerOption

Expand Down Expand Up @@ -552,6 +554,27 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
})
}

// RecvBufferPool returns a ServerOption that configures the server
// to use the provided shared buffer pool for parsing incoming messages. Depending
// on the application's workload, this could result in reduced memory allocation.
dfawley marked this conversation as resolved.
Show resolved Hide resolved
//
// If you are unsure about how to implement a memory pool but want to utilize one,
// begin with grpc.NewSharedBufferPool.
//
// Note: The shared buffer pool feature will not be active if any of the following
// options are used: StatsHandler, EnableTracing, or binary logging. In such
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this optimization is not going to work with StatsHandler? I'm not very familiar with the codebase, but I couldn't see anything that'd explain this in the MR. I'm using StatsHandler for metrics and for some other things, but I also want my app to benefit from this optimization because currently most memory allocations in my app come from protobuf (golang/protobuf#1495) and gRPC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the current StatsHandler relies on uncompressedBytes, making it difficult to determine when to return memory to the buffer pool.

grpc-go/stream.go

Line 1096 in 2cd95c7

Data: payInfo.uncompressedBytes,

A possible solution I've thought of is to allow users to control the payloadInfo configuration, but that change may be too extensive to include in this PR. If you're interested in pursuing this, please create a new issue, and I'll follow it after this.

type payloadInfo struct {

Any thoughts on this? @dfawley @easwars

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any way around this, given our current API, unfortunately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hueypark Opened #6660 to track this as a feature request. Thank you.

// cases, the shared buffer pool will be ignored.
//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func RecvBufferPool(bufferPool SharedBufferPool) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.recvBufferPool = bufferPool
})
}

// serverWorkerResetThreshold defines how often the stack must be reset. Every
// N requests, by spawning a new goroutine in its place, a worker can reset its
// stack so that large stacks don't live in memory forever. 2^16 should allow
Expand Down Expand Up @@ -1296,7 +1319,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if len(shs) != 0 || len(binlogs) != 0 {
payInfo = &payloadInfo{}
}
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
d, err := recvAndDecompress(&parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
if err != nil {
if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
Expand Down Expand Up @@ -1506,7 +1529,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ctx: ctx,
t: t,
s: stream,
p: &parser{r: stream},
p: &parser{r: stream, recvBufferPool: s.opts.recvBufferPool},
codec: s.getCodec(stream.ContentSubtype()),
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxSendMessageSize: s.opts.maxSendMessageSize,
Expand Down
Loading