Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return Pending rather than loop around if no new finalized hash in submit_transaction #1378

Merged
merged 2 commits into from
Jan 18, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 38 additions & 58 deletions subxt/src/backend/unstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,61 +480,43 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
// with chainHead_follow.
let mut finalized_hash: Option<T::Hash> = None;

// Monitor usage to help track down an obscure issue.
// Record the start time so that we can time out if things appear to take too long.
let start_instant = instant::Instant::now();
let mut last_seen_follow_event_str = None;
let mut last_seen_follow_event_time = None;
let mut last_seen_tx_event_str = None;
let mut last_seen_tx_event_time = None;
let mut num_polls: usize = 0;
let mut num_loops: usize = 0;

macro_rules! bail_with_stats {
($msg:literal) => {{
let msg = $msg;
let err = format!(
"submit_transaction error: {msg}. Please raise an issue. Details: \
start_time={start_instant:?}, \
num_polls={num_polls}, \
num_loops={num_loops}, \
last_follow_event_time={last_seen_follow_event_time:?}, \
last_follow_event={last_seen_follow_event_str:?}, \
last_tx_event_time={last_seen_tx_event_time:?}, \
last_tx_event={last_seen_tx_event_str:?}",
);
return Poll::Ready(Some(Err(Error::Other(err))));
}};
}

// A quick helper to return a generic error.
let err_other = |s: &str| Some(Err(Error::Other(s.into())));

// Now we can attempt to associate tx events with pinned blocks.
let tx_stream = futures::stream::poll_fn(move |cx| {
num_polls = num_polls.saturating_add(1);

loop {
num_loops = num_loops.saturating_add(1);

// Bail early if no more tx events; we don't want to keep polling for pinned blocks.
// Bail early if we're finished; nothing else to do.
if done {
return Poll::Ready(None);
}

// Bail if we exceed 4 mins; something very likely went wrong. We'll remove this
// once we get to the bottom of a spurious error that causes this to hang.
// Bail if we exceed 4 mins; something very likely went wrong.
if start_instant.elapsed().as_secs() > 240 {
bail_with_stats!("finalized block expected by now");
return Poll::Ready(err_other(
"Timeout waiting for the transaction to be finalized",
));
}

if let Poll::Ready(maybe_follow_event) = seen_blocks_sub.poll_next_unpin(cx) {
// Bail if we get back `None`, and we didn't previously see a `Stop`, which would be
// caught first below. Something unexpected has happened here.
let Some(follow_event) = maybe_follow_event else {
bail_with_stats!("chainHead_follow stream ended unexpectedly");
};

last_seen_follow_event_str = Some(format!("{follow_event:?}"));
last_seen_follow_event_time = Some(instant::Instant::now());

match follow_event {
// Poll for a follow event, and error if the stream has unexpectedly ended.
let follow_ev_poll = match seen_blocks_sub.poll_next_unpin(cx) {
Poll::Ready(None) => {
return Poll::Ready(err_other("chainHead_follow stream ended unexpectedly"))
}
Poll::Ready(Some(follow_ev)) => Poll::Ready(follow_ev),
Poll::Pending => Poll::Pending,
};
let follow_ev_is_pending = follow_ev_poll.is_pending();

// If there was a follow event, then handle it and loop around to see if there are more.
// We want to buffer follow events until we hit Pending, so that we are as uptodate as possible
jsdw marked this conversation as resolved.
Show resolved Hide resolved
// for when we see a BestBlockChanged event, so that we have the best change of already having
// seen the block that it mentions and returning a proper pinned block.
if let Poll::Ready(follow_ev) = follow_ev_poll {
match follow_ev {
FollowEvent::NewBlock(ev) => {
// Optimization: once we have a `finalized_hash`, we only care about finalized
// block refs now and can avoid bothering to save new blocks.
Expand All @@ -558,7 +540,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
// in which we may lose the finaliuzed block that the TX is in. For now, just error if
// this happens, to prevent the case in which we never see a finalized block and wait
// forever.
return Poll::Ready(Some(Err(Error::Other("chainHead_follow emitted 'stop' event during transaction submission".into()))));
return Poll::Ready(err_other("chainHead_follow emitted 'stop' event during transaction submission"));
}
_ => {}
}
Expand All @@ -577,29 +559,27 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
};
return Poll::Ready(Some(Ok(ev)));
} else {
// Keep waiting for more finalized blocks until we find it (get rid of any other block refs
// now, since none of them were what we were looking for anyway).
// Not found it! If follow ev is pending, then return pending here and wait for
// a new one to come in, else loop around and see if we get another one immediately.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could probably remove the else blocks to get rid of some extra indentation; up to you 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did actually think abotu this, but you'd end up with a bit much for my liking in the new else thing eg

let Some((SeenBlockMarker::Finalized, block_ref)) = seen_blocks.remove(hash) else {
    seen_blocks.clear();
    if follow_ev_is_pending {
        return Poll::Pending;
    } else {
        continue;
    }
};

// ..

Unless you had another idea? :)

seen_blocks.clear();
continue;
if follow_ev_is_pending {
return Poll::Pending;
} else {
continue;
}
}
}

// Otherwise, we are still watching for tx events:
let ev = match tx_progress.poll_next_unpin(cx) {
// If we don't have a finalized block yet, we keep polling for tx progress events.
let tx_progress_ev = match tx_progress.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
done = true;
return Poll::Ready(None);
}
Poll::Ready(None) => return Poll::Ready(err_other("No more transaction progress events, but we haven't seen a Finalized one yet")),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(ev))) => ev,
};

last_seen_tx_event_str = Some(format!("{ev:?}"));
last_seen_tx_event_time = Some(instant::Instant::now());

// When we get one, map it to the correct format (or for finalized ev, wait for the pinned block):
let ev = match ev {
let tx_progress_ev = match tx_progress_ev {
rpc_methods::TransactionStatus::Finalized { block } => {
// We'll wait until we have seen this hash, to try to guarantee
// that when we return this event, the corresponding block is
Expand Down Expand Up @@ -637,7 +617,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
}
rpc_methods::TransactionStatus::Validated => TransactionStatus::Validated,
};
return Poll::Ready(Some(Ok(ev)));
return Poll::Ready(Some(Ok(tx_progress_ev)));
}
});

Expand Down
Loading