Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get blobs from the EL's blob pool #5829

Closed
wants to merge 12 commits into from
702 changes: 640 additions & 62 deletions Cargo.lock

Large diffs are not rendered by default.

102 changes: 98 additions & 4 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use crate::{
};
use derivative::Derivative;
use eth2::types::{EventKind, PublishBlockRequest};
use execution_layer::versioned_hashes::extract_blob_transaction_ids;
use execution_layer::PayloadStatus;
pub use fork_choice::{AttestationFromBlock, PayloadVerificationStatus};
use parking_lot::RwLockReadGuard;
Expand Down Expand Up @@ -95,9 +96,9 @@ use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
use task_executor::JoinHandle;
use tree_hash::TreeHash;
use types::{
BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, ExecutionBlockHash,
Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock,
SignedBeaconBlockHeader, Slot,
blob_sidecar::FixedBlobSidecarList, BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec,
Epoch, EthSpec, ExecutionBlockHash, Hash256, InconsistentFork, PublicKey, PublicKeyBytes,
RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};
use types::{BlobSidecar, ExecPayload};

Expand Down Expand Up @@ -652,10 +653,14 @@ pub struct SignatureVerifiedBlock<T: BeaconChainTypes> {
consensus_context: ConsensusContext<T::EthSpec>,
}

/// Used to await the result of executing payload with a remote EE.
/// Used to await the result of executing payload with an EE.
type PayloadVerificationHandle<E> =
JoinHandle<Option<Result<PayloadVerificationOutcome, BlockError<E>>>>;

// FIXME(sproul): delete
// Used to await the result of downloading blobs from an EE.
// type BlobFetcherHandle<E> = JoinHandle<Option<Result<(), BlockError<E>>>>;

/// A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and
/// ready to import into the `BeaconChain`. The validation includes:
///
Expand All @@ -671,6 +676,7 @@ pub struct ExecutionPendingBlock<T: BeaconChainTypes> {
pub block: MaybeAvailableBlock<T::EthSpec>,
pub import_data: BlockImportData<T::EthSpec>,
pub payload_verification_handle: PayloadVerificationHandle<T::EthSpec>,
// pub blob_fetcher_handle: BlobFetcherHandle<T::EthSpec>,
}

