From d496c993565bdf56bbee3bcc7ffa10253e0b6df4 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> Date: Wed, 31 Jul 2024 20:02:38 +0200 Subject: [PATCH] BCI-3842 custom timeouts for Hedera (#13876) * allow to configure RPCTimeouts * Custom (30s) timeout for Hedera RPC requests with large payloads (SendTransaction, CallContext, etc.) * fix linter --- .changeset/mean-brooms-agree.md | 5 ++ core/chains/evm/client/chain_client.go | 3 +- core/chains/evm/client/evm_client.go | 14 ++- core/chains/evm/client/helpers_test.go | 4 +- core/chains/evm/client/rpc_client.go | 81 +++++++++-------- core/chains/evm/client/rpc_client_test.go | 88 +++++++++++++++++-- core/chains/evm/config/chaintype/chaintype.go | 6 +- core/services/chainlink/config_test.go | 4 +- core/services/ocr/contract_tracker.go | 2 +- 9 files changed, 152 insertions(+), 55 deletions(-) create mode 100644 .changeset/mean-brooms-agree.md diff --git a/.changeset/mean-brooms-agree.md b/.changeset/mean-brooms-agree.md new file mode 100644 index 00000000000..0dd2ba7bd33 --- /dev/null +++ b/.changeset/mean-brooms-agree.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Custom (30s) timeout for Hedera RPC requests with large payloads (SendTransaction, CallContext, etc.) #internal diff --git a/core/chains/evm/client/chain_client.go b/core/chains/evm/client/chain_client.go index 75973469140..c39214471ce 100644 --- a/core/chains/evm/client/chain_client.go +++ b/core/chains/evm/client/chain_client.go @@ -20,7 +20,6 @@ import ( evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) -const queryTimeout = 10 * time.Second const BALANCE_OF_ADDRESS_FUNCTION_SELECTOR = "0x70a08231" var _ Client = (*chainClient)(nil) @@ -99,7 +98,7 @@ type Client interface { } func ContextWithDefaultTimeout() (ctx context.Context, cancel context.CancelFunc) { - return context.WithTimeout(context.Background(), queryTimeout) + return context.WithTimeout(context.Background(), commonclient.QueryTimeout) } type chainClient struct { diff --git a/core/chains/evm/client/evm_client.go b/core/chains/evm/client/evm_client.go index c2373ee775f..36768086833 100644 --- a/core/chains/evm/client/evm_client.go +++ b/core/chains/evm/client/evm_client.go @@ -3,6 +3,7 @@ package client import ( "math/big" "net/url" + "time" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -17,16 +18,17 @@ func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, cli var empty url.URL var primaries []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient] var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient] + largePayloadRPCTimeout, defaultRPCTimeout := getRPCTimeouts(chainType) for i, node := range nodes { if node.SendOnly != nil && *node.SendOnly { rpc := NewRPCClient(lggr, empty, (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID, - commonclient.Secondary, cfg.FinalizedBlockPollInterval()) + commonclient.Secondary, cfg.FinalizedBlockPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout) sendonly := commonclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL), *node.Name, chainID, rpc) sendonlys = append(sendonlys, sendonly) } else { rpc := NewRPCClient(lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i), - chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval()) + chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout) primaryNode := commonclient.NewNode(cfg, chainCfg, lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID, *node.Order, rpc, "EVM") @@ -37,3 +39,11 @@ func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, cli return NewChainClient(lggr, cfg.SelectionMode(), cfg.LeaseDuration(), chainCfg.NodeNoNewHeadsThreshold(), primaries, sendonlys, chainID, chainType, clientErrors, cfg.DeathDeclarationDelay()) } + +func getRPCTimeouts(chainType chaintype.ChainType) (largePayload, defaultTimeout time.Duration) { + if chaintype.ChainHedera == chainType { + return 30 * time.Second, commonclient.QueryTimeout + } + + return commonclient.QueryTimeout, commonclient.QueryTimeout +} diff --git a/core/chains/evm/client/helpers_test.go b/core/chains/evm/client/helpers_test.go index a2a55e17918..8caacb4190a 100644 --- a/core/chains/evm/client/helpers_test.go +++ b/core/chains/evm/client/helpers_test.go @@ -140,7 +140,7 @@ func NewChainClientWithTestNode( } lggr := logger.Test(t) - rpc := NewRPCClient(lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0) + rpc := NewRPCClient(lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout) n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient]( nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM") @@ -152,7 +152,7 @@ func NewChainClientWithTestNode( return nil, pkgerrors.Errorf("sendonly ethereum rpc url scheme must be http(s): %s", u.String()) } var empty url.URL - rpc := NewRPCClient(lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0) + rpc := NewRPCClient(lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout) s := commonclient.NewSendOnlyNode[*big.Int, RPCClient]( lggr, u, fmt.Sprintf("eth-sendonly-%d", i), chainID, rpc) sendonlys = append(sendonlys, s) diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index 1a0023227af..200703dd42f 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -117,6 +117,8 @@ type rpcClient struct { id int32 chainID *big.Int tier commonclient.NodeTier + largePayloadRpcTimeout time.Duration + rpcTimeout time.Duration finalizedBlockPollInterval time.Duration ws rawclient @@ -152,8 +154,13 @@ func NewRPCClient( chainID *big.Int, tier commonclient.NodeTier, finalizedBlockPollInterval time.Duration, + largePayloadRpcTimeout time.Duration, + rpcTimeout time.Duration, ) RPCClient { - r := new(rpcClient) + r := &rpcClient{ + largePayloadRpcTimeout: largePayloadRpcTimeout, + rpcTimeout: rpcTimeout, + } r.name = name r.id = id r.chainID = chainID @@ -178,7 +185,7 @@ func NewRPCClient( // Not thread-safe, pure dial. func (r *rpcClient) Dial(callerCtx context.Context) error { - ctx, cancel := r.makeQueryCtx(callerCtx) + ctx, cancel := r.makeQueryCtx(callerCtx, r.rpcTimeout) defer cancel() promEVMPoolRPCNodeDials.WithLabelValues(r.chainID.String(), r.name).Inc() @@ -367,7 +374,7 @@ func (r *rpcClient) UnsubscribeAllExceptAliveLoop() { // CallContext implementation func (r *rpcClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.largePayloadRpcTimeout) defer cancel() lggr := r.newRqLggr().With( "method", method, @@ -390,7 +397,7 @@ func (r *rpcClient) CallContext(ctx context.Context, result interface{}, method } func (r *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.largePayloadRpcTimeout) defer cancel() lggr := r.newRqLggr().With("nBatchElems", len(b), "batchElems", b) @@ -411,7 +418,7 @@ func (r *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) err // TODO: Full transition from SubscribeNewHead to SubscribeToHeads is done in BCI-2875 func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtypes.Head) (_ commontypes.Subscription, err error) { - ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx) + ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx, r.rpcTimeout) defer cancel() args := []interface{}{"newHeads"} lggr := r.newRqLggr().With("args", args) @@ -442,7 +449,7 @@ func (r *rpcClient) SubscribeNewHead(ctx context.Context, channel chan<- *evmtyp } func (r *rpcClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.Head, sub commontypes.Subscription, err error) { - ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx) + ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx, r.rpcTimeout) defer cancel() args := []interface{}{rpcSubscriptionMethodNewHeads} @@ -504,7 +511,7 @@ func (r *rpcClient) TransactionReceipt(ctx context.Context, txHash common.Hash) } func (r *rpcClient) TransactionReceiptGeth(ctx context.Context, txHash common.Hash) (receipt *types.Receipt, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr().With("txHash", txHash) @@ -527,7 +534,7 @@ func (r *rpcClient) TransactionReceiptGeth(ctx context.Context, txHash common.Ha return } func (r *rpcClient) TransactionByHash(ctx context.Context, txHash common.Hash) (tx *types.Transaction, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr().With("txHash", txHash) @@ -551,7 +558,7 @@ func (r *rpcClient) TransactionByHash(ctx context.Context, txHash common.Hash) ( } func (r *rpcClient) HeaderByNumber(ctx context.Context, number *big.Int) (header *types.Header, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr().With("number", number) @@ -572,7 +579,7 @@ func (r *rpcClient) HeaderByNumber(ctx context.Context, number *big.Int) (header } func (r *rpcClient) HeaderByHash(ctx context.Context, hash common.Hash) (header *types.Header, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr().With("hash", hash) @@ -604,7 +611,7 @@ func (r *rpcClient) BlockByNumber(ctx context.Context, number *big.Int) (head *e } func (r *rpcClient) blockByNumber(ctx context.Context, number string) (head *evmtypes.Head, err error) { - ctx, cancel, chStopInFlight, ws, http := r.acquireQueryCtx(ctx) + ctx, cancel, chStopInFlight, ws, http := r.acquireQueryCtx(ctx, r.rpcTimeout) defer cancel() const method = "eth_getBlockByNumber" args := []interface{}{number, false} @@ -656,7 +663,7 @@ func (r *rpcClient) BlockByHash(ctx context.Context, hash common.Hash) (head *ev } func (r *rpcClient) BlockByHashGeth(ctx context.Context, hash common.Hash) (block *types.Block, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr().With("hash", hash) @@ -679,7 +686,7 @@ func (r *rpcClient) BlockByHashGeth(ctx context.Context, hash common.Hash) (bloc } func (r *rpcClient) BlockByNumberGeth(ctx context.Context, number *big.Int) (block *types.Block, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr().With("number", number) @@ -702,7 +709,7 @@ func (r *rpcClient) BlockByNumberGeth(ctx context.Context, number *big.Int) (blo } func (r *rpcClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.largePayloadRpcTimeout) defer cancel() lggr := r.newRqLggr().With("tx", tx) @@ -740,7 +747,7 @@ func (r *rpcClient) SendEmptyTransaction( // PendingSequenceAt returns one higher than the highest nonce from both mempool and mined transactions func (r *rpcClient) PendingSequenceAt(ctx context.Context, account common.Address) (nonce evmtypes.Nonce, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr().With("account", account) @@ -769,7 +776,7 @@ func (r *rpcClient) PendingSequenceAt(ctx context.Context, account common.Addres // mined nonce at the given block number, but it actually returns the total // transaction count which is the highest mined nonce + 1 func (r *rpcClient) SequenceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (nonce evmtypes.Nonce, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr().With("account", account, "blockNumber", blockNumber) @@ -795,7 +802,7 @@ func (r *rpcClient) SequenceAt(ctx context.Context, account common.Address, bloc } func (r *rpcClient) PendingCodeAt(ctx context.Context, account common.Address) (code []byte, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr().With("account", account) @@ -818,7 +825,7 @@ func (r *rpcClient) PendingCodeAt(ctx context.Context, account common.Address) ( } func (r *rpcClient) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) (code []byte, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr().With("account", account, "blockNumber", blockNumber) @@ -841,7 +848,7 @@ func (r *rpcClient) CodeAt(ctx context.Context, account common.Address, blockNum } func (r *rpcClient) EstimateGas(ctx context.Context, c interface{}) (gas uint64, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.largePayloadRpcTimeout) defer cancel() call := c.(ethereum.CallMsg) lggr := r.newRqLggr().With("call", call) @@ -865,7 +872,7 @@ func (r *rpcClient) EstimateGas(ctx context.Context, c interface{}) (gas uint64, } func (r *rpcClient) SuggestGasPrice(ctx context.Context) (price *big.Int, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr() @@ -888,7 +895,7 @@ func (r *rpcClient) SuggestGasPrice(ctx context.Context) (price *big.Int, err er } func (r *rpcClient) CallContract(ctx context.Context, msg interface{}, blockNumber *big.Int) (val []byte, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.largePayloadRpcTimeout) defer cancel() lggr := r.newRqLggr().With("callMsg", msg, "blockNumber", blockNumber) message := msg.(ethereum.CallMsg) @@ -916,7 +923,7 @@ func (r *rpcClient) CallContract(ctx context.Context, msg interface{}, blockNumb } func (r *rpcClient) PendingCallContract(ctx context.Context, msg interface{}) (val []byte, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.largePayloadRpcTimeout) defer cancel() lggr := r.newRqLggr().With("callMsg", msg) message := msg.(ethereum.CallMsg) @@ -950,7 +957,7 @@ func (r *rpcClient) LatestBlockHeight(ctx context.Context) (*big.Int, error) { } func (r *rpcClient) BlockNumber(ctx context.Context) (height uint64, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr() @@ -973,7 +980,7 @@ func (r *rpcClient) BlockNumber(ctx context.Context) (height uint64, err error) } func (r *rpcClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (balance *big.Int, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr().With("account", account.Hex(), "blockNumber", blockNumber) @@ -1038,7 +1045,7 @@ func (r *rpcClient) FilterEvents(ctx context.Context, q ethereum.FilterQuery) ([ } func (r *rpcClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (l []types.Log, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr().With("q", q) @@ -1066,7 +1073,7 @@ func (r *rpcClient) ClientVersion(ctx context.Context) (version string, err erro } func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (_ ethereum.Subscription, err error) { - ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx) + ctx, cancel, chStopInFlight, ws, _ := r.acquireQueryCtx(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr().With("q", q) @@ -1092,7 +1099,7 @@ func (r *rpcClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQu } func (r *rpcClient) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr() @@ -1117,7 +1124,7 @@ func (r *rpcClient) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, err // Returns the ChainID according to the geth client. This is useful for functions like verify() // the common node. func (r *rpcClient) ChainID(ctx context.Context) (chainID *big.Int, err error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() @@ -1172,12 +1179,12 @@ func (r *rpcClient) wrapHTTP(err error) error { } // makeLiveQueryCtxAndSafeGetClients wraps makeQueryCtx -func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient) { - ctx, cancel, _, ws, http = r.acquireQueryCtx(parentCtx) +func (r *rpcClient) makeLiveQueryCtxAndSafeGetClients(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, ws rawclient, http *rawclient) { + ctx, cancel, _, ws, http = r.acquireQueryCtx(parentCtx, timeout) return } -func (r *rpcClient) acquireQueryCtx(parentCtx context.Context) (ctx context.Context, cancel context.CancelFunc, +func (r *rpcClient) acquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, chStopInFlight chan struct{}, ws rawclient, http *rawclient) { // Need to wrap in mutex because state transition can cancel and replace the // context @@ -1189,7 +1196,7 @@ func (r *rpcClient) acquireQueryCtx(parentCtx context.Context) (ctx context.Cont http = &cp } r.stateMu.RUnlock() - ctx, cancel = makeQueryCtx(parentCtx, chStopInFlight) + ctx, cancel = makeQueryCtx(parentCtx, chStopInFlight, timeout) return } @@ -1197,10 +1204,10 @@ func (r *rpcClient) acquireQueryCtx(parentCtx context.Context) (ctx context.Cont // 1. Passed in ctx cancels // 2. Passed in channel is closed // 3. Default timeout is reached (queryTimeout) -func makeQueryCtx(ctx context.Context, ch services.StopChan) (context.Context, context.CancelFunc) { +func makeQueryCtx(ctx context.Context, ch services.StopChan, timeout time.Duration) (context.Context, context.CancelFunc) { var chCancel, timeoutCancel context.CancelFunc ctx, chCancel = ch.Ctx(ctx) - ctx, timeoutCancel = context.WithTimeout(ctx, queryTimeout) + ctx, timeoutCancel = context.WithTimeout(ctx, timeout) cancel := func() { chCancel() timeoutCancel() @@ -1208,12 +1215,12 @@ func makeQueryCtx(ctx context.Context, ch services.StopChan) (context.Context, c return ctx, cancel } -func (r *rpcClient) makeQueryCtx(ctx context.Context) (context.Context, context.CancelFunc) { - return makeQueryCtx(ctx, r.getChStopInflight()) +func (r *rpcClient) makeQueryCtx(ctx context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + return makeQueryCtx(ctx, r.getChStopInflight(), timeout) } func (r *rpcClient) IsSyncing(ctx context.Context) (bool, error) { - ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx) + ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout) defer cancel() lggr := r.newRqLggr() diff --git a/core/chains/evm/client/rpc_client_test.go b/core/chains/evm/client/rpc_client_test.go index 9b21aedbea8..d6a11e0d013 100644 --- a/core/chains/evm/client/rpc_client_test.go +++ b/core/chains/evm/client/rpc_client_test.go @@ -3,10 +3,12 @@ package client_test import ( "context" "encoding/json" + "errors" "fmt" "math/big" "net/url" "testing" + "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" @@ -56,7 +58,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0) + rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) // set to default values @@ -106,7 +108,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0) + rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) ch := make(chan *evmtypes.Head) @@ -129,7 +131,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { t.Run("Block's chain ID matched configured", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0) + rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) ch := make(chan *evmtypes.Head) @@ -146,7 +148,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { }) wsURL := server.WSURL() observedLggr, observed := logger.TestObserved(t, zap.DebugLevel) - rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0) + rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout) require.NoError(t, rpc.Dial(ctx)) server.Close() _, err := rpc.SubscribeNewHead(ctx, make(chan *evmtypes.Head)) @@ -156,7 +158,7 @@ func TestRPCClient_SubscribeNewHead(t *testing.T) { t.Run("Subscription error is properly wrapper", func(t *testing.T) { server := testutils.NewWSServer(t, chainId, serverCallBack) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0) + rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) sub, err := rpc.SubscribeNewHead(ctx, make(chan *evmtypes.Head)) @@ -184,7 +186,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { }) wsURL := server.WSURL() observedLggr, observed := logger.TestObserved(t, zap.DebugLevel) - rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0) + rpc := client.NewRPCClient(observedLggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout) require.NoError(t, rpc.Dial(ctx)) server.Close() _, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) @@ -201,7 +203,7 @@ func TestRPCClient_SubscribeFilterLogs(t *testing.T) { return resp }) wsURL := server.WSURL() - rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0) + rpc := client.NewRPCClient(lggr, *wsURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout) defer rpc.Close() require.NoError(t, rpc.Dial(ctx)) sub, err := rpc.SubscribeFilterLogs(ctx, ethereum.FilterQuery{}, make(chan types.Log)) @@ -250,7 +252,7 @@ func TestRPCClient_LatestFinalizedBlock(t *testing.T) { } server := createRPCServer() - rpc := client.NewRPCClient(lggr, *server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0) + rpc := client.NewRPCClient(lggr, *server.URL, nil, "rpc", 1, chainId, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout) require.NoError(t, rpc.Dial(ctx)) defer rpc.Close() server.Head = &evmtypes.Head{Number: 128} @@ -298,3 +300,73 @@ func TestRPCClient_LatestFinalizedBlock(t *testing.T) { assert.Equal(t, int64(0), latest.BlockNumber) assert.Equal(t, int64(0), latest.FinalizedBlockNumber) } + +func TestRpcClientLargePayloadTimeout(t *testing.T) { + t.Parallel() + + testCases := []struct { + Name string + Fn func(ctx context.Context, rpc client.RPCClient) error + }{ + { + Name: "SendTransaction", + Fn: func(ctx context.Context, rpc client.RPCClient) error { + return rpc.SendTransaction(ctx, types.NewTx(&types.LegacyTx{})) + }, + }, + { + Name: "EstimateGas", + Fn: func(ctx context.Context, rpc client.RPCClient) error { + _, err := rpc.EstimateGas(ctx, ethereum.CallMsg{}) + return err + }, + }, + { + Name: "CallContract", + Fn: func(ctx context.Context, rpc client.RPCClient) error { + _, err := rpc.CallContract(ctx, ethereum.CallMsg{}, nil) + return err + }, + }, + { + Name: "CallContext", + Fn: func(ctx context.Context, rpc client.RPCClient) error { + err := rpc.CallContext(ctx, nil, "rpc_call", nil) + return err + }, + }, + { + Name: "BatchCallContext", + Fn: func(ctx context.Context, rpc client.RPCClient) error { + err := rpc.BatchCallContext(ctx, nil) + return err + }, + }, + } + for _, testCase := range testCases { + testCase := testCase + t.Run(testCase.Name, func(t *testing.T) { + t.Parallel() + // use background context to ensure that the DeadlineExceeded is caused by timeout we've set on request + // level, instead of one that was set on test level. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + chainId := big.NewInt(123456) + rpcURL := testutils.NewWSServer(t, chainId, func(method string, params gjson.Result) (resp testutils.JSONRPCResponse) { + // block until test is done + <-ctx.Done() + return + }).WSURL() + + // use something unreasonably large for RPC timeout to ensure that we use largePayloadRPCTimeout + const rpcTimeout = time.Hour + const largePayloadRPCTimeout = tests.TestInterval + rpc := client.NewRPCClient(logger.Test(t), *rpcURL, nil, "rpc", 1, chainId, commonclient.Primary, 0, largePayloadRPCTimeout, rpcTimeout) + require.NoError(t, rpc.Dial(ctx)) + defer rpc.Close() + err := testCase.Fn(ctx, rpc) + assert.True(t, errors.Is(err, context.DeadlineExceeded), fmt.Sprintf("Expected DedlineExceeded error, but got: %v", err)) + }) + } +} diff --git a/core/chains/evm/config/chaintype/chaintype.go b/core/chains/evm/config/chaintype/chaintype.go index 9b845969e4b..e8abfc5abb2 100644 --- a/core/chains/evm/config/chaintype/chaintype.go +++ b/core/chains/evm/config/chaintype/chaintype.go @@ -19,6 +19,7 @@ const ( ChainXLayer ChainType = "xlayer" ChainZkEvm ChainType = "zkevm" ChainZkSync ChainType = "zksync" + ChainHedera ChainType = "hedera" ) // IsL2 returns true if this chain is a Layer 2 chain. Notably: @@ -35,7 +36,7 @@ func (c ChainType) IsL2() bool { func (c ChainType) IsValid() bool { switch c { - case "", ChainArbitrum, ChainCelo, ChainGnosis, ChainKroma, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync: + case "", ChainArbitrum, ChainCelo, ChainGnosis, ChainKroma, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync, ChainHedera: return true } return false @@ -65,6 +66,8 @@ func ChainTypeFromSlug(slug string) ChainType { return ChainZkEvm case "zksync": return ChainZkSync + case "hedera": + return ChainHedera default: return ChainType(slug) } @@ -128,4 +131,5 @@ var ErrInvalidChainType = fmt.Errorf("must be one of %s or omitted", strings.Joi string(ChainXLayer), string(ChainZkEvm), string(ChainZkSync), + string(ChainHedera), }, ", ")) diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 121012b5dd3..001dd450659 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -1304,7 +1304,7 @@ func TestConfig_Validate(t *testing.T) { - 1: 10 errors: - ChainType: invalid value (Foo): must not be set with this chain id - Nodes: missing: must have at least one node - - ChainType: invalid value (Foo): must be one of arbitrum, celo, gnosis, kroma, metis, optimismBedrock, scroll, wemix, xlayer, zkevm, zksync or omitted + - ChainType: invalid value (Foo): must be one of arbitrum, celo, gnosis, kroma, metis, optimismBedrock, scroll, wemix, xlayer, zkevm, zksync, hedera or omitted - HeadTracker.HistoryDepth: invalid value (30): must be greater than or equal to FinalizedBlockOffset - GasEstimator.BumpThreshold: invalid value (0): cannot be 0 if auto-purge feature is enabled for Foo - Transactions.AutoPurge.Threshold: missing: needs to be set if auto-purge feature is enabled for Foo @@ -1317,7 +1317,7 @@ func TestConfig_Validate(t *testing.T) { - 2: 5 errors: - ChainType: invalid value (Arbitrum): only "optimismBedrock" can be used with this chain id - Nodes: missing: must have at least one node - - ChainType: invalid value (Arbitrum): must be one of arbitrum, celo, gnosis, kroma, metis, optimismBedrock, scroll, wemix, xlayer, zkevm, zksync or omitted + - ChainType: invalid value (Arbitrum): must be one of arbitrum, celo, gnosis, kroma, metis, optimismBedrock, scroll, wemix, xlayer, zkevm, zksync, hedera or omitted - FinalityDepth: invalid value (0): must be greater than or equal to 1 - MinIncomingConfirmations: invalid value (0): must be greater than or equal to 1 - 3.Nodes: 5 errors: diff --git a/core/services/ocr/contract_tracker.go b/core/services/ocr/contract_tracker.go index 11dfacd1d1a..9546c522a2f 100644 --- a/core/services/ocr/contract_tracker.go +++ b/core/services/ocr/contract_tracker.go @@ -399,7 +399,7 @@ func (t *OCRContractTracker) LatestBlockHeight(ctx context.Context) (blockheight // care about the block height; we have no way of getting the L1 block // height anyway return 0, nil - case "", chaintype.ChainArbitrum, chaintype.ChainCelo, chaintype.ChainGnosis, chaintype.ChainKroma, chaintype.ChainOptimismBedrock, chaintype.ChainScroll, chaintype.ChainWeMix, chaintype.ChainXLayer, chaintype.ChainZkEvm, chaintype.ChainZkSync: + case "", chaintype.ChainArbitrum, chaintype.ChainCelo, chaintype.ChainGnosis, chaintype.ChainKroma, chaintype.ChainOptimismBedrock, chaintype.ChainScroll, chaintype.ChainWeMix, chaintype.ChainXLayer, chaintype.ChainZkEvm, chaintype.ChainZkSync, chaintype.ChainHedera: // continue } latestBlockHeight := t.getLatestBlockHeight()