Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download range of headers during syncing #1270

Merged
merged 50 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
fb481a0
Add requests for blocks to p2p service with test
MitchTurner Aug 1, 2023
2ce71ea
Create stream from singlee headers request rather than multiple requests
MitchTurner Aug 2, 2023
18cad5f
WIP fix tests
MitchTurner Aug 3, 2023
47b7347
Finish tests, remove unused functions
MitchTurner Aug 3, 2023
2df5a9b
Appease Clippy-sama
MitchTurner Aug 3, 2023
964e798
Get headers range test working
MitchTurner Aug 4, 2023
341e54b
Cleanup, make test more robust
MitchTurner Aug 4, 2023
7c25646
Rename tests
MitchTurner Aug 4, 2023
e677568
Fix back-pressure tests
MitchTurner Aug 4, 2023
fa39cd5
Fix new service test
MitchTurner Aug 4, 2023
3ab1ec1
Add impl for adapter
MitchTurner Aug 4, 2023
ce4c4b6
Fix data mapping
MitchTurner Aug 4, 2023
5abf18a
Appease Clippy-sama
MitchTurner Aug 4, 2023
318afda
Merge branch 'master' into download-block-range
xgreenx Aug 6, 2023
560ff4e
Remove not used variant
xgreenx Aug 6, 2023
b61b3b7
Merge remote-tracking branch 'origin/download-block-range' into downl…
xgreenx Aug 6, 2023
9ed60e4
Improve test, use range instead of start/end
MitchTurner Aug 7, 2023
a3854e8
Merge branch 'download-block-range' of github.com:FuelLabs/fuel-core …
MitchTurner Aug 7, 2023
432d49c
Improve the header filtering, rename helper, cleanup missed comments
MitchTurner Aug 7, 2023
53136b7
Appease clippy-sama
MitchTurner Aug 7, 2023
0aa691a
Merge remote-tracking branch 'origin/download-block-range' into downl…
xgreenx Aug 7, 2023
9b6d9ee
Fix off-by-one error, rename field
MitchTurner Aug 7, 2023
2523661
Merge branch 'download-block-range' of github.com:FuelLabs/fuel-core …
MitchTurner Aug 7, 2023
2733c5d
Merge remote-tracking branch 'origin/master' into download-block-range
MitchTurner Aug 10, 2023
b1e970f
Update Changelog
MitchTurner Aug 10, 2023
ee9efdd
Get headers in batches
MitchTurner Aug 11, 2023
efb7703
Cleanup commented code
MitchTurner Aug 11, 2023
66b7d7c
Merge remote-tracking branch 'origin/master' into download-block-range
MitchTurner Aug 11, 2023
0215ed1
Add new fields to sync config
MitchTurner Aug 11, 2023
1972302
Instrument
MitchTurner Aug 11, 2023
fabb2b3
Helm lint
MitchTurner Aug 11, 2023
2b75c60
Address most of the PR comments
MitchTurner Aug 14, 2023
b5df8d4
Merge branch 'master' into download-block-range
MitchTurner Aug 14, 2023
7ec0ac0
Fix compilation errors
MitchTurner Aug 14, 2023
1d4328a
Update crates/services/p2p/src/service.rs
MitchTurner Aug 15, 2023
3283587
Update crates/services/sync/src/ports.rs
MitchTurner Aug 15, 2023
d1f809a
Update crates/services/sync/src/import.rs
MitchTurner Aug 15, 2023
94fc48b
Update crates/services/sync/src/import.rs
MitchTurner Aug 15, 2023
c6a4b3d
Make PR requested changes
MitchTurner Aug 15, 2023
9c0412e
Add ignore for RustSec advisory
MitchTurner Aug 15, 2023
5d63451
Undo bump, remove unused imports
MitchTurner Aug 15, 2023
9a174b5
Add max requests check with config
MitchTurner Aug 16, 2023
98856e6
Remove sealed header (singular) path from p2p service
MitchTurner Aug 16, 2023
91d5253
Merge remote-tracking branch 'origin/master' into download-block-range
MitchTurner Aug 16, 2023
5332d01
Update crates/services/p2p/src/p2p_service.rs
MitchTurner Aug 16, 2023
f2d37b0
Manually format macro
MitchTurner Aug 16, 2023
978b488
Merge branch 'master' into download-block-range
MitchTurner Aug 17, 2023
5970d6a
Add max headers per request to deployment env vars
MitchTurner Aug 17, 2023
6132f5d
Helm lint, add other vars
MitchTurner Aug 17, 2023
46e2c17
Wrap headers in option to represent ambiguous failed request
MitchTurner Aug 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/fuel-core/src/service/adapters/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ impl P2pDb for Database {
self.get_sealed_block_by_height(height)
}

