Skip to content

Commit

Permalink
Fix PublishBlockRequest SSZ decoding (#5078)
Browse files Browse the repository at this point in the history
* Fix PublishBlockRequest SSZ decoding

* Send header for block v1 ssz client

* Delete unused function
  • Loading branch information
michaelsproul committed Jan 19, 2024
1 parent 2a161ce commit 185646a
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 50 deletions.
12 changes: 9 additions & 3 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use eth2::types::{
PublishBlockRequest, ValidatorBalancesRequestBody, ValidatorId, ValidatorStatus,
ValidatorsRequestBody,
};
use eth2::{CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use eth2::{CONSENSUS_VERSION_HEADER, CONTENT_TYPE_HEADER, SSZ_CONTENT_TYPE_HEADER};
use lighthouse_network::{types::SyncState, EnrExt, NetworkGlobals, PeerId, PubsubMessage};
use lighthouse_version::version_with_platform;
use logging::SSELoggingComponents;
Expand Down Expand Up @@ -1249,6 +1249,8 @@ pub fn serve<T: BeaconChainTypes>(
/*
* beacon/blocks
*/
let consensus_version_header_filter =
warp::header::header::<ForkName>(CONSENSUS_VERSION_HEADER);

// POST beacon/blocks
let post_beacon_blocks = eth_v1
Expand Down Expand Up @@ -1286,20 +1288,22 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("blocks"))
.and(warp::path::end())
.and(warp::body::bytes())
.and(consensus_version_header_filter)
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
move |block_bytes: Bytes,
consensus_version: ForkName,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block_contents = PublishBlockRequest::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
&chain.spec,
consensus_version,
)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
Expand Down Expand Up @@ -1356,21 +1360,23 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::query::<api_types::BroadcastValidationQuery>())
.and(warp::path::end())
.and(warp::body::bytes())
.and(consensus_version_header_filter)
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
block_bytes: Bytes,
consensus_version: ForkName,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger| {
task_spawner.spawn_async_with_rejection(Priority::P0, async move {
let block_contents = PublishBlockRequest::<T::EthSpec>::from_ssz_bytes(
&block_bytes,
&chain.spec,
consensus_version,
)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
Expand Down
10 changes: 6 additions & 4 deletions beacon_node/http_api/tests/broadcast_validation_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ use beacon_chain::{
test_utils::{AttestationStrategy, BlockStrategy},
GossipVerifiedBlock, IntoGossipVerifiedBlockContents,
};
use eth2::reqwest::StatusCode;
use eth2::types::{BroadcastValidation, PublishBlockRequest, SignedBeaconBlock};
use http_api::test_utils::InteractiveTester;
use http_api::{publish_blinded_block, publish_block, reconstruct_block, ProvenancedBlock};
use std::sync::Arc;
use tree_hash::TreeHash;
use types::{Hash256, MainnetEthSpec, Slot};
use types::{Epoch, EthSpec, ForkName, Hash256, MainnetEthSpec, Slot};
use warp::Rejection;
use warp_utils::reject::CustomBadRequest;

use eth2::reqwest::StatusCode;

type E = MainnetEthSpec;

/*
Expand Down Expand Up @@ -190,7 +189,10 @@ pub async fn gossip_full_pass_ssz() {
// `validator_count // 32`.
let validator_count = 64;
let num_initial: u64 = 31;
let tester = InteractiveTester::<E>::new(None, validator_count).await;
// Deneb epoch set ahead of block slot, to test fork-based decoding
let mut spec = ForkName::Capella.make_genesis_spec(MainnetEthSpec::default_spec());
spec.deneb_fork_epoch = Some(Epoch::new(4));
let tester = InteractiveTester::<E>::new(Some(spec), validator_count).await;

// Create some chain depth.
tester.harness.advance_slot();
Expand Down
31 changes: 9 additions & 22 deletions common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,25 +377,6 @@ impl BeaconNodeHttpClient {
ok_or_error(response).await
}

/// Generic POST function supporting arbitrary responses and timeouts.
async fn post_generic_with_ssz_body<T: Into<Body>, U: IntoUrl>(
&self,
url: U,
body: T,
timeout: Option<Duration>,
) -> Result<Response, Error> {
let mut builder = self.client.post(url);
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}
let response = builder
.header("Content-Type", "application/octet-stream")
.body(body)
.send()
.await?;
ok_or_error(response).await
}

/// Generic POST function supporting arbitrary responses and timeouts.
async fn post_generic_with_consensus_version<T: Serialize, U: IntoUrl>(
&self,
Expand Down Expand Up @@ -866,10 +847,11 @@ impl BeaconNodeHttpClient {
.push("beacon")
.push("blocks");

self.post_generic_with_ssz_body(
self.post_generic_with_consensus_version_and_ssz_body(
path,
block_contents.as_ssz_bytes(),
Some(self.timeouts.proposal),
block_contents.signed_block().fork_name_unchecked(),
)
.await?;

Expand Down Expand Up @@ -910,8 +892,13 @@ impl BeaconNodeHttpClient {
.push("beacon")
.push("blinded_blocks");

self.post_generic_with_ssz_body(path, block.as_ssz_bytes(), Some(self.timeouts.proposal))
.await?;
self.post_generic_with_consensus_version_and_ssz_body(
path,
block.as_ssz_bytes(),
Some(self.timeouts.proposal),
block.fork_name_unchecked(),
)
.await?;

Ok(())
}
Expand Down
34 changes: 13 additions & 21 deletions common/eth2/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1487,7 +1487,7 @@ mod tests {
.expect("should convert into signed block contents");

let decoded: PublishBlockRequest<E> =
PublishBlockRequest::from_ssz_bytes(&block.as_ssz_bytes(), &spec)
PublishBlockRequest::from_ssz_bytes(&block.as_ssz_bytes(), ForkName::Capella)
.expect("should decode Block");
assert!(matches!(decoded, PublishBlockRequest::Block(_)));
}
Expand All @@ -1505,9 +1505,11 @@ mod tests {
let kzg_proofs = KzgProofs::<E>::from(vec![KzgProof::empty()]);
let signed_block_contents = PublishBlockRequest::new(block, Some((kzg_proofs, blobs)));

let decoded: PublishBlockRequest<E> =
PublishBlockRequest::from_ssz_bytes(&signed_block_contents.as_ssz_bytes(), &spec)
.expect("should decode BlockAndBlobSidecars");
let decoded: PublishBlockRequest<E> = PublishBlockRequest::from_ssz_bytes(
&signed_block_contents.as_ssz_bytes(),
ForkName::Deneb,
)
.expect("should decode BlockAndBlobSidecars");
assert!(matches!(decoded, PublishBlockRequest::BlockContents(_)));
}
}
Expand Down Expand Up @@ -1746,22 +1748,11 @@ impl<T: EthSpec> PublishBlockRequest<T> {
}
}

/// SSZ decode with fork variant determined by slot.
pub fn from_ssz_bytes(bytes: &[u8], spec: &ChainSpec) -> Result<Self, ssz::DecodeError> {
let slot_len = <Slot as Decode>::ssz_fixed_len();
let slot_bytes = bytes
.get(0..slot_len)
.ok_or(DecodeError::InvalidByteLength {
len: bytes.len(),
expected: slot_len,
})?;

let slot = Slot::from_ssz_bytes(slot_bytes)?;
let fork_at_slot = spec.fork_name_at_slot::<T>(slot);

match fork_at_slot {
/// SSZ decode with fork variant determined by `fork_name`.
pub fn from_ssz_bytes(bytes: &[u8], fork_name: ForkName) -> Result<Self, ssz::DecodeError> {
match fork_name {
ForkName::Base | ForkName::Altair | ForkName::Merge | ForkName::Capella => {
SignedBeaconBlock::from_ssz_bytes(bytes, spec)
SignedBeaconBlock::from_ssz_bytes_for_fork(bytes, fork_name)
.map(|block| PublishBlockRequest::Block(block))
}
ForkName::Deneb => {
Expand All @@ -1771,8 +1762,9 @@ impl<T: EthSpec> PublishBlockRequest<T> {
builder.register_type::<BlobsList<T>>()?;

let mut decoder = builder.build()?;
let block = decoder
.decode_next_with(|bytes| SignedBeaconBlock::from_ssz_bytes(bytes, spec))?;
let block = decoder.decode_next_with(|bytes| {
SignedBeaconBlock::from_ssz_bytes_for_fork(bytes, fork_name)
})?;
let kzg_proofs = decoder.decode_next()?;
let blobs = decoder.decode_next()?;
Ok(PublishBlockRequest::new(block, Some((kzg_proofs, blobs))))
Expand Down
10 changes: 10 additions & 0 deletions consensus/types/src/signed_beacon_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,16 @@ impl<E: EthSpec, Payload: AbstractExecPayload<E>> SignedBeaconBlock<E, Payload>
Self::from_ssz_bytes_with(bytes, |bytes| BeaconBlock::from_ssz_bytes(bytes, spec))
}

/// SSZ decode with explicit fork variant.
pub fn from_ssz_bytes_for_fork(
bytes: &[u8],
fork_name: ForkName,
) -> Result<Self, ssz::DecodeError> {
Self::from_ssz_bytes_with(bytes, |bytes| {
BeaconBlock::from_ssz_bytes_for_fork(bytes, fork_name)
})
}

/// SSZ decode which attempts to decode all variants (slow).
pub fn any_from_ssz_bytes(bytes: &[u8]) -> Result<Self, ssz::DecodeError> {
Self::from_ssz_bytes_with(bytes, BeaconBlock::any_from_ssz_bytes)
Expand Down

0 comments on commit 185646a

Please sign in to comment.