Skip to content

Commit

Permalink
defer txpool reorg until worker fetches txns for the next block (#944)
Browse files Browse the repository at this point in the history
* feat: defer txpool reorg until worker fetches txns for the next block (#905)

* fix

---------

Co-authored-by: Ömer Faruk Irmak <omerfirmak@gmail.com>
  • Loading branch information
HAOYUatHZ and omerfirmak committed Aug 1, 2024
1 parent c37f493 commit 17acae8
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 1 deletion.
7 changes: 7 additions & 0 deletions core/txpool/blobpool/blobpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,3 +1576,10 @@ func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus {
func (p *BlobPool) RemoveTx(hash common.Hash, outofbound bool, unreserve bool) int {
return 0
}

func (p *BlobPool) PauseReorgs() {
log.Debug("skip BlobPool `PauseReorgs`")
}
func (p *BlobPool) ResumeReorgs() {
log.Debug("skip BlobPool `ResumeReorgs`")
}
24 changes: 23 additions & 1 deletion core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ type LegacyPool struct {
queueTxEventCh chan *types.Transaction
reorgDoneCh chan chan struct{}
reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop
reorgPauseCh chan bool // requests to pause scheduleReorgLoop
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

Expand Down Expand Up @@ -258,6 +259,7 @@ func New(config Config, chain BlockChain) *LegacyPool {
queueTxEventCh: make(chan *types.Transaction),
reorgDoneCh: make(chan chan struct{}),
reorgShutdownCh: make(chan struct{}),
reorgPauseCh: make(chan bool),
initDoneCh: make(chan struct{}),
}
pool.locals = newAccountSet(pool.signer)
Expand Down Expand Up @@ -1198,13 +1200,14 @@ func (pool *LegacyPool) scheduleReorgLoop() {
curDone chan struct{} // non-nil while runReorg is active
nextDone = make(chan struct{})
launchNextRun bool
reorgsPaused bool
reset *txpoolResetRequest
dirtyAccounts *accountSet
queuedEvents = make(map[common.Address]*sortedMap)
)
for {
// Launch next background reorg if needed
if curDone == nil && launchNextRun {
if curDone == nil && launchNextRun && !reorgsPaused {
// Run the background reorg and announcements
go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents)

Expand Down Expand Up @@ -1256,6 +1259,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
}
close(nextDone)
return
case reorgsPaused = <-pool.reorgPauseCh:
}
}
}
Expand Down Expand Up @@ -1705,6 +1709,24 @@ func (pool *LegacyPool) demoteUnexecutables() {
}
}

// PauseReorgs stops any new reorg jobs to be started but doesn't interrupt any existing ones that are in flight
// Keep in mind this function might block, although it is not expected to block for any significant amount of time
func (pool *LegacyPool) PauseReorgs() {
select {
case pool.reorgPauseCh <- true:
case <-pool.reorgShutdownCh:
}
}

// ResumeReorgs allows new reorg jobs to be started.
// Keep in mind this function might block, although it is not expected to block for any significant amount of time
func (pool *LegacyPool) ResumeReorgs() {
select {
case pool.reorgPauseCh <- false:
case <-pool.reorgShutdownCh:
}
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
address common.Address
Expand Down
3 changes: 3 additions & 0 deletions core/txpool/subpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,7 @@ type SubPool interface {

// RemoveTx removes a transaction from the pool, returning the number of transactions removed.
RemoveTx(hash common.Hash, outofbound bool, unreserve bool) int

PauseReorgs()
ResumeReorgs()
}
12 changes: 12 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,15 @@ func (p *TxPool) RemoveTx(hash common.Hash, outofbound bool, unreserve bool) int
}
return ret
}

func (pool *TxPool) PauseReorgs() {
for _, subpool := range pool.subpools {
subpool.PauseReorgs()
}
}

func (pool *TxPool) ResumeReorgs() {
for _, subpool := range pool.subpools {
subpool.ResumeReorgs()
}
}
7 changes: 7 additions & 0 deletions miner/scroll_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,9 @@ func (w *worker) startNewPipeline(timestamp int64) {
}
collectL2Timer.UpdateSince(tidyPendingStart)

// Allow txpool to be reorged as we build current block
w.eth.TxPool().ResumeReorgs()

var nextL1MsgIndex uint64
if dbIndex := rawdb.ReadFirstQueueIndexNotInL2Block(w.chain.Database(), parent.Hash()); dbIndex != nil {
nextL1MsgIndex = *dbIndex
Expand Down Expand Up @@ -668,6 +671,10 @@ func (w *worker) commit(res *pipeline.Result) error {
"accRows", res.Rows,
)

// A new block event will trigger a reorg in the txpool, pause reorgs to defer this until we fetch txns for next block.
// We may end up trying to process txns that we already included in the previous block, but they will all fail the nonce check
w.eth.TxPool().PauseReorgs()

rawdb.WriteBlockRowConsumption(w.eth.ChainDb(), blockHash, res.Rows)
// Commit block and state to database.
_, err = w.chain.WriteBlockAndSetHead(block, res.FinalBlock.Receipts, logs, res.FinalBlock.State, true)
Expand Down

0 comments on commit 17acae8

Please sign in to comment.