From ab434bc0753960f867a3dd5c9a054862d8d51b93 Mon Sep 17 00:00:00 2001 From: Pawan Dhananjay Date: Mon, 4 Apr 2022 00:26:15 +0000 Subject: [PATCH] Fix merge rpc length limits (#3133) ## Issue Addressed N/A ## Proposed Changes Fix the upper bound for blocks by root responses to be equal to the max merge block size instead of altair. Further make the rpc response limits fork aware. --- .../src/peer_manager/mod.rs | 2 +- .../lighthouse_network/src/rpc/codec/base.rs | 70 +++- .../src/rpc/codec/ssz_snappy.rs | 327 +++++++++++++++--- .../lighthouse_network/src/rpc/handler.rs | 2 +- .../lighthouse_network/src/rpc/protocol.rs | 63 ++-- .../lighthouse_network/tests/common/mod.rs | 43 ++- .../lighthouse_network/tests/rpc_tests.rs | 34 +- 7 files changed, 413 insertions(+), 128 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index cf31cee02fc..9c881e0625e 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -459,7 +459,7 @@ impl PeerManager { // Our fault. Do nothing return; } - RPCError::InvalidData => { + RPCError::InvalidData(_) => { // Peer is not complying with the protocol. This is considered a malicious action PeerAction::Fatal } diff --git a/beacon_node/lighthouse_network/src/rpc/codec/base.rs b/beacon_node/lighthouse_network/src/rpc/codec/base.rs index eca05787853..53f85d9a7b6 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/base.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/base.rs @@ -184,13 +184,25 @@ mod tests { use crate::rpc::protocol::*; use std::sync::Arc; - use types::{ForkContext, Hash256}; + use types::{Epoch, ForkContext, ForkName, Hash256, Slot}; use unsigned_varint::codec::Uvi; type Spec = types::MainnetEthSpec; - fn fork_context() -> ForkContext { - ForkContext::new::(types::Slot::new(0), Hash256::zero(), &Spec::default_spec()) + fn fork_context(fork_name: ForkName) -> ForkContext { + let mut chain_spec = Spec::default_spec(); + let altair_fork_epoch = Epoch::new(1); + let merge_fork_epoch = Epoch::new(2); + + chain_spec.altair_fork_epoch = Some(altair_fork_epoch); + chain_spec.bellatrix_fork_epoch = Some(merge_fork_epoch); + + let current_slot = match fork_name { + ForkName::Base => Slot::new(0), + ForkName::Altair => altair_fork_epoch.start_slot(Spec::slots_per_epoch()), + ForkName::Merge => merge_fork_epoch.start_slot(Spec::slots_per_epoch()), + }; + ForkContext::new::(current_slot, Hash256::zero(), &chain_spec) } #[test] @@ -202,9 +214,12 @@ mod tests { let snappy_protocol_id = ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); - let fork_context = Arc::new(fork_context()); - let mut snappy_outbound_codec = - SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576, fork_context); + let fork_context = Arc::new(fork_context(ForkName::Base)); + let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( + snappy_protocol_id, + max_rpc_size(&fork_context), + fork_context, + ); // remove response code let mut snappy_buf = buf.clone(); @@ -234,9 +249,12 @@ mod tests { let snappy_protocol_id = ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy); - let fork_context = Arc::new(fork_context()); - let mut snappy_outbound_codec = - SSZSnappyOutboundCodec::::new(snappy_protocol_id, 1_048_576, fork_context); + let fork_context = Arc::new(fork_context(ForkName::Base)); + let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new( + snappy_protocol_id, + max_rpc_size(&fork_context), + fork_context, + ); let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst).unwrap_err(); @@ -260,36 +278,50 @@ mod tests { ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy); // Response limits - let limit = protocol_id.rpc_response_limits::(); + let fork_context = Arc::new(fork_context(ForkName::Base)); + let max_rpc_size = max_rpc_size(&fork_context); + let limit = protocol_id.rpc_response_limits::(&fork_context); let mut max = encode_len(limit.max + 1); - let fork_context = Arc::new(fork_context()); let mut codec = SSZSnappyOutboundCodec::::new( protocol_id.clone(), - 1_048_576, + max_rpc_size, fork_context.clone(), ); - assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData); + assert!(matches!( + codec.decode(&mut max).unwrap_err(), + RPCError::InvalidData(_) + )); let mut min = encode_len(limit.min - 1); let mut codec = SSZSnappyOutboundCodec::::new( protocol_id.clone(), - 1_048_576, + max_rpc_size, fork_context.clone(), ); - assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData); + assert!(matches!( + codec.decode(&mut min).unwrap_err(), + RPCError::InvalidData(_) + )); // Request limits let limit = protocol_id.rpc_request_limits(); let mut max = encode_len(limit.max + 1); let mut codec = SSZSnappyOutboundCodec::::new( protocol_id.clone(), - 1_048_576, + max_rpc_size, fork_context.clone(), ); - assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData); + assert!(matches!( + codec.decode(&mut max).unwrap_err(), + RPCError::InvalidData(_) + )); let mut min = encode_len(limit.min - 1); - let mut codec = SSZSnappyOutboundCodec::::new(protocol_id, 1_048_576, fork_context); - assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData); + let mut codec = + SSZSnappyOutboundCodec::::new(protocol_id, max_rpc_size, fork_context); + assert!(matches!( + codec.decode(&mut min).unwrap_err(), + RPCError::InvalidData(_) + )); } } diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 0924dca0c08..188ae59b6f9 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -146,7 +146,10 @@ impl Decoder for SSZSnappyInboundCodec { // packet size for ssz container corresponding to `self.protocol`. let ssz_limits = self.protocol.rpc_request_limits(); if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { - return Err(RPCError::InvalidData); + return Err(RPCError::InvalidData(format!( + "RPC request length is out of bounds, length {}", + length + ))); } // Calculate worst case compression length for given uncompressed length let max_compressed_len = snap::raw::max_compress_len(length) as u64; @@ -279,9 +282,14 @@ impl Decoder for SSZSnappyOutboundCodec { // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of // packet size for ssz container corresponding to `self.protocol`. - let ssz_limits = self.protocol.rpc_response_limits::(); + let ssz_limits = self + .protocol + .rpc_response_limits::(&self.fork_context); if ssz_limits.is_out_of_bounds(length, self.max_packet_size) { - return Err(RPCError::InvalidData); + return Err(RPCError::InvalidData(format!( + "RPC response length is out of bounds, length {}", + length + ))); } // Calculate worst case compression length for given uncompressed length let max_compressed_len = snap::raw::max_compress_len(length) as u64; @@ -327,7 +335,10 @@ impl OutboundCodec> for SSZSnappyOutbound // Should not attempt to decode rpc chunks with `length > max_packet_size` or not within bounds of // packet size for ssz container corresponding to `ErrorType`. if length > self.max_packet_size || length > *ERROR_TYPE_MAX || length < *ERROR_TYPE_MIN { - return Err(RPCError::InvalidData); + return Err(RPCError::InvalidData(format!( + "RPC Error length is out of bounds, length {}", + length + ))); } // Calculate worst case compression length for given uncompressed length @@ -364,7 +375,10 @@ fn handle_error( // If snappy has read `max_compressed_len` from underlying stream and still can't fill buffer, we have a malicious message. // Report as `InvalidData` so that malicious peer gets banned. if num_bytes >= max_compressed_len { - Err(RPCError::InvalidData) + Err(RPCError::InvalidData(format!( + "Received malicious snappy message, num_bytes {}, max_compressed_len {}", + num_bytes, max_compressed_len + ))) } else { // Haven't received enough bytes to decode yet, wait for more Ok(None) @@ -460,7 +474,9 @@ fn handle_v1_request( // Handle this case just for completeness. Protocol::MetaData => { if !decoded_buffer.is_empty() { - Err(RPCError::InvalidData) + Err(RPCError::InternalError( + "Metadata requests shouldn't reach decoder", + )) } else { Ok(Some(InboundRequest::MetaData(PhantomData))) } @@ -486,7 +502,7 @@ fn handle_v2_request( // Handle this case just for completeness. Protocol::MetaData => { if !decoded_buffer.is_empty() { - Err(RPCError::InvalidData) + Err(RPCError::InvalidData("Metadata request".to_string())) } else { Ok(Some(InboundRequest::MetaData(PhantomData))) } @@ -510,7 +526,9 @@ fn handle_v1_response( decoded_buffer, )?))), // This case should be unreachable as `Goodbye` has no response. - Protocol::Goodbye => Err(RPCError::InvalidData), + Protocol::Goodbye => Err(RPCError::InvalidData( + "Goodbye RPC message has no valid response".to_string(), + )), Protocol::BlocksByRange => Ok(Some(RPCResponse::BlocksByRange(Box::new( SignedBeaconBlock::Base(SignedBeaconBlockBase::from_ssz_bytes(decoded_buffer)?), )))), @@ -615,8 +633,8 @@ mod tests { }; use std::sync::Arc; use types::{ - BeaconBlock, BeaconBlockAltair, BeaconBlockBase, Epoch, ForkContext, Hash256, Signature, - SignedBeaconBlock, Slot, + BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Epoch, ForkContext, + FullPayload, Hash256, Signature, SignedBeaconBlock, Slot, }; use snap::write::FrameEncoder; @@ -625,12 +643,20 @@ mod tests { type Spec = types::MainnetEthSpec; - fn fork_context() -> ForkContext { + fn fork_context(fork_name: ForkName) -> ForkContext { let mut chain_spec = Spec::default_spec(); - // Set fork_epoch to `Some` to ensure that the `ForkContext` object - // includes altair in the list of forks - chain_spec.altair_fork_epoch = Some(types::Epoch::new(42)); - ForkContext::new::(types::Slot::new(0), Hash256::zero(), &chain_spec) + let altair_fork_epoch = Epoch::new(1); + let merge_fork_epoch = Epoch::new(2); + + chain_spec.altair_fork_epoch = Some(altair_fork_epoch); + chain_spec.bellatrix_fork_epoch = Some(merge_fork_epoch); + + let current_slot = match fork_name { + ForkName::Base => Slot::new(0), + ForkName::Altair => altair_fork_epoch.start_slot(Spec::slots_per_epoch()), + ForkName::Merge => merge_fork_epoch.start_slot(Spec::slots_per_epoch()), + }; + ForkContext::new::(current_slot, Hash256::zero(), &chain_spec) } fn base_block() -> SignedBeaconBlock { @@ -644,6 +670,36 @@ mod tests { SignedBeaconBlock::from_block(full_block, Signature::empty()) } + /// Merge block with length < max_rpc_size. + fn merge_block_small(fork_context: &ForkContext) -> SignedBeaconBlock { + let mut block: BeaconBlockMerge<_, FullPayload> = + BeaconBlockMerge::empty(&Spec::default_spec()); + let tx = VariableList::from(vec![0; 1024]); + let txs = VariableList::from(std::iter::repeat(tx).take(5000).collect::>()); + + block.body.execution_payload.execution_payload.transactions = txs; + + let block = BeaconBlock::Merge(block); + assert!(block.ssz_bytes_len() <= max_rpc_size(fork_context)); + SignedBeaconBlock::from_block(block, Signature::empty()) + } + + /// Merge block with length > MAX_RPC_SIZE. + /// The max limit for a merge block is in the order of ~16GiB which wouldn't fit in memory. + /// Hence, we generate a merge block just greater than `MAX_RPC_SIZE` to test rejection on the rpc layer. + fn merge_block_large(fork_context: &ForkContext) -> SignedBeaconBlock { + let mut block: BeaconBlockMerge<_, FullPayload> = + BeaconBlockMerge::empty(&Spec::default_spec()); + let tx = VariableList::from(vec![0; 1024]); + let txs = VariableList::from(std::iter::repeat(tx).take(100000).collect::>()); + + block.body.execution_payload.execution_payload.transactions = txs; + + let block = BeaconBlock::Merge(block); + assert!(block.ssz_bytes_len() > max_rpc_size(fork_context)); + SignedBeaconBlock::from_block(block, Signature::empty()) + } + fn status_message() -> StatusMessage { StatusMessage { fork_digest: [0; 4], @@ -678,10 +734,11 @@ mod tests { protocol: Protocol, version: Version, message: RPCCodedResponse, + fork_name: ForkName, ) -> Result { - let max_packet_size = 1_048_576; let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy); - let fork_context = Arc::new(fork_context()); + let fork_context = Arc::new(fork_context(fork_name)); + let max_packet_size = max_rpc_size(&fork_context); let mut buf = BytesMut::new(); let mut snappy_inbound_codec = @@ -691,14 +748,43 @@ mod tests { Ok(buf) } + fn encode_without_length_checks( + bytes: Vec, + fork_name: ForkName, + ) -> Result { + let fork_context = fork_context(fork_name); + let mut dst = BytesMut::new(); + + // Add context bytes if required + dst.extend_from_slice(&fork_context.to_context_bytes(fork_name).unwrap()); + + let mut uvi_codec: Uvi = Uvi::default(); + + // Inserts the length prefix of the uncompressed bytes into dst + // encoded as a unsigned varint + uvi_codec + .encode(bytes.len(), &mut dst) + .map_err(RPCError::from)?; + + let mut writer = FrameEncoder::new(Vec::new()); + writer.write_all(&bytes).map_err(RPCError::from)?; + writer.flush().map_err(RPCError::from)?; + + // Write compressed bytes to `dst` + dst.extend_from_slice(writer.get_ref()); + + Ok(dst) + } + /// Attempts to decode the given protocol bytes as an rpc response fn decode( protocol: Protocol, version: Version, message: &mut BytesMut, + fork_name: ForkName, ) -> Result>, RPCError> { let snappy_protocol_id = ProtocolId::new(protocol, version, Encoding::SSZSnappy); - let fork_context = Arc::new(fork_context()); + let fork_context = Arc::new(fork_context(fork_name)); let max_packet_size = max_rpc_size(&fork_context); let mut snappy_outbound_codec = SSZSnappyOutboundCodec::::new(snappy_protocol_id, max_packet_size, fork_context); @@ -711,9 +797,10 @@ mod tests { protocol: Protocol, version: Version, message: RPCCodedResponse, + fork_name: ForkName, ) -> Result>, RPCError> { - let mut encoded = encode(protocol, version.clone(), message)?; - decode(protocol, version, &mut encoded) + let mut encoded = encode(protocol, version.clone(), message, fork_name)?; + decode(protocol, version, &mut encoded, fork_name) } // Test RPCResponse encoding/decoding for V1 messages @@ -723,7 +810,8 @@ mod tests { encode_then_decode( Protocol::Status, Version::V1, - RPCCodedResponse::Success(RPCResponse::Status(status_message())) + RPCCodedResponse::Success(RPCResponse::Status(status_message())), + ForkName::Base, ), Ok(Some(RPCResponse::Status(status_message()))) ); @@ -732,7 +820,8 @@ mod tests { encode_then_decode( Protocol::Ping, Version::V1, - RPCCodedResponse::Success(RPCResponse::Pong(ping_message())) + RPCCodedResponse::Success(RPCResponse::Pong(ping_message())), + ForkName::Base, ), Ok(Some(RPCResponse::Pong(ping_message()))) ); @@ -741,7 +830,8 @@ mod tests { encode_then_decode( Protocol::BlocksByRange, Version::V1, - RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))), + ForkName::Base, ), Ok(Some(RPCResponse::BlocksByRange(Box::new(base_block())))) ); @@ -752,6 +842,7 @@ mod tests { Protocol::BlocksByRange, Version::V1, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))), + ForkName::Altair, ) .unwrap_err(), RPCError::SSZDecodeError(_) @@ -763,7 +854,8 @@ mod tests { encode_then_decode( Protocol::BlocksByRoot, Version::V1, - RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Base, ), Ok(Some(RPCResponse::BlocksByRoot(Box::new(base_block())))) ); @@ -774,6 +866,7 @@ mod tests { Protocol::BlocksByRoot, Version::V1, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))), + ForkName::Altair, ) .unwrap_err(), RPCError::SSZDecodeError(_) @@ -786,6 +879,7 @@ mod tests { Protocol::MetaData, Version::V1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), + ForkName::Base, ), Ok(Some(RPCResponse::MetaData(metadata()))), ); @@ -795,6 +889,7 @@ mod tests { Protocol::MetaData, Version::V1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), + ForkName::Base, ), Ok(Some(RPCResponse::MetaData(metadata()))), ); @@ -805,6 +900,7 @@ mod tests { Protocol::MetaData, Version::V1, RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())), + ForkName::Base, ), Ok(Some(RPCResponse::MetaData(metadata()))), ); @@ -819,6 +915,7 @@ mod tests { Protocol::Status, Version::V2, RPCCodedResponse::Success(RPCResponse::Status(status_message())), + ForkName::Base, ) .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), @@ -832,6 +929,7 @@ mod tests { Protocol::Ping, Version::V2, RPCCodedResponse::Success(RPCResponse::Pong(ping_message())), + ForkName::Base, ) .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), @@ -843,7 +941,8 @@ mod tests { encode_then_decode( Protocol::BlocksByRange, Version::V2, - RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))), + ForkName::Base, ), Ok(Some(RPCResponse::BlocksByRange(Box::new(base_block())))) ); @@ -852,35 +951,104 @@ mod tests { encode_then_decode( Protocol::BlocksByRange, Version::V2, - RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(altair_block()))), + ForkName::Altair, ), Ok(Some(RPCResponse::BlocksByRange(Box::new(altair_block())))) ); + let merge_block_small = merge_block_small(&fork_context(ForkName::Merge)); + let merge_block_large = merge_block_large(&fork_context(ForkName::Merge)); + + assert_eq!( + encode_then_decode( + Protocol::BlocksByRange, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new( + merge_block_small.clone() + ))), + ForkName::Merge, + ), + Ok(Some(RPCResponse::BlocksByRange(Box::new( + merge_block_small.clone() + )))) + ); + + let mut encoded = + encode_without_length_checks(merge_block_large.as_ssz_bytes(), ForkName::Merge) + .unwrap(); + + assert!( + matches!( + decode( + Protocol::BlocksByRange, + Version::V2, + &mut encoded, + ForkName::Merge, + ) + .unwrap_err(), + RPCError::InvalidData(_) + ), + "Decoding a block larger than max_rpc_size should fail" + ); + assert_eq!( encode_then_decode( Protocol::BlocksByRoot, Version::V2, - RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Base, ), - Ok(Some(RPCResponse::BlocksByRoot(Box::new(base_block())))) + Ok(Some(RPCResponse::BlocksByRoot(Box::new(base_block())))), ); assert_eq!( encode_then_decode( Protocol::BlocksByRoot, Version::V2, - RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))) + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))), + ForkName::Altair, ), Ok(Some(RPCResponse::BlocksByRoot(Box::new(altair_block())))) ); + assert_eq!( + encode_then_decode( + Protocol::BlocksByRoot, + Version::V2, + RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new( + merge_block_small.clone() + ))), + ForkName::Merge, + ), + Ok(Some(RPCResponse::BlocksByRoot(Box::new(merge_block_small)))) + ); + + let mut encoded = + encode_without_length_checks(merge_block_large.as_ssz_bytes(), ForkName::Merge) + .unwrap(); + + assert!( + matches!( + decode( + Protocol::BlocksByRoot, + Version::V2, + &mut encoded, + ForkName::Merge, + ) + .unwrap_err(), + RPCError::InvalidData(_) + ), + "Decoding a block larger than max_rpc_size should fail" + ); + // A MetaDataV1 still encodes as a MetaDataV2 since version is Version::V2 assert_eq!( encode_then_decode( Protocol::MetaData, Version::V2, - RPCCodedResponse::Success(RPCResponse::MetaData(metadata())) + RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), + ForkName::Base, ), Ok(Some(RPCResponse::MetaData(metadata_v2()))) ); @@ -889,7 +1057,8 @@ mod tests { encode_then_decode( Protocol::MetaData, Version::V2, - RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())) + RPCCodedResponse::Success(RPCResponse::MetaData(metadata_v2())), + ForkName::Altair, ), Ok(Some(RPCResponse::MetaData(metadata_v2()))) ); @@ -898,20 +1067,27 @@ mod tests { // Test RPCResponse encoding/decoding for V2 messages #[test] fn test_context_bytes_v2() { - let fork_context = fork_context(); + let fork_context = fork_context(ForkName::Altair); // Removing context bytes for v2 messages should error let mut encoded_bytes = encode( Protocol::BlocksByRange, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))), + ForkName::Base, ) .unwrap(); let _ = encoded_bytes.split_to(4); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut encoded_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut encoded_bytes, + ForkName::Base + ) + .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), )); @@ -919,13 +1095,20 @@ mod tests { Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Base, ) .unwrap(); let _ = encoded_bytes.split_to(4); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut encoded_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut encoded_bytes, + ForkName::Base + ) + .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), )); @@ -934,6 +1117,7 @@ mod tests { Protocol::BlocksByRange, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRange(Box::new(base_block()))), + ForkName::Altair, ) .unwrap(); @@ -943,7 +1127,13 @@ mod tests { wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut wrong_fork_bytes, + ForkName::Altair + ) + .unwrap_err(), RPCError::SSZDecodeError(_), )); @@ -952,6 +1142,7 @@ mod tests { Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(altair_block()))), + ForkName::Altair, ) .unwrap(); @@ -960,7 +1151,13 @@ mod tests { wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut wrong_fork_bytes, + ForkName::Altair + ) + .unwrap_err(), RPCError::SSZDecodeError(_), )); @@ -972,17 +1169,25 @@ mod tests { Protocol::MetaData, Version::V2, RPCCodedResponse::Success(RPCResponse::MetaData(metadata())), + ForkName::Altair, ) .unwrap(), ); - assert!(decode(Protocol::MetaData, Version::V2, &mut encoded_bytes).is_err()); + assert!(decode( + Protocol::MetaData, + Version::V2, + &mut encoded_bytes, + ForkName::Altair + ) + .is_err()); // Sending context bytes which do not correspond to any fork should return an error let mut encoded_bytes = encode( Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Altair, ) .unwrap(); @@ -991,7 +1196,13 @@ mod tests { wrong_fork_bytes.extend_from_slice(&encoded_bytes.split_off(4)); assert!(matches!( - decode(Protocol::BlocksByRange, Version::V2, &mut wrong_fork_bytes).unwrap_err(), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut wrong_fork_bytes, + ForkName::Altair + ) + .unwrap_err(), RPCError::ErrorResponse(RPCResponseErrorCode::InvalidRequest, _), )); @@ -1000,13 +1211,19 @@ mod tests { Protocol::BlocksByRoot, Version::V2, RPCCodedResponse::Success(RPCResponse::BlocksByRoot(Box::new(base_block()))), + ForkName::Altair, ) .unwrap(); let mut part = encoded_bytes.split_to(3); assert_eq!( - decode(Protocol::BlocksByRange, Version::V2, &mut part), + decode( + Protocol::BlocksByRange, + Version::V2, + &mut part, + ForkName::Altair + ), Ok(None) ) } @@ -1061,17 +1278,17 @@ mod tests { dst.extend_from_slice(writer.get_ref()); // 10 (for stream identifier) + 80 + 42 = 132 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. - assert_eq!( - decode(Protocol::Status, Version::V1, &mut dst).unwrap_err(), - RPCError::InvalidData - ); + assert!(matches!( + decode(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(), + RPCError::InvalidData(_) + )); } /// Test a malicious snappy encoding for a V2 `BlocksByRange` message where the attacker /// sends a valid message filled with a stream of useless padding before the actual message. #[test] fn test_decode_malicious_v2_message() { - let fork_context = Arc::new(fork_context()); + let fork_context = Arc::new(fork_context(ForkName::Altair)); // 10 byte snappy stream identifier let stream_identifier: &'static [u8] = b"\xFF\x06\x00\x00sNaPpY"; @@ -1118,10 +1335,16 @@ mod tests { dst.extend_from_slice(writer.get_ref()); // 10 (for stream identifier) + 176156 + 8103 = 184269 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`. - assert_eq!( - decode(Protocol::BlocksByRange, Version::V2, &mut dst).unwrap_err(), - RPCError::InvalidData - ); + assert!(matches!( + decode( + Protocol::BlocksByRange, + Version::V2, + &mut dst, + ForkName::Altair + ) + .unwrap_err(), + RPCError::InvalidData(_) + )); } /// Test sending a message with encoded length prefix > max_rpc_size. @@ -1157,9 +1380,9 @@ mod tests { writer.flush().unwrap(); dst.extend_from_slice(writer.get_ref()); - assert_eq!( - decode(Protocol::Status, Version::V1, &mut dst).unwrap_err(), - RPCError::InvalidData - ); + assert!(matches!( + decode(Protocol::Status, Version::V1, &mut dst, ForkName::Base).unwrap_err(), + RPCError::InvalidData(_) + )); } } diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 2b9e7c49020..b685c43348a 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -477,7 +477,7 @@ where ProtocolError::InvalidMessage | ProtocolError::TooManyProtocols => { // Peer is sending invalid data during the negotiation phase, not // participating in the protocol - RPCError::InvalidData + RPCError::InvalidData("Invalid message during negotiation".to_string()) } }, }; diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index e3ad6a803e5..d88f93de49f 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -63,7 +63,13 @@ lazy_static! { /// The `BeaconBlockMerge` block has an `ExecutionPayload` field which has a max size ~16 GiB for future proofing. /// We calculate the value from its fields instead of constructing the block and checking the length. - pub static ref SIGNED_BEACON_BLOCK_MERGE_MAX: usize = types::ExecutionPayload::::max_execution_payload_size(); + /// Note: This is only the theoretical upper bound. We further bound the max size we receive over the network + /// with `MAX_RPC_SIZE_POST_MERGE`. + pub static ref SIGNED_BEACON_BLOCK_MERGE_MAX: usize = + // Size of a full altair block + *SIGNED_BEACON_BLOCK_ALTAIR_MAX + + types::ExecutionPayload::::max_execution_payload_size() // adding max size of execution payload (~16gb) + + ssz::BYTES_PER_LENGTH_OFFSET; // Adding the additional ssz offset for the `ExecutionPayload` field pub static ref BLOCKS_BY_ROOT_REQUEST_MIN: usize = VariableList::::from(Vec::::new()) @@ -106,10 +112,9 @@ const REQUEST_TIMEOUT: u64 = 15; /// Returns the maximum bytes that can be sent across the RPC. pub fn max_rpc_size(fork_context: &ForkContext) -> usize { - if fork_context.fork_exists(ForkName::Merge) { - MAX_RPC_SIZE_POST_MERGE - } else { - MAX_RPC_SIZE + match fork_context.current_fork() { + ForkName::Merge => MAX_RPC_SIZE_POST_MERGE, + ForkName::Altair | ForkName::Base => MAX_RPC_SIZE, } } @@ -269,39 +274,39 @@ impl ProtocolId { } /// Returns min and max size for messages of given protocol id responses. - pub fn rpc_response_limits(&self) -> RpcLimits { + pub fn rpc_response_limits(&self, fork_context: &ForkContext) -> RpcLimits { match self.message_name { Protocol::Status => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), ), Protocol::Goodbye => RpcLimits::new(0, 0), // Goodbye request has no response - Protocol::BlocksByRange => RpcLimits::new( - std::cmp::min( - std::cmp::min( - *SIGNED_BEACON_BLOCK_ALTAIR_MIN, - *SIGNED_BEACON_BLOCK_BASE_MIN, - ), - *SIGNED_BEACON_BLOCK_MERGE_MIN, + Protocol::BlocksByRange => match fork_context.current_fork() { + ForkName::Base => { + RpcLimits::new(*SIGNED_BEACON_BLOCK_BASE_MIN, *SIGNED_BEACON_BLOCK_BASE_MAX) + } + ForkName::Altair => RpcLimits::new( + *SIGNED_BEACON_BLOCK_ALTAIR_MIN, + *SIGNED_BEACON_BLOCK_ALTAIR_MAX, ), - std::cmp::max( - std::cmp::max( - *SIGNED_BEACON_BLOCK_ALTAIR_MAX, - *SIGNED_BEACON_BLOCK_BASE_MAX, - ), + ForkName::Merge => RpcLimits::new( + *SIGNED_BEACON_BLOCK_MERGE_MIN, *SIGNED_BEACON_BLOCK_MERGE_MAX, ), - ), - Protocol::BlocksByRoot => RpcLimits::new( - std::cmp::min( + }, + Protocol::BlocksByRoot => match fork_context.current_fork() { + ForkName::Base => { + RpcLimits::new(*SIGNED_BEACON_BLOCK_BASE_MIN, *SIGNED_BEACON_BLOCK_BASE_MAX) + } + ForkName::Altair => RpcLimits::new( *SIGNED_BEACON_BLOCK_ALTAIR_MIN, - *SIGNED_BEACON_BLOCK_BASE_MIN, - ), - std::cmp::max( *SIGNED_BEACON_BLOCK_ALTAIR_MAX, - *SIGNED_BEACON_BLOCK_BASE_MAX, ), - ), + ForkName::Merge => RpcLimits::new( + *SIGNED_BEACON_BLOCK_MERGE_MIN, + *SIGNED_BEACON_BLOCK_MERGE_MAX, + ), + }, Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), @@ -528,7 +533,7 @@ pub enum RPCError { /// Stream ended unexpectedly. IncompleteStream, /// Peer sent invalid data. - InvalidData, + InvalidData(String), /// An error occurred due to internal reasons. Ex: timer failure. InternalError(&'static str), /// Negotiation with this peer timed out. @@ -562,7 +567,7 @@ impl std::fmt::Display for RPCError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match *self { RPCError::SSZDecodeError(ref err) => write!(f, "Error while decoding ssz: {:?}", err), - RPCError::InvalidData => write!(f, "Peer sent unexpected data"), + RPCError::InvalidData(ref err) => write!(f, "Peer sent unexpected data: {}", err), RPCError::IoError(ref err) => write!(f, "IO Error: {}", err), RPCError::ErrorResponse(ref code, ref reason) => write!( f, @@ -589,7 +594,7 @@ impl std::error::Error for RPCError { RPCError::StreamTimeout => None, RPCError::UnsupportedProtocol => None, RPCError::IncompleteStream => None, - RPCError::InvalidData => None, + RPCError::InvalidData(_) => None, RPCError::InternalError(_) => None, RPCError::ErrorResponse(_, _) => None, RPCError::NegotiationTimeout => None, diff --git a/beacon_node/lighthouse_network/tests/common/mod.rs b/beacon_node/lighthouse_network/tests/common/mod.rs index e79fdf464dd..ea770de6c23 100644 --- a/beacon_node/lighthouse_network/tests/common/mod.rs +++ b/beacon_node/lighthouse_network/tests/common/mod.rs @@ -10,7 +10,9 @@ use std::sync::Arc; use std::sync::Weak; use std::time::Duration; use tokio::runtime::Runtime; -use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, Hash256, MinimalEthSpec}; +use types::{ + ChainSpec, EnrForkId, Epoch, EthSpec, ForkContext, ForkName, Hash256, MinimalEthSpec, Slot, +}; use unused_port::unused_tcp_port; #[allow(clippy::type_complexity)] @@ -26,13 +28,20 @@ type ReqId = usize; use tempfile::Builder as TempBuilder; /// Returns a dummy fork context -pub fn fork_context() -> ForkContext { +pub fn fork_context(fork_name: ForkName) -> ForkContext { let mut chain_spec = E::default_spec(); - // Set fork_epoch to `Some` to ensure that the `ForkContext` object - // includes altair in the list of forks - chain_spec.altair_fork_epoch = Some(types::Epoch::new(42)); - chain_spec.bellatrix_fork_epoch = Some(types::Epoch::new(84)); - ForkContext::new::(types::Slot::new(0), Hash256::zero(), &chain_spec) + let altair_fork_epoch = Epoch::new(1); + let merge_fork_epoch = Epoch::new(2); + + chain_spec.altair_fork_epoch = Some(altair_fork_epoch); + chain_spec.bellatrix_fork_epoch = Some(merge_fork_epoch); + + let current_slot = match fork_name { + ForkName::Base => Slot::new(0), + ForkName::Altair => altair_fork_epoch.start_slot(E::slots_per_epoch()), + ForkName::Merge => merge_fork_epoch.start_slot(E::slots_per_epoch()), + }; + ForkContext::new::(current_slot, Hash256::zero(), &chain_spec) } pub struct Libp2pInstance(LibP2PService, exit_future::Signal); @@ -90,6 +99,7 @@ pub async fn build_libp2p_instance( rt: Weak, boot_nodes: Vec, log: slog::Logger, + fork_name: ForkName, ) -> Libp2pInstance { let port = unused_tcp_port().unwrap(); let config = build_config(port, boot_nodes); @@ -101,7 +111,7 @@ pub async fn build_libp2p_instance( let libp2p_context = lighthouse_network::Context { config: &config, enr_fork_id: EnrForkId::default(), - fork_context: Arc::new(fork_context()), + fork_context: Arc::new(fork_context(fork_name)), chain_spec: &ChainSpec::minimal(), gossipsub_registry: None, }; @@ -125,10 +135,11 @@ pub async fn build_full_mesh( rt: Weak, log: slog::Logger, n: usize, + fork_name: ForkName, ) -> Vec { let mut nodes = Vec::with_capacity(n); for _ in 0..n { - nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone()).await); + nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name).await); } let multiaddrs: Vec = nodes .iter() @@ -154,12 +165,13 @@ pub async fn build_full_mesh( pub async fn build_node_pair( rt: Weak, log: &slog::Logger, + fork_name: ForkName, ) -> (Libp2pInstance, Libp2pInstance) { let sender_log = log.new(o!("who" => "sender")); let receiver_log = log.new(o!("who" => "receiver")); - let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log).await; - let mut receiver = build_libp2p_instance(rt, vec![], receiver_log).await; + let mut sender = build_libp2p_instance(rt.clone(), vec![], sender_log, fork_name).await; + let mut receiver = build_libp2p_instance(rt, vec![], receiver_log, fork_name).await; let receiver_multiaddr = receiver.swarm.behaviour_mut().local_enr().multiaddr()[1].clone(); @@ -198,10 +210,15 @@ pub async fn build_node_pair( // Returns `n` peers in a linear topology #[allow(dead_code)] -pub async fn build_linear(rt: Weak, log: slog::Logger, n: usize) -> Vec { +pub async fn build_linear( + rt: Weak, + log: slog::Logger, + n: usize, + fork_name: ForkName, +) -> Vec { let mut nodes = Vec::with_capacity(n); for _ in 0..n { - nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone()).await); + nodes.push(build_libp2p_instance(rt.clone(), vec![], log.clone(), fork_name).await); } let multiaddrs: Vec = nodes diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index a270e404478..5895d32d5dc 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -12,7 +12,7 @@ use tokio::runtime::Runtime; use tokio::time::sleep; use types::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockMerge, Epoch, EthSpec, ForkContext, - Hash256, MinimalEthSpec, Signature, SignedBeaconBlock, Slot, + ForkName, Hash256, MinimalEthSpec, Signature, SignedBeaconBlock, Slot, }; mod common; @@ -23,7 +23,7 @@ type E = MinimalEthSpec; fn merge_block_small(fork_context: &ForkContext) -> BeaconBlock { let mut block = BeaconBlockMerge::::empty(&E::default_spec()); let tx = VariableList::from(vec![0; 1024]); - let txs = VariableList::from(std::iter::repeat(tx).take(100).collect::>()); + let txs = VariableList::from(std::iter::repeat(tx).take(5000).collect::>()); block.body.execution_payload.execution_payload.transactions = txs; @@ -61,7 +61,8 @@ fn test_status_rpc() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // Dummy STATUS RPC message let rpc_request = Request::Status(StatusMessage { @@ -159,7 +160,8 @@ fn test_blocks_by_range_chunked_rpc() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { @@ -179,7 +181,7 @@ fn test_blocks_by_range_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_altair = Response::BlocksByRange(Some(Box::new(signed_full_block))); - let full_block = merge_block_small(&common::fork_context()); + let full_block = merge_block_small(&common::fork_context(ForkName::Merge)); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_small = Response::BlocksByRange(Some(Box::new(signed_full_block))); @@ -298,7 +300,8 @@ fn test_blocks_by_range_over_limit() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { @@ -308,7 +311,7 @@ fn test_blocks_by_range_over_limit() { }); // BlocksByRange Response - let full_block = merge_block_large(&common::fork_context()); + let full_block = merge_block_large(&common::fork_context(ForkName::Merge)); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_large = Response::BlocksByRange(Some(Box::new(signed_full_block))); @@ -395,7 +398,8 @@ fn test_blocks_by_range_chunked_rpc_terminates_correctly() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { @@ -526,7 +530,8 @@ fn test_blocks_by_range_single_empty_rpc() { rt.block_on(async { // get sender/receiver - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // BlocksByRange Request let rpc_request = Request::BlocksByRange(BlocksByRangeRequest { @@ -641,7 +646,8 @@ fn test_blocks_by_root_chunked_rpc() { let rt = Arc::new(Runtime::new().unwrap()); // get sender/receiver rt.block_on(async { - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Merge).await; // BlocksByRoot Request let rpc_request = Request::BlocksByRoot(BlocksByRootRequest { @@ -664,7 +670,7 @@ fn test_blocks_by_root_chunked_rpc() { let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_altair = Response::BlocksByRoot(Some(Box::new(signed_full_block))); - let full_block = merge_block_small(&common::fork_context()); + let full_block = merge_block_small(&common::fork_context(ForkName::Merge)); let signed_full_block = SignedBeaconBlock::from_block(full_block, Signature::empty()); let rpc_response_merge_small = Response::BlocksByRoot(Some(Box::new(signed_full_block))); @@ -779,7 +785,8 @@ fn test_blocks_by_root_chunked_rpc_terminates_correctly() { let rt = Arc::new(Runtime::new().unwrap()); // get sender/receiver rt.block_on(async { - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // BlocksByRoot Request let rpc_request = Request::BlocksByRoot(BlocksByRootRequest { @@ -916,7 +923,8 @@ fn test_goodbye_rpc() { let rt = Arc::new(Runtime::new().unwrap()); // get sender/receiver rt.block_on(async { - let (mut sender, mut receiver) = common::build_node_pair(Arc::downgrade(&rt), &log).await; + let (mut sender, mut receiver) = + common::build_node_pair(Arc::downgrade(&rt), &log, ForkName::Base).await; // build the sender future let sender_future = async {