Skip to content

Commit

Permalink
Prototyped experimental engine reverse downloader (#7936)
Browse files Browse the repository at this point in the history
Erigon lib now good
  • Loading branch information
Giulio2002 committed Jul 27, 2023
1 parent dc47f43 commit a0865b4
Show file tree
Hide file tree
Showing 14 changed files with 714 additions and 18 deletions.
2 changes: 1 addition & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere
// intiialize engine backend
var engine *execution_client.ExecutionClientDirect

executionRpc := direct.NewExecutionClientDirect(eth1.NewEthereumExecutionModule(blockReader, chainKv, nil, backend.forkValidator, chainConfig, assembleBlockPOS, logger))
executionRpc := direct.NewExecutionClientDirect(eth1.NewEthereumExecutionModule(blockReader, chainKv, nil, backend.forkValidator, chainConfig, assembleBlockPOS, logger, config.HistoryV3))
if config.ExperimentalConsensusSeparation {
log.Info("Using experimental Engine API")
engineBackendRPC := engineapi.NewEngineServerExperimental(ctx, logger, chainConfig, executionRpc, backend.chainDB, blockReader, backend.sentriesClient.Hd, config.Miner.EnabledPOS)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon
go 1.19

require (
github.com/ledgerwatch/erigon-lib v0.0.0-20230726204453-940c82dc2341
github.com/ledgerwatch/erigon-lib v0.0.0-20230727220941-7a00f70fb35d
github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230622075030-1d69651854c2
github.com/ledgerwatch/log/v3 v3.8.0
github.com/ledgerwatch/secp256k1 v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20230726204453-940c82dc2341 h1:uKE4wYKbBrzBdrEV7gB2hQZ+snGkMobooU6on1/MRpo=
github.com/ledgerwatch/erigon-lib v0.0.0-20230726204453-940c82dc2341/go.mod h1:k8pDfuQxOA2IJvgJVbw0iEmro2ri3jLUyDANMhPIbWk=
github.com/ledgerwatch/erigon-lib v0.0.0-20230727220941-7a00f70fb35d h1:EjFyTSy8ymmZY3IFBRoJyMekXTO9o8vCGZWjA5I05ZM=
github.com/ledgerwatch/erigon-lib v0.0.0-20230727220941-7a00f70fb35d/go.mod h1:k8pDfuQxOA2IJvgJVbw0iEmro2ri3jLUyDANMhPIbWk=
github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230622075030-1d69651854c2 h1:Ls2itRGHMOr2PbHRDA4g1HH8HQdwfJhRVfMPEaLQe94=
github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230622075030-1d69651854c2/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo=
github.com/ledgerwatch/log/v3 v3.8.0 h1:gCpp7uGtIerEz1jKVPeDnbIopFPud9ZnCpBLlLBGqPU=
Expand Down
2 changes: 1 addition & 1 deletion turbo/app/snapshots_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func doDecompressSpeed(cliCtx *cli.Context) error {
t := time.Now()
g := decompressor.MakeGetter()
for g.HasNext() {
_ = g.Skip()
_, _ = g.Skip()
}
log.Info("decompress skip speed", "took", time.Since(t))
}()
Expand Down
283 changes: 283 additions & 0 deletions turbo/engineapi/engine_block_downloader/block_downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
package engine_block_downloader

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"

"github.com/ledgerwatch/log/v3"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/gointerfaces/execution"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/adapter"
"github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers"
"github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/erigon/turbo/stages/bodydownload"
"github.com/ledgerwatch/erigon/turbo/stages/headerdownload"
)

const (
logInterval = 30 * time.Second
requestLoopCutOff int = 1
)

type RequestBodyFunction func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool)

// EngineBlockDownloader is responsible to download blocks in reverse, and then insert them in the database.
type EngineBlockDownloader struct {
ctx context.Context
// downloaders
hd *headerdownload.HeaderDownload
bd *bodydownload.BodyDownload
bodyReqSend RequestBodyFunction

// current status of the downloading process, aka: is it doing anything?
status atomic.Value // it is a headerdownload.SyncStatus
startDownloadCh chan downloadRequest

// data reader
blockPropagator adapter.BlockPropagator
blockReader services.FullBlockReader
db kv.RoDB

// Execution module
executionModule execution.ExecutionClient

// Misc
tmpdir string
timeout int

// lock
lock sync.Mutex

// logs
logger log.Logger
}

func NewEngineBlockDownloader(ctx context.Context, logger log.Logger, executionModule execution.ExecutionClient,
hd *headerdownload.HeaderDownload, bd *bodydownload.BodyDownload, blockPropagator adapter.BlockPropagator,
blockReader services.FullBlockReader, bodyReqSend RequestBodyFunction, tmpdir string, timeout int) *EngineBlockDownloader {
var s atomic.Value
s.Store(headerdownload.Idle)
return &EngineBlockDownloader{
ctx: ctx,
hd: hd,
bd: bd,
status: s,
tmpdir: tmpdir,
logger: logger,
blockPropagator: blockPropagator,
timeout: timeout,
blockReader: blockReader,
startDownloadCh: make(chan downloadRequest),
bodyReqSend: bodyReqSend,
executionModule: executionModule,
}
}

func (e *EngineBlockDownloader) scheduleHeadersDownload(
requestId int,
hashToDownload libcommon.Hash,
heightToDownload uint64,
downloaderTip libcommon.Hash,
) bool {
e.hd.BeaconRequestList.SetStatus(requestId, engine_helpers.DataWasMissing)

if e.hd.PosStatus() != headerdownload.Idle {
e.logger.Info("[EngineBlockDownloader] Postponing PoS download since another one is in progress", "height", heightToDownload, "hash", hashToDownload)
return false
}

if heightToDownload == 0 {
e.logger.Info("[EngineBlockDownloader] Downloading PoS headers...", "height", "unknown", "hash", hashToDownload, "requestId", requestId)
} else {
e.logger.Info("[EngineBlockDownloader] Downloading PoS headers...", "height", heightToDownload, "hash", hashToDownload, "requestId", requestId)
}

e.hd.SetRequestId(requestId)
e.hd.SetPoSDownloaderTip(downloaderTip)
e.hd.SetHeaderToDownloadPoS(hashToDownload, heightToDownload)
e.hd.SetPOSSync(true) // This needs to be called after SetHeaderToDownloadPOS because SetHeaderToDownloadPOS sets `posAnchor` member field which is used by ProcessHeadersPOS

// headerCollector is closed in saveDownloadedPoSHeaders, thus nolint

//nolint
e.hd.SetHeadersCollector(etl.NewCollector("EngineBlockDownloader", e.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), e.logger))

e.hd.SetPosStatus(headerdownload.Syncing)

return true
}

