Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger new block on new l1 messages #343

Merged
merged 4 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ type ChainSideEvent struct {
}

type ChainHeadEvent struct{ Block *types.Block }

// NewL1MsgsEvent is posted when we receive some new messages from L1.
type NewL1MsgsEvent struct{ Count int }
23 changes: 12 additions & 11 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,17 +515,18 @@ func (s *Ethereum) StopMining() {
func (s *Ethereum) IsMining() bool { return s.miner.Mining() }
func (s *Ethereum) Miner() *miner.Miner { return s.miner }

func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager }
func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain }
func (s *Ethereum) TxPool() *core.TxPool { return s.txPool }
func (s *Ethereum) EventMux() *event.TypeMux { return s.eventMux }
func (s *Ethereum) Engine() consensus.Engine { return s.engine }
func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
func (s *Ethereum) IsListening() bool { return true } // Always listening
func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader }
func (s *Ethereum) Synced() bool { return atomic.LoadUint32(&s.handler.acceptTxs) == 1 }
func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }
func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer }
func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager }
func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain }
func (s *Ethereum) TxPool() *core.TxPool { return s.txPool }
func (s *Ethereum) EventMux() *event.TypeMux { return s.eventMux }
func (s *Ethereum) Engine() consensus.Engine { return s.engine }
func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
func (s *Ethereum) IsListening() bool { return true } // Always listening
func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader }
func (s *Ethereum) Synced() bool { return atomic.LoadUint32(&s.handler.acceptTxs) == 1 }
func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }
func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer }
func (s *Ethereum) SyncService() *sync_service.SyncService { return s.syncService }

// Protocols returns all the currently configured
// network protocols to start.
Expand Down
4 changes: 4 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,10 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
)
// Broadcast transactions to a batch of peers not knowing about it
for _, tx := range txs {
// L1 messages are not broadcast to peers
if tx.IsL1MessageTx() {
continue
}
peers := h.peers.peersWithoutTransaction(tx.Hash())
// Send the tx unconditionally to a subset of our peers
numDirect := int(math.Sqrt(float64(len(peers))))
Expand Down
2 changes: 2 additions & 0 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ import (
"github.com/scroll-tech/go-ethereum/event"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/params"
"github.com/scroll-tech/go-ethereum/rollup/sync_service"
)

// Backend wraps all methods required for mining.
type Backend interface {
BlockChain() *core.BlockChain
TxPool() *core.TxPool
ChainDb() ethdb.Database
SyncService() *sync_service.SyncService
}

// Config is the configuration parameters of mining.
Expand Down
5 changes: 5 additions & 0 deletions miner/miner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/ethdb/memorydb"
"github.com/scroll-tech/go-ethereum/event"
"github.com/scroll-tech/go-ethereum/rollup/sync_service"
"github.com/scroll-tech/go-ethereum/trie"
)

Expand All @@ -57,6 +58,10 @@ func (m *mockBackend) TxPool() *core.TxPool {
return m.txPool
}

func (m *mockBackend) SyncService() *sync_service.SyncService {
return nil
}

