Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow non-ccc pipelines #781

Merged
merged 3 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions miner/scroll_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,11 @@ func (w *worker) startNewPipeline(timestamp int64) {
}

w.currentPipelineStart = time.Now()
w.currentPipeline = pipeline.NewPipeline(w.chain, w.chain.GetVMConfig(), parentState, header, nextL1MsgIndex, w.getCCC()).WithBeforeTxHook(w.beforeTxHook)
pipelineCCC := w.getCCC()
if !w.isRunning() {
Thegaram marked this conversation as resolved.
Show resolved Hide resolved
pipelineCCC = nil
}
w.currentPipeline = pipeline.NewPipeline(w.chain, w.chain.GetVMConfig(), parentState, header, nextL1MsgIndex, pipelineCCC).WithBeforeTxHook(w.beforeTxHook)

deadline := time.Unix(int64(header.Time), 0)
if w.chainConfig.Clique != nil && w.chainConfig.Clique.RelaxedPeriod {
Expand Down Expand Up @@ -579,11 +583,19 @@ func (w *worker) startNewPipeline(timestamp int64) {
return
}
}

// pipelineCCC was nil, so the block was built for RPC purposes only. Stop the pipeline immediately
// and update the pending block.
if pipelineCCC == nil {
Thegaram marked this conversation as resolved.
Show resolved Hide resolved
w.currentPipeline.Stop()
colinlyguo marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (w *worker) handlePipelineResult(res *pipeline.Result) error {
if !w.isRunning() {
if res != nil && res.FinalBlock != nil {
// Rows being nil without an OverflowingTx means that block didn't go thru CCC,
// which means that we are not the sequencer. Do not attempt to commit.
if res != nil && res.Rows == nil && res.OverflowingTx == nil {
if res.FinalBlock != nil {
w.updateSnapshot(res.FinalBlock)
}
w.currentPipeline.Release()
Expand Down
44 changes: 44 additions & 0 deletions miner/scroll_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,3 +1045,47 @@ func TestSealBlockAfterCliquePeriod(t *testing.T) {
t.Fatalf("timeout")
}
}

func TestPending(t *testing.T) {
var (
engine consensus.Engine
chainConfig *params.ChainConfig
db = rawdb.NewMemoryDatabase()
)
chainConfig = params.AllCliqueProtocolChanges
chainConfig.Clique = &params.CliqueConfig{Period: 1, Epoch: 30000}
chainConfig.Scroll.FeeVaultAddress = &common.Address{}
engine = clique.New(chainConfig.Clique, db)
w, b := newTestWorker(t, chainConfig, engine, db, 0)
defer w.close()

// This test chain imports the mined blocks.
b.genesis.MustCommit(db)
chain, _ := core.NewBlockChain(db, nil, b.chain.Config(), engine, vm.Config{
Debug: true,
Tracer: vm.NewStructLogger(&vm.LogConfig{EnableMemory: true, EnableReturnData: true})}, nil, nil)
defer chain.Stop()

// Define 3 transactions:
// A --> B (nonce: 0, gas: 20)
tx0, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testBankAddress), testUserAddress, big.NewInt(100000000000000000), params.TxGas, big.NewInt(20*params.InitialBaseFee), nil), types.HomesteadSigner{}, testBankKey)
// A --> B (nonce: 1, gas: 5)
tx1, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testBankAddress)+1, testUserAddress, big.NewInt(0), params.TxGas, big.NewInt(5*params.InitialBaseFee), nil), types.HomesteadSigner{}, testBankKey)
// B --> A (nonce: 0, gas: 20)
tx2, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testUserAddress), testBankAddress, big.NewInt(0), params.TxGas, big.NewInt(20*params.InitialBaseFee), nil), types.HomesteadSigner{}, testUserKey)
// B --> A (nonce: 1, gas: 20)
tx3, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testUserAddress)+1, testBankAddress, big.NewInt(0), params.TxGas, big.NewInt(20*params.InitialBaseFee), nil), types.HomesteadSigner{}, testUserKey)
// B --> A (nonce: 2, gas: 20)
tx4, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testUserAddress)+2, testBankAddress, big.NewInt(0), params.TxGas, big.NewInt(20*params.InitialBaseFee), nil), types.HomesteadSigner{}, testUserKey)
// A --> B (nonce: 2, gas: 5)
tx5, _ := types.SignTx(types.NewTransaction(b.txPool.Nonce(testBankAddress)+2, testUserAddress, big.NewInt(0), params.TxGas, big.NewInt(5*params.InitialBaseFee), nil), types.HomesteadSigner{}, testBankKey)

