Skip to content

Commit

Permalink
Support reorged logs in eth_filter namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
nkysg committed Sep 22, 2024
1 parent 159bf2c commit 1317990
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 24 deletions.
7 changes: 5 additions & 2 deletions crates/rpc/rpc-builder/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,18 @@ impl EthFilterApiBuilder {
Provider: Send + Sync + Clone + 'static,
Pool: Send + Sync + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
Eth: EthApiTypes + 'static,
{
EthFilter::new(
let eth_filter = EthFilter::new(
ctx.provider.clone(),
ctx.pool.clone(),
ctx.cache.clone(),
ctx.config.filter_config(),
Box::new(ctx.executor.clone()),
)
);
eth_filter.spawn_watch_reorgs(ctx.events.clone());
eth_filter
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc-eth-api/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use jsonrpsee::{core::RpcResult, proc_macros::rpc};
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "eth"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "eth"))]
pub trait EthFilterApi<T: RpcObject> {
/// Creates anew filter and returns its id.
/// Creates a new filter and returns its id.
#[method(name = "newFilter")]
async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId>;

Expand Down
151 changes: 130 additions & 21 deletions crates/rpc/rpc/src/eth/filter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! `eth_` `Filter` RPC handler implementation

use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
fmt,
iter::StepBy,
marker::PhantomData,
Expand All @@ -10,17 +10,23 @@ use std::{
time::{Duration, Instant},
};

use alloy_primitives::TxHash;
use alloy_primitives::{TxHash, B256};
use alloy_rpc_types::{
BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log,
PendingTransactionFilterKind,
};
use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use parking_lot::RwLock;
use reth_chainspec::ChainInfo;
use reth_node_api::EthApiTypes;
use reth_primitives::{IntoRecoveredTransaction, TransactionSignedEcRecovered};
use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider, ProviderError};
use reth_primitives::{
IntoRecoveredTransaction, SealedBlockWithSenders, TransactionSignedEcRecovered,
};
use reth_provider::{
BlockIdReader, BlockReader, CanonStateNotification, CanonStateSubscriptions, EvmEnvProvider,
ProviderError,
};
use reth_rpc_eth_api::{EthFilterApiServer, FullEthApiTypes, RpcTransaction, TransactionCompat};
use reth_rpc_eth_types::{
logs_utils::{self, append_matching_block_logs},
Expand Down Expand Up @@ -106,6 +112,20 @@ where
eth_filter
}

/// Create a background task and listener for reorged blocks updates relevant active filters
pub fn spawn_watch_reorgs<Events>(&self, events: Events)
where
Events: CanonStateSubscriptions + 'static,
{
let this = self.clone();
self.inner.task_spawner.spawn_critical(
"eth-filters-watch-reorgs",
Box::pin(async move {
this.watch_reorgs(events).await;
}),
);
}

/// Returns all currently active filters
pub fn active_filters(&self) -> &ActiveFilters<RpcTransaction<Eth::NetworkTypes>> {
&self.inner.active_filters
Expand Down Expand Up @@ -139,6 +159,58 @@ where
is_valid
})
}

/// Watch block reorgs and update filters accordingly
async fn watch_reorgs<Events>(&self, events: Events)
where
Events: CanonStateSubscriptions,
{
let mut canon_state_notifications = events.subscribe_to_canonical_state();
while let Ok(notification) = canon_state_notifications.recv().await {
if let CanonStateNotification::Reorg { old, .. } = notification {
self.update_reorg(old.blocks()).await;
}
}
}

/// update a reorg block for all active filters
async fn update_reorg(&self, old_blocks: &BTreeMap<BlockNumber, SealedBlockWithSenders>) {
let mut reorg_blocks = HashMap::<B256, BlockNumber>::with_capacity(old_blocks.len());
for block in old_blocks.values() {
reorg_blocks.insert(block.header.hash(), block.header.header().number);
}

for active_filter in self.active_filters().inner.lock().await.values_mut() {
if let FilterKind::Log(filter) = &active_filter.kind {
match filter.block_option {
FilterBlockOption::AtBlockHash(block_hash) => {
if let Some(block_number) = reorg_blocks.get(&block_hash) {
active_filter.reorg_blocks.write().insert(block_hash, *block_number);
}
}
FilterBlockOption::Range { ref from_block, ref to_block } => {
if let (Some(from), Some(to)) = (
from_block.and_then(|from| from.as_number()),
to_block.and_then(|to| to.as_number()),
) {
if from > to {
continue
}

for block_number in from..=to {
if let Some(block) = old_blocks.get(&block_number) {
active_filter
.reorg_blocks
.write()
.insert(block.header.hash(), block_number);
}
}
}
}
}
}
}
}
}

impl<Provider, Pool, Eth> EthFilter<Provider, Pool, Eth>
Expand All @@ -158,7 +230,7 @@ where

