Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement async versions of long-running methods #657

Closed
carver opened this issue Feb 20, 2018 · 5 comments
Closed

Implement async versions of long-running methods #657

carver opened this issue Feb 20, 2018 · 5 comments

Comments

@carver
Copy link
Collaborator

carver commented Feb 20, 2018

  • Version: 4.1

What was wrong?

Some async callers want asyncio versions of methods available.

Initial discussion on #574

@pipermerriam
What do you think about introducing a web3.eth.async module with a coroutine version of this as well? It'd be nice to start supporting native asyncio. We might need to go as far as implementing new providers as well with coroutine versions of Provider.make_request and the corresponding Manager methods. This would also open the door for a Filter API with callbacks for logs.

@carver
Or maybe web3.async.* that replicates the whole web3 namespace? Seems reasonable, but probably after getting v4 stable.

How it can be implemented.

TODO

@pipermerriam
Copy link
Member

Things that stand out as must be coroutine API are:

  • All module methods (eg: web3.eth.getBalance.
  • Underlying Manager methods. eg: Manager.request_blocking (this should maybe be renamed to just make_rpc_request or something similar).
  • Provider.make_request
  • middlewares

We'll need to deal with computed properties as well, or potentially not support them in the async portion of the API. eg: web3.eth.accounts

New APIs will need to be created to expose some sort of callback based API for filters.

@boneyard93501
Copy link
Contributor

to put a stake in the ground and get the blood boiling, below a few approaches (unoptimized, exploratory in scope) to async in the context of batch processing.

without re-writing the providers, concurrent.futures is the best (performance) bet. however, the GIL doesn't care where and how threads are spawned and generally, there is decreasing performance at an increasing rate as threads are added beyond some "golden' number; on smaller aws ec2 t-type instances that seems to be around 30 to 50 threads, on my old desktop it's about 120 threads, and on my laptop about a 150 threads. so there is a contextual aspect to this.

the aiohttp example is a proxy as to where a re-written http provider method might fall performance-wise compared to "wrapped" existing (http) provider methods. i haven't done any serious profiling but the ranking seems about right, although the rpc methods used are too simple to make any serious claims.

as you can see, i had to use new_event_loop(s), which is pretty much inevitable without an overarching event loop policy in place. so one aspect of async to consider is to spawn one event_loop as part of web3.async . in addition to re-writing async core methods, such as the providers, we'd have to consider event loop registration and de-registration for various tasks. sort of a pluggable loop system.

#!/usr/bin/env python3
# -*- coding: utf8 -*-
# import os, sys
import aiohttp
import asyncio
import datetime
import random
import ujson
import time
import typing  # noqa F401

from concurrent.futures import ProcessPoolExecutor
from collections import deque
# from functools import Iterator
from typing import Any, Union

from web3 import Web3, HTTPProvider, IPCProvider, WebsocketProvider



class BatchHandlerThreadPool:
    ''' wrap existing providers '''
    provider_map = {'http': HTTPProvider, 'ipc': IPCProvider, 'ws': WebsocketProvider}

    def __init__(self, uri: str='127.0.0.1:8545', provider_type: str='http',
                 chunk_size: int = 50, max_pool_size=10):
        self.uri = uri
        self.provider = self._check_providers(provider_type)
        self.chunk_size = chunk_size
        self.max_pool_size = max_pool_size

    def _check_providers(self, provider_type: str) -> Union[TypeError, dict]:
        if not provider_type.lower() in BatchHandlerThreadPool.provider_map.keys():
            msg = 'invalid provider type {} requested. Valid types are {}.'
            raise TypeError(msg.format(provider_type, BatchHandlerThreadPool.provider_map.keys()))
        else:
            return BatchHandlerThreadPool.provider_map[provider_type]

    def make_requests(self, job: dict) -> tuple:
        ''' '''
        try:
            w3 = Web3(self.provider(self.uri))
            m = job['method']
            p = job['params']
            res = w3.providers[0].make_request(m, p)
            result = (job, res, '')
        except Exception as exc:
            print("exc: ", exc)
            result = (job, '', exc)
        return result

    def chunked_payload(self, payload: list) -> list:
        '''  '''
        for i in range(0, len(payload), self.chunk_size):
            yield payload[i: i + self.chunk_size]

    def run_batch(self, payload: list, callback_fn: Any):
        '''  '''
        executor = ProcessPoolExecutor(max_workers=self.max_pool_size)
        for jobs in self.chunked_payload(payload):
            futures = [executor.submit(self.make_requests, job) for job in jobs]
            # [future.add_done_callback(callback_fn) for future in futures]
            # keep things in line with "generic" client
            [callback_fn(future.result()) for future in futures]


class BatchHandlerTasks:
    ''' wrap existing providers '''
    provider_map = {'http': HTTPProvider, 'ipc': IPCProvider, 'ws': WebsocketProvider}

    def __init__(self, uri: str='127.0.0.1:8545', chunk_size: int=50, provider_type: str='http'):
        self.uri = uri
        self.provider = self._check_providers(provider_type)
        self.chunk_size = chunk_size

    def _check_providers(self, provider_type: str) -> Union[TypeError, dict]:
        if not provider_type.lower() in BatchHandlerTasks.provider_map.keys():
            msg = 'invalid provider type {} requested. Valid types are {}.'
            raise TypeError(msg.format(provider_type, BatchHandlerTasks.provider_map.keys()))
        else:
            return BatchHandlerTasks.provider_map[provider_type]

    async def make_request(self, job: dict) -> tuple:
        ''' '''
        provider = Web3(self.provider(self.uri)).providers[0]
        try:
            m = job['method']
            p = job['params']
            res = provider.make_request(m, p)
            result = (job, res, '')
        except Exception as exc:
            result = (job, '', exc)
        return result

    async def req_wrapper(self, job, callback_fn):
        result = await self.make_request(job)
        await callback_fn(result)

    def chunked_payload(self, payload: list) -> list:
        '''  '''
        for i in range(0, len(payload), self.chunk_size):
            yield payload[i: i + self.chunk_size]

    def run_batch(self, payload: list, callback_fn: Any):
        '''  '''
        batch_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(batch_loop)
        for jobs in self.chunked_payload(payload):
            tasks = [self.req_wrapper(job, callback_fn) for job in jobs]
            batch_loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
        batch_loop.close()


class BatchHandlerAioHttpTasks:
    ''' use aiohttp as a proxy for native asyncio providers (streams for ipc)  '''
    provider_map = {'http': HTTPProvider, 'ipc': IPCProvider, 'ws': WebsocketProvider}

    def __init__(self, uri: str='127.0.0.1:8545', chunk_size: int=50, provider_type: str='http'):
        self.uri = uri
        self.provider = self._check_providers(provider_type)
        self.chunk_size = chunk_size

    def _check_providers(self, provider_type: str) -> Union[TypeError, dict]:
        if not provider_type.lower() in BatchHandlerTasks.provider_map.keys():
            msg = 'invalid provider type {} requested. Valid types are {}.'
            raise TypeError(msg.format(provider_type, BatchHandlerTasks.provider_map.keys()))
        else:
            return BatchHandlerTasks.provider_map[provider_type]

    async def make_requests(self, job: dict, callback_fn: Any):
        ''' '''
        _rpc = {"jsonrpc":"2.0", "id": 1001}
        payload = {**_rpc, **job}
        async with aiohttp.ClientSession(json_serialize=ujson.dumps) as session:
            res = await session.post(self.uri, json=payload)
            try:
                if res.status == 200:
                    result = (job, await res.json(), '')
                else:
                    result = (job, '', aiohttp.errors.HttpProcessingError)
            except Exception as exc:
                result = (job, '', exc)
            await callback_fn(result)

    def chunked_payload(self, payload: list) -> list:
        '''  '''
        for i in range(0, len(payload), self.chunk_size):
            yield payload[i: i + self.chunk_size]

    def run_batch(self, payload: list, callback_fn: Any):
        '''  '''
        batch_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(batch_loop)
        for jobs in self.chunked_payload(payload):
            tasks = [self.make_requests(job, callback_fn) for job in jobs]
            batch_loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
        batch_loop.close()


def illustrative_async_client(batch_hander_cls, batch_q: deque, chunk_size: int, batch_multiplier: int = 1):
    def callback_fn(result):
        batch_q.append(result)

    uri = 'https://ropsten.infura.io'
    provider_type = 'http'
    payload = []
    payload.extend([{'method': 'web3_clientVersion', 'params':[]}] * batch_multiplier)
    payload.extend([{'method': 'net_version', 'params':[]}] * batch_multiplier)
    random.shuffle(payload)

    batch_handler = batch_hander_cls(uri=uri, provider_type=provider_type, chunk_size=chunk_size)
    dt_start = datetime.datetime.utcnow()
    batch_handler.run_batch(payload, callback_fn)

    while len(batch_q) < len(payload):
        time.sleep(0.1)

    good_jobs, exc_jobs = [], []
    for res in batch_q:
        if res[1]:
            good_jobs.append(res)
        else:
            exc_jobs.append(res)

    dt_end = datetime.datetime.utcnow()
    res_vals = (batch_hander_cls.__name__, len(payload), len(good_jobs), len(exc_jobs),
                (dt_end - dt_start).total_seconds())
    msg = '{} processed {} jobs, {} successes and {} failures,'
    msg += ' in {} seconds.'
    print(msg.format(*res_vals))


if __name__ == '__main__':
    n = 100
    chunk_size = 15
    for batch_cls in [BatchHandlerThreadPool, BatchHandlerAioHttpTasks, BatchHandlerTasks]:
        illustrative_async_client(batch_cls, deque(), chunk_size, n)
# wouldn't put to much stock into the actual numbers other than maybe ranking
(web3-4.2.1) 93501@iMac27 ~/devcloud/web3-async-batch/src $ python3 multiple_async_batcher.py
BatchHandlerThreadPool processed 200 jobs, 200 successes and 0 failures, in 3.489139 seconds.
BatchHandlerAioHttpTasks processed 200 jobs, 200 successes and 0 failures, in 9.77327 seconds.
BatchHandlerTasks processed 200 jobs, 200 successes and 0 failures, in 17.100553 seconds.
(web3-4.2.1) 93501@iMac27 ~/devcloud/web3-async-batch/src $

i've messed around a bit with asyncio stream for ipc provider examples but ran out of time. writing async versions of the web3 http and ipc providers, respectively, doesn't seem all that challenging (did i just write that? nooooo) but in the end, it depends on the general approach.

finally, there are performance improvements to be had going outside of the standard asyncio module. using uvloop, for example, makes a significant difference speeding up the aiohttp-based example (by about 3 seconds; i didn't see much, if any, improvements in the "wrapped" HTTPProvider).

torch it, shred it, tear it apart.

@boneyard93501
Copy link
Contributor

@carver
Some async callers want asyncio versions of methods available.

is the assumptions that the (async) users will handle the event loop correct? and, as a corollary, that the web3.async namespace/API is comprised of a collection of awaitable primitives, e.g., async providers?

@pipermerriam
Copy link
Member

@boneyard93501 Correct on both counts.

Clarification on how I think our providers should work.

Rather than having sync and async providers, I think that we should have the standard provider API be

class BaseProvider:
    def make_request(...):
        # standard blocking request fn

    async def make_async_request(...):
        # awaitable request fn.

The web3.async namespaced providers would all use the make_async_request API for provider requests.

@pacrob
Copy link
Contributor

pacrob commented Feb 4, 2022

Closing, tracking async in Issue #1413

@pacrob pacrob closed this as completed Feb 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants