Skip to content

Commit

Permalink
Add conceptual docs for run_in_worker_thread
Browse files Browse the repository at this point in the history
Fixes gh-231
  • Loading branch information
njsmith committed Jun 22, 2017
1 parent 315bb7f commit f5c4911
Showing 1 changed file with 131 additions and 1 deletion.
132 changes: 131 additions & 1 deletion docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1390,16 +1390,146 @@ like "blocking".
In acknowledgment of this reality, Trio provides two useful utilities
for working with real, operating-system level,
:mod:`threading`\-module-style threads. First, if you're in Trio but
need to push some work into a thread, there's
need to push some blocking I/O into a thread, there's
:func:`run_in_worker_thread`. And if you're in a thread and need to
communicate back with trio, there's the closely related
:func:`current_run_in_trio_thread` and
:func:`current_await_in_trio_thread`.


Trio's philosophy about managing worker threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If you've used other I/O frameworks, you may have encountered the
concept of a "thread pool", which is most commonly implemented as a
fixed size collection of threads that hang around waiting for jobs to
be assigned to them. These solve two different problems: First,
re-using the same threads over and over is more efficient than
starting and stopping a new thread for every job you need done;
basically, the pool acts as a kind of cache for idle threads. And
second, having a fixed size avoids getting into a situation where
100,000 jobs are submitted simultaneously, and then 100,000 threads
are spawned and the system gets overloaded and crashes. Instead, the N
threads start executing the first N jobs, while the other
(100,000 - N) jobs sit in a queue and wait their turn. Which is
generally what you want, and this is how
:func:`trio.run_in_worker_thread` works by default.

The downside of this kind of thread pool is that sometimes, you need
more sophisticated logic for controlling how many threads are run at
once. For example, you might want a policy like "at most 20 threads
total, but no more than 3 of those can be running jobs associated with
the same user account", or you might want a pool whose size is
dynamically adjusted over time in response to system conditions.

It's even possible for a fixed-size policy to cause unexpected
`deadlocks <https://en.wikipedia.org/wiki/Deadlock>`__. Imagine a
situation where we have two different types of blocking jobs that you
want to run in the thread pool, type A and type B. Type A is pretty
simple: it just runs and completes pretty quickly. But type B is more
complicated: it has to stop in the middle and wait for some other work
to finish, and that other work includes running a type A job. Now,
suppose you submit N jobs of type B to the pool. They all start
running, and then eventually end up submitting one or more jobs of
type A. But since every thread in our pool is already busy, the type A
jobs don't actually start running – they just sit in a queue waiting
for the type B jobs to finish. But the type B jobs will never finish,
because they're waiting for the type A jobs. Our system has
deadlocked. The ideal solution to this problem is to avoid having type
B jobs in the first place – generally it's better to keep complex
synchronization logic in the main Trio thread. But if you can't do
that, then you need a custom thread allocation policy that tracks
separate limits for different types of jobs, and make it impossible
for type B jobs to fill up all the slots that type A jobs need to run.

So, we can see that it's important to be able to change the policy
controlling the allocation of threads to jobs. But in many frameworks,
this requires implementing a new thread pool from scratch, which is
highly non-trivial; and if different types of jobs need different
policies, then you may have to create multiple pools, which is
inefficient because now you effectively have two different thread
caches that aren't sharing resources.

Trio's solution to this problem is to split worker thread management
into two layers. The lower layer is responsible for taking blocking
I/O jobs and arranging for them to run immediately on some worker
thread. It takes care of solving the tricky concurrency problems
involved in managing threads and is responsible for optimizations like
re-using threads, but has no admission control policy: if you give it
100,000 jobs, it will spawn 100,000 threads. The upper layer is
responsible for providing the policy to make sure that this doesn't
happen – but since it *only* has to worry about policy, it can be much
simpler. In fact, all there is to it is the ``limiter=`` argument
passed to :func:`run_in_worker_thread`. This defaults to a global
:class:`CapacityLimiter` object, which gives us the classic fixed-size
thread pool behavior. (See
:func:`current_default_worker_thread_limiter`.) But if you want to use
"separate pools" for type A jobs and type B jobs, then it's just a
matter of creating two separate :class:`CapacityLimiter` objects and
passing them in when running these jobs. Or here's an example of
defining a custom policy that respects the global thread limit, while
making sure that no individual user can use more than 3 threads at a
time::

class CombinedLimiter:
def __init__(self, first, second):
self._first = first
self._second = second

async def acquire_on_behalf_of(self, borrower):
# Acquire both, being careful to clean up properly on error
await self._first.acquire_on_behalf_of(borrower)
try:
await self._second.acquire_on_behalf_of(borrower)
except:
self._first.release_on_behalf_of(borrower)
raise

def release_on_behalf_of(self, borrower):
# Release both, being careful to clean up properly on error
try:
self._second.release_on_behalf_of(borrower)
finally:
self._first.release_on_behalf_of(borrower)


# Use a weak value dictionary, so that we don't waste memory holding
# limiter objects for users who don't have any worker threads running.
USER_LIMITERS = weakref.WeakValueDictionary()
MAX_THREADS_PER_USER = 3

def get_user_limiter(user_id):
try:
return USER_LIMITERS[user_id]
except KeyError:
per_user_limiter = trio.CapacityLimiter(MAX_THREADS_PER_USER)
global_limiter = trio.current_default_worker_thread_limiter()
# IMPORTANT: acquire the per_user_limiter before the global_limiter.
# If we get 100 jobs for a user at the same time, we want
# to only allow 3 of them at a time to even compete for the
# global thread slots.
combined_limiter = CombinedLimiter(per_user_limiter, global_limiter)
USER_LIMITERS[user_id] = limiter
return limiter


async def run_in_worker_thread_for_user(user_id, async_fn, *args, **kwargs):
# *args belong to async_fn; **kwargs belong to run_in_worker_thread
kwargs["limiter"] = get_user_limiter(user_id)
return await trio.run_in_worker_thread(asycn_fn, *args, **kwargs)


Putting blocking I/O into worker threads
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autofunction:: run_in_worker_thread

.. autofunction:: current_default_worker_thread_limiter


Getting back into the trio thread from another thread
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. function:: current_run_in_trio_thread
current_await_in_trio_thread

Expand Down

0 comments on commit f5c4911

Please sign in to comment.