Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get blobs from the EL's blob pool #5829

Closed
wants to merge 12 commits into from
695 changes: 633 additions & 62 deletions Cargo.lock

Large diffs are not rendered by default.

97 changes: 74 additions & 23 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2998,6 +2998,38 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.remove_notified(&block_root, r)
}

pub async fn process_engine_blobs(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::BlockIsAlreadyKnown(block_root));
}

if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_blob_sidecar_subscribers() {
for blob in blobs.iter().filter_map(|maybe_blob| maybe_blob.as_ref()) {
event_handler.register(EventKind::BlobSidecar(
SseBlobSidecar::from_blob_sidecar(blob),
));
}
}
}

let r = self
.check_engine_blob_availability_and_import(slot, block_root, blobs)
.await;
self.remove_notified(&block_root, r)
}

/// Remove any block components from the *processing cache* if we no longer require them. If the
/// block was imported full or erred, we no longer require them.
fn remove_notified(
Expand Down Expand Up @@ -3257,6 +3289,33 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.process_availability(slot, availability).await
}

fn check_blobs_for_slashability(
self: &Arc<Self>,
block_root: Hash256,
blobs: &FixedBlobSidecarList<T::EthSpec>,
) -> Result<(), BlockError<T::EthSpec>> {
let mut slashable_cache = self.observed_slashable.write();
for header in blobs
.into_iter()
.filter_map(|b| b.as_ref().map(|b| b.signed_block_header.clone()))
.unique()
{
if verify_header_signature::<T, BlockError<T::EthSpec>>(self, &header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header);
}
}
}
Ok(())
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
async fn check_rpc_blob_availability_and_import(
Expand All @@ -3265,36 +3324,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
// Need to scope this to ensure the lock is dropped before calling `process_availability`
// Even an explicit drop is not enough to convince the borrow checker.
{
let mut slashable_cache = self.observed_slashable.write();
for header in blobs
.into_iter()
.filter_map(|b| b.as_ref().map(|b| b.signed_block_header.clone()))
.unique()
{
if verify_header_signature::<T, BlockError<T::EthSpec>>(self, &header).is_ok() {
slashable_cache
.observe_slashable(
header.message.slot,
header.message.proposer_index,
block_root,
)
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
if let Some(slasher) = self.slasher.as_ref() {
slasher.accept_block_header(header);
}
}
}
}
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability = self
.data_availability_checker
.put_rpc_blobs(block_root, blobs)?;

self.process_availability(slot, availability).await
}

async fn check_engine_blob_availability_and_import(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability = self
.data_availability_checker
.put_engine_blobs(block_root, blobs)?;

self.process_availability(slot, availability).await
}

/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
///
/// An error is returned if the block was unable to be imported. It may be partially imported
Expand Down
15 changes: 13 additions & 2 deletions beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,15 +336,26 @@ impl<E: EthSpec> KzgVerifiedBlobList<E> {
) -> Result<Self, KzgError> {
let blobs = blob_list.into_iter().collect::<Vec<_>>();
verify_kzg_for_blob_list(blobs.iter(), kzg)?;
Ok(Self {
Ok(Self::from_verified(blobs, seen_timestamp))
}

/// Create a `KzgVerifiedBlobList` from `blobs` that are already KZG verified.
///
/// This should be used with caution, as used incorrectly it could result in KZG verification
/// being skipped and invalid blobs being deemed valid.
pub fn from_verified<I: IntoIterator<Item = Arc<BlobSidecar<E>>>>(
blobs: I,
seen_timestamp: Duration,
) -> Self {
Self {
verified_blobs: blobs
.into_iter()
.map(|blob| KzgVerifiedBlob {
blob,
seen_timestamp,
})
.collect(),
})
}
}
}