pub trait IntoGossipVerifiedBlockContents<T: BeaconChainTypes>: Sized {
Expand Down Expand Up @@ -1302,6 +1308,94 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {

check_block_relevancy(block.as_block(), block_root, chain)?;

// Spawn an async call to the execution node to fetch blobs.
let blob_chain = chain.clone();
let blob_block = block.as_block().clone();
let blob_fetcher_future = async move {
let block = blob_block;
let chain = blob_chain;
let execution_payload = block.message().execution_payload()?;
let blob_ids = extract_blob_transaction_ids::<T::EthSpec>(
execution_payload.transactions().unwrap(),
)
.map_err(|e| {
BlockError::ExecutionPayloadError(ExecutionPayloadError::RequestFailed(
execution_layer::Error::VerifyingVersionedHashes(e),
))
})?;
let execution_layer = chain
.execution_layer
.as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)?;
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
debug!(
chain.log,
"Blobs from EL request";
"num_blob_tx" => blob_ids.len(),
);
let response = execution_layer.get_blobs(blob_ids).await.map_err(|e| {
BlockError::ExecutionPayloadError(ExecutionPayloadError::RequestFailed(e))
})?;
let num_success = response.blobs.iter().filter(|b| b.is_some()).count();
if num_success == 0 {
debug!(chain.log, "Blobs from EL empty");
}
debug!(
chain.log,
"Blobs from EL success";
"num_blob_tx" => num_success
);
let mut fixed_blob_sidecar_list = FixedBlobSidecarList::default();
for (i, (blob, kzg_proof)) in response
.blobs
.into_iter()
.flatten()
.flat_map(|blob| blob.blobs.into_iter().zip(blob.proofs))
.enumerate()
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
{
match BlobSidecar::new(i, blob, &block, kzg_proof) {
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
Ok(blob) => {
if let Some(blob_mut) = fixed_blob_sidecar_list.get_mut(i) {
*blob_mut = Some(Arc::new(blob));
} else {
error!(
chain.log,
"Blobs from EL out of bounds";
"i" => i
);
}
}
Err(e) => {
warn!(
chain.log,
"Blobs from EL error";
"error" => ?e
);
}
}
}
debug!(
chain.log,
"Blobs from EL processing";
"num_blobs" => fixed_blob_sidecar_list.iter().filter(|b| b.is_some()).count(),
);
chain
.process_rpc_blobs(block.slot(), block_root, fixed_blob_sidecar_list)
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved
.await
.map(|_| {
debug!(chain.log, "Blobs from EL processed");
})
.map_err(|e| {
debug!(chain.log, "Blobs from EL errored"; "error" => ?e);
e
})
};
let blob_fetcher_handle = chain
.task_executor
.spawn_handle(blob_fetcher_future, "execution_blob_fetcher")
.ok_or(BeaconChainError::RuntimeShutdown)?;
// FIXME(sproul): should we wait for this handle?
drop(blob_fetcher_handle);

// Define a future that will verify the execution payload with an execution engine.
//
// We do this as early as possible so that later parts of this function can run in parallel
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/execution_layer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ pretty_reqwest_error = { workspace = true }
arc-swap = "1.6.0"
eth2_network_config = { workspace = true }
alloy-rlp = "0.3"
alloy-consensus = { git = "https://github.com/alloy-rs/alloy.git", rev = "974d488bab5e21e9f17452a39a4bfa56677367b2" }
alloy-consensus = { git = "https://github.com/alloy-rs/alloy.git", rev = "64feb9bc51c8021ea08535694c44de84222f474e" }
reth-primitives = { git = "https://github.com/paradigmxyz/reth.git", rev = "5943c4707d3a3a7debd6bd0f4ab27d3f5cd657a5" }
lighthouse_version = { workspace = true }
6 changes: 5 additions & 1 deletion beacon_node/execution_layer/src/engine_api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::engines::ForkchoiceState;
use crate::http::{
ENGINE_FORKCHOICE_UPDATED_V1, ENGINE_FORKCHOICE_UPDATED_V2, ENGINE_FORKCHOICE_UPDATED_V3,
ENGINE_GET_CLIENT_VERSION_V1, ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1,
ENGINE_GET_BLOBS_V1, ENGINE_GET_CLIENT_VERSION_V1, ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1,
ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1, ENGINE_GET_PAYLOAD_V1, ENGINE_GET_PAYLOAD_V2,
ENGINE_GET_PAYLOAD_V3, ENGINE_NEW_PAYLOAD_V1, ENGINE_NEW_PAYLOAD_V2, ENGINE_NEW_PAYLOAD_V3,
};
Expand Down Expand Up @@ -661,6 +661,7 @@ pub struct EngineCapabilities {
pub get_payload_v2: bool,
pub get_payload_v3: bool,
pub get_client_version_v1: bool,
pub get_blobs_v1: bool,
}

impl EngineCapabilities {
Expand Down Expand Up @@ -702,6 +703,9 @@ impl EngineCapabilities {
if self.get_client_version_v1 {
response.push(ENGINE_GET_CLIENT_VERSION_V1);
}
if self.get_blobs_v1 {
response.push(ENGINE_GET_BLOBS_V1);
}

response
}
Expand Down
20 changes: 20 additions & 0 deletions beacon_node/execution_layer/src/engine_api/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use super::*;
use crate::auth::Auth;
use crate::json_structures::*;
use crate::versioned_hashes::BlobTransactionId;
use lazy_static::lazy_static;
use lighthouse_version::{COMMIT_PREFIX, VERSION};
use reqwest::header::CONTENT_TYPE;
Expand Down Expand Up @@ -56,6 +57,9 @@ pub const ENGINE_EXCHANGE_CAPABILITIES_TIMEOUT: Duration = Duration::from_secs(1
pub const ENGINE_GET_CLIENT_VERSION_V1: &str = "engine_getClientVersionV1";
pub const ENGINE_GET_CLIENT_VERSION_TIMEOUT: Duration = Duration::from_secs(1);

pub const ENGINE_GET_BLOBS_V1: &str = "engine_getBlobsV1";
pub const ENGINE_GET_BLOBS_TIMEOUT: Duration = Duration::from_secs(1);

/// This error is returned during a `chainId` call by Geth.
pub const EIP155_ERROR_STR: &str = "chain not synced beyond EIP-155 replay-protection fork block";
/// This code is returned by all clients when a method is not supported
Expand All @@ -75,6 +79,7 @@ pub static LIGHTHOUSE_CAPABILITIES: &[&str] = &[
ENGINE_GET_PAYLOAD_BODIES_BY_HASH_V1,
ENGINE_GET_PAYLOAD_BODIES_BY_RANGE_V1,
ENGINE_GET_CLIENT_VERSION_V1,
ENGINE_GET_BLOBS_V1,
];

lazy_static! {
Expand Down Expand Up @@ -702,6 +707,20 @@ impl HttpJsonRpc {
}
}

pub async fn get_blobs<E: EthSpec>(
&self,
blob_ids: Vec<BlobTransactionId>,
) -> Result<GetBlobsResponse<E>, Error> {
let params = json!([blob_ids]);

self.rpc_request(
ENGINE_GET_BLOBS_V1,
params,
ENGINE_GET_BLOBS_TIMEOUT * self.execution_timeout_multiplier,
)
.await
}

pub async fn get_block_by_number<'a>(
&self,
query: BlockByNumberQuery<'a>,
Expand Down Expand Up @@ -1078,6 +1097,7 @@ impl HttpJsonRpc {
get_payload_v2: capabilities.contains(ENGINE_GET_PAYLOAD_V2),
get_payload_v3: capabilities.contains(ENGINE_GET_PAYLOAD_V3),
get_client_version_v1: capabilities.contains(ENGINE_GET_CLIENT_VERSION_V1),
get_blobs_v1: capabilities.contains(ENGINE_GET_BLOBS_V1),
})
}

Expand Down
6 changes: 6 additions & 0 deletions beacon_node/execution_layer/src/engine_api/json_structures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,12 @@ impl<E: EthSpec> From<JsonBlobsBundleV1<E>> for BlobsBundle<E> {
}
}