// waitForEndOfHeadersDownload waits until the download of headers ends and returns the outcome.
func (e *EngineBlockDownloader) waitForEndOfHeadersDownload() headerdownload.SyncStatus {
for e.hd.PosStatus() == headerdownload.Syncing {
time.Sleep(10 * time.Millisecond)

}

return e.hd.PosStatus()
}

// waitForEndOfHeadersDownload waits until the download of headers ends and returns the outcome.
func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uint64, toBlock uint64, fromHash libcommon.Hash, err error) {
var lastValidHash libcommon.Hash
var badChainError error
var foundPow bool

headerLoadFunc := func(key, value []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error {
var h types.Header
// no header to process
if value == nil {
return nil
}
if err := rlp.DecodeBytes(value, &h); err != nil {
return err
}
if badChainError != nil {
e.hd.ReportBadHeaderPoS(h.Hash(), lastValidHash)
return nil
}
lastValidHash = h.ParentHash
if err := e.hd.VerifyHeader(&h); err != nil {
e.logger.Warn("Verification failed for header", "hash", h.Hash(), "height", h.Number.Uint64(), "err", err)
badChainError = err
e.hd.ReportBadHeaderPoS(h.Hash(), lastValidHash)
return nil
}
// If we are in PoW range then block validation is not required anymore.
if foundPow {
if (fromHash == libcommon.Hash{}) {
fromHash = h.Hash()
fromBlock = h.Number.Uint64()
}
toBlock = h.Number.Uint64()
return saveHeader(tx, &h, h.Hash())
}

foundPow = h.Difficulty.Cmp(libcommon.Big0) != 0
if foundPow {
if (fromHash == libcommon.Hash{}) {
fromHash = h.Hash()
fromBlock = h.Number.Uint64()
}
toBlock = h.Number.Uint64()
return saveHeader(tx, &h, h.Hash())
}
if (fromHash == libcommon.Hash{}) {
fromHash = h.Hash()
fromBlock = h.Number.Uint64()
}
toBlock = h.Number.Uint64()
// Validate state if possible (bodies will be retrieved through body download)
return saveHeader(tx, &h, h.Hash())
}

err = e.hd.HeadersCollector().Load(tx, kv.Headers, headerLoadFunc, etl.TransformArgs{
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
},
})
return
}