Expand Down
136 changes: 130 additions & 6 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ use slot_clock::SlotClock;
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use ssz_types::VariableList;
use state_processing::per_block_processing::{errors::IntoWithIndex, is_merge_transition_block};
use state_processing::per_block_processing::{
deneb::kzg_commitment_to_versioned_hash, errors::IntoWithIndex, is_merge_transition_block,
};
use state_processing::{
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
per_block_processing, per_slot_processing,
Expand All @@ -94,9 +96,9 @@ use std::sync::Arc;
use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
use task_executor::JoinHandle;
use types::{
BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, ExecutionBlockHash,
Hash256, InconsistentFork, PublicKey, PublicKeyBytes, RelativeEpoch, SignedBeaconBlock,
SignedBeaconBlockHeader, Slot,
blob_sidecar::FixedBlobSidecarList, BeaconBlockRef, BeaconState, BeaconStateError, ChainSpec,
Epoch, EthSpec, ExecutionBlockHash, Hash256, InconsistentFork, PublicKey, PublicKeyBytes,
RelativeEpoch, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};
use types::{BlobSidecar, ExecPayload};

Expand Down Expand Up @@ -651,10 +653,14 @@ pub struct SignatureVerifiedBlock<T: BeaconChainTypes> {
consensus_context: ConsensusContext<T::EthSpec>,
}

/// Used to await the result of executing payload with a remote EE.
/// Used to await the result of executing payload with an EE.
type PayloadVerificationHandle<E> =
JoinHandle<Option<Result<PayloadVerificationOutcome, BlockError<E>>>>;

// FIXME(sproul): delete
// Used to await the result of downloading blobs from an EE.
// type BlobFetcherHandle<E> = JoinHandle<Option<Result<(), BlockError<E>>>>;

/// A wrapper around a `SignedBeaconBlock` that indicates that this block is fully verified and
/// ready to import into the `BeaconChain`. The validation includes:
///
Expand All @@ -670,6 +676,7 @@ pub struct ExecutionPendingBlock<T: BeaconChainTypes> {
pub block: MaybeAvailableBlock<T::EthSpec>,
pub import_data: BlockImportData<T::EthSpec>,
pub payload_verification_handle: PayloadVerificationHandle<T::EthSpec>,
// pub blob_fetcher_handle: BlobFetcherHandle<T::EthSpec>,
}

pub trait IntoGossipVerifiedBlockContents<T: BeaconChainTypes>: Sized {
Expand Down Expand Up @@ -1308,9 +1315,126 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
/*
* Perform cursory checks to see if the block is even worth processing.
*/

check_block_relevancy(block.as_block(), block_root, chain)?;

// Spawn an async call to the execution node to fetch blobs.
let blob_chain = chain.clone();
let blob_block = block.as_block().clone();
let blob_fetcher_future = async move {
let block = blob_block;
let chain = blob_chain;

let versioned_hashes =
if let Ok(kzg_commitments) = block.message().body().blob_kzg_commitments() {
kzg_commitments
.iter()
.map(kzg_commitment_to_versioned_hash)
.collect()
} else {
vec![]
};
let num_blobs = versioned_hashes.len();

if versioned_hashes.is_empty() {
debug!(chain.log, "Blobs from EL - none required");
return Ok(());
}

let execution_layer = chain
.execution_layer
.as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)?;
michaelsproul marked this conversation as resolved.
Show resolved Hide resolved

debug!(
chain.log,
"Blobs from EL - start request";
"num_blobs" => num_blobs,
);
let response = execution_layer
.get_blobs(versioned_hashes)
.await
.map_err(|e| {
BlockError::ExecutionPayloadError(ExecutionPayloadError::RequestFailed(e))
})?;
let num_fetched_blobs = response.iter().filter(|b| b.is_some()).count();
if num_fetched_blobs == 0 {
debug!(chain.log, "Blobs from EL - response with none");
return Ok(());
} else if num_fetched_blobs < num_blobs {
debug!(
chain.log,
"Blobs from EL - response with some";
"fetched" => num_fetched_blobs,
"total" => num_blobs,
);
} else {
debug!(
chain.log,
"Blobs from EL - response with all";
"num_blobs" => num_blobs
);
}
let (signed_block_header, kzg_commitments_proof) =
block.signed_block_header_and_kzg_commitments_proof()?;

let mut fixed_blob_sidecar_list = FixedBlobSidecarList::default();
for (i, blob_and_proof) in response
.into_iter()
.enumerate()
.filter_map(|(i, opt_blob)| Some((i, opt_blob?)))
{
match BlobSidecar::new_efficiently(
i,
blob_and_proof.blob,
&block,
signed_block_header.clone(),
&kzg_commitments_proof,
blob_and_proof.proof,
) {
Ok(blob) => {
if let Some(blob_mut) = fixed_blob_sidecar_list.get_mut(i) {
*blob_mut = Some(Arc::new(blob));
} else {
error!(
chain.log,
"Blobs from EL - out of bounds";
"i" => i
);
}
}
Err(e) => {
warn!(
chain.log,
"Blobs from EL - error";
"error" => ?e
);
}
}
}
debug!(
chain.log,
"Blobs from EL - start processing";
"num_blobs" => num_blobs,
);
chain
.process_engine_blobs(block.slot(), block_root, fixed_blob_sidecar_list)
.await
.map(|_| {
debug!(chain.log, "Blobs from EL - processed");
})
.map_err(|e| {
warn!(chain.log, "Blobs from EL - error"; "error" => ?e);
e
})
};
let blob_fetcher_handle = chain
.task_executor
.spawn_handle(blob_fetcher_future, "execution_blob_fetcher")
.ok_or(BeaconChainError::RuntimeShutdown)?;
// FIXME(sproul): should we wait for this handle?
// FIXME(sproul): should do blob broadcast on P2P in here somewhere
drop(blob_fetcher_handle);

// Define a future that will verify the execution payload with an execution engine.
//
// We do this as early as possible so that later parts of this function can run in parallel
Expand Down
23 changes: 22 additions & 1 deletion beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,34 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.ok_or(AvailabilityCheckError::SlotClockError)?;

let verified_blobs =
KzgVerifiedBlobList::new(Vec::from(blobs).into_iter().flatten(), kzg, seen_timestamp)
KzgVerifiedBlobList::new(blobs.iter().flatten().cloned(), kzg, seen_timestamp)
.map_err(AvailabilityCheckError::Kzg)?;

self.availability_cache
.put_kzg_verified_blobs(block_root, verified_blobs)
}