#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(bound = "E: EthSpec", rename_all = "camelCase")]
pub struct GetBlobsResponse<E: EthSpec> {
pub blobs: Vec<Option<JsonBlobsBundleV1<E>>>,
}

#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct JsonForkchoiceStateV1 {
Expand Down
15 changes: 14 additions & 1 deletion beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
//! This crate only provides useful functionality for "The Merge", it does not provide any of the
//! deposit-contract functionality that the `beacon_node/eth1` crate already provides.

use crate::json_structures::GetBlobsResponse;
use crate::payload_cache::PayloadCache;
use crate::versioned_hashes::BlobTransactionId;
use arc_swap::ArcSwapOption;
use auth::{strip_prefix, Auth, JwtKey};
pub use block_hash::calculate_execution_block_hash;
Expand Down Expand Up @@ -63,7 +65,7 @@ mod metrics;
pub mod payload_cache;
mod payload_status;
pub mod test_utils;
mod versioned_hashes;
pub mod versioned_hashes;

/// Indicates the default jwt authenticated execution endpoint.
pub const DEFAULT_EXECUTION_ENDPOINT: &str = "http://localhost:8551/";
Expand Down Expand Up @@ -1837,6 +1839,17 @@ impl<E: EthSpec> ExecutionLayer<E> {
}
}

