From e378310e2eb4082b56fe15e2df87ce8534052173 Mon Sep 17 00:00:00 2001 From: Paul Robinson Date: Tue, 16 Aug 2022 11:42:31 -0600 Subject: [PATCH] [WIP] Async filters feature branch (#2588) * async filters eth module --- web3/_utils/module_testing/eth_module.py | 74 ++++++++++++++--- web3/eth.py | 101 +++++++++++++++-------- 2 files changed, 127 insertions(+), 48 deletions(-) diff --git a/web3/_utils/module_testing/eth_module.py b/web3/_utils/module_testing/eth_module.py index e27974fe90..9f5a5a2843 100644 --- a/web3/_utils/module_testing/eth_module.py +++ b/web3/_utils/module_testing/eth_module.py @@ -1403,6 +1403,64 @@ def test_async_provider_default_block( # reset to default async_w3.eth.default_block = "latest" + @pytest.mark.asyncio + async def test_async_eth_new_filter(self, async_w3: "Web3") -> None: + filter = await async_w3.eth.filter({}) + + changes = await async_w3.eth.get_filter_changes( + filter.filter_id + ) # type: ignore + assert is_list_like(changes) + assert not changes + + logs = await async_w3.eth.get_filter_logs(filter.filter_id) # type: ignore + assert is_list_like(logs) + assert not logs + + result = await async_w3.eth.uninstall_filter(filter.filter_id) # type: ignore + assert result is True + + @pytest.mark.asyncio + async def test_async_eth_new_block_filter(self, async_w3: "Web3") -> None: + filter = await async_w3.eth.filter("latest") + assert is_string(filter.filter_id) + + changes = await async_w3.eth.get_filter_changes( + filter.filter_id + ) # type: ignore + assert is_list_like(changes) + assert not changes + + result = await async_w3.eth.uninstall_filter(filter.filter_id) # type: ignore + assert result is True + + @pytest.mark.asyncio + async def test_async_eth_new_pending_transaction_filter( + self, async_w3: "Web3" + ) -> None: + filter = await async_w3.eth.filter("pending") + assert is_string(filter.filter_id) + + changes = await async_w3.eth.get_filter_changes( + filter.filter_id + ) # type: ignore + assert is_list_like(changes) + assert not changes + + result = await async_w3.eth.uninstall_filter(filter.filter_id) # type: ignore + assert result is True + + @pytest.mark.asyncio + async def test_async_eth_uninstall_filter(self, async_w3: "Web3") -> None: + filter = await async_w3.eth.filter({}) + assert is_string(filter.filter_id) + + success = await async_w3.eth.uninstall_filter(filter.filter_id) # type: ignore + assert success is True + + failure = await async_w3.eth.uninstall_filter(filter.filter_id) # type: ignore + assert failure is False + class EthModuleTest: def test_eth_syncing(self, w3: "Web3") -> None: @@ -3093,7 +3151,7 @@ def test_eth_getUncleByBlockNumberAndIndex(self, w3: "Web3") -> None: # TODO: how do we make uncles.... pass - def test_eth_newFilter(self, w3: "Web3") -> None: + def test_eth_new_filter(self, w3: "Web3") -> None: filter = w3.eth.filter({}) changes = w3.eth.get_filter_changes(filter.filter_id) @@ -3107,7 +3165,7 @@ def test_eth_newFilter(self, w3: "Web3") -> None: result = w3.eth.uninstall_filter(filter.filter_id) assert result is True - def test_eth_newBlockFilter(self, w3: "Web3") -> None: + def test_eth_new_block_filter(self, w3: "Web3") -> None: filter = w3.eth.filter("latest") assert is_string(filter.filter_id) @@ -3115,15 +3173,10 @@ def test_eth_newBlockFilter(self, w3: "Web3") -> None: assert is_list_like(changes) assert not changes - # TODO: figure out why this fails in go-ethereum - # logs = w3.eth.get_filter_logs(filter.filter_id) - # assert is_list_like(logs) - # assert not logs - result = w3.eth.uninstall_filter(filter.filter_id) assert result is True - def test_eth_newPendingTransactionFilter(self, w3: "Web3") -> None: + def test_eth_new_pending_transaction_filter(self, w3: "Web3") -> None: filter = w3.eth.filter("pending") assert is_string(filter.filter_id) @@ -3131,11 +3184,6 @@ def test_eth_newPendingTransactionFilter(self, w3: "Web3") -> None: assert is_list_like(changes) assert not changes - # TODO: figure out why this fails in go-ethereum - # logs = w3.eth.get_filter_logs(filter.filter_id) - # assert is_list_like(logs) - # assert not logs - result = w3.eth.uninstall_filter(filter.filter_id) assert result is True diff --git a/web3/eth.py b/web3/eth.py index cdb08fd346..17b013d6e1 100644 --- a/web3/eth.py +++ b/web3/eth.py @@ -346,6 +346,35 @@ def set_contract_factory( ) -> None: self.defaultContractFactory = contractFactory + def filter_munger( + self, + filter_params: Optional[Union[str, FilterParams]] = None, + filter_id: Optional[HexStr] = None, + ) -> Union[List[FilterParams], List[HexStr], List[str]]: + if filter_id and filter_params: + raise TypeError( + "Ambiguous invocation: provide either a `filter_params` or a " + "`filter_id` argument. Both were supplied." + ) + if isinstance(filter_params, dict): + return [filter_params] + elif is_string(filter_params): + if filter_params in ["latest", "pending"]: + return [filter_params] + else: + raise ValueError( + "The filter API only accepts the values of `pending` or " + "`latest` for string based filters" + ) + elif filter_id and not filter_params: + return [filter_id] + else: + raise TypeError( + "Must provide either filter_params as a string or " + "a valid filter object, or a filter_id as a string " + "or hex." + ) + class AsyncEth(BaseEth): is_async = True @@ -597,6 +626,37 @@ async def get_storage_at( ) -> HexBytes: return await self._get_storage_at(account, position, block_identifier) + filter: Method[Callable[..., Awaitable[Any]]] = Method( + method_choice_depends_on_args=select_filter_method( + if_new_block_filter=RPC.eth_newBlockFilter, + if_new_pending_transaction_filter=RPC.eth_newPendingTransactionFilter, + if_new_filter=RPC.eth_newFilter, + ), + mungers=[BaseEth.filter_munger], + ) + + _get_filter_changes: Method[ + Callable[[HexStr], Awaitable[List[LogReceipt]]] + ] = Method(RPC.eth_getFilterChanges, mungers=[default_root_munger]) + + async def get_filter_changes(self, filter_id: HexStr) -> List[LogReceipt]: + return await self._get_filter_changes(filter_id) + + _get_filter_logs: Method[Callable[[HexStr], Awaitable[List[LogReceipt]]]] = Method( + RPC.eth_getFilterLogs, mungers=[default_root_munger] + ) + + async def get_filter_logs(self, filter_id: HexStr) -> List[LogReceipt]: + return await self._get_filter_logs(filter_id) + + _uninstall_filter: Method[Callable[[HexStr], Awaitable[bool]]] = Method( + RPC.eth_uninstallFilter, + mungers=[default_root_munger], + ) + + async def uninstall_filter(self, filter_id: HexStr) -> bool: + return await self._uninstall_filter(filter_id) + class Eth(BaseEth): defaultContractFactory: Type[Union[Contract, ContractCaller]] = Contract @@ -902,42 +962,13 @@ def fee_history( ) -> FeeHistory: return self._fee_history(block_count, newest_block, reward_percentiles) - def filter_munger( - self, - filter_params: Optional[Union[str, FilterParams]] = None, - filter_id: Optional[HexStr] = None, - ) -> Union[List[FilterParams], List[HexStr], List[str]]: - if filter_id and filter_params: - raise TypeError( - "Ambiguous invocation: provide either a `filter_params` or a " - "`filter_id` argument. Both were supplied." - ) - if isinstance(filter_params, dict): - return [filter_params] - elif is_string(filter_params): - if filter_params in ["latest", "pending"]: - return [filter_params] - else: - raise ValueError( - "The filter API only accepts the values of `pending` or " - "`latest` for string based filters" - ) - elif filter_id and not filter_params: - return [filter_id] - else: - raise TypeError( - "Must provide either filter_params as a string or " - "a valid filter object, or a filter_id as a string " - "or hex." - ) - filter: Method[Callable[..., Any]] = Method( method_choice_depends_on_args=select_filter_method( if_new_block_filter=RPC.eth_newBlockFilter, if_new_pending_transaction_filter=RPC.eth_newPendingTransactionFilter, if_new_filter=RPC.eth_newFilter, ), - mungers=[filter_munger], + mungers=[BaseEth.filter_munger], ) get_filter_changes: Method[Callable[[HexStr], List[LogReceipt]]] = Method( @@ -948,6 +979,11 @@ def filter_munger( RPC.eth_getFilterLogs, mungers=[default_root_munger] ) + uninstall_filter: Method[Callable[[HexStr], bool]] = Method( + RPC.eth_uninstallFilter, + mungers=[default_root_munger], + ) + get_logs: Method[Callable[[FilterParams], List[LogReceipt]]] = Method( RPC.eth_getLogs, mungers=[default_root_munger] ) @@ -962,11 +998,6 @@ def filter_munger( mungers=[default_root_munger], ) - uninstall_filter: Method[Callable[[HexStr], bool]] = Method( - RPC.eth_uninstallFilter, - mungers=[default_root_munger], - ) - get_work: Method[Callable[[], List[HexBytes]]] = Method( RPC.eth_getWork, is_property=True,