Skip to content

Commit

Permalink
[WIP] Async filters feature branch (#2588)
Browse files Browse the repository at this point in the history
* async filters eth module
  • Loading branch information
Paul Robinson committed Sep 12, 2022
1 parent 12b649d commit c282769
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 48 deletions.
74 changes: 61 additions & 13 deletions web3/_utils/module_testing/eth_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,64 @@ def test_async_provider_default_block(
# reset to default
async_w3.eth.default_block = "latest"

@pytest.mark.asyncio
async def test_async_eth_new_filter(self, async_w3: "Web3") -> None:
filter = await async_w3.eth.filter({})

changes = await async_w3.eth.get_filter_changes(
filter.filter_id
) # type: ignore
assert is_list_like(changes)
assert not changes

logs = await async_w3.eth.get_filter_logs(filter.filter_id) # type: ignore
assert is_list_like(logs)
assert not logs

result = await async_w3.eth.uninstall_filter(filter.filter_id) # type: ignore
assert result is True

@pytest.mark.asyncio
async def test_async_eth_new_block_filter(self, async_w3: "Web3") -> None:
filter = await async_w3.eth.filter("latest")
assert is_string(filter.filter_id)

changes = await async_w3.eth.get_filter_changes(
filter.filter_id
) # type: ignore
assert is_list_like(changes)
assert not changes

result = await async_w3.eth.uninstall_filter(filter.filter_id) # type: ignore
assert result is True

@pytest.mark.asyncio
async def test_async_eth_new_pending_transaction_filter(
self, async_w3: "Web3"
) -> None:
filter = await async_w3.eth.filter("pending")
assert is_string(filter.filter_id)

changes = await async_w3.eth.get_filter_changes(
filter.filter_id
) # type: ignore
assert is_list_like(changes)
assert not changes

result = await async_w3.eth.uninstall_filter(filter.filter_id) # type: ignore
assert result is True

@pytest.mark.asyncio
async def test_async_eth_uninstall_filter(self, async_w3: "Web3") -> None:
filter = await async_w3.eth.filter({})
assert is_string(filter.filter_id)

success = await async_w3.eth.uninstall_filter(filter.filter_id) # type: ignore
assert success is True

failure = await async_w3.eth.uninstall_filter(filter.filter_id) # type: ignore
assert failure is False


class EthModuleTest:
def test_eth_syncing(self, w3: "Web3") -> None:
Expand Down Expand Up @@ -3093,7 +3151,7 @@ def test_eth_getUncleByBlockNumberAndIndex(self, w3: "Web3") -> None:
# TODO: how do we make uncles....
pass

def test_eth_newFilter(self, w3: "Web3") -> None:
def test_eth_new_filter(self, w3: "Web3") -> None:
filter = w3.eth.filter({})

changes = w3.eth.get_filter_changes(filter.filter_id)
Expand All @@ -3107,35 +3165,25 @@ def test_eth_newFilter(self, w3: "Web3") -> None:
result = w3.eth.uninstall_filter(filter.filter_id)
assert result is True

def test_eth_newBlockFilter(self, w3: "Web3") -> None:
def test_eth_new_block_filter(self, w3: "Web3") -> None:
filter = w3.eth.filter("latest")
assert is_string(filter.filter_id)

changes = w3.eth.get_filter_changes(filter.filter_id)
assert is_list_like(changes)
assert not changes

# TODO: figure out why this fails in go-ethereum
# logs = w3.eth.get_filter_logs(filter.filter_id)
# assert is_list_like(logs)
# assert not logs

result = w3.eth.uninstall_filter(filter.filter_id)
assert result is True

def test_eth_newPendingTransactionFilter(self, w3: "Web3") -> None:
def test_eth_new_pending_transaction_filter(self, w3: "Web3") -> None:
filter = w3.eth.filter("pending")
assert is_string(filter.filter_id)

changes = w3.eth.get_filter_changes(filter.filter_id)
assert is_list_like(changes)
assert not changes

# TODO: figure out why this fails in go-ethereum
# logs = w3.eth.get_filter_logs(filter.filter_id)
# assert is_list_like(logs)
# assert not logs

result = w3.eth.uninstall_filter(filter.filter_id)
assert result is True

Expand Down
101 changes: 66 additions & 35 deletions web3/eth.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,35 @@ def set_contract_factory(
) -> None:
self.defaultContractFactory = contractFactory

def filter_munger(
self,
filter_params: Optional[Union[str, FilterParams]] = None,
filter_id: Optional[HexStr] = None,
) -> Union[List[FilterParams], List[HexStr], List[str]]:
if filter_id and filter_params:
raise TypeError(
"Ambiguous invocation: provide either a `filter_params` or a "
"`filter_id` argument. Both were supplied."
)
if isinstance(filter_params, dict):
return [filter_params]
elif is_string(filter_params):
if filter_params in ["latest", "pending"]:
return [filter_params]
else:
raise ValueError(
"The filter API only accepts the values of `pending` or "
"`latest` for string based filters"
)
elif filter_id and not filter_params:
return [filter_id]
else:
raise TypeError(
"Must provide either filter_params as a string or "
"a valid filter object, or a filter_id as a string "
"or hex."
)


class AsyncEth(BaseEth):
is_async = True
Expand Down Expand Up @@ -597,6 +626,37 @@ async def get_storage_at(
) -> HexBytes:
return await self._get_storage_at(account, position, block_identifier)

filter: Method[Callable[..., Awaitable[Any]]] = Method(
method_choice_depends_on_args=select_filter_method(
if_new_block_filter=RPC.eth_newBlockFilter,
if_new_pending_transaction_filter=RPC.eth_newPendingTransactionFilter,
if_new_filter=RPC.eth_newFilter,
),
mungers=[BaseEth.filter_munger],
)

_get_filter_changes: Method[
Callable[[HexStr], Awaitable[List[LogReceipt]]]
] = Method(RPC.eth_getFilterChanges, mungers=[default_root_munger])

async def get_filter_changes(self, filter_id: HexStr) -> List[LogReceipt]:
return await self._get_filter_changes(filter_id)

_get_filter_logs: Method[Callable[[HexStr], Awaitable[List[LogReceipt]]]] = Method(
RPC.eth_getFilterLogs, mungers=[default_root_munger]
)

async def get_filter_logs(self, filter_id: HexStr) -> List[LogReceipt]:
return await self._get_filter_logs(filter_id)

_uninstall_filter: Method[Callable[[HexStr], Awaitable[bool]]] = Method(
RPC.eth_uninstallFilter,
mungers=[default_root_munger],
)

async def uninstall_filter(self, filter_id: HexStr) -> bool:
return await self._uninstall_filter(filter_id)


class Eth(BaseEth):
defaultContractFactory: Type[Union[Contract, ContractCaller]] = Contract
Expand Down Expand Up @@ -902,42 +962,13 @@ def fee_history(
) -> FeeHistory:
return self._fee_history(block_count, newest_block, reward_percentiles)

def filter_munger(
self,
filter_params: Optional[Union[str, FilterParams]] = None,
filter_id: Optional[HexStr] = None,
) -> Union[List[FilterParams], List[HexStr], List[str]]:
if filter_id and filter_params:
raise TypeError(
"Ambiguous invocation: provide either a `filter_params` or a "
"`filter_id` argument. Both were supplied."
)
if isinstance(filter_params, dict):
return [filter_params]
elif is_string(filter_params):
if filter_params in ["latest", "pending"]:
return [filter_params]
else:
raise ValueError(
"The filter API only accepts the values of `pending` or "
"`latest` for string based filters"
)
elif filter_id and not filter_params:
return [filter_id]
else:
raise TypeError(
"Must provide either filter_params as a string or "
"a valid filter object, or a filter_id as a string "
"or hex."
)

filter: Method[Callable[..., Any]] = Method(
method_choice_depends_on_args=select_filter_method(
if_new_block_filter=RPC.eth_newBlockFilter,
if_new_pending_transaction_filter=RPC.eth_newPendingTransactionFilter,
if_new_filter=RPC.eth_newFilter,
),
mungers=[filter_munger],
mungers=[BaseEth.filter_munger],
)

get_filter_changes: Method[Callable[[HexStr], List[LogReceipt]]] = Method(
Expand All @@ -948,6 +979,11 @@ def filter_munger(
RPC.eth_getFilterLogs, mungers=[default_root_munger]
)

uninstall_filter: Method[Callable[[HexStr], bool]] = Method(
RPC.eth_uninstallFilter,
mungers=[default_root_munger],
)

get_logs: Method[Callable[[FilterParams], List[LogReceipt]]] = Method(
RPC.eth_getLogs, mungers=[default_root_munger]
)
Expand All @@ -962,11 +998,6 @@ def filter_munger(
mungers=[default_root_munger],
)

uninstall_filter: Method[Callable[[HexStr], bool]] = Method(
RPC.eth_uninstallFilter,
mungers=[default_root_munger],
)

get_work: Method[Callable[[], List[HexBytes]]] = Method(
RPC.eth_getWork,
is_property=True,
Expand Down

0 comments on commit c282769

Please sign in to comment.