pub async fn get_blobs(
&self,
query: Vec<BlobTransactionId>,
) -> Result<GetBlobsResponse<E>, Error> {
self.engine()
.request(|engine| async move { engine.api.get_blobs(query).await })
.await
.map_err(Box::new)
.map_err(Error::EngineError)
}

pub async fn get_block_by_number(
&self,
query: BlockByNumberQuery<'_>,
Expand Down
1 change: 1 addition & 0 deletions beacon_node/execution_layer/src/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub const DEFAULT_ENGINE_CAPABILITIES: EngineCapabilities = EngineCapabilities {
get_payload_v2: true,
get_payload_v3: true,
get_client_version_v1: true,
get_blobs_v1: true,
};

lazy_static! {
Expand Down
62 changes: 43 additions & 19 deletions beacon_node/execution_layer/src/versioned_hashes.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
extern crate alloy_consensus;
extern crate alloy_rlp;
use alloy_consensus::TxEnvelope;
use alloy_rlp::Decodable;
use serde::{Deserialize, Serialize};
use types::{EthSpec, ExecutionPayloadRef, Hash256, Unsigned, VersionedHash};

#[derive(Debug)]
Expand Down Expand Up @@ -44,28 +43,52 @@ pub fn extract_versioned_hashes_from_transactions<E: EthSpec>(
let mut versioned_hashes = Vec::new();

for tx in transactions {
match beacon_tx_to_tx_envelope(tx)? {
TxEnvelope::Eip4844(signed_tx_eip4844) => {
versioned_hashes.extend(
signed_tx_eip4844
.tx()
.blob_versioned_hashes
.iter()
.map(|fb| Hash256::from(fb.0)),
);
}
// enumerating all variants explicitly to make pattern irrefutable
// in case new types are added in the future which also have blobs
TxEnvelope::Legacy(_)
| TxEnvelope::TaggedLegacy(_)
| TxEnvelope::Eip2930(_)
| TxEnvelope::Eip1559(_) => {}
if let TxEnvelope::Eip4844(signed_tx_eip4844) = beacon_tx_to_tx_envelope(tx)? {
versioned_hashes.extend(
signed_tx_eip4844
.tx()
.tx()
.blob_versioned_hashes
.iter()
.map(|fb| Hash256::from(fb.0)),
);
}
}

Ok(versioned_hashes)
}

#[derive(Deserialize, Serialize)]
pub struct BlobTransactionId {
tx_hash: Hash256,
versioned_hashes: Vec<VersionedHash>,
}

pub fn extract_blob_transaction_ids<E: EthSpec>(
transactions: &types::Transactions<E>,
) -> Result<Vec<BlobTransactionId>, Error> {
let mut transaction_ids = vec![];

for tx in transactions {
if let TxEnvelope::Eip4844(signed_tx_eip4844) = beacon_tx_to_tx_envelope(tx)? {
let tx_hash = signed_tx_eip4844.hash();
let versioned_hashes = signed_tx_eip4844
.tx()
.tx()
.blob_versioned_hashes
.iter()
.map(|fb| Hash256::from(fb.0))
.collect();
transaction_ids.push(BlobTransactionId {
tx_hash: Hash256::from_slice(tx_hash.as_slice()),
versioned_hashes,
});
}
}

Ok(transaction_ids)
}

pub fn beacon_tx_to_tx_envelope<N: Unsigned>(
tx: &types::Transaction<N>,
) -> Result<TxEnvelope, Error> {
Expand All @@ -78,7 +101,8 @@ pub fn beacon_tx_to_tx_envelope<N: Unsigned>(
mod test {
use super::*;
use crate::test_utils::static_valid_tx;
use alloy_consensus::{TxKind, TxLegacy};
use alloy_consensus::TxLegacy;
use reth_primitives::alloy_primitives::TxKind;

type E = types::MainnetEthSpec;

Expand Down
Loading