diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 5ebb8fb580c..fed6c42eb31 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -7,6 +7,7 @@ import ( "fmt" "sort" "strings" + "sync" "time" "github.com/ipfs/go-cid" @@ -62,9 +63,13 @@ var ddls = []string{ value BLOB NOT NULL )`, + createTableEventsSeen, + createIndexEventEntryIndexedKey, createIndexEventEntryCodecValue, createIndexEventEntryEventId, + createIndexEventsSeenHeight, + createIndexEventsSeenTipsetKeyCid, // metadata containing version of schema `CREATE TABLE IF NOT EXISTS _meta ( @@ -76,6 +81,7 @@ var ddls = []string{ `INSERT OR IGNORE INTO _meta (version) VALUES (3)`, `INSERT OR IGNORE INTO _meta (version) VALUES (4)`, `INSERT OR IGNORE INTO _meta (version) VALUES (5)`, + `INSERT OR IGNORE INTO _meta (version) VALUES (6)`, } var ( @@ -83,13 +89,19 @@ var ( ) const ( - schemaVersion = 5 + schemaVersion = 6 eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?` restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` + revertEventSeen = `UPDATE events_seen SET reverted=true WHERE height=? AND tipset_key_cid=?` + restoreEventSeen = `UPDATE events_seen SET reverted=false WHERE height=? AND tipset_key_cid=?` + upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` + isTipsetProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?` + getMaxHeightInIndex = `SELECT MAX(height) FROM events_seen` + isHeightProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE height=?` createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)` createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);` @@ -99,6 +111,17 @@ const ( createIndexEventEntryIndexedKey = `CREATE INDEX IF NOT EXISTS event_entry_indexed_key ON event_entry (indexed, key);` createIndexEventEntryCodecValue = `CREATE INDEX IF NOT EXISTS event_entry_codec_value ON event_entry (codec, value);` createIndexEventEntryEventId = `CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id);` + + createTableEventsSeen = `CREATE TABLE IF NOT EXISTS events_seen ( + id INTEGER PRIMARY KEY, + height INTEGER NOT NULL, + tipset_key_cid BLOB NOT NULL, + reverted INTEGER NOT NULL, + UNIQUE(height, tipset_key_cid) + )` + + createIndexEventsSeenHeight = `CREATE INDEX IF NOT EXISTS events_seen_height ON events_seen (height);` + createIndexEventsSeenTipsetKeyCid = `CREATE INDEX IF NOT EXISTS events_seen_tipset_key_cid ON events_seen (tipset_key_cid);` ) type EventIndex struct { @@ -109,8 +132,27 @@ type EventIndex struct { stmtInsertEntry *sql.Stmt stmtRevertEventsInTipset *sql.Stmt stmtRestoreEvent *sql.Stmt + stmtUpsertEventsSeen *sql.Stmt + stmtRevertEventSeen *sql.Stmt + stmtRestoreEventSeen *sql.Stmt + + stmtIsTipsetProcessed *sql.Stmt + stmtGetMaxHeightInIndex *sql.Stmt + stmtIsHeightProcessed *sql.Stmt + + mu sync.Mutex + subIdCounter uint64 + updateSubs map[uint64]*updateSub +} + +type updateSub struct { + ctx context.Context + ch chan EventIndexUpdated + cancel context.CancelFunc } +type EventIndexUpdated struct{} + func (ei *EventIndex) initStatements() (err error) { ei.stmtEventExists, err = ei.db.Prepare(eventExists) if err != nil { @@ -137,6 +179,36 @@ func (ei *EventIndex) initStatements() (err error) { return xerrors.Errorf("prepare stmtRestoreEvent: %w", err) } + ei.stmtUpsertEventsSeen, err = ei.db.Prepare(upsertEventsSeen) + if err != nil { + return xerrors.Errorf("prepare stmtUpsertEventsSeen: %w", err) + } + + ei.stmtRevertEventSeen, err = ei.db.Prepare(revertEventSeen) + if err != nil { + return xerrors.Errorf("prepare stmtRevertEventSeen: %w", err) + } + + ei.stmtRestoreEventSeen, err = ei.db.Prepare(restoreEventSeen) + if err != nil { + return xerrors.Errorf("prepare stmtRestoreEventSeen: %w", err) + } + + ei.stmtIsTipsetProcessed, err = ei.db.Prepare(isTipsetProcessed) + if err != nil { + return xerrors.Errorf("prepare isTipsetProcessed: %w", err) + } + + ei.stmtGetMaxHeightInIndex, err = ei.db.Prepare(getMaxHeightInIndex) + if err != nil { + return xerrors.Errorf("prepare getMaxHeightInIndex: %w", err) + } + + ei.stmtIsHeightProcessed, err = ei.db.Prepare(isHeightProcessed) + if err != nil { + return xerrors.Errorf("prepare isHeightProcessed: %w", err) + } + return nil } @@ -402,9 +474,59 @@ func (ei *EventIndex) migrateToVersion5(ctx context.Context) error { return xerrors.Errorf("commit transaction: %w", err) } + log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now)) + return nil +} + +func (ei *EventIndex) migrateToVersion6(ctx context.Context) error { + now := time.Now() + + tx, err := ei.db.BeginTx(ctx, nil) + if err != nil { + return xerrors.Errorf("begin transaction: %w", err) + } + defer func() { _ = tx.Rollback() }() + + stmtCreateTableEventsSeen, err := tx.PrepareContext(ctx, createTableEventsSeen) + if err != nil { + return xerrors.Errorf("prepare stmtCreateTableEventsSeen: %w", err) + } + _, err = stmtCreateTableEventsSeen.ExecContext(ctx) + if err != nil { + return xerrors.Errorf("create table events_seen: %w", err) + } + + _, err = tx.ExecContext(ctx, createIndexEventsSeenHeight) + if err != nil { + return xerrors.Errorf("create index events_seen_height: %w", err) + } + _, err = tx.ExecContext(ctx, createIndexEventsSeenTipsetKeyCid) + if err != nil { + return xerrors.Errorf("create index events_seen_tipset_key_cid: %w", err) + } + + // INSERT an entry in the events_seen table for all epochs we do have events for in our DB + _, err = tx.ExecContext(ctx, ` + INSERT OR IGNORE INTO events_seen (height, tipset_key_cid, reverted) + SELECT DISTINCT height, tipset_key_cid, reverted FROM event +`) + if err != nil { + return xerrors.Errorf("insert events into events_seen: %w", err) + } + + _, err = tx.ExecContext(ctx, "INSERT OR IGNORE INTO _meta (version) VALUES (6)") + if err != nil { + return xerrors.Errorf("increment _meta version: %w", err) + } + + err = tx.Commit() + if err != nil { + return xerrors.Errorf("commit transaction: %w", err) + } + ei.vacuumDBAndCheckpointWAL(ctx) - log.Infof("Successfully migrated event index from version 4 to version 5 in %s", time.Since(now)) + log.Infof("Successfully migrated event index from version 5 to version 6 in %s", time.Since(now)) return nil } @@ -502,6 +624,16 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor version = 5 } + if version == 5 { + log.Infof("Upgrading event index from version 5 to version 6") + err = eventIndex.migrateToVersion6(ctx) + if err != nil { + _ = db.Close() + return nil, xerrors.Errorf("could not migrate event index schema from version 5 to version 6: %w", err) + } + version = 6 + } + if version != schemaVersion { _ = db.Close() return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion) @@ -514,6 +646,8 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor return nil, xerrors.Errorf("error preparing eventIndex database statements: %w", err) } + eventIndex.updateSubs = make(map[uint64]*updateSub) + return &eventIndex, nil } @@ -524,6 +658,60 @@ func (ei *EventIndex) Close() error { return ei.db.Close() } +func (ei *EventIndex) SubscribeUpdates() (chan EventIndexUpdated, func()) { + subCtx, subCancel := context.WithCancel(context.Background()) + ch := make(chan EventIndexUpdated) + + tSub := &updateSub{ + ctx: subCtx, + cancel: subCancel, + ch: ch, + } + + ei.mu.Lock() + subId := ei.subIdCounter + ei.subIdCounter++ + ei.updateSubs[subId] = tSub + ei.mu.Unlock() + + unSubscribeF := func() { + ei.mu.Lock() + tSub, ok := ei.updateSubs[subId] + if !ok { + ei.mu.Unlock() + return + } + delete(ei.updateSubs, subId) + ei.mu.Unlock() + + // cancel the subscription + tSub.cancel() + } + + return tSub.ch, unSubscribeF +} + +func (ei *EventIndex) GetMaxHeightInIndex(ctx context.Context) (uint64, error) { + row := ei.stmtGetMaxHeightInIndex.QueryRowContext(ctx) + var maxHeight uint64 + err := row.Scan(&maxHeight) + return maxHeight, err +} + +func (ei *EventIndex) IsHeightProcessed(ctx context.Context, height uint64) (bool, error) { + row := ei.stmtIsHeightProcessed.QueryRowContext(ctx, height) + var exists bool + err := row.Scan(&exists) + return exists, err +} + +func (ei *EventIndex) IsTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) { + row := ei.stmtIsTipsetProcessed.QueryRowContext(ctx, tipsetKeyCid) + var exists bool + err := row.Scan(&exists) + return exists, err +} + func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error { tx, err := ei.db.BeginTx(ctx, nil) if err != nil { @@ -532,6 +720,11 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever // rollback the transaction (a no-op if the transaction was already committed) defer func() { _ = tx.Rollback() }() + tsKeyCid, err := te.msgTs.Key().Cid() + if err != nil { + return xerrors.Errorf("tipset key cid: %w", err) + } + // lets handle the revert case first, since its simpler and we can simply mark all events in this tipset as reverted and return if revert { _, err = tx.Stmt(ei.stmtRevertEventsInTipset).Exec(te.msgTs.Height(), te.msgTs.Key().Bytes()) @@ -539,11 +732,34 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever return xerrors.Errorf("revert event: %w", err) } + _, err = tx.Stmt(ei.stmtRevertEventSeen).Exec(te.msgTs.Height(), tsKeyCid.Bytes()) + if err != nil { + return xerrors.Errorf("revert event seen: %w", err) + } + err = tx.Commit() if err != nil { return xerrors.Errorf("commit transaction: %w", err) } + ei.mu.Lock() + tSubs := make([]*updateSub, 0, len(ei.updateSubs)) + for _, tSub := range ei.updateSubs { + tSubs = append(tSubs, tSub) + } + ei.mu.Unlock() + + for _, tSub := range tSubs { + tSub := tSub + select { + case tSub.ch <- EventIndexUpdated{}: + case <-tSub.ctx.Done(): + // subscription was cancelled, ignore + case <-ctx.Done(): + return ctx.Err() + } + } + return nil } @@ -571,11 +787,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever addressLookups[ev.Emitter] = addr } - tsKeyCid, err := te.msgTs.Key().Cid() - if err != nil { - return xerrors.Errorf("tipset key cid: %w", err) - } - // check if this event already exists in the database var entryID sql.NullInt64 err = tx.Stmt(ei.stmtEventExists).QueryRow( @@ -655,11 +866,39 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever } } + // this statement will mark the tipset as processed and will insert a new row if it doesn't exist + // or update the reverted field to false if it does + _, err = tx.Stmt(ei.stmtUpsertEventsSeen).Exec( + te.msgTs.Height(), + tsKeyCid.Bytes(), + ) + if err != nil { + return xerrors.Errorf("exec upsert events seen: %w", err) + } + err = tx.Commit() if err != nil { return xerrors.Errorf("commit transaction: %w", err) } + ei.mu.Lock() + tSubs := make([]*updateSub, 0, len(ei.updateSubs)) + for _, tSub := range ei.updateSubs { + tSubs = append(tSubs, tSub) + } + ei.mu.Unlock() + + for _, tSub := range tSubs { + tSub := tSub + select { + case tSub.ch <- EventIndexUpdated{}: + case <-tSub.ctx.Done(): + // subscription was cancelled, ignore + case <-ctx.Done(): + return ctx.Err() + } + } + return nil } diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go index ce3f7b78a03..10b3eb57779 100644 --- a/chain/events/filter/index_test.go +++ b/chain/events/filter/index_test.go @@ -76,10 +76,50 @@ func TestEventIndexPrefillFilter(t *testing.T) { ei, err := NewEventIndex(context.Background(), dbPath, nil) require.NoError(t, err, "create event index") + + subCh, unSubscribe := ei.SubscribeUpdates() + defer unSubscribe() + + out := make(chan EventIndexUpdated, 1) + go func() { + tu := <-subCh + out <- tu + }() + if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil { require.NoError(t, err, "collect events") } + mh, err := ei.GetMaxHeightInIndex(context.Background()) + require.NoError(t, err) + require.Equal(t, uint64(14000), mh) + + b, err := ei.IsHeightProcessed(context.Background(), 14000) + require.NoError(t, err) + require.True(t, b) + + b, err = ei.IsHeightProcessed(context.Background(), 14001) + require.NoError(t, err) + require.False(t, b) + + b, err = ei.IsHeightProcessed(context.Background(), 13000) + require.NoError(t, err) + require.False(t, b) + + tsKey := events14000.msgTs.Key() + tsKeyCid, err := tsKey.Cid() + require.NoError(t, err, "tipset key cid") + + seen, err := ei.IsTipsetProcessed(context.Background(), tsKeyCid.Bytes()) + require.NoError(t, err) + require.True(t, seen, "tipset key should be seen") + + seen, err = ei.IsTipsetProcessed(context.Background(), []byte{1}) + require.NoError(t, err) + require.False(t, seen, "tipset key should not be seen") + + _ = <-out + testCases := []struct { name string filter *eventFilter @@ -397,6 +437,22 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { ei, err := NewEventIndex(context.Background(), dbPath, nil) require.NoError(t, err, "create event index") + + tCh := make(chan EventIndexUpdated, 3) + subCh, unSubscribe := ei.SubscribeUpdates() + defer unSubscribe() + go func() { + cnt := 0 + for tu := range subCh { + tCh <- tu + cnt++ + if cnt == 3 { + close(tCh) + return + } + } + }() + if err := ei.CollectEvents(context.Background(), revertedEvents14000, false, addrMap.ResolveAddress); err != nil { require.NoError(t, err, "collect reverted events") } @@ -407,6 +463,10 @@ func TestEventIndexPrefillFilterExcludeReverted(t *testing.T) { require.NoError(t, err, "collect events") } + _ = <-tCh + _ = <-tCh + _ = <-tCh + inclusiveTestCases := []struct { name string filter *eventFilter diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 82f272c6cff..27d7002e440 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -44,6 +44,11 @@ var ErrUnsupported = errors.New("unsupported method") const maxEthFeeHistoryRewardPercentiles = 100 +var ( + // wait for 3 epochs + eventReadTimeout = 90 * time.Second +) + type EthModuleAPI interface { EthBlockNumber(ctx context.Context) (ethtypes.EthUint64, error) EthAccounts(ctx context.Context) ([]ethtypes.EthAddress, error) @@ -1258,10 +1263,58 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E return nil, api.ErrNotSupported } + if e.EventFilterManager.EventIndex == nil { + return nil, xerrors.Errorf("cannot use eth_get_logs if historical event index is disabled") + } + + pf, err := e.parseEthFilterSpec(ctx, filterSpec) + if err != nil { + return nil, xerrors.Errorf("failed to parse eth filter spec: %w", err) + } + + if pf.tipsetCid == cid.Undef { + maxHeight := pf.maxHeight + if maxHeight == -1 { + maxHeight = e.Chain.GetHeaviestTipSet().Height() + } + if maxHeight > e.Chain.GetHeaviestTipSet().Height() { + return nil, xerrors.Errorf("maxHeight requested is greater than the heaviest tipset") + } + + err := e.waitForHeightProcessed(ctx, maxHeight) + if err != nil { + return nil, err + } + + // should also have the minHeight in the filter indexed + if b, err := e.EventFilterManager.EventIndex.IsHeightProcessed(ctx, uint64(pf.minHeight)); err != nil { + return nil, xerrors.Errorf("failed to check if event index has events for the minHeight: %w", err) + } else if !b { + return nil, xerrors.Errorf("event index does not have event for epoch %d", pf.minHeight) + } + } else { + ts, err := e.Chain.GetTipSetByCid(ctx, pf.tipsetCid) + if err != nil { + return nil, xerrors.Errorf("failed to get tipset by cid: %w", err) + } + err = e.waitForHeightProcessed(ctx, ts.Height()) + if err != nil { + return nil, err + } + + b, err := e.EventFilterManager.EventIndex.IsTipsetProcessed(ctx, pf.tipsetCid.Bytes()) + if err != nil { + return nil, xerrors.Errorf("failed to check if tipset events have been indexed: %w", err) + } + if !b { + return nil, xerrors.Errorf("event index failed to index tipset %s", pf.tipsetCid.String()) + } + } + // Create a temporary filter - f, err := e.installEthFilterSpec(ctx, filterSpec) + f, err := e.EventFilterManager.Install(ctx, pf.minHeight, pf.maxHeight, pf.tipsetCid, pf.addresses, pf.keys, true) if err != nil { - return nil, err + return nil, xerrors.Errorf("failed to install event filter: %w", err) } ces := f.TakeCollectedEvents(ctx) @@ -1270,6 +1323,47 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E return ethFilterResultFromEvents(ctx, ces, e.SubManager.StateAPI) } +func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi.ChainEpoch) error { + ei := e.EventFilterManager.EventIndex + if height > e.Chain.GetHeaviestTipSet().Height() { + return xerrors.New("height is in the future") + } + + ctx, cancel := context.WithTimeout(ctx, eventReadTimeout) + defer cancel() + + // if the height we're interested in has already been indexed -> there's nothing to do here + if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil { + return xerrors.Errorf("failed to check if event index has events for given height: %w", err) + } else if b { + return nil + } + + // subscribe for updates to the event index + subCh, unSubscribeF := ei.SubscribeUpdates() + defer unSubscribeF() + + // it could be that the event index was update while the subscription was being processed -> check if index has what we need now + if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil { + return xerrors.Errorf("failed to check if event index has events for given height: %w", err) + } else if b { + return nil + } + + for { + select { + case <-subCh: + if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil { + return xerrors.Errorf("failed to check if event index has events for given height: %w", err) + } else if b { + return nil + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + func (e *EthEventHandler) EthGetFilterChanges(ctx context.Context, id ethtypes.EthFilterID) (*ethtypes.EthFilterResult, error) { if e.FilterStore == nil { return nil, api.ErrNotSupported @@ -1368,7 +1462,15 @@ func parseBlockRange(heaviest abi.ChainEpoch, fromBlock, toBlock *string, maxRan return minHeight, maxHeight, nil } -func (e *EthEventHandler) installEthFilterSpec(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (filter.EventFilter, error) { +type parsedFilter struct { + minHeight abi.ChainEpoch + maxHeight abi.ChainEpoch + tipsetCid cid.Cid + addresses []address.Address + keys map[string][]types.ActorEventBlock +} + +func (e *EthEventHandler) parseEthFilterSpec(ctx context.Context, filterSpec *ethtypes.EthFilterSpec) (*parsedFilter, error) { var ( minHeight abi.ChainEpoch maxHeight abi.ChainEpoch @@ -1405,7 +1507,13 @@ func (e *EthEventHandler) installEthFilterSpec(ctx context.Context, filterSpec * return nil, err } - return e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keysToKeysWithCodec(keys), true) + return &parsedFilter{ + minHeight: minHeight, + maxHeight: maxHeight, + tipsetCid: tipsetCid, + addresses: addresses, + keys: keysToKeysWithCodec(keys), + }, nil } func keysToKeysWithCodec(keys map[string][][]byte) map[string][]types.ActorEventBlock { @@ -1426,11 +1534,16 @@ func (e *EthEventHandler) EthNewFilter(ctx context.Context, filterSpec *ethtypes return ethtypes.EthFilterID{}, api.ErrNotSupported } - f, err := e.installEthFilterSpec(ctx, filterSpec) + pf, err := e.parseEthFilterSpec(ctx, filterSpec) if err != nil { return ethtypes.EthFilterID{}, err } + f, err := e.EventFilterManager.Install(ctx, pf.minHeight, pf.maxHeight, pf.tipsetCid, pf.addresses, pf.keys, true) + if err != nil { + return ethtypes.EthFilterID{}, xerrors.Errorf("failed to install event filter: %w", err) + } + if err := e.FilterStore.Add(ctx, f); err != nil { // Could not record in store, attempt to delete filter to clean up err2 := e.TipSetFilterManager.Remove(ctx, f.ID())