func saveHeader(db kv.RwTx, header *types.Header, hash libcommon.Hash) error {
blockHeight := header.Number.Uint64()
// TODO(yperbasis): do we need to check if the header is already inserted (oldH)?

parentTd, err := rawdb.ReadTd(db, header.ParentHash, blockHeight-1)
if err != nil || parentTd == nil {
return fmt.Errorf("[saveHeader] parent's total difficulty not found with hash %x and height %d for header %x %d: %v", header.ParentHash, blockHeight-1, hash, blockHeight, err)
}
td := new(big.Int).Add(parentTd, header.Difficulty)
if err = rawdb.WriteHeader(db, header); err != nil {
return fmt.Errorf("[saveHeader] failed to WriteHeader: %w", err)
}
if err = rawdb.WriteTd(db, hash, blockHeight, td); err != nil {
return fmt.Errorf("[saveHeader] failed to WriteTd: %w", err)
}
if err = rawdb.WriteCanonicalHash(db, hash, blockHeight); err != nil {
return fmt.Errorf("[saveHeader] failed to canonical hash: %w", err)
}
return nil
}

func (e *EngineBlockDownloader) insertHeadersAndBodies(tx kv.Tx, fromBlock uint64, fromHash libcommon.Hash) error {
blockBatchSize := 10_000
// We divide them in batches
headersBatch := []*types.Header{}
// For bodies we need to batch block numbers and block hashes
bodiesBatch := []*types.RawBody{}
blockNumbersBatch := []uint64{}
blockHashesBatch := []libcommon.Hash{}

headersCursors, err := tx.Cursor(kv.Headers)
if err != nil {
return err
}
bodiesCursors, err := tx.Cursor(kv.BlockBody)
if err != nil {
return err
}
// Start by seeking headers
for k, v, err := headersCursors.Seek(dbutils.HeaderKey(fromBlock, fromHash)); k != nil; k, v, err = headersCursors.Next() {
if err != nil {
return err
}
if len(headersBatch) == blockBatchSize {
if err := eth1_utils.InsertHeadersAndWait(e.ctx, e.executionModule, headersBatch); err != nil {
return err
}
headersBatch = headersBatch[:0]
}
header := new(types.Header)
if err := rlp.Decode(bytes.NewReader(v), header); err != nil {
e.logger.Error("Invalid block header RLP", "err", err)
return nil
}
headersBatch = append(headersBatch, header)
}
if err := eth1_utils.InsertHeadersAndWait(e.ctx, e.executionModule, headersBatch); err != nil {
return err
}
// then seek bodies
for k, v, err := bodiesCursors.Seek(dbutils.BlockBodyKey(fromBlock, fromHash)); k != nil; k, v, err = headersCursors.Next() {
if err != nil {
return err
}
if len(bodiesBatch) == blockBatchSize {
if err := eth1_utils.InsertBodiesAndWait(e.ctx, e.executionModule, bodiesBatch, blockNumbersBatch, blockHashesBatch); err != nil {
return err
}
bodiesBatch = bodiesBatch[:0]
blockNumbersBatch = blockNumbersBatch[:0]
blockHashesBatch = blockHashesBatch[:0]
}
if len(v) != 40 {
continue
}
blockNumber := binary.BigEndian.Uint64(v[:8])
var blockHash libcommon.Hash
copy(blockHash[:], v[8:])
blockBody, err := rawdb.ReadBodyWithTransactions(tx, blockHash, blockNumber)
if err != nil {
return err
}
bodiesBatch = append(bodiesBatch, blockBody.RawBody())
blockNumbersBatch = append(blockNumbersBatch, blockNumber)
blockHashesBatch = append(blockHashesBatch, blockHash)
}
return eth1_utils.InsertBodiesAndWait(e.ctx, e.executionModule, bodiesBatch, blockNumbersBatch, blockHashesBatch)
}
Loading

0 comments on commit a0865b4

Please sign in to comment.