/// Put a list of blobs received from the EL pool into the availability cache.
///
/// This DOES NOT perform KZG verification because the KZG proofs should have been constructed
/// immediately prior to calling this function so they are assumed to be valid.
pub fn put_engine_blobs(
&self,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let seen_timestamp = self
.slot_clock
.now_duration()
.ok_or(AvailabilityCheckError::SlotClockError)?;

let verified_blobs =
KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp);

self.availability_cache
.put_kzg_verified_blobs(block_root, verified_blobs)
}

/// Check if we've cached other blobs for this block. If it completes a set and we also
/// have a block cached, return the `Availability` variant triggering block import.
/// Otherwise cache the blob sidecar.
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/execution_layer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,6 @@ pretty_reqwest_error = { workspace = true }
arc-swap = "1.6.0"
eth2_network_config = { workspace = true }
alloy-rlp = "0.3"
alloy-consensus = { git = "https://github.com/alloy-rs/alloy.git", rev = "974d488bab5e21e9f17452a39a4bfa56677367b2" }
alloy-consensus = { git = "https://github.com/alloy-rs/alloy.git", rev = "64feb9bc51c8021ea08535694c44de84222f474e" }
reth-primitives = { git = "https://github.com/paradigmxyz/reth.git", rev = "5943c4707d3a3a7debd6bd0f4ab27d3f5cd657a5" }
lighthouse_version = { workspace = true }
Loading