Skip to content

Commit

Permalink
fix: api: fix EthSubscribe tipsets off by one (filecoin-project#11858)
Browse files Browse the repository at this point in the history
Eth subscribe tipsets API should only return tipsets that have been executed.

We do this by only returning the parent tipset of the latest tipset received by ETH Subscribe from it's TipSetFilter subscription.

Closes filecoin-project#11807
Subsumes filecoin-project#11816
  • Loading branch information
aarshkshah1992 authored and ribasushi committed May 11, 2024
1 parent afb9ac2 commit 18a6b6a
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 4 deletions.
35 changes: 35 additions & 0 deletions itests/eth_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,41 @@ func TestEthNewPendingTransactionFilter(t *testing.T) {
}
}

func TestEthNewHeadsSubSimple(t *testing.T) {
require := require.New(t)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

kit.QuietAllLogsExcept("events", "messagepool")

client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.WithEthRPC())
ens.InterconnectAll().BeginMining(10 * time.Millisecond)

// install filter
subId, err := client.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(require.NoError))
require.NoError(err)

err = client.EthSubRouter.AddSub(ctx, subId, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error {
rs := *resp
block, ok := rs.Result.(map[string]interface{})
require.True(ok)
blockNumber, ok := block["number"].(string)
require.True(ok)

blk, err := client.EthGetBlockByNumber(ctx, blockNumber, false)
require.NoError(err)
require.NotNil(blk)
fmt.Printf("block: %v\n", blk)
// block hashes should match
require.Equal(block["hash"], blk.Hash.String())

return nil
})
require.NoError(err)
time.Sleep(2 * time.Second)
}

func TestEthNewPendingTransactionSub(t *testing.T) {
require := require.New(t)

Expand Down
25 changes: 21 additions & 4 deletions node/impl/full/eth_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ type ethSubscription struct {
sendQueueLen int
toSend *queue.Queue[[]byte]
sendCond chan struct{}

lastSentTipset *types.TipSetKey
}

func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) {
Expand Down Expand Up @@ -337,12 +339,27 @@ func (e *ethSubscription) start(ctx context.Context) {
e.send(ctx, r)
}
case *types.TipSet:
ev, err := newEthBlockFromFilecoinTipSet(ctx, vt, true, e.Chain, e.StateAPI)
if err != nil {
break
// Skip processing for tipset at epoch 0 as it has no parent
if vt.Height() == 0 {
continue
}
// Check if the parent has already been processed
parentTipSetKey := vt.Parents()
if e.lastSentTipset != nil && (*e.lastSentTipset) == parentTipSetKey {
continue
}
parentTipSet, loadErr := e.Chain.LoadTipSet(ctx, parentTipSetKey)
if loadErr != nil {
log.Warnw("failed to load parent tipset", "tipset", parentTipSetKey, "error", loadErr)
continue
}
ethBlock, ethBlockErr := newEthBlockFromFilecoinTipSet(ctx, parentTipSet, true, e.Chain, e.StateAPI)
if ethBlockErr != nil {
continue
}

e.send(ctx, ev)
e.send(ctx, ethBlock)
e.lastSentTipset = &parentTipSetKey
case *types.SignedMessage: // mpool txid
evs, err := ethFilterResultFromMessages([]*types.SignedMessage{vt})
if err != nil {
Expand Down

0 comments on commit 18a6b6a

Please sign in to comment.