Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reorged logs in eth_filter namespace #11105

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions crates/rpc/rpc-builder/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,18 @@ impl EthFilterApiBuilder {
Provider: Send + Sync + Clone + 'static,
Pool: Send + Sync + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
Eth: EthApiTypes + 'static,
{
EthFilter::new(
let eth_filter = EthFilter::new(
ctx.provider.clone(),
ctx.pool.clone(),
ctx.cache.clone(),
ctx.config.filter_config(),
Box::new(ctx.executor.clone()),
)
);
eth_filter.spawn_watch_reorgs(ctx.events.clone());
eth_filter
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc-eth-api/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use jsonrpsee::{core::RpcResult, proc_macros::rpc};
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "eth"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "eth"))]
pub trait EthFilterApi<T: RpcObject> {
/// Creates anew filter and returns its id.
/// Creates a new filter and returns its id.
#[method(name = "newFilter")]
async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId>;

Expand Down
151 changes: 130 additions & 21 deletions crates/rpc/rpc/src/eth/filter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! `eth_` `Filter` RPC handler implementation

use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
fmt,
iter::StepBy,
marker::PhantomData,
Expand All @@ -10,17 +10,23 @@ use std::{
time::{Duration, Instant},
};

use alloy_primitives::TxHash;
use alloy_primitives::{BlockNumber, TxHash, B256};
use alloy_rpc_types::{
BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log,
PendingTransactionFilterKind,
};
use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use parking_lot::RwLock;
use reth_chainspec::ChainInfo;
use reth_node_api::EthApiTypes;
use reth_primitives::{IntoRecoveredTransaction, TransactionSignedEcRecovered};
use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider, ProviderError};
use reth_primitives::{
IntoRecoveredTransaction, SealedBlockWithSenders, TransactionSignedEcRecovered,
};
use reth_provider::{
BlockIdReader, BlockReader, CanonStateNotification, CanonStateSubscriptions, EvmEnvProvider,
ProviderError,
};
use reth_rpc_eth_api::{EthFilterApiServer, FullEthApiTypes, RpcTransaction, TransactionCompat};
use reth_rpc_eth_types::{
logs_utils::{self, append_matching_block_logs},
Expand Down Expand Up @@ -106,6 +112,20 @@ where
eth_filter
}

/// Create a background task and listener for reorged blocks updates relevant active filters
pub fn spawn_watch_reorgs<Events>(&self, events: Events)
where
Events: CanonStateSubscriptions + 'static,
{
let this = self.clone();
self.inner.task_spawner.spawn_critical(
"eth-filters-watch-reorgs",
Box::pin(async move {
this.watch_reorgs(events).await;
}),
);
}

/// Returns all currently active filters
pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
&self.inner.active_filters
Expand Down Expand Up @@ -139,6 +159,58 @@ where
is_valid
})
}

/// Watch block reorgs and update filters accordingly
async fn watch_reorgs<Events>(&self, events: Events)
where
Events: CanonStateSubscriptions,
{
let mut canon_state_notifications = events.subscribe_to_canonical_state();
while let Ok(notification) = canon_state_notifications.recv().await {
if let CanonStateNotification::Reorg { old, .. } = notification {
self.update_reorg(old.blocks()).await;
}
}
}

/// update a reorg block for all active filters
async fn update_reorg(&self, old_blocks: &BTreeMap<BlockNumber, SealedBlockWithSenders>) {
let mut reorg_blocks = HashMap::<B256, BlockNumber>::with_capacity(old_blocks.len());
for block in old_blocks.values() {
reorg_blocks.insert(block.header.hash(), block.header.header().number);
}

for active_filter in self.active_filters().inner.lock().await.values_mut() {
if let FilterKind::Log(filter) = &active_filter.kind {
match filter.block_option {
FilterBlockOption::AtBlockHash(block_hash) => {
if let Some(block_number) = reorg_blocks.get(&block_hash) {
active_filter.reorg_blocks.write().insert(block_hash, *block_number);
}
}
FilterBlockOption::Range { ref from_block, ref to_block } => {
if let (Some(from), Some(to)) = (
from_block.and_then(|from| from.as_number()),
to_block.and_then(|to| to.as_number()),
) {
if from > to {
continue
}

for block_number in from..=to {
if let Some(block) = old_blocks.get(&block_number) {
active_filter
.reorg_blocks
.write()
.insert(block.header.hash(), block_number);
}
}
}
}
}
}
}
}
}

impl<Provider, Pool, Eth> EthFilter<Provider, Pool, Eth>
Expand All @@ -158,7 +230,7 @@ where