fn get_sealed_blocks_inclusive(
&self,
start_height: &BlockHeight,
end_height: &BlockHeight,
) -> StorageResult<Vec<SealedBlock>> {
todo!()
}

fn get_sealed_header(
&self,
height: &BlockHeight,
Expand Down
8 changes: 8 additions & 0 deletions crates/services/p2p/src/codecs/postcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ impl RequestResponseConverter for PostcardCodec {

Ok(ResponseMessage::SealedBlock(response))
}
NetworkResponse::BlocksInclusive(block_bytes) => {
let response = self.deserialize(block_bytes)?;
Ok(ResponseMessage::SealedBlocksInclusive(response))
}
NetworkResponse::Header(header_bytes) => {
let response = if let Some(header_bytes) = header_bytes {
Some(self.deserialize(header_bytes)?)
Expand Down Expand Up @@ -239,6 +243,10 @@ impl RequestResponseConverter for PostcardCodec {

Ok(NetworkResponse::Block(response))
}
OutboundResponse::BlocksInclusive(sealed_blocks) => {
let bytes = self.serialize(sealed_blocks)?;
Ok(NetworkResponse::BlocksInclusive(bytes))
}
OutboundResponse::SealedHeader(sealed_header) => {
let response = if let Some(sealed_header) = sealed_header {
Some(self.serialize(sealed_header.as_ref())?)
Expand Down
70 changes: 68 additions & 2 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,17 @@ impl<Codec: NetworkCodec> FuelP2PService<Codec> {
);
}
}
(
Some(ResponseChannelItem::BlocksInclusive(channel)),
Ok(ResponseMessage::SealedBlocksInclusive(blocks)),
) => {
if channel.send(Some(blocks)).is_err() {
debug!(
"Failed to send through the channel for {:?}",
request_id
);
}
}
(
Some(ResponseChannelItem::Transactions(channel)),
Ok(ResponseMessage::Transactions(transactions)),
Expand Down Expand Up @@ -1514,6 +1525,31 @@ mod tests {
}
}

fn arbitrary_blocks() -> Vec<SealedBlock> {
let mut blocks = Vec::new();
for i in 2..=5 {
let block = Block::new(
PartialBlockHeader::default(),
(0..i).map(|_| Transaction::default_test_tx()).collect(),
&[],
);

let sealed_block = SealedBlock {
entity: block,
consensus: Consensus::PoA(PoAConsensus::new(Default::default())),
};
blocks.push(sealed_block);
}
blocks
}

// Metadata gets skipped during serialization, so this is the fuzzy way to compare blocks
fn eq_except_metadata(a: &SealedBlock, b: &SealedBlock) -> bool {
a.entity.header().application == b.entity.header().application
&& a.entity.header().consensus == b.entity.header().consensus
&& a.entity.transactions() == b.entity.transactions()
}

async fn request_response_works_with(request_msg: RequestMessage) {
let mut p2p_config = Config::default_initialized("request_response_works_with");

Expand All @@ -1524,15 +1560,15 @@ mod tests {
p2p_config.bootstrap_nodes = node_a.multiaddrs();
let mut node_b = build_service_from_config(p2p_config.clone()).await;

let (tx_test_end, mut rx_test_end) = mpsc::channel(1);
let (tx_test_end, mut rx_test_end) = mpsc::channel::<bool>(1);

let mut request_sent = false;

loop {
tokio::select! {
message_sent = rx_test_end.recv() => {
// we received a signal to end the test
assert_eq!(message_sent, Some(true), "Receuved incorrect or missing missing messsage");
assert!(message_sent.unwrap(), "Receuved incorrect or missing missing messsage");
break;
}
node_a_event = node_a.next_event() => {
Expand Down Expand Up @@ -1560,6 +1596,24 @@ mod tests {
});

}
RequestMessage::BlocksInclusive(_, _) => {
let (tx_orchestrator, rx_orchestrator) = oneshot::channel();
assert!(node_a.send_request_msg(None, request_msg, ResponseChannelItem::BlocksInclusive(tx_orchestrator)).is_ok());
let tx_test_end = tx_test_end.clone();

tokio::spawn(async move {
let response_message = rx_orchestrator.await;

if let Ok(Some(sealed_blocks)) = response_message {
let expected = arbitrary_blocks();
let check = sealed_blocks.iter().zip(expected.iter()).all(|(a, b)| eq_except_metadata(a, b));
let _ = tx_test_end.send(check).await;
} else {
tracing::error!("Orchestrator failed to receive a message: {:?}", response_message);
let _ = tx_test_end.send(false).await;
}
});
}
RequestMessage::SealedHeader(_) => {
let (tx_orchestrator, rx_orchestrator) = oneshot::channel();
assert!(node_a.send_request_msg(None, request_msg, ResponseChannelItem::SealedHeader(tx_orchestrator)).is_ok());
Expand Down Expand Up @@ -1613,6 +1667,11 @@ mod tests {

let _ = node_b.send_response_msg(request_id, OutboundResponse::Block(Some(Arc::new(sealed_block))));
}
RequestMessage::BlocksInclusive(_, _) => {
let blocks = arbitrary_blocks();

let _ = node_b.send_response_msg(request_id, OutboundResponse::BlocksInclusive(blocks));
}
RequestMessage::SealedHeader(_) => {
let header = Default::default();

Expand Down Expand Up @@ -1650,6 +1709,13 @@ mod tests {
request_response_works_with(RequestMessage::Block(0.into())).await
}

#[tokio::test]
#[instrument]
async fn request_response_works_with_blocks_inclusive() {
request_response_works_with(RequestMessage::BlocksInclusive(2.into(), 5.into()))
.await
}

#[tokio::test]
#[instrument]
async fn request_response_works_with_sealed_header() {
Expand Down
6 changes: 6 additions & 0 deletions crates/services/p2p/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ pub trait P2pDb: Send + Sync {
height: &BlockHeight,
) -> StorageResult<Option<SealedBlock>>;

fn get_sealed_blocks_inclusive(
&self,
start_height: &BlockHeight,
end_height: &BlockHeight,
) -> StorageResult<Vec<SealedBlock>>;

fn get_sealed_header(
&self,
height: &BlockHeight,
Expand Down
5 changes: 5 additions & 0 deletions crates/services/p2p/src/request_response/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub type ChannelItem<T> = oneshot::Sender<Option<T>>;
#[derive(Serialize, Deserialize, Eq, PartialEq, Debug, Clone, Copy)]
pub enum RequestMessage {
Block(BlockHeight),
BlocksInclusive(BlockHeight, BlockHeight),
SealedHeader(BlockHeight),
Transactions(#[serde_as(as = "FromInto<[u8; 32]>")] BlockId),
}
Expand All @@ -49,6 +50,7 @@ pub enum RequestMessage {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum ResponseMessage {
SealedBlock(Option<SealedBlock>),
SealedBlocksInclusive(Vec<SealedBlock>),
xgreenx marked this conversation as resolved.
Show resolved Hide resolved
SealedHeader(Option<SealedBlockHeader>),
Transactions(Option<Vec<Transaction>>),
}
Expand All @@ -57,6 +59,7 @@ pub enum ResponseMessage {
#[derive(Debug)]
pub enum ResponseChannelItem {
Block(ChannelItem<SealedBlock>),
BlocksInclusive(ChannelItem<Vec<SealedBlock>>),
SealedHeader(ChannelItem<(PeerId, SealedBlockHeader)>),
Transactions(ChannelItem<Vec<Transaction>>),
}
Expand All @@ -66,6 +69,7 @@ pub enum ResponseChannelItem {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum NetworkResponse {
Block(Option<Vec<u8>>),
BlocksInclusive(Vec<u8>),
xgreenx marked this conversation as resolved.
Show resolved Hide resolved
Header(Option<Vec<u8>>),
Transactions(Option<Vec<u8>>),
}
Expand All @@ -75,6 +79,7 @@ pub enum NetworkResponse {
#[derive(Debug, Clone)]
pub enum OutboundResponse {
Block(Option<Arc<SealedBlock>>),
BlocksInclusive(Vec<SealedBlock>),
xgreenx marked this conversation as resolved.
Show resolved Hide resolved
SealedHeader(Option<Arc<SealedBlockHeader>>),
Transactions(Option<Arc<Vec<Transaction>>>),
xgreenx marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down
42 changes: 42 additions & 0 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ enum TaskRequest {
height: BlockHeight,
channel: oneshot::Sender<Option<SealedBlock>>,
},
GetBlocksInclusive {
from_height: BlockHeight,
to_height: BlockHeight,
channel: oneshot::Sender<Option<Vec<SealedBlock>>>,
},
GetSealedHeader {
height: BlockHeight,
channel: oneshot::Sender<Option<(PeerId, SealedBlockHeader)>>,
Expand Down Expand Up @@ -222,6 +227,12 @@ where
let peer = self.p2p_service.peer_manager().get_peer_id_with_height(&height);
let _ = self.p2p_service.send_request_msg(peer, request_msg, channel_item);
}
Some(TaskRequest::GetBlocksInclusive { from_height, to_height, channel }) => {
let request_msg = RequestMessage::BlocksInclusive(from_height, to_height);
let channel_item = ResponseChannelItem::BlocksInclusive(channel);
let peer = self.p2p_service.peer_manager().get_peer_id_with_height(&from_height);
let _ = self.p2p_service.send_request_msg(peer, request_msg, channel_item);
}
Some(TaskRequest::GetSealedHeader{ height, channel: response }) => {
let request_msg = RequestMessage::SealedHeader(height);
let channel_item = ResponseChannelItem::SealedHeader(response);
Expand Down Expand Up @@ -282,6 +293,11 @@ where
.map(Arc::new);
let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::Block(block_response));
}
RequestMessage::BlocksInclusive(from_height, to_height) => {
let blocks_response = self.db.get_sealed_blocks_inclusive(&from_height, &to_height)?;

let _ = self.p2p_service.send_response_msg(request_id, OutboundResponse::BlocksInclusive(blocks_response));
}
RequestMessage::Transactions(block_id) => {
let transactions_response = self.db.get_transactions(&block_id)?
.map(Arc::new);
Expand Down Expand Up @@ -366,6 +382,24 @@ impl SharedState {
receiver.await.map_err(|e| anyhow!("{}", e))
}

pub async fn get_blocks_inclusive(
&self,
from_height: BlockHeight,
to_height: BlockHeight,
) -> anyhow::Result<Option<Vec<SealedBlock>>> {
let (sender, receiver) = oneshot::channel();

self.request_sender
.send(TaskRequest::GetBlocksInclusive {
from_height,
to_height,
channel: sender,
})
.await?;

receiver.await.map_err(|e| anyhow!("{}", e))
}

pub async fn get_sealed_block_header(
&self,
height: BlockHeight,
Expand Down Expand Up @@ -556,6 +590,14 @@ pub mod tests {
}))
}

fn get_sealed_blocks_inclusive(
&self,
_start_height: &BlockHeight,
_end_height: &BlockHeight,
) -> StorageResult<Vec<SealedBlock>> {
todo!()
}

fn get_sealed_header(
&self,
_height: &BlockHeight,
Expand Down
Loading
Loading