From 917ef459474dae8426a391bf55fe9d546901789c Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Wed, 10 Apr 2024 00:32:02 +0400 Subject: [PATCH] fix: api: fix EthSubscribe tipsets off by one (#11858) Eth subscribe tipsets API should only return tipsets that have been executed. We do this by only returning the parent tipset of the latest tipset received by ETH Subscribe from it's TipSetFilter subscription. Closes #11807 Subsumes #11816 --- itests/eth_filter_test.go | 35 +++++++++++++++++++++++++++++++++++ node/impl/full/eth_events.go | 25 +++++++++++++++++++++---- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/itests/eth_filter_test.go b/itests/eth_filter_test.go index 2bb8e618bff..114a4c2550a 100644 --- a/itests/eth_filter_test.go +++ b/itests/eth_filter_test.go @@ -137,6 +137,41 @@ func TestEthNewPendingTransactionFilter(t *testing.T) { } } +func TestEthNewHeadsSubSimple(t *testing.T) { + require := require.New(t) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + kit.QuietAllLogsExcept("events", "messagepool") + + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.WithEthRPC()) + ens.InterconnectAll().BeginMining(10 * time.Millisecond) + + // install filter + subId, err := client.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(require.NoError)) + require.NoError(err) + + err = client.EthSubRouter.AddSub(ctx, subId, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error { + rs := *resp + block, ok := rs.Result.(map[string]interface{}) + require.True(ok) + blockNumber, ok := block["number"].(string) + require.True(ok) + + blk, err := client.EthGetBlockByNumber(ctx, blockNumber, false) + require.NoError(err) + require.NotNil(blk) + fmt.Printf("block: %v\n", blk) + // block hashes should match + require.Equal(block["hash"], blk.Hash.String()) + + return nil + }) + require.NoError(err) + time.Sleep(2 * time.Second) +} + func TestEthNewPendingTransactionSub(t *testing.T) { require := require.New(t) diff --git a/node/impl/full/eth_events.go b/node/impl/full/eth_events.go index 81ecef64bfa..d0edc307da2 100644 --- a/node/impl/full/eth_events.go +++ b/node/impl/full/eth_events.go @@ -250,6 +250,8 @@ type ethSubscription struct { sendQueueLen int toSend *queue.Queue[[]byte] sendCond chan struct{} + + lastSentTipset *types.TipSetKey } func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) { @@ -337,12 +339,27 @@ func (e *ethSubscription) start(ctx context.Context) { e.send(ctx, r) } case *types.TipSet: - ev, err := newEthBlockFromFilecoinTipSet(ctx, vt, true, e.Chain, e.StateAPI) - if err != nil { - break + // Skip processing for tipset at epoch 0 as it has no parent + if vt.Height() == 0 { + continue + } + // Check if the parent has already been processed + parentTipSetKey := vt.Parents() + if e.lastSentTipset != nil && (*e.lastSentTipset) == parentTipSetKey { + continue + } + parentTipSet, loadErr := e.Chain.LoadTipSet(ctx, parentTipSetKey) + if loadErr != nil { + log.Warnw("failed to load parent tipset", "tipset", parentTipSetKey, "error", loadErr) + continue + } + ethBlock, ethBlockErr := newEthBlockFromFilecoinTipSet(ctx, parentTipSet, true, e.Chain, e.StateAPI) + if ethBlockErr != nil { + continue } - e.send(ctx, ev) + e.send(ctx, ethBlock) + e.lastSentTipset = &parentTipSetKey case *types.SignedMessage: // mpool txid evs, err := ethFilterResultFromMessages([]*types.SignedMessage{vt}) if err != nil {