// start_block is the block from which we should start fetching changes, the next block from
// the last time changes were polled, in other words the best block at last poll + 1
let (start_block, kind) = {
let (start_block, kind, reorg_blocks) = {
let mut filters = self.inner.active_filters.inner.lock().await;
let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;

Expand All @@ -174,7 +246,7 @@ where
std::mem::swap(&mut filter.block, &mut block);
filter.last_poll_timestamp = Instant::now();

(block, filter.kind.clone())
(block, filter.kind.clone(), filter.reorg_blocks.clone())
};

match kind {
Expand Down Expand Up @@ -211,7 +283,13 @@ where
};
let logs = self
.inner
.get_logs_in_block_range(&filter, from_block_number, to_block_number, info)
.get_logs_in_block_range(
&filter,
from_block_number,
to_block_number,
info,
reorg_blocks,
)
.await?;
Ok(FilterChanges::Logs(logs))
}
Expand All @@ -224,19 +302,19 @@ where
///
/// Handler for `eth_getFilterLogs`
pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
let filter = {
let (filter, reorg_blocks) = {
let filters = self.inner.active_filters.inner.lock().await;
if let FilterKind::Log(ref filter) =
filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
{
*filter.clone()
let active_filter =
filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?;
if let FilterKind::Log(ref filter) = active_filter.kind {
(*filter.clone(), active_filter.reorg_blocks.clone())
} else {
// Not a log filter
return Err(EthFilterError::FilterNotFound(id))
}
};

self.inner.logs_for_filter(filter).await
self.inner.logs_for_filter(filter, reorg_blocks).await
}
}

Expand Down Expand Up @@ -327,7 +405,7 @@ where
/// Handler for `eth_getLogs`
async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
trace!(target: "rpc::eth", "Serving eth_getLogs");
Ok(self.inner.logs_for_filter(filter).await?)
Ok(self.inner.logs_for_filter(filter, Arc::new(RwLock::new(HashMap::new()))).await?)
}
}

Expand Down Expand Up @@ -371,7 +449,11 @@ where
Pool: TransactionPool + 'static,
{
/// Returns logs matching given filter object.
async fn logs_for_filter(&self, filter: Filter) -> Result<Vec<Log>, EthFilterError> {
async fn logs_for_filter(
&self,
filter: Filter,
reorg_blocks: Arc<RwLock<HashMap<B256, BlockNumber>>>,
) -> Result<Vec<Log>, EthFilterError> {
match filter.block_option {
FilterBlockOption::AtBlockHash(block_hash) => {
// for all matching logs in the block
Expand All @@ -388,7 +470,12 @@ where
.get_receipts(block_hash)
.await?
.ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;

let removed = if let Some(block_number) = reorg_blocks.read().get(&block_hash) {
trace!(target: "rpc::eth::filter", block_number=block_number, "reorged");
true
} else {
false
};
let mut all_logs = Vec::new();
let filter = FilteredParams::new(Some(filter));
logs_utils::append_matching_block_logs(
Expand All @@ -397,7 +484,7 @@ where
&filter,
(block_hash, block.number).into(),
&receipts,
false,
removed,
block.timestamp,
)?;

Expand All @@ -419,8 +506,14 @@ where
.flatten();
let (from_block_number, to_block_number) =
logs_utils::get_filter_block_range(from, to, start_block, info);
self.get_logs_in_block_range(&filter, from_block_number, to_block_number, info)
.await
self.get_logs_in_block_range(
&filter,
from_block_number,
to_block_number,
info,
Arc::new(RwLock::new(HashMap::new())),
)
.await
}
}
}
Expand All @@ -436,6 +529,7 @@ where
block: last_poll_block_number,
last_poll_timestamp: Instant::now(),
kind,
reorg_blocks: Arc::new(RwLock::new(HashMap::new())),
},
);
Ok(id)
Expand All @@ -452,6 +546,7 @@ where
from_block: u64,
to_block: u64,
chain_info: ChainInfo,
reorg_blocks: Arc<RwLock<HashMap<B256, BlockNumber>>>,
) -> Result<Vec<Log>, EthFilterError> {
trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
let best_number = chain_info.best_number;
Expand All @@ -475,13 +570,17 @@ where
if let Some((block, receipts)) =
self.eth_cache.get_block_and_receipts(chain_info.best_hash).await?
{
let removed = reorg_blocks.read().contains_key(&chain_info.best_hash);
if removed {
trace!(target: "rpc::eth::filter", block_number=best_number, "reorged");
}
logs_utils::append_matching_block_logs(
&mut all_logs,
&self.provider,
&filter_params,
chain_info.into(),
&receipts,
false,
removed,
block.header.timestamp,
)?;
}
Expand Down Expand Up @@ -515,13 +614,21 @@ where
};

if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? {
let removed = if let Some(block_number) =
reorg_blocks.read().get(&block_hash)
{
trace!(target: "rpc::eth::filter", block_number=block_number, "reorged");
true
} else {
false
};
append_matching_block_logs(
&mut all_logs,
&self.provider,
&filter_params,
BlockNumHash::new(header.number, block_hash),
&receipts,
false,
removed,
header.timestamp,
)?;

Expand Down Expand Up @@ -564,6 +671,8 @@ struct ActiveFilter<T> {
last_poll_timestamp: Instant,
/// What kind of filter it is.
kind: FilterKind<T>,
/// reorg blocks that are relevant to this filter
reorg_blocks: Arc<RwLock<HashMap<B256, BlockNumber>>>,
}

/// A receiver for pending transactions that returns all new transactions since the last poll.
Expand Down
Loading