// start_block is the block from which we should start fetching changes, the next block from
// the last time changes were polled, in other words the best block at last poll + 1
let (start_block, kind) = {
let (start_block, kind, reorg_blocks) = {
let mut filters = self.inner.active_filters.inner.lock().await;
let filter = filters.get_mut(&id).ok_or(EthFilterError::FilterNotFound(id))?;

Expand All @@ -174,7 +246,7 @@ where
std::mem::swap(&mut filter.block, &mut block);
filter.last_poll_timestamp = Instant::now();

(block, filter.kind.clone())
(block, filter.kind.clone(), filter.reorg_blocks.clone())
};

match kind {
Expand Down Expand Up @@ -211,7 +283,13 @@ where
};
let logs = self
.inner
.get_logs_in_block_range(&filter, from_block_number, to_block_number, info)
.get_logs_in_block_range(
&filter,
from_block_number,
to_block_number,
info,
reorg_blocks,
)
.await?;
Ok(FilterChanges::Logs(logs))
}
Expand All @@ -224,19 +302,19 @@ where
///
/// Handler for `eth_getFilterLogs`
pub async fn filter_logs(&self, id: FilterId) -> Result<Vec<Log>, EthFilterError> {
let filter = {
let (filter, reorg_blocks) = {
let filters = self.inner.active_filters.inner.lock().await;
if let FilterKind::Log(ref filter) =
filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?.kind
{
*filter.clone()
let active_filter =
filters.get(&id).ok_or_else(|| EthFilterError::FilterNotFound(id.clone()))?;
if let FilterKind::Log(ref filter) = active_filter.kind {
(*filter.clone(), active_filter.reorg_blocks.clone())
} else {
// Not a log filter
return Err(EthFilterError::FilterNotFound(id))
}
};

self.inner.logs_for_filter(filter).await
self.inner.logs_for_filter(filter, reorg_blocks).await
}
}

Expand Down Expand Up @@ -327,7 +405,7 @@ where
/// Handler for `eth_getLogs`
async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
trace!(target: "rpc::eth", "Serving eth_getLogs");
Ok(self.inner.logs_for_filter(filter).await?)
Ok(self.inner.logs_for_filter(filter, Arc::new(RwLock::new(HashMap::new()))).await?)
}
}

Expand Down Expand Up @@ -371,7 +449,11 @@ where
Pool: TransactionPool + 'static,
{
/// Returns logs matching given filter object.
async fn logs_for_filter(&self, filter: Filter) -> Result<Vec<Log>, EthFilterError> {
async fn logs_for_filter(
&self,
filter: Filter,
reorg_blocks: Arc<RwLock<HashMap<B256, BlockNumber>>>,
) -> Result<Vec<Log>, EthFilterError> {
match filter.block_option {
FilterBlockOption::AtBlockHash(block_hash) => {
// for all matching logs in the block
Expand All @@ -388,7 +470,12 @@ where
.get_receipts(block_hash)
.await?
.ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;

let removed = if let Some(block_number) = reorg_blocks.read().get(&block_hash) {
trace!(target: "rpc::eth::filter", block_number=block_number, "reorged");
true
} else {
false
};
let mut all_logs = Vec::new();
let filter = FilteredParams::new(Some(filter));
logs_utils::append_matching_block_logs(
Expand All @@ -397,7 +484,7 @@ where
&filter,
(block_hash, block.number).into(),
&receipts,
false,
removed,
block.timestamp,
)?;

Expand All @@ -419,8 +506,14 @@ where
.flatten();
let (from_block_number, to_block_number) =
logs_utils::get_filter_block_range(from, to, start_block, info);
self.get_logs_in_block_range(&filter, from_block_number, to_block_number, info)
.await
self.get_logs_in_block_range(
&filter,
from_block_number,
to_block_number,
info,
Arc::new(RwLock::new(HashMap::new())),
)
.await
}
}
}
Expand All @@ -436,6 +529,7 @@ where
block: last_poll_block_number,
last_poll_timestamp: Instant::now(),
kind,
reorg_blocks: Arc::new(RwLock::new(HashMap::new())),
},
);
Ok(id)
Expand All @@ -452,6 +546,7 @@ where
from_block: u64,
to_block: u64,
chain_info: ChainInfo,
reorg_blocks: Arc<RwLock<HashMap<B256, BlockNumber>>>,
) -> Result<Vec<Log>, EthFilterError> {
trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
let best_number = chain_info.best_number;
Expand All @@ -475,13 +570,17 @@ where
if let Some((block, receipts)) =
self.eth_cache.get_block_and_receipts(chain_info.best_hash).await?
{
let removed = reorg_blocks.read().contains_key(&chain_info.best_hash);
if removed {
trace!(target: "rpc::eth::filter", block_number=best_number, "reorged");
}
logs_utils::append_matching_block_logs(
&mut all_logs,
&self.provider,
&filter_params,
chain_info.into(),
&receipts,
false,
removed,
block.header.timestamp,
)?;
}
Expand Down Expand Up @@ -515,13 +614,21 @@ where
};

if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? {
let removed = if let Some(block_number) =
reorg_blocks.read().get(&block_hash)
{
trace!(target: "rpc::eth::filter", block_number=block_number, "reorged");
true
} else {
false
};
append_matching_block_logs(
&mut all_logs,
&self.provider,
&filter_params,
BlockNumHash::new(header.number, block_hash),
&receipts,
false,
removed,
header.timestamp,
)?;

Expand Down Expand Up @@ -564,6 +671,8 @@ struct ActiveFilter<T> {
last_poll_timestamp: Instant,
/// What kind of filter it is.
kind: FilterKind<T>,
/// reorg blocks that are relevant to this filter
reorg_blocks: Arc<RwLock<HashMap<B256, BlockNumber>>>,
}

/// A receiver for pending transactions that returns all new transactions since the last poll.
Expand Down

0 comments on commit 1317990

Please sign in to comment.