Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(worker): short-circuit l1 message iteration #525

Merged
merged 2 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 46 additions & 6 deletions core/rawdb/accessors_l1_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,38 @@ func ReadSyncedL1BlockNumber(db ethdb.Reader) *uint64 {
return &value
}

// WriteHighestSyncedQueueIndex writes the highest synced L1 message queue index to the database.
func WriteHighestSyncedQueueIndex(db ethdb.KeyValueWriter, queueIndex uint64) {
value := big.NewInt(0).SetUint64(queueIndex).Bytes()

if err := db.Put(highestSyncedQueueIndexKey, value); err != nil {
log.Crit("Failed to update highest synced L1 message queue index", "err", err)
}
}

// ReadHighestSyncedQueueIndex retrieves the highest synced L1 message queue index.
func ReadHighestSyncedQueueIndex(db ethdb.Reader) uint64 {
data, err := db.Get(highestSyncedQueueIndexKey)
if err != nil && isNotFoundErr(err) {
return 0
}
if err != nil {
log.Crit("Failed to read highest synced L1 message queue index from database", "err", err)
}
if len(data) == 0 {
return 0
}

number := new(big.Int).SetBytes(data)
if !number.IsUint64() {
log.Crit("Unexpected highest synced L1 block number in database", "number", number)
}

return number.Uint64()
}

// WriteL1Message writes an L1 message to the database.
// We assume that L1 messages are written to DB following their queue index order.
func WriteL1Message(db ethdb.KeyValueWriter, l1Msg types.L1MessageTx) {
bytes, err := rlp.EncodeToBytes(l1Msg)
if err != nil {
Expand All @@ -52,6 +83,8 @@ func WriteL1Message(db ethdb.KeyValueWriter, l1Msg types.L1MessageTx) {
if err := db.Put(L1MessageKey(l1Msg.QueueIndex), bytes); err != nil {
log.Crit("Failed to store L1 message", "err", err)
}

WriteHighestSyncedQueueIndex(db, l1Msg.QueueIndex)
}

// WriteL1Messages writes an array of L1 messages to the database.
Expand Down Expand Up @@ -91,20 +124,23 @@ func ReadL1Message(db ethdb.Reader, queueIndex uint64) *types.L1MessageTx {
// allows us to iterate over L1 messages in the database. It
// implements an interface similar to ethdb.Iterator.
type L1MessageIterator struct {
inner ethdb.Iterator
keyLength int
inner ethdb.Iterator
keyLength int
maxQueueIndex uint64
}

// IterateL1MessagesFrom creates an L1MessageIterator that iterates over
// all L1 message in the database starting at the provided enqueue index.
func IterateL1MessagesFrom(db ethdb.Iteratee, fromQueueIndex uint64) L1MessageIterator {
func IterateL1MessagesFrom(db ethdb.Database, fromQueueIndex uint64) L1MessageIterator {
start := encodeBigEndian(fromQueueIndex)
it := db.NewIterator(l1MessagePrefix, start)
keyLength := len(l1MessagePrefix) + 8
maxQueueIndex := ReadHighestSyncedQueueIndex(db)

return L1MessageIterator{
inner: it,
keyLength: keyLength,
inner: it,
keyLength: keyLength,
maxQueueIndex: maxQueueIndex,
}
}

Expand Down Expand Up @@ -145,7 +181,7 @@ func (it *L1MessageIterator) Release() {
}

// ReadL1MessagesFrom retrieves up to `maxCount` L1 messages starting at `startIndex`.
func ReadL1MessagesFrom(db ethdb.Iteratee, startIndex, maxCount uint64) []types.L1MessageTx {
func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx {
msgs := make([]types.L1MessageTx, 0, maxCount)
it := IterateL1MessagesFrom(db, startIndex)
defer it.Release()
Expand All @@ -170,6 +206,10 @@ func ReadL1MessagesFrom(db ethdb.Iteratee, startIndex, maxCount uint64) []types.
msgs = append(msgs, msg)
index += 1
count -= 1

if msg.QueueIndex == it.maxQueueIndex {
break
}
}

return msgs
Expand Down
32 changes: 32 additions & 0 deletions core/rawdb/accessors_l1_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func TestReadWriteL1Message(t *testing.T) {
if got == nil || got.QueueIndex != queueIndex {
t.Fatal("L1 message mismatch", "expected", queueIndex, "got", got)
}

max := ReadHighestSyncedQueueIndex(db)
if max != 123 {
t.Fatal("max index mismatch", "expected", 123, "got", max)
}
}

func TestIterateL1Message(t *testing.T) {
Expand All @@ -62,6 +67,11 @@ func TestIterateL1Message(t *testing.T) {
db := NewMemoryDatabase()
WriteL1Messages(db, msgs)

max := ReadHighestSyncedQueueIndex(db)
if max != 1000 {
t.Fatal("max index mismatch", "expected", 1000, "got", max)
}

it := IterateL1MessagesFrom(db, 103)
defer it.Release()

Expand Down Expand Up @@ -125,3 +135,25 @@ func TestReadWriteLastL1MessageInL2Block(t *testing.T) {
}
}
}

func TestIterationStopsAtMaxQueueIndex(t *testing.T) {
msgs := []types.L1MessageTx{
newL1MessageTx(100),
newL1MessageTx(101),
newL1MessageTx(102),
newL1MessageTx(103),
}

db := NewMemoryDatabase()
WriteL1Messages(db, msgs)

// artificially change max index from 103 to 102
WriteHighestSyncedQueueIndex(db, 102)

// iteration should terminate at 102 and not read 103
got := ReadL1MessagesFrom(db, 100, 10)

if len(got) != 3 {
t.Fatal("Invalid length", "expected", 4, "got", len(got))
Thegaram marked this conversation as resolved.
Show resolved Hide resolved
}
}
1 change: 1 addition & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ var (
syncedL1BlockNumberKey = []byte("LastSyncedL1BlockNumber")
l1MessagePrefix = []byte("l1") // l1MessagePrefix + queueIndex (uint64 big endian) -> L1MessageTx
firstQueueIndexNotInL2BlockPrefix = []byte("q") // firstQueueIndexNotInL2BlockPrefix + L2 block hash -> enqueue index
highestSyncedQueueIndexKey = []byte("HighestSyncedQueueIndex")

// Row consumption
rowConsumptionPrefix = []byte("rc") // rowConsumptionPrefix + hash -> row consumption by block
Expand Down
2 changes: 1 addition & 1 deletion params/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
const (
VersionMajor = 4 // Major version component of the current release
VersionMinor = 4 // Minor version component of the current release
VersionPatch = 13 // Patch version component of the current release
VersionPatch = 14 // Patch version component of the current release
VersionMeta = "sepolia" // Version metadata to append to the version string
)

Expand Down
Loading