func (m *mockBackend) ChainDb() ethdb.Database {
return m.chainDb
}
Expand Down
35 changes: 31 additions & 4 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ type worker struct {
chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent
chainSideSub event.Subscription
l1MsgsCh chan core.NewL1MsgsEvent
l1MsgsSub event.Subscription

// Channels
newWorkCh chan *newWorkReq
Expand Down Expand Up @@ -174,8 +176,9 @@ type worker struct {
snapshotState *state.StateDB

// atomic status counters
running int32 // The indicator whether the consensus engine is running or not.
newTxs int32 // New arrival transaction count since last sealing work submitting.
running int32 // The indicator whether the consensus engine is running or not.
newTxs int32 // New arrival transaction count since last sealing work submitting.
newL1Msgs int32 // New arrival L1 message count since last sealing work submitting.

// noempty is the flag used to control whether the feature of pre-seal empty
// block is enabled. The default value is false(pre-seal is enabled by default).
Expand Down Expand Up @@ -208,6 +211,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
pendingTasks: make(map[common.Hash]*task),
txsCh: make(chan core.NewTxsEvent, txChanSize),
l1MsgsCh: make(chan core.NewL1MsgsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
newWorkCh: make(chan *newWorkReq),
Expand All @@ -218,8 +222,21 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
}

// Subscribe NewTxsEvent for tx pool
worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)

// Subscribe NewL1MsgsEvent for sync service
if s := eth.SyncService(); s != nil {
worker.l1MsgsSub = s.SubscribeNewL1MsgsEvent(worker.l1MsgsCh)
} else {
// create an empty subscription so that the tests won't fail
worker.l1MsgsSub = event.NewSubscription(func(quit <-chan struct{}) error {
<-quit
return nil
})
}

// Subscribe events for blockchain
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
Expand Down Expand Up @@ -378,6 +395,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
}
timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)
atomic.StoreInt32(&w.newL1Msgs, 0)
}
// clearPending cleans the stale pending tasks.
clearPending := func(number uint64) {
Expand Down Expand Up @@ -407,7 +425,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
// higher priced transactions. Disable this overhead for pending blocks.
if w.isRunning() && (w.chainConfig.Clique == nil || w.chainConfig.Clique.Period > 0) {
// Short circuit if no new transaction arrives.
if atomic.LoadInt32(&w.newTxs) == 0 {
if atomic.LoadInt32(&w.newTxs) == 0 && atomic.LoadInt32(&w.newL1Msgs) == 0 {
timer.Reset(recommit)
continue
}
Expand Down Expand Up @@ -454,6 +472,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {
func (w *worker) mainLoop() {
defer w.wg.Done()
defer w.txsSub.Unsubscribe()
defer w.l1MsgsSub.Unsubscribe()
defer w.chainHeadSub.Unsubscribe()
defer w.chainSideSub.Unsubscribe()
defer func() {
Expand Down Expand Up @@ -545,11 +564,16 @@ func (w *worker) mainLoop() {
}
atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))

case ev := <-w.l1MsgsCh:
atomic.AddInt32(&w.newL1Msgs, int32(ev.Count))

// System stopped
case <-w.exitCh:
return
case <-w.txsSub.Err():
return
case <-w.l1MsgsSub.Err():
return
case <-w.chainHeadSub.Err():
return
case <-w.chainSideSub.Err():
Expand Down Expand Up @@ -1035,8 +1059,10 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
}
// fetch l1Txs
l1Txs := make(map[common.Address]types.Transactions)
pendingL1Txs := 0
if w.chainConfig.Scroll.L1MsgEnabled() {
l1Messages := w.collectPendingL1Messages()
pendingL1Txs = len(l1Messages)
for _, l1msg := range l1Messages {
tx := types.NewTx(&l1msg)
sender := l1msg.Sender
Expand All @@ -1054,7 +1080,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
// Short circuit if there is no available pending transactions.
// But if we disable empty precommit already, ignore it. Since
// empty block is necessary to keep the liveness of the network.
if len(pending) == 0 && atomic.LoadUint32(&w.noempty) == 0 {
if len(pending) == 0 && pendingL1Txs == 0 && atomic.LoadUint32(&w.noempty) == 0 {
w.updateSnapshot()
return
}
Expand All @@ -1067,6 +1093,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64)
}
}
if w.chainConfig.Scroll.L1MsgEnabled() && len(l1Txs) > 0 {
log.Trace("Processing L1 messages for inclusion", "count", pendingL1Txs)
txs := types.NewTransactionsByPriceAndNonce(w.current.signer, l1Txs, header.BaseFee)
if w.commitTransactions(txs, w.coinbase, interrupt) {
return
Expand Down
8 changes: 5 additions & 3 deletions miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/event"
"github.com/scroll-tech/go-ethereum/params"
"github.com/scroll-tech/go-ethereum/rollup/sync_service"
)

const (
Expand Down Expand Up @@ -168,9 +169,10 @@ func newTestWorkerBackend(t *testing.T, chainConfig *params.ChainConfig, engine
}
}

func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain }
func (b *testWorkerBackend) TxPool() *core.TxPool { return b.txPool }
func (b *testWorkerBackend) ChainDb() ethdb.Database { return b.db }
func (b *testWorkerBackend) BlockChain() *core.BlockChain { return b.chain }
func (b *testWorkerBackend) TxPool() *core.TxPool { return b.txPool }
func (b *testWorkerBackend) ChainDb() ethdb.Database { return b.db }
func (b *testWorkerBackend) SyncService() *sync_service.SyncService { return nil }

func (b *testWorkerBackend) newRandomUncle() *types.Block {
var parent *types.Block
Expand Down
23 changes: 22 additions & 1 deletion rollup/sync_service/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"reflect"
"time"

"github.com/scroll-tech/go-ethereum/core"
"github.com/scroll-tech/go-ethereum/core/rawdb"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/event"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/node"
"github.com/scroll-tech/go-ethereum/params"
Expand Down Expand Up @@ -39,8 +41,10 @@ type SyncService struct {
cancel context.CancelFunc
client *BridgeClient
db ethdb.Database
msgCountFeed event.Feed
pollInterval time.Duration
latestProcessedBlock uint64
scope event.SubscriptionScope
}

func NewSyncService(ctx context.Context, genesisConfig *params.ChainConfig, nodeConfig *node.Config, db ethdb.Database, l1Client EthClient) (*SyncService, error) {
Expand Down Expand Up @@ -124,11 +128,20 @@ func (s *SyncService) Stop() {

log.Info("Stopping sync service")

// Unsubscribe all subscriptions registered
s.scope.Close()

if s.cancel != nil {
s.cancel()
}
}

// SubscribeNewL1MsgsEvent registers a subscription of NewL1MsgsEvent and
// starts sending event to the given channel.
func (s *SyncService) SubscribeNewL1MsgsEvent(ch chan<- core.NewL1MsgsEvent) event.Subscription {
return s.scope.Track(s.msgCountFeed.Subscribe(ch))
}

func (s *SyncService) fetchMessages() {
latestConfirmed, err := s.client.getLatestConfirmedBlockNumber(s.ctx)
if err != nil {
Expand All @@ -140,6 +153,7 @@ func (s *SyncService) fetchMessages() {

batchWriter := s.db.NewBatch()
numBlocksPendingDbWrite := uint64(0)
numMessagesPendingDbWrite := 0

// helper function to flush database writes cached in memory
flush := func(lastBlock uint64) {
Expand All @@ -153,9 +167,15 @@ func (s *SyncService) fetchMessages() {
log.Crit("failed to write L1 messages to database", "err", err)
}

s.latestProcessedBlock = lastBlock
batchWriter.Reset()
numBlocksPendingDbWrite = 0

if numMessagesPendingDbWrite > 0 {
s.msgCountFeed.Send(core.NewL1MsgsEvent{Count: numMessagesPendingDbWrite})
numMessagesPendingDbWrite = 0
}

s.latestProcessedBlock = lastBlock
}

// ticker for logging progress
Expand Down Expand Up @@ -197,6 +217,7 @@ func (s *SyncService) fetchMessages() {
}

numBlocksPendingDbWrite += to - from
numMessagesPendingDbWrite += len(msgs)

// flush new messages to database periodically
if to == latestConfirmed || batchWriter.ValueSize() >= DbWriteThresholdBytes || numBlocksPendingDbWrite >= DbWriteThresholdBlocks {
Expand Down