b.txPool.AddRemotesSync([]*types.Transaction{tx0, tx1, tx2, tx3, tx4, tx5})
// start building pending block
w.startCh <- struct{}{}

time.Sleep(time.Second)
pending := w.pendingBlock()
assert.NotNil(t, pending)
assert.NotEmpty(t, pending.Transactions())
colinlyguo marked this conversation as resolved.
Show resolved Hide resolved
}
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 5 // Major version component of the current release
VersionMinor = 3 // Minor version component of the current release
VersionPatch = 30 // Patch version component of the current release
VersionPatch = 31 // Patch version component of the current release
VersionMeta = "mainnet" // Version metadata to append to the version string
)

Expand Down
74 changes: 42 additions & 32 deletions rollup/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ func (p *Pipeline) Start(deadline time.Time) error {
return nil
}

// Stop forces pipeline to stop its operation and return whatever progress it has so far
func (p *Pipeline) Stop() {
if p.txnQueue != nil {
close(p.txnQueue)
p.txnQueue = nil
}
}

func (p *Pipeline) TryPushTxns(txs types.OrderedTransactionSet, onFailingTxn func(txnIndex int, tx *types.Transaction, err error) bool) *Result {
for {
tx := txs.Peek()
Expand All @@ -127,8 +135,7 @@ func (p *Pipeline) TryPushTxns(txs types.OrderedTransactionSet, onFailingTxn fun
txs.Shift()
default:
if errors.Is(err, ErrApplyStageDone) || onFailingTxn(p.txs.Len(), tx, err) {
close(p.txnQueue)
p.txnQueue = nil
p.Stop()
return nil
}

Expand Down Expand Up @@ -167,10 +174,7 @@ func (p *Pipeline) TryPushTxn(tx *types.Transaction) (*Result, error) {

// Release releases all resources related to the pipeline
func (p *Pipeline) Release() {
if p.txnQueue != nil {
close(p.txnQueue)
p.txnQueue = nil
}
p.Stop()

select {
case <-p.applyStageRespCh:
Expand Down Expand Up @@ -303,7 +307,9 @@ type Result struct {
}

func (p *Pipeline) cccStage(candidates <-chan *BlockCandidate, deadline time.Time) <-chan *Result {
p.ccc.Reset()
if p.ccc != nil {
p.ccc.Reset()
}
resultCh := make(chan *Result)
var lastCandidate *BlockCandidate
var lastAccRows *types.RowConsumption
Expand Down Expand Up @@ -335,7 +341,7 @@ func (p *Pipeline) cccStage(candidates <-chan *BlockCandidate, deadline time.Tim
cccStart := time.Now()
var accRows *types.RowConsumption
var err error
if candidate != nil {
if candidate != nil && p.ccc != nil {
accRows, err = p.ccc.ApplyTransaction(candidate.LastTrace)
lastTxn := candidate.Txs[candidate.Txs.Len()-1]
cccTimer.UpdateSince(cccStart)
Expand All @@ -352,6 +358,8 @@ func (p *Pipeline) cccStage(candidates <-chan *BlockCandidate, deadline time.Tim

lastCandidate = candidate
lastAccRows = accRows
} else if candidate != nil && p.ccc == nil {
lastCandidate = candidate
}

// immediately close the block if deadline reached or apply stage is done
Expand All @@ -376,34 +384,36 @@ func (p *Pipeline) traceAndApply(tx *types.Transaction) (*types.Receipt, *types.
p.beforeTxHook()
}

// do gas limit check up-front and do not run CCC if it fails
if p.gasPool.Gas() < tx.Gas() {
return nil, nil, core.ErrGasLimitReached
}

// don't commit the state during tracing for circuit capacity checker, otherwise we cannot revert.
// and even if we don't commit the state, the `refund` value will still be correct, as explained in `CommitTransaction`
commitStateAfterApply := false
snap := p.state.Snapshot()
if p.ccc != nil {
// do gas limit check up-front and do not run CCC if it fails
if p.gasPool.Gas() < tx.Gas() {
return nil, nil, core.ErrGasLimitReached
}

// 1. we have to check circuit capacity before `core.ApplyTransaction`,
// because if the tx can be successfully executed but circuit capacity overflows, it will be inconvenient to revert.
// 2. even if we don't commit to the state during the tracing (which means `clearJournalAndRefund` is not called during the tracing),
// the `refund` value will still be correct, because:
// 2.1 when starting handling the first tx, `state.refund` is 0 by default,
// 2.2 after tracing, the state is either committed in `core.ApplyTransaction`, or reverted, so the `state.refund` can be cleared,
// 2.3 when starting handling the following txs, `state.refund` comes as 0
trace, err = tracing.NewTracerWrapper().CreateTraceEnvAndGetBlockTrace(p.chain.Config(), p.chain, p.chain.Engine(), p.chain.Database(),
p.state, p.parent, types.NewBlockWithHeader(&p.Header).WithBody([]*types.Transaction{tx}, nil), commitStateAfterApply)
// `w.current.traceEnv.State` & `w.current.state` share a same pointer to the state, so only need to revert `w.current.state`
// revert to snapshot for calling `core.ApplyMessage` again, (both `traceEnv.GetBlockTrace` & `core.ApplyTransaction` will call `core.ApplyMessage`)
p.state.RevertToSnapshot(snap)
if err != nil {
return nil, nil, err
// don't commit the state during tracing for circuit capacity checker, otherwise we cannot revert.
// and even if we don't commit the state, the `refund` value will still be correct, as explained in `CommitTransaction`
commitStateAfterApply := false
snap := p.state.Snapshot()

// 1. we have to check circuit capacity before `core.ApplyTransaction`,
// because if the tx can be successfully executed but circuit capacity overflows, it will be inconvenient to revert.
// 2. even if we don't commit to the state during the tracing (which means `clearJournalAndRefund` is not called during the tracing),
// the `refund` value will still be correct, because:
// 2.1 when starting handling the first tx, `state.refund` is 0 by default,
// 2.2 after tracing, the state is either committed in `core.ApplyTransaction`, or reverted, so the `state.refund` can be cleared,
// 2.3 when starting handling the following txs, `state.refund` comes as 0
trace, err = tracing.NewTracerWrapper().CreateTraceEnvAndGetBlockTrace(p.chain.Config(), p.chain, p.chain.Engine(), p.chain.Database(),
p.state, p.parent, types.NewBlockWithHeader(&p.Header).WithBody([]*types.Transaction{tx}, nil), commitStateAfterApply)
// `w.current.traceEnv.State` & `w.current.state` share a same pointer to the state, so only need to revert `w.current.state`
// revert to snapshot for calling `core.ApplyMessage` again, (both `traceEnv.GetBlockTrace` & `core.ApplyTransaction` will call `core.ApplyMessage`)
p.state.RevertToSnapshot(snap)
if err != nil {
return nil, nil, err
}
}

// create new snapshot for `core.ApplyTransaction`
snap = p.state.Snapshot()
snap := p.state.Snapshot()

var receipt *types.Receipt
receipt, err = core.ApplyTransaction(p.chain.Config(), p.chain, nil /* coinbase will default to chainConfig.Scroll.FeeVaultAddress */, p.gasPool,
Expand Down