Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding an option for outbound concurrency control #36

Merged
merged 7 commits into from
May 15, 2023
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions pkg/rpcbackend/backend.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2023 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -52,9 +52,27 @@ func NewRPCClient(client *resty.Client) Backend {
}
}

// NewRPCClientWithOption Constructor
func NewRPCClientWithOption(client *resty.Client, options RPCClientOptions) Backend {
rpcClient := &RPCClient{
client: client,
}

if options.MaxConcurrentRequest > 0 {
rpcClient.concurrencySlots = make(chan bool, options.MaxConcurrentRequest)
}

return rpcClient
}

type RPCClient struct {
client *resty.Client
requestCounter int64
client *resty.Client
concurrencySlots chan bool
requestCounter int64
}

type RPCClientOptions struct {
MaxConcurrentRequest int64
}

type RPCRequest struct {
Expand Down Expand Up @@ -127,6 +145,17 @@ func (rc *RPCClient) CallRPC(ctx context.Context, result interface{}, method str
// In all return paths *including error paths* the RPCResponse is populated
// so the caller has an RPC structure to send back to the front-end caller.
func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRes *RPCResponse, err error) {
if rc.concurrencySlots != nil {
select {
case rc.concurrencySlots <- true:
// wait for the concurrency slot and continue
case <-ctx.Done():
return nil, fmt.Errorf("request with id %s failed due to canceled context", rpcReq.ID)
Chengxuan marked this conversation as resolved.
Show resolved Hide resolved
}
defer func() {
<-rc.concurrencySlots
}()
}

// We always set the back-end request ID - as we need to support requests coming in from
// multiple concurrent clients on our front-end that might use clashing IDs.
Expand Down