Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make it easier to farm out work #685

Closed
Nikratio opened this issue Sep 27, 2018 · 3 comments
Closed

Make it easier to farm out work #685

Nikratio opened this issue Sep 27, 2018 · 3 comments

Comments

@Nikratio
Copy link

I have a bunch of work items that need processing. This involves calling expensive functions that release the GIL, so I want to use worker threads. Ideally, I would like to write code like this:

limiter = trio.CapacityLimiter(max_workers)
with something_something:
  for item in worklist:
    await trio.run_in_worker_thread(do_work, item, limiter=limiter)

This should go through worklist, making sure that all max_workers threads are busy. At the end of the while statement, all workers should have completed.

Sadly, there seems to be no something_something that I could use for with with block, nor is there a run_in_worker_thread that is not synchronous. So I think what I would need to write instead is something like:

END_SENTINEL = object()
async def worker_loop(q):
    while True:
        item = q.get()
        if item is END_SENTINEL:
            break
        await trio.run_sync_in_worker_thread(do_work, item)

queue = trio.Queue(1)
with trio.open_nursery() as nursery:
    for _ in max_workers:
        nursery.start_soon(run_thread_as_task, worker_loop, queue)
    for item in worklist:
        await queue.put(item)
    for _ in max_workers:
        await queue.put(END_SENTINEL)

This is rather verbose. I was wondering if Trio might be able to offer something more concise, along the lines of the first (currently hypothetical) code?

@njsmith
Copy link
Member

njsmith commented Sep 28, 2018

How big is worklist, roughly?

The reason I ask: right now you can write:

limiter = trio.CapacityLimiter(max_workers)
async with trio.open_nursery() as nursery:
  for item in worklist:
    nursery.start_soon(partial(trio.run_in_worker_thread, do_work, item, limiter=limiter))

and... I suspect this will actually work pretty well? The downside compared to your hypothetical code is that it allocates a bunch of Trio tasks in memory up front, which then sit around waiting to acquire the capacity limiter before actually putting work onto a thread. A sleeping Trio task isn't free, but it's fairly cheap – just some Python data structures that take up a bit of memory, on the order of 5-10 kilobytes per task. That's wasting some RAM compared to your hypothetical approach, and if your worklist is large enough it might be a problem... but if you've got, say, 1000 items, then you're only wasting like 5-10 megabytes.

In the longer run, this is an example of a general problem that lots of people run into, regardless of threads: how do I run a bunch of tasks concurrently, but not too much. After #586 gets finished and we have a solid API for sending work into/out of this kind of task pool, the next thing I want to do is implement a kind of generic "concurrent for loop" primitive, that has built-in support for capacity limiting and other niceties like that. I've even squatted the name on pypi :-)

@Nikratio
Copy link
Author

Typically worklist is less than a million, so there wouldn't be a memory problem. I think I would still go with the longer code though, to me it feels rather verbose but not as hacky.

@Zac-HD
Copy link
Member

Zac-HD commented Mar 17, 2023

Closing because I don't think there are any action items.

@Zac-HD Zac-HD closed this as completed Mar 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants