diff --git a/eth/backend.go b/eth/backend.go index 6dff78b39ec..daf9b0aa36c 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -569,7 +569,7 @@ func New(stack *node.Node, config *ethconfig.Config, logger log.Logger) (*Ethere // intiialize engine backend var engine *execution_client.ExecutionClientDirect - executionRpc := direct.NewExecutionClientDirect(eth1.NewEthereumExecutionModule(blockReader, chainKv, nil, backend.forkValidator, chainConfig, assembleBlockPOS, logger)) + executionRpc := direct.NewExecutionClientDirect(eth1.NewEthereumExecutionModule(blockReader, chainKv, nil, backend.forkValidator, chainConfig, assembleBlockPOS, logger, config.HistoryV3)) if config.ExperimentalConsensusSeparation { log.Info("Using experimental Engine API") engineBackendRPC := engineapi.NewEngineServerExperimental(ctx, logger, chainConfig, executionRpc, backend.chainDB, blockReader, backend.sentriesClient.Hd, config.Miner.EnabledPOS) diff --git a/go.mod b/go.mod index 3d031f6e3ee..28dbec75277 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon go 1.19 require ( - github.com/ledgerwatch/erigon-lib v0.0.0-20230726204453-940c82dc2341 + github.com/ledgerwatch/erigon-lib v0.0.0-20230727220941-7a00f70fb35d github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230622075030-1d69651854c2 github.com/ledgerwatch/log/v3 v3.8.0 github.com/ledgerwatch/secp256k1 v1.0.0 diff --git a/go.sum b/go.sum index fc58f921232..06f98fe513f 100644 --- a/go.sum +++ b/go.sum @@ -499,8 +499,8 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3PYPwICLl+/9oulQauOuETfgFvhBDffs0= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/ledgerwatch/erigon-lib v0.0.0-20230726204453-940c82dc2341 h1:uKE4wYKbBrzBdrEV7gB2hQZ+snGkMobooU6on1/MRpo= -github.com/ledgerwatch/erigon-lib v0.0.0-20230726204453-940c82dc2341/go.mod h1:k8pDfuQxOA2IJvgJVbw0iEmro2ri3jLUyDANMhPIbWk= +github.com/ledgerwatch/erigon-lib v0.0.0-20230727220941-7a00f70fb35d h1:EjFyTSy8ymmZY3IFBRoJyMekXTO9o8vCGZWjA5I05ZM= +github.com/ledgerwatch/erigon-lib v0.0.0-20230727220941-7a00f70fb35d/go.mod h1:k8pDfuQxOA2IJvgJVbw0iEmro2ri3jLUyDANMhPIbWk= github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230622075030-1d69651854c2 h1:Ls2itRGHMOr2PbHRDA4g1HH8HQdwfJhRVfMPEaLQe94= github.com/ledgerwatch/erigon-snapshot v1.2.1-0.20230622075030-1d69651854c2/go.mod h1:3AuPxZc85jkehh/HA9h8gabv5MSi3kb/ddtzBsTVJFo= github.com/ledgerwatch/log/v3 v3.8.0 h1:gCpp7uGtIerEz1jKVPeDnbIopFPud9ZnCpBLlLBGqPU= diff --git a/turbo/app/snapshots_cmd.go b/turbo/app/snapshots_cmd.go index d5cdb362aaa..88e87e30b8a 100644 --- a/turbo/app/snapshots_cmd.go +++ b/turbo/app/snapshots_cmd.go @@ -208,7 +208,7 @@ func doDecompressSpeed(cliCtx *cli.Context) error { t := time.Now() g := decompressor.MakeGetter() for g.HasNext() { - _ = g.Skip() + _, _ = g.Skip() } log.Info("decompress skip speed", "took", time.Since(t)) }() diff --git a/turbo/engineapi/engine_block_downloader/block_downloader.go b/turbo/engineapi/engine_block_downloader/block_downloader.go new file mode 100644 index 00000000000..b89688f37de --- /dev/null +++ b/turbo/engineapi/engine_block_downloader/block_downloader.go @@ -0,0 +1,283 @@ +package engine_block_downloader + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/ledgerwatch/log/v3" + + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/etl" + "github.com/ledgerwatch/erigon-lib/gointerfaces/execution" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/common/dbutils" + "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/rlp" + "github.com/ledgerwatch/erigon/turbo/adapter" + "github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers" + "github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils" + "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/erigon/turbo/stages/bodydownload" + "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" +) + +const ( + logInterval = 30 * time.Second + requestLoopCutOff int = 1 +) + +type RequestBodyFunction func(context.Context, *bodydownload.BodyRequest) ([64]byte, bool) + +// EngineBlockDownloader is responsible to download blocks in reverse, and then insert them in the database. +type EngineBlockDownloader struct { + ctx context.Context + // downloaders + hd *headerdownload.HeaderDownload + bd *bodydownload.BodyDownload + bodyReqSend RequestBodyFunction + + // current status of the downloading process, aka: is it doing anything? + status atomic.Value // it is a headerdownload.SyncStatus + startDownloadCh chan downloadRequest + + // data reader + blockPropagator adapter.BlockPropagator + blockReader services.FullBlockReader + db kv.RoDB + + // Execution module + executionModule execution.ExecutionClient + + // Misc + tmpdir string + timeout int + + // lock + lock sync.Mutex + + // logs + logger log.Logger +} + +func NewEngineBlockDownloader(ctx context.Context, logger log.Logger, executionModule execution.ExecutionClient, + hd *headerdownload.HeaderDownload, bd *bodydownload.BodyDownload, blockPropagator adapter.BlockPropagator, + blockReader services.FullBlockReader, bodyReqSend RequestBodyFunction, tmpdir string, timeout int) *EngineBlockDownloader { + var s atomic.Value + s.Store(headerdownload.Idle) + return &EngineBlockDownloader{ + ctx: ctx, + hd: hd, + bd: bd, + status: s, + tmpdir: tmpdir, + logger: logger, + blockPropagator: blockPropagator, + timeout: timeout, + blockReader: blockReader, + startDownloadCh: make(chan downloadRequest), + bodyReqSend: bodyReqSend, + executionModule: executionModule, + } +} + +func (e *EngineBlockDownloader) scheduleHeadersDownload( + requestId int, + hashToDownload libcommon.Hash, + heightToDownload uint64, + downloaderTip libcommon.Hash, +) bool { + e.hd.BeaconRequestList.SetStatus(requestId, engine_helpers.DataWasMissing) + + if e.hd.PosStatus() != headerdownload.Idle { + e.logger.Info("[EngineBlockDownloader] Postponing PoS download since another one is in progress", "height", heightToDownload, "hash", hashToDownload) + return false + } + + if heightToDownload == 0 { + e.logger.Info("[EngineBlockDownloader] Downloading PoS headers...", "height", "unknown", "hash", hashToDownload, "requestId", requestId) + } else { + e.logger.Info("[EngineBlockDownloader] Downloading PoS headers...", "height", heightToDownload, "hash", hashToDownload, "requestId", requestId) + } + + e.hd.SetRequestId(requestId) + e.hd.SetPoSDownloaderTip(downloaderTip) + e.hd.SetHeaderToDownloadPoS(hashToDownload, heightToDownload) + e.hd.SetPOSSync(true) // This needs to be called after SetHeaderToDownloadPOS because SetHeaderToDownloadPOS sets `posAnchor` member field which is used by ProcessHeadersPOS + + // headerCollector is closed in saveDownloadedPoSHeaders, thus nolint + + //nolint + e.hd.SetHeadersCollector(etl.NewCollector("EngineBlockDownloader", e.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize), e.logger)) + + e.hd.SetPosStatus(headerdownload.Syncing) + + return true +} + +// waitForEndOfHeadersDownload waits until the download of headers ends and returns the outcome. +func (e *EngineBlockDownloader) waitForEndOfHeadersDownload() headerdownload.SyncStatus { + for e.hd.PosStatus() == headerdownload.Syncing { + time.Sleep(10 * time.Millisecond) + + } + + return e.hd.PosStatus() +} + +// waitForEndOfHeadersDownload waits until the download of headers ends and returns the outcome. +func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uint64, toBlock uint64, fromHash libcommon.Hash, err error) { + var lastValidHash libcommon.Hash + var badChainError error + var foundPow bool + + headerLoadFunc := func(key, value []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error { + var h types.Header + // no header to process + if value == nil { + return nil + } + if err := rlp.DecodeBytes(value, &h); err != nil { + return err + } + if badChainError != nil { + e.hd.ReportBadHeaderPoS(h.Hash(), lastValidHash) + return nil + } + lastValidHash = h.ParentHash + if err := e.hd.VerifyHeader(&h); err != nil { + e.logger.Warn("Verification failed for header", "hash", h.Hash(), "height", h.Number.Uint64(), "err", err) + badChainError = err + e.hd.ReportBadHeaderPoS(h.Hash(), lastValidHash) + return nil + } + // If we are in PoW range then block validation is not required anymore. + if foundPow { + if (fromHash == libcommon.Hash{}) { + fromHash = h.Hash() + fromBlock = h.Number.Uint64() + } + toBlock = h.Number.Uint64() + return saveHeader(tx, &h, h.Hash()) + } + + foundPow = h.Difficulty.Cmp(libcommon.Big0) != 0 + if foundPow { + if (fromHash == libcommon.Hash{}) { + fromHash = h.Hash() + fromBlock = h.Number.Uint64() + } + toBlock = h.Number.Uint64() + return saveHeader(tx, &h, h.Hash()) + } + if (fromHash == libcommon.Hash{}) { + fromHash = h.Hash() + fromBlock = h.Number.Uint64() + } + toBlock = h.Number.Uint64() + // Validate state if possible (bodies will be retrieved through body download) + return saveHeader(tx, &h, h.Hash()) + } + + err = e.hd.HeadersCollector().Load(tx, kv.Headers, headerLoadFunc, etl.TransformArgs{ + LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) { + return []interface{}{"block", binary.BigEndian.Uint64(k)} + }, + }) + return +} + +func saveHeader(db kv.RwTx, header *types.Header, hash libcommon.Hash) error { + blockHeight := header.Number.Uint64() + // TODO(yperbasis): do we need to check if the header is already inserted (oldH)? + + parentTd, err := rawdb.ReadTd(db, header.ParentHash, blockHeight-1) + if err != nil || parentTd == nil { + return fmt.Errorf("[saveHeader] parent's total difficulty not found with hash %x and height %d for header %x %d: %v", header.ParentHash, blockHeight-1, hash, blockHeight, err) + } + td := new(big.Int).Add(parentTd, header.Difficulty) + if err = rawdb.WriteHeader(db, header); err != nil { + return fmt.Errorf("[saveHeader] failed to WriteHeader: %w", err) + } + if err = rawdb.WriteTd(db, hash, blockHeight, td); err != nil { + return fmt.Errorf("[saveHeader] failed to WriteTd: %w", err) + } + if err = rawdb.WriteCanonicalHash(db, hash, blockHeight); err != nil { + return fmt.Errorf("[saveHeader] failed to canonical hash: %w", err) + } + return nil +} + +func (e *EngineBlockDownloader) insertHeadersAndBodies(tx kv.Tx, fromBlock uint64, fromHash libcommon.Hash) error { + blockBatchSize := 10_000 + // We divide them in batches + headersBatch := []*types.Header{} + // For bodies we need to batch block numbers and block hashes + bodiesBatch := []*types.RawBody{} + blockNumbersBatch := []uint64{} + blockHashesBatch := []libcommon.Hash{} + + headersCursors, err := tx.Cursor(kv.Headers) + if err != nil { + return err + } + bodiesCursors, err := tx.Cursor(kv.BlockBody) + if err != nil { + return err + } + // Start by seeking headers + for k, v, err := headersCursors.Seek(dbutils.HeaderKey(fromBlock, fromHash)); k != nil; k, v, err = headersCursors.Next() { + if err != nil { + return err + } + if len(headersBatch) == blockBatchSize { + if err := eth1_utils.InsertHeadersAndWait(e.ctx, e.executionModule, headersBatch); err != nil { + return err + } + headersBatch = headersBatch[:0] + } + header := new(types.Header) + if err := rlp.Decode(bytes.NewReader(v), header); err != nil { + e.logger.Error("Invalid block header RLP", "err", err) + return nil + } + headersBatch = append(headersBatch, header) + } + if err := eth1_utils.InsertHeadersAndWait(e.ctx, e.executionModule, headersBatch); err != nil { + return err + } + // then seek bodies + for k, v, err := bodiesCursors.Seek(dbutils.BlockBodyKey(fromBlock, fromHash)); k != nil; k, v, err = headersCursors.Next() { + if err != nil { + return err + } + if len(bodiesBatch) == blockBatchSize { + if err := eth1_utils.InsertBodiesAndWait(e.ctx, e.executionModule, bodiesBatch, blockNumbersBatch, blockHashesBatch); err != nil { + return err + } + bodiesBatch = bodiesBatch[:0] + blockNumbersBatch = blockNumbersBatch[:0] + blockHashesBatch = blockHashesBatch[:0] + } + if len(v) != 40 { + continue + } + blockNumber := binary.BigEndian.Uint64(v[:8]) + var blockHash libcommon.Hash + copy(blockHash[:], v[8:]) + blockBody, err := rawdb.ReadBodyWithTransactions(tx, blockHash, blockNumber) + if err != nil { + return err + } + bodiesBatch = append(bodiesBatch, blockBody.RawBody()) + blockNumbersBatch = append(blockNumbersBatch, blockNumber) + blockHashesBatch = append(blockHashesBatch, blockHash) + } + return eth1_utils.InsertBodiesAndWait(e.ctx, e.executionModule, bodiesBatch, blockNumbersBatch, blockHashesBatch) +} diff --git a/turbo/engineapi/engine_block_downloader/body.go b/turbo/engineapi/engine_block_downloader/body.go new file mode 100644 index 00000000000..1105b102d29 --- /dev/null +++ b/turbo/engineapi/engine_block_downloader/body.go @@ -0,0 +1,228 @@ +package engine_block_downloader + +import ( + "fmt" + "runtime" + "time" + + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/dbg" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/dataflow" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/turbo/stages/bodydownload" + "github.com/ledgerwatch/log/v3" +) + +// downloadBodies executes bodies download. +func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(tx kv.RwTx, fromBlock, toBlock uint64) (err error) { + headerProgress := toBlock + bodyProgress := fromBlock + + if err := stages.SaveStageProgress(tx, stages.Bodies, bodyProgress); err != nil { + return err + } + if err := stages.SaveStageProgress(tx, stages.Headers, headerProgress); err != nil { + return err + } + + var d1, d2, d3, d4, d5, d6 time.Duration + + timeout := e.timeout + + // This will update bd.maxProgress + if _, _, _, _, err = e.bd.UpdateFromDb(tx); err != nil { + return + } + defer e.bd.ClearBodyCache() + + if bodyProgress >= headerProgress { + return nil + } + + logPrefix := "EngineBlockDownloader" + if headerProgress <= bodyProgress+16 { + // When processing small number of blocks, we can afford wasting more bandwidth but get blocks quicker + timeout = 1 + } else { + // Do not print logs for short periods + e.logger.Info(fmt.Sprintf("[%s] Processing bodies...", logPrefix), "from", bodyProgress, "to", headerProgress) + } + logEvery := time.NewTicker(logInterval) + defer logEvery.Stop() + + var prevDeliveredCount float64 = 0 + var prevWastedCount float64 = 0 + timer := time.NewTimer(1 * time.Second) // Check periodically even in the abseence of incoming messages + var req *bodydownload.BodyRequest + var peer [64]byte + var sentToPeer bool + stopped := false + prevProgress := bodyProgress + var noProgressCount uint = 0 // How many time the progress was printed without actual progress + var totalDelivered uint64 = 0 + + loopBody := func() (bool, error) { + // loopCount is used here to ensure we don't get caught in a constant loop of making requests + // having some time out so requesting again and cycling like that forever. We'll cap it + // and break the loop so we can see if there are any records to actually process further down + // then come back here again in the next cycle + for loopCount := 0; loopCount == 0 || (req != nil && sentToPeer && loopCount < requestLoopCutOff); loopCount++ { + start := time.Now() + currentTime := uint64(time.Now().Unix()) + req, err = e.bd.RequestMoreBodies(tx, e.blockReader, currentTime, e.blockPropagator) + if err != nil { + return false, fmt.Errorf("request more bodies: %w", err) + } + d1 += time.Since(start) + peer = [64]byte{} + sentToPeer = false + if req != nil { + start = time.Now() + peer, sentToPeer = e.bodyReqSend(e.ctx, req) + d2 += time.Since(start) + } + if req != nil && sentToPeer { + start = time.Now() + e.bd.RequestSent(req, currentTime+uint64(timeout), peer) + d3 += time.Since(start) + } + } + + start := time.Now() + requestedLow, delivered, err := e.bd.GetDeliveries(tx) + if err != nil { + return false, err + } + totalDelivered += delivered + d4 += time.Since(start) + start = time.Now() + + toProcess := e.bd.NextProcessingCount() + + write := true + for i := uint64(0); i < toProcess; i++ { + select { + case <-logEvery.C: + logWritingBodies(logPrefix, bodyProgress, headerProgress, e.logger) + default: + } + nextBlock := requestedLow + i + rawBody := e.bd.GetBodyFromCache(nextBlock, write /* delete */) + if rawBody == nil { + e.bd.NotDelivered(nextBlock) + write = false + } + if !write { + continue + } + e.bd.NotDelivered(nextBlock) + header, _, err := e.bd.GetHeader(nextBlock, e.blockReader, tx) + if err != nil { + return false, err + } + blockHeight := header.Number.Uint64() + if blockHeight != nextBlock { + return false, fmt.Errorf("[%s] Header block unexpected when matching body, got %v, expected %v", logPrefix, blockHeight, nextBlock) + } + + // Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call) + ok, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody) + if err != nil { + return false, fmt.Errorf("WriteRawBodyIfNotExists: %w", err) + } + if ok { + dataflow.BlockBodyDownloadStates.AddChange(blockHeight, dataflow.BlockBodyCleared) + } + + if blockHeight > bodyProgress { + bodyProgress = blockHeight + } + e.bd.AdvanceLow() + } + + d5 += time.Since(start) + start = time.Now() + if bodyProgress == headerProgress { + return true, nil + } + + timer.Stop() + timer = time.NewTimer(1 * time.Second) + select { + case <-e.ctx.Done(): + stopped = true + case <-logEvery.C: + deliveredCount, wastedCount := e.bd.DeliveryCounts() + if prevProgress == bodyProgress { + noProgressCount++ + } else { + noProgressCount = 0 // Reset, there was progress + } + logDownloadingBodies(logPrefix, bodyProgress, headerProgress-requestedLow, totalDelivered, prevDeliveredCount, deliveredCount, + prevWastedCount, wastedCount, e.bd.BodyCacheSize(), e.logger) + prevProgress = bodyProgress + prevDeliveredCount = deliveredCount + prevWastedCount = wastedCount + //logger.Info("Timings", "d1", d1, "d2", d2, "d3", d3, "d4", d4, "d5", d5, "d6", d6) + case <-timer.C: + e.logger.Trace("RequestQueueTime (bodies) ticked") + case <-e.bd.DeliveryNotify: + e.logger.Trace("bodyLoop woken up by the incoming request") + } + d6 += time.Since(start) + + return false, nil + } + + // kick off the loop and check for any reason to stop and break early + var shouldBreak bool + for !stopped && !shouldBreak { + if shouldBreak, err = loopBody(); err != nil { + return err + } + } + + if stopped { + return libcommon.ErrStopped + } + e.logger.Info(fmt.Sprintf("[%s] Processed", logPrefix), "highest", bodyProgress) + + return nil +} + +func logDownloadingBodies(logPrefix string, committed, remaining uint64, totalDelivered uint64, prevDeliveredCount, deliveredCount, + prevWastedCount, wastedCount float64, bodyCacheSize int, logger log.Logger) { + speed := (deliveredCount - prevDeliveredCount) / float64(logInterval/time.Second) + wastedSpeed := (wastedCount - prevWastedCount) / float64(logInterval/time.Second) + if speed == 0 && wastedSpeed == 0 { + logger.Info(fmt.Sprintf("[%s] No block bodies to write in this log period", logPrefix), "block number", committed) + return + } + + var m runtime.MemStats + dbg.ReadMemStats(&m) + logger.Info(fmt.Sprintf("[%s] Downloading block bodies", logPrefix), + "block_num", committed, + "delivery/sec", libcommon.ByteCount(uint64(speed)), + "wasted/sec", libcommon.ByteCount(uint64(wastedSpeed)), + "remaining", remaining, + "delivered", totalDelivered, + "cache", libcommon.ByteCount(uint64(bodyCacheSize)), + "alloc", libcommon.ByteCount(m.Alloc), + "sys", libcommon.ByteCount(m.Sys), + ) +} + +func logWritingBodies(logPrefix string, committed, headerProgress uint64, logger log.Logger) { + var m runtime.MemStats + dbg.ReadMemStats(&m) + remaining := headerProgress - committed + logger.Info(fmt.Sprintf("[%s] Writing block bodies", logPrefix), + "block_num", committed, + "remaining", remaining, + "alloc", libcommon.ByteCount(m.Alloc), + "sys", libcommon.ByteCount(m.Sys), + ) +} diff --git a/turbo/engineapi/engine_block_downloader/core.go b/turbo/engineapi/engine_block_downloader/core.go new file mode 100644 index 00000000000..86a3428ddd0 --- /dev/null +++ b/turbo/engineapi/engine_block_downloader/core.go @@ -0,0 +1,90 @@ +package engine_block_downloader + +import ( + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv/memdb" + "github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers" + "github.com/ledgerwatch/erigon/turbo/stages/headerdownload" +) + +type downloadRequest struct { + hashToDownload libcommon.Hash + downloaderTip libcommon.Hash + requestId int +} + +func (e *EngineBlockDownloader) Loop() { + for { + select { + case req := <-e.startDownloadCh: + /* Start download process*/ + // First we schedule the headers download process + if !e.scheduleHeadersDownload(req.requestId, req.hashToDownload, 0, req.downloaderTip) { + e.logger.Warn("[EngineBlockDownloader] could not begin header download") + // could it be scheduled? if not nevermind. + e.status.Store(headerdownload.Idle) + continue + } + // see the outcome of header download + headersStatus := e.waitForEndOfHeadersDownload() + if headersStatus != engine_helpers.Synced { + // Could not sync. Set to idle + e.status.Store(headerdownload.Idle) + e.logger.Warn("[EngineBlockDownloader] Header download did not yield success") + continue + } + tx, err := e.db.BeginRo(e.ctx) + if err != nil { + e.logger.Warn("[EngineBlockDownloader] Could not begin tx: %s", err) + e.status.Store(headerdownload.Idle) + continue + } + + memoryMutation := memdb.NewMemoryBatch(tx, e.tmpdir) + defer memoryMutation.Rollback() + + startBlock, endBlock, startHash, err := e.loadDownloadedHeaders(memoryMutation) + if err != nil { + e.status.Store(headerdownload.Idle) + continue + } + if err := e.downloadAndLoadBodiesSyncronously(memoryMutation, startBlock, endBlock); err != nil { + e.logger.Warn("[EngineBlockDownloader] Could not download bodies: %s", err) + e.status.Store(headerdownload.Idle) + continue + } + tx.Rollback() // Discard the original db tx + if err := e.insertHeadersAndBodies(memoryMutation.MemTx(), startBlock, startHash); err != nil { + e.logger.Warn("[EngineBlockDownloader] Could not insert headers and bodies: %s", err) + e.status.Store(headerdownload.Idle) + continue + } + e.status.Store(headerdownload.Idle) + case <-e.ctx.Done(): + return + } + + } +} + +// StartDownloading triggers the download process and returns true if the process started or false if it could not. +func (e *EngineBlockDownloader) StartDownloading(requestId int, hashToDownload libcommon.Hash, downloaderTip libcommon.Hash) bool { + e.lock.Lock() + defer e.lock.Unlock() + if e.status.Load() != headerdownload.Idle { + return false + } + e.status.Store(headerdownload.Syncing) + e.startDownloadCh <- downloadRequest{ + requestId: requestId, + hashToDownload: hashToDownload, + downloaderTip: downloaderTip, + } + return true +} + +func (e *EngineBlockDownloader) Status() headerdownload.SyncStatus { + e.lock.Lock() + defer e.lock.Unlock() + return e.status.Load().(headerdownload.SyncStatus) +} diff --git a/turbo/execution/eth1/block_building.go b/turbo/execution/eth1/block_building.go index a2e4a07a9ab..6e2dde42461 100644 --- a/turbo/execution/eth1/block_building.go +++ b/turbo/execution/eth1/block_building.go @@ -15,6 +15,7 @@ import ( "github.com/ledgerwatch/erigon/rpc" "github.com/ledgerwatch/erigon/turbo/builder" "github.com/ledgerwatch/erigon/turbo/engineapi/engine_helpers" + "github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils" ) func (e *EthereumExecutionModule) checkWithdrawalsPresence(time uint64, withdrawals []*types.Withdrawal) error { @@ -50,7 +51,7 @@ func (e *EthereumExecutionModule) AssembleBlock(ctx context.Context, req *execut Timestamp: req.Timestamp, PrevRandao: gointerfaces.ConvertH256ToHash(req.MixDigest), SuggestedFeeRecipient: gointerfaces.ConvertH160toAddress(req.SuggestedFeeRecipent), - Withdrawals: ConvertWithdrawalsFromRpc(req.Withdrawals), + Withdrawals: eth1_utils.ConvertWithdrawalsFromRpc(req.Withdrawals), } if err := e.checkWithdrawalsPresence(param.Timestamp, param.Withdrawals); err != nil { @@ -145,7 +146,7 @@ func (e *EthereumExecutionModule) GetAssembledBlock(ctx context.Context, req *ex } if block.Withdrawals() != nil { payload.Version = 2 - payload.Withdrawals = ConvertWithdrawalsToRpc(block.Withdrawals()) + payload.Withdrawals = eth1_utils.ConvertWithdrawalsToRpc(block.Withdrawals()) } if header.DataGasUsed != nil && header.ExcessDataGas != nil { diff --git a/turbo/execution/eth1/eth1_test.go b/turbo/execution/eth1/eth1_test.go index b0d06410b58..2326effc5ba 100644 --- a/turbo/execution/eth1/eth1_test.go +++ b/turbo/execution/eth1/eth1_test.go @@ -12,6 +12,7 @@ import ( "github.com/ledgerwatch/erigon/core/rawdb" "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/erigon/turbo/execution/eth1" + "github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils" "github.com/stretchr/testify/require" ) @@ -25,10 +26,10 @@ func TestInsertGetterHeader(t *testing.T) { tx, _ := db.BeginRw(context.TODO()) rawdb.WriteTd(tx, libcommon.Hash{}, 1, libcommon.Big0) tx.Commit() - e := eth1.NewEthereumExecutionModule(nil, db, nil, nil, nil, nil, nil) + e := eth1.NewEthereumExecutionModule(nil, db, nil, nil, nil, nil, nil, false) _, err := e.InsertHeaders(context.TODO(), &execution.InsertHeadersRequest{ Headers: []*execution.Header{ - eth1.HeaderToHeaderRPC(header), + eth1_utils.HeaderToHeaderRPC(header), }}) require.NoError(t, err) resp, err := e.GetHeader(context.TODO(), &execution.GetSegmentRequest{ @@ -46,10 +47,10 @@ func TestInsertGetterBody(t *testing.T) { body := &types.RawBody{ Transactions: txs, } - e := eth1.NewEthereumExecutionModule(nil, memdb.NewTestDB(t), nil, nil, nil, nil, nil) + e := eth1.NewEthereumExecutionModule(nil, memdb.NewTestDB(t), nil, nil, nil, nil, nil, false) _, err := e.InsertBodies(context.TODO(), &execution.InsertBodiesRequest{ Bodies: []*execution.BlockBody{ - eth1.ConvertRawBlockBodyToRpc(body, bn, bhash), + eth1_utils.ConvertRawBlockBodyToRpc(body, bn, bhash), }}) require.NoError(t, err) resp, err := e.GetBody(context.TODO(), &execution.GetSegmentRequest{ diff --git a/turbo/execution/eth1/eth1_utils/communication.go b/turbo/execution/eth1/eth1_utils/communication.go new file mode 100644 index 00000000000..24e593cf752 --- /dev/null +++ b/turbo/execution/eth1/eth1_utils/communication.go @@ -0,0 +1,67 @@ +package eth1_utils + +import ( + "context" + "fmt" + "time" + + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/gointerfaces/execution" + "github.com/ledgerwatch/erigon/core/types" +) + +const retryTimeout = 10 * time.Millisecond + +func InsertHeadersAndWait(ctx context.Context, executionModule execution.ExecutionClient, headers []*types.Header) error { + request := &execution.InsertHeadersRequest{ + Headers: HeadersToHeadersRPC(headers), + } + response, err := executionModule.InsertHeaders(ctx, request) + if err != nil { + return err + } + retryInterval := time.NewTicker(retryTimeout) + defer retryInterval.Stop() + for response.Result == execution.ValidationStatus_Busy { + select { + case <-retryInterval.C: + response, err = executionModule.InsertHeaders(ctx, request) + if err != nil { + return err + } + case <-ctx.Done(): + return context.Canceled + } + } + if response.Result != execution.ValidationStatus_Success { + return fmt.Errorf("insertHeadersAndWait: invalid code recieved from execution module: %s", response.Result.String()) + } + return nil +} + +func InsertBodiesAndWait(ctx context.Context, executionModule execution.ExecutionClient, bodies []*types.RawBody, blockNumbers []uint64, blockHashes []libcommon.Hash) error { + request := &execution.InsertBodiesRequest{ + Bodies: ConvertRawBlockBodiesToRpc(bodies, blockNumbers, blockHashes), + } + response, err := executionModule.InsertBodies(ctx, request) + if err != nil { + return err + } + retryInterval := time.NewTicker(retryTimeout) + defer retryInterval.Stop() + for response.Result == execution.ValidationStatus_Busy { + select { + case <-retryInterval.C: + response, err = executionModule.InsertBodies(ctx, request) + if err != nil { + return err + } + case <-ctx.Done(): + return context.Canceled + } + } + if response.Result != execution.ValidationStatus_Success { + return fmt.Errorf("InsertBodiesAndWait: invalid code recieved from execution module: %s", response.Result.String()) + } + return nil +} diff --git a/turbo/execution/eth1/grpc.go b/turbo/execution/eth1/eth1_utils/grpc.go similarity index 90% rename from turbo/execution/eth1/grpc.go rename to turbo/execution/eth1/eth1_utils/grpc.go index db2837ed644..ce511514ddf 100644 --- a/turbo/execution/eth1/grpc.go +++ b/turbo/execution/eth1/eth1_utils/grpc.go @@ -1,4 +1,4 @@ -package eth1 +package eth1_utils import ( "encoding/binary" @@ -49,7 +49,14 @@ func HeaderToHeaderRPC(header *types.Header) *execution.Header { ExcessDataGas: header.ExcessDataGas, DataGasUsed: header.DataGasUsed, } +} +func HeadersToHeadersRPC(headers []*types.Header) []*execution.Header { + ret := []*execution.Header{} + for _, header := range headers { + ret = append(ret, HeaderToHeaderRPC(header)) + } + return ret } func HeaderRpcToHeader(header *execution.Header) (*types.Header, error) { @@ -132,6 +139,15 @@ func ConvertRawBlockBodyToRpc(in *types.RawBody, blockNumber uint64, blockHash l } } +func ConvertRawBlockBodiesToRpc(in []*types.RawBody, blockNumbers []uint64, blockHashes []libcommon.Hash) []*execution.BlockBody { + ret := []*execution.BlockBody{} + + for i, body := range in { + ret = append(ret, ConvertRawBlockBodyToRpc(body, blockNumbers[i], blockHashes[i])) + } + return ret +} + func ConvertRawBlockBodyFromRpc(in *execution.BlockBody) *types.RawBody { if in == nil { return nil diff --git a/turbo/execution/eth1/ethereum_execution.go b/turbo/execution/eth1/ethereum_execution.go index bf7533a90ea..5d4bc6f477c 100644 --- a/turbo/execution/eth1/ethereum_execution.go +++ b/turbo/execution/eth1/ethereum_execution.go @@ -42,12 +42,14 @@ type EthereumExecutionModule struct { builders map[uint64]*builder.BlockBuilder // configuration - config *chain.Config + config *chain.Config + historyV3 bool execution.UnimplementedExecutionServer } -func NewEthereumExecutionModule(blockReader services.FullBlockReader, db kv.RwDB, executionPipeline *stagedsync.Sync, forkValidator *engine_helpers.ForkValidator, config *chain.Config, builderFunc builder.BlockBuilderFunc, logger log.Logger) *EthereumExecutionModule { +func NewEthereumExecutionModule(blockReader services.FullBlockReader, db kv.RwDB, executionPipeline *stagedsync.Sync, forkValidator *engine_helpers.ForkValidator, + config *chain.Config, builderFunc builder.BlockBuilderFunc, logger log.Logger, historyV3 bool) *EthereumExecutionModule { return &EthereumExecutionModule{ blockReader: blockReader, db: db, diff --git a/turbo/execution/eth1/getters.go b/turbo/execution/eth1/getters.go index ee9cbbbdca4..32ab0c761d0 100644 --- a/turbo/execution/eth1/getters.go +++ b/turbo/execution/eth1/getters.go @@ -13,6 +13,7 @@ import ( types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types" "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils" ) func (e *EthereumExecutionModule) parseSegmentRequest(ctx context.Context, tx kv.Tx, req *execution.GetSegmentRequest) (blockHash libcommon.Hash, blockNumber uint64, err error) { @@ -64,7 +65,7 @@ func (e *EthereumExecutionModule) GetBody(ctx context.Context, req *execution.Ge } rawBody := body.RawBody() - return &execution.GetBodyResponse{Body: ConvertRawBlockBodyToRpc(rawBody, blockNumber, blockHash)}, nil + return &execution.GetBodyResponse{Body: eth1_utils.ConvertRawBlockBodyToRpc(rawBody, blockNumber, blockHash)}, nil } func (e *EthereumExecutionModule) GetHeader(ctx context.Context, req *execution.GetSegmentRequest) (*execution.GetHeaderResponse, error) { @@ -90,7 +91,7 @@ func (e *EthereumExecutionModule) GetHeader(ctx context.Context, req *execution. return &execution.GetHeaderResponse{Header: nil}, nil } - return &execution.GetHeaderResponse{Header: HeaderToHeaderRPC(header)}, nil + return &execution.GetHeaderResponse{Header: eth1_utils.HeaderToHeaderRPC(header)}, nil } func (e *EthereumExecutionModule) GetHeaderHashNumber(ctx context.Context, req *types2.H256) (*execution.GetHeaderHashNumberResponse, error) { diff --git a/turbo/execution/eth1/inserters.go b/turbo/execution/eth1/inserters.go index ba89959c869..be6fb63d029 100644 --- a/turbo/execution/eth1/inserters.go +++ b/turbo/execution/eth1/inserters.go @@ -7,6 +7,7 @@ import ( "github.com/ledgerwatch/erigon-lib/gointerfaces" "github.com/ledgerwatch/erigon-lib/gointerfaces/execution" "github.com/ledgerwatch/erigon/core/rawdb" + "github.com/ledgerwatch/erigon/turbo/execution/eth1/eth1_utils" ) func (e *EthereumExecutionModule) InsertBodies(ctx context.Context, req *execution.InsertBodiesRequest) (*execution.InsertionResult, error) { @@ -22,9 +23,15 @@ func (e *EthereumExecutionModule) InsertBodies(ctx context.Context, req *executi } defer tx.Rollback() for _, grpcBody := range req.Bodies { - if _, err := rawdb.WriteRawBodyIfNotExists(tx, gointerfaces.ConvertH256ToHash(grpcBody.BlockHash), grpcBody.BlockNumber, ConvertRawBlockBodyFromRpc(grpcBody)); err != nil { + var ok bool + if ok, err = rawdb.WriteRawBodyIfNotExists(tx, gointerfaces.ConvertH256ToHash(grpcBody.BlockHash), grpcBody.BlockNumber, eth1_utils.ConvertRawBlockBodyFromRpc(grpcBody)); err != nil { return nil, fmt.Errorf("ethereumExecutionModule.InsertBodies: could not insert: %s", err) } + if e.historyV3 && ok { + if err := rawdb.AppendCanonicalTxNums(tx, grpcBody.BlockNumber); err != nil { + return nil, fmt.Errorf("ethereumExecutionModule.InsertBodies: could not insert: %s", err) + } + } } if err := tx.Commit(); err != nil { return nil, fmt.Errorf("ethereumExecutionModule.InsertBodies: could not commit: %s", err) @@ -48,7 +55,7 @@ func (e *EthereumExecutionModule) InsertHeaders(ctx context.Context, req *execut } defer tx.Rollback() for _, grpcHeader := range req.Headers { - header, err := HeaderRpcToHeader(grpcHeader) + header, err := eth1_utils.HeaderRpcToHeader(grpcHeader) if err != nil { return nil, fmt.Errorf("ethereumExecutionModule.InsertHeaders: cannot convert headers: %s", err) }