Skip to content

Commit

Permalink
core: rework tx indexer (ethereum#25723)
Browse files Browse the repository at this point in the history
This PR reworks tx indexer a bit. Compared to the original version, one scenario is no longer handled - upgrading  from legacy geth without indexer support. 

The tx indexer was introduced in 2020 and have been present through hardforks, so it can be assumed that all Geth nodes have tx indexer already. So we can simplify the tx indexer logic a bit:

-    If the tail flag is not present, it means node is just initialized may or may not with an ancient store attached. In this case all blocks are regarded as unindexed
-   If the tail flag is present, it means blocks below tail are unindexed, blocks above tail are indexed

This change also address some weird cornercases that could make the indexer not work after a crash.
  • Loading branch information
rjl493456442 authored and shekhirin committed Jun 6, 2023
1 parent b3a23f6 commit 4c981d9
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 121 deletions.
131 changes: 55 additions & 76 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,22 +292,16 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.currentFinalizedBlock.Store(nilBlock)
bc.currentSafeBlock.Store(nilBlock)

// Initialize the chain with ancient data if it isn't empty.
var txIndexBlock uint64

// If Geth is initialized with an external ancient store, re-initialize the
// missing chain indexes and chain flags. This procedure can survive crash
// and can be resumed in next restart since chain flags are updated in last step.
if bc.empty() {
rawdb.InitDatabaseFromFreezer(bc.db)
// If ancient database is not empty, reconstruct all missing
// indices in the background.
frozen, _ := bc.db.Ancients()
if frozen > 0 {
txIndexBlock = frozen
}
}
// Load blockchain states from disk
if err := bc.loadLastState(); err != nil {
return nil, err
}

// Make sure the state associated with the block is available
head := bc.CurrentBlock()
if _, err := state.New(head.Root(), bc.stateCache, bc.snaps); err != nil {
Expand Down Expand Up @@ -415,14 +409,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.wg.Add(1)
go bc.updateFutureBlocks()

// Start tx indexer/unindexer.
if txLookupLimit != nil {
bc.txLookupLimit = *txLookupLimit

bc.wg.Add(1)
go bc.maintainTxIndex(txIndexBlock)
}

// If periodic cache journal is required, spin it up.
if bc.cacheConfig.TrieCleanRejournal > 0 {
if bc.cacheConfig.TrieCleanRejournal < time.Minute {
Expand All @@ -442,6 +428,13 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.SetHead(compat.RewindTo)
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}
// Start tx indexer/unindexer if required.
if txLookupLimit != nil {
bc.txLookupLimit = *txLookupLimit

bc.wg.Add(1)
go bc.maintainTxIndex()
}
return bc, nil
}

Expand Down Expand Up @@ -2289,72 +2282,58 @@ func (bc *BlockChain) skipBlock(err error, it *insertIterator) bool {
return false
}

// indexBlocks reindexes or unindexes transactions depending on user configuration
func (bc *BlockChain) indexBlocks(tail *uint64, head uint64, done chan struct{}) {
defer func() { close(done) }()

// The tail flag is not existent, it means the node is just initialized
// and all blocks(may from ancient store) are not indexed yet.
if tail == nil {
from := uint64(0)
if bc.txLookupLimit != 0 && head >= bc.txLookupLimit {
from = head - bc.txLookupLimit + 1
}
rawdb.IndexTransactions(bc.db, from, head+1, bc.quit)
return
}
// The tail flag is existent, but the whole chain is required to be indexed.
if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
if *tail > 0 {
// It can happen when chain is rewound to a historical point which
// is even lower than the indexes tail, recap the indexing target
// to new head to avoid reading non-existent block bodies.
end := *tail
if end > head+1 {
end = head + 1
}
rawdb.IndexTransactions(bc.db, 0, end, bc.quit)
}
return
}
// Update the transaction index to the new chain state
if head-bc.txLookupLimit+1 < *tail {
// Reindex a part of missing indices and rewind index tail to HEAD-limit
rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit)
} else {
// Unindex a part of stale indices and forward index tail to HEAD-limit
rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit)
}
}

// maintainTxIndex is responsible for the construction and deletion of the
// transaction index.
//
// User can use flag `txlookuplimit` to specify a "recentness" block, below
// which ancient tx indices get deleted. If `txlookuplimit` is 0, it means
// all tx indices will be reserved.
//
// The user can adjust the txlookuplimit value for each launch after fast
// sync, Geth will automatically construct the missing indices and delete
// the extra indices.
func (bc *BlockChain) maintainTxIndex(ancients uint64) {
// The user can adjust the txlookuplimit value for each launch after sync,
// Geth will automatically construct the missing indices or delete the extra
// indices.
func (bc *BlockChain) maintainTxIndex() {
defer bc.wg.Done()

// Before starting the actual maintenance, we need to handle a special case,
// where user might init Geth with an external ancient database. If so, we
// need to reindex all necessary transactions before starting to process any
// pruning requests.
if ancients > 0 {
var from = uint64(0)
if bc.txLookupLimit != 0 && ancients > bc.txLookupLimit {
from = ancients - bc.txLookupLimit
}
rawdb.IndexTransactions(bc.db, from, ancients, bc.quit)
}

// indexBlocks reindexes or unindexes transactions depending on user configuration
indexBlocks := func(tail *uint64, head uint64, done chan struct{}) {
defer func() { done <- struct{}{} }()

// If the user just upgraded Geth to a new version which supports transaction
// index pruning, write the new tail and remove anything older.
if tail == nil {
if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
// Nothing to delete, write the tail and return
rawdb.WriteTxIndexTail(bc.db, 0)
} else {
// Prune all stale tx indices and record the tx index tail
rawdb.UnindexTransactions(bc.db, 0, head-bc.txLookupLimit+1, bc.quit)
}
return
}
// If a previous indexing existed, make sure that we fill in any missing entries
if bc.txLookupLimit == 0 || head < bc.txLookupLimit {
if *tail > 0 {
// It can happen when chain is rewound to a historical point which
// is even lower than the indexes tail, recap the indexing target
// to new head to avoid reading non-existent block bodies.
end := *tail
if end > head+1 {
end = head + 1
}
rawdb.IndexTransactions(bc.db, 0, end, bc.quit)
}
return
}
// Update the transaction index to the new chain state
if head-bc.txLookupLimit+1 < *tail {
// Reindex a part of missing indices and rewind index tail to HEAD-limit
rawdb.IndexTransactions(bc.db, head-bc.txLookupLimit+1, *tail, bc.quit)
} else {
// Unindex a part of stale indices and forward index tail to HEAD-limit
rawdb.UnindexTransactions(bc.db, *tail, head-bc.txLookupLimit+1, bc.quit)
}
}

// Any reindexing done, start listening to chain events and moving the index window
// Listening to chain events and manipulate the transaction indexes.
var (
done chan struct{} // Non-nil if background unindexing or reindexing routine is active.
headCh = make(chan ChainHeadEvent, 1) // Buffered to avoid locking up the event feed
Expand All @@ -2370,7 +2349,7 @@ func (bc *BlockChain) maintainTxIndex(ancients uint64) {
case head := <-headCh:
if done == nil {
done = make(chan struct{})
go indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done)
go bc.indexBlocks(rawdb.ReadTxIndexTail(bc.db), head.Block.NumberU64(), done)
}
case <-done:
done = nil
Expand Down
Loading

0 comments on commit 4c981d9

Please sign in to comment.