Skip to content

Commit

Permalink
feat: consistent handling of edge cases for header sync (#5837)
Browse files Browse the repository at this point in the history
Description
---
This PR fixes an edge case for header sync where the local node has more
headers, but the remote node has better proof-of-work (accumulated
difficulty), thus higher actual blockchain height. It also simplifies
the attempted header sync logic and error handling when a peer does not
return headers and introduces consistent naming, variable re-use and use
of information

Supersedes #5756

Motivation and Context
---
When doing header sync, and determining from which peer to sync, we need
to consider our actual blockchain state when comparing chains. It is
possible that our node has valid block headers that will translate into
a higher proof of work when fully synced than the remote syncing peer,
but lacking the blocks to back the block headers, thus the remote chain
has an actual higher proof of work blockchain.
With the current implementation, race conditions can exist when
determining if the peer is misbehaving, i.e. lying about their
proof-of-work or not wanting to supply block headers, due to a mismatch
in tip height used for the check.

This PR fixes the race conditions by always using the latest local chain
metadata and block headers, ignoring all sync peers that do not exceed
our own accumulated difficulty and using consistent information in all
the checks.

How Has This Been Tested?
---
Added integration-level unit tests:
- `test_header_sync_happy_path`
- `test_header_sync_with_fork_happy_path`
- `test_header_sync_uneven_headers_and_blocks_happy_path`
-
`test_header_sync_uneven_headers_and_blocks_peer_lies_about_pow_no_ban`
-
`test_header_sync_even_headers_and_blocks_peer_lies_about_pow_with_ban`
-
`test_header_sync_even_headers_and_blocks_peer_metadata_improve_with_reorg`

What process can a PR reviewer use to test or verify this change?
---
- Code walk-through
- Review unit tests

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: SW van Heerden <swvheerden@gmail.com>
  • Loading branch information
3 people authored Oct 24, 2023
1 parent d9e8e22 commit 3e1ec1f
Show file tree
Hide file tree
Showing 22 changed files with 991 additions and 187 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ impl TransactionsTab {
let status = Span::styled(status_msg, Style::default().fg(Color::White));
let message = Span::styled(tx.message.as_str(), Style::default().fg(Color::White));

// let mined_time = DateTime::<Local>::from_utc(tx.mined_timestamp, Local::now().offset().to_owned());
let mined_timestamp = Span::styled(
match tx.mined_timestamp {
None => String::new(),
Expand All @@ -398,7 +397,6 @@ impl TransactionsTab {
.format("%Y-%m-%d %H:%M:%S")
),
},
// format!("{}", mined_time.format("%Y-%m-%d %H:%M:%S")),
Style::default().fg(Color::White),
);

Expand Down
2 changes: 0 additions & 2 deletions base_layer/core/src/base_node/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ message FindChainSplitResponse {
// `FindChainSplitRequest::block_hashes`. This value could also be used to
// know how far back a split occurs.
uint64 fork_hash_index = 2;
// The current header height of this node
uint64 tip_height = 3;
}

message SyncKernelsRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
db.set_disable_add_block_flag();
HeaderSync(HeaderSyncState::new(sync_peers, local_metadata))
},
(HeaderSync(s), HeaderSyncFailed) => {
(HeaderSync(s), HeaderSyncFailed(_err)) => {
db.clear_disable_add_block_flag();
Waiting(s.into())
},
(HeaderSync(s), Continue | NetworkSilence) => {
db.clear_disable_add_block_flag();
Listening(s.into())
},
(HeaderSync(s), HeadersSynchronized(_)) => DecideNextSync(s.into()),
(HeaderSync(s), HeadersSynchronized(..)) => DecideNextSync(s.into()),

(DecideNextSync(_), ProceedToHorizonSync(peers)) => HorizonStateSync(peers.into()),
(DecideNextSync(s), Continue) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::base_node::{
Starting,
Waiting,
},
sync::{HorizonSyncInfo, SyncPeer},
sync::{AttemptSyncResult, HorizonSyncInfo, SyncPeer},
};

#[derive(Debug)]
Expand All @@ -57,8 +57,8 @@ pub enum BaseNodeState {
#[derive(Debug, Clone, PartialEq)]
pub enum StateEvent {
Initialized,
HeadersSynchronized(SyncPeer),
HeaderSyncFailed,
HeadersSynchronized(SyncPeer, AttemptSyncResult),
HeaderSyncFailed(String),
ProceedToHorizonSync(Vec<SyncPeer>),
ProceedToBlockSync(Vec<SyncPeer>),
HorizonStateSynchronized,
Expand Down Expand Up @@ -145,8 +145,8 @@ impl Display for StateEvent {
match self {
Initialized => write!(f, "Initialized"),
BlocksSynchronized => write!(f, "Synchronised Blocks"),
HeadersSynchronized(peer) => write!(f, "Headers Synchronized from peer `{}`", peer),
HeaderSyncFailed => write!(f, "Header Synchronization Failed"),
HeadersSynchronized(peer, result) => write!(f, "Headers Synchronized from peer `{}` ({:?})", peer, result),
HeaderSyncFailed(err) => write!(f, "Header Synchronization Failed ({})", err),
ProceedToHorizonSync(_) => write!(f, "Proceed to horizon sync"),
ProceedToBlockSync(_) => write!(f, "Proceed to block sync"),
HorizonStateSynchronized => write!(f, "Horizon State Synchronized"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{cmp::Ordering, time::Instant};

use log::*;
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::peer_manager::NodeId;

use crate::{
base_node::{
Expand All @@ -35,7 +36,6 @@ use crate::{
},
chain_storage::BlockchainBackend,
};

const LOG_TARGET: &str = "c::bn::header_sync";

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -70,12 +70,42 @@ impl HeaderSyncState {
self.sync_peers
}

fn remove_sync_peer(&mut self, node_id: &NodeId) {
if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) {
self.sync_peers.remove(pos);
}
}

// converting u64 to i64 is okay as the future time limit is the hundreds so way below u32 even
#[allow(clippy::too_many_lines)]
#[allow(clippy::cast_possible_wrap)]
pub async fn next_event<B: BlockchainBackend + 'static>(
&mut self,
shared: &mut BaseNodeStateMachine<B>,
) -> StateEvent {
// Only sync to peers with better claimed accumulated difficulty than the local chain: this may be possible
// at this stage due to read-write lock race conditions in the database
match shared.db.get_chain_metadata().await {
Ok(best_block_metadata) => {
let mut remove = Vec::new();
for sync_peer in &self.sync_peers {
if sync_peer.claimed_chain_metadata().accumulated_difficulty() <=
best_block_metadata.accumulated_difficulty()
{
remove.push(sync_peer.node_id().clone());
}
}
for node_id in remove {
self.remove_sync_peer(&node_id);
}
if self.sync_peers.is_empty() {
// Go back to Listening state
return StateEvent::Continue;
}
},
Err(e) => return StateEvent::FatalError(format!("{}", e)),
}

let mut synchronizer = HeaderSynchronizer::new(
shared.config.blockchain_sync_config.clone(),
shared.db.clone(),
Expand Down Expand Up @@ -128,7 +158,7 @@ impl HeaderSyncState {
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
match synchronizer.synchronize().await {
Ok(sync_peer) => {
Ok((sync_peer, sync_result)) => {
log_mdc::extend(mdc);
info!(
target: LOG_TARGET,
Expand All @@ -144,7 +174,7 @@ impl HeaderSyncState {
}
}
self.is_synced = true;
StateEvent::HeadersSynchronized(sync_peer)
StateEvent::HeadersSynchronized(sync_peer, sync_result)
},
Err(err) => {
let _ignore = shared.status_event_sender.send(StatusInfo {
Expand All @@ -163,7 +193,7 @@ impl HeaderSyncState {
_ => {
log_mdc::extend(mdc);
debug!(target: LOG_TARGET, "Header sync failed: {}", err);
StateEvent::HeaderSyncFailed
StateEvent::HeaderSyncFailed(err.to_string())
},
}
},
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/sync/header_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ pub use error::BlockHeaderSyncError;
mod validator;

mod synchronizer;
pub use synchronizer::HeaderSynchronizer;
pub use synchronizer::{AttemptSyncResult, HeaderSyncStatus, HeaderSynchronizer};
Loading

0 comments on commit 3e1ec1f

Please sign in to comment.