From 1d1f871153b636fb18efe7091f7382e4fa57c41d Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Fri, 22 May 2020 23:50:26 -0700 Subject: [PATCH 1/4] Add thread cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On my Linux laptop, this makes 'await trio.to_thread.run_sync(lambda: None)' about twice as fast, from ~150 µs to ~75 µs. Closes: gh-6 Test program: import trio import time COUNT = 10000 async def main(): while True: start = time.monotonic() for _ in range(COUNT): await trio.to_thread.run_sync(lambda: None) end = time.monotonic() print("{:.2f} µs/job".format((end - start) / COUNT * 1e6)) trio.run(main) --- docs/source/conf.py | 1 + docs/source/reference-lowlevel.rst | 6 + newsfragments/6.feature.rst | 6 + notes-to-self/tiny-thread-pool.py | 143 ---------------------- trio/_core/__init__.py | 2 + trio/_core/_thread_cache.py | 168 ++++++++++++++++++++++++++ trio/_core/tests/test_thread_cache.py | 120 ++++++++++++++++++ trio/_threads.py | 37 +++--- trio/lowlevel.py | 1 + trio/tests/test_threads.py | 8 +- 10 files changed, 325 insertions(+), 167 deletions(-) create mode 100644 newsfragments/6.feature.rst delete mode 100644 notes-to-self/tiny-thread-pool.py create mode 100644 trio/_core/_thread_cache.py create mode 100644 trio/_core/tests/test_thread_cache.py diff --git a/docs/source/conf.py b/docs/source/conf.py index 9dddc9340..6c7fb0245 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -50,6 +50,7 @@ ("py:class", "math.inf"), ("py:exc", "Anything else"), ("py:class", "async function"), + ("py:class", "sync function"), ] autodoc_inherit_docstrings = False default_role = "obj" diff --git a/docs/source/reference-lowlevel.rst b/docs/source/reference-lowlevel.rst index 53e5cf8a8..b196cd0b4 100644 --- a/docs/source/reference-lowlevel.rst +++ b/docs/source/reference-lowlevel.rst @@ -270,6 +270,12 @@ Trio tokens .. autofunction:: current_trio_token +Spawning threads +================ + +.. autofunction:: start_thread_soon + + Safer KeyboardInterrupt handling ================================ diff --git a/newsfragments/6.feature.rst b/newsfragments/6.feature.rst new file mode 100644 index 000000000..8baa3a27d --- /dev/null +++ b/newsfragments/6.feature.rst @@ -0,0 +1,6 @@ +To speed up `trio.to_thread.run_sync`, Trio now caches and re-uses +worker threads. + +And in case you have some exotic use case where you need to spawn +threads manually, but want to take advantage of Trio's cache, you can +do that using the new `trio.lowlevel.start_thread_soon`. diff --git a/notes-to-self/tiny-thread-pool.py b/notes-to-self/tiny-thread-pool.py deleted file mode 100644 index b5de1c37b..000000000 --- a/notes-to-self/tiny-thread-pool.py +++ /dev/null @@ -1,143 +0,0 @@ -# This is some very messy notes on how we might implement a thread cache - -import threading -import Queue - -# idea: -# -# unbounded thread pool; tracks how many threads are "available" and how much -# work there is to do; if work > available threads, spawn a new thread -# -# if a thread sits idle for >N ms, exit -# -# we don't need to support job cancellation -# -# we do need to mark a thread as "available" just before it -# signals back to Trio that it's done, to maintain the invariant that all -# unavailable threads are inside the limiter= protection -# -# maintaining this invariant while exiting can be a bit tricky -# -# maybe a simple target should be to always have 1 idle thread - -# XX we can't use a single shared dispatch queue, because we need LIFO -# scheduling, or else the idle-thread timeout won't work! -# -# instead, keep a list/deque/OrderedDict/something of idle threads, and -# dispatch by popping one off; put things back by pushing them on the end -# maybe one shared dispatch Lock, plus a Condition for each thread -# dispatch by dropping the job into the place where the thread can see it and -# then signalling its Condition? or could have separate locks - -@attr.s(frozen=True) -class Job: - main = attr.ib() - main_args = attr.ib() - finish = attr.ib() - finish_args = attr.ib() - -class EXIT: - pass - -class ThreadCache: - def __init__(self): - self._lock = threading.Lock() - self._idle_workers = deque() - self._closed = False - - def close(self): - self._closed = True - with self._lock: - while self._idle_workers: - self._idle_workers.pop().submit(None) - - def submit(self, job): - with self._lock: - if not self._idle_workers: - WorkerThread(self, self._lock, job) - else: - worker = self._idle_workers.pop() - worker.submit(job) - - # Called from another thread - # Must be called with the lock held - def remove_idle_worker(self, worker): - self._idle_workers.remove(worker) - - # Called from another thread - # Lock is *not* held - def add_idle_worker(self, worker): - if self._closed: - with self._lock: - worker.submit - self._idle_workers.append(worker) - -# XX thread name - -IDLE_TIMEOUT = 1.0 - -class WorkerThread: - def __init__(self, cache, lock, initial_job): - self._cache = cache - self._condition = threading.Condition(lock) - self._job = None - self._thread = threading.Thread( - target=self._loop, args=(initial_job,), daemon=True) - self._thread.start() - - # Must be called with the lock held - def submit(self, job): - assert self._job is None - self._job = job - self._condition.notify() - - def _loop(self, initial_job): - self._run_job(initial_job) - while True: - with self._condition: - self._condition.wait(IDLE_TIMEOUT): - job = self._job - self._job = None - if job is None: - self._cache.remove_idle_worker(self) - return - # Dropped the lock, and have a job to do - self._run_job(job) - - def _run_job(self, job): - job.main(*job.main_args) - self._cache.add_idle_worker(self) - job.finish(*job.finish_args) - - -# Probably the interface should be: trio.lowlevel.call_soon_in_worker_thread? - -# Enqueueing work: -# put into unbounded queue -# with lock: -# if idle_threads: -# idle_threads -= 1 -# else: -# spawn a new thread (it starts out non-idle) -# -# Thread shutdown: -# with lock: -# idle_threads -= 1 -# check for work one last time, and then either exit or do it -# -# Thread startup: -# -# check for work -# while True: -# mark self as idle -# check for work (with timeout) -# either do work or shutdown - -# if we want to support QueueUserAPC cancellation, we need a way to get back -# the thread id... maybe that just works like -# -# def WaitForSingleObjectEx_thread_fn(...): -# with lock: -# check if already cancelled -# put our thread id where main thread can find it -# WaitForSingleObjectEx(...) diff --git a/trio/_core/__init__.py b/trio/_core/__init__.py index c28b7f407..136bfe6b9 100644 --- a/trio/_core/__init__.py +++ b/trio/_core/__init__.py @@ -68,6 +68,8 @@ from ._local import RunVar +from ._thread_cache import start_thread_soon + # Kqueue imports try: from ._run import current_kqueue, monitor_kevent, wait_kevent diff --git a/trio/_core/_thread_cache.py b/trio/_core/_thread_cache.py new file mode 100644 index 000000000..1ecd4bdfe --- /dev/null +++ b/trio/_core/_thread_cache.py @@ -0,0 +1,168 @@ +from threading import Thread, Lock +import sys +import outcome +from itertools import count + +# The "thread cache" is a simple unbounded thread pool, i.e., it automatically +# spawns as many threads as needed to handle all the requests its given. Its +# only purpose is to cache worker threads so that they don't have to be +# started from scratch every time we want to delegate some work to a thread. +# It's expected that some higher-level code will track how many threads are in +# use to avoid overwhelming the system (e.g. the limiter= argument to +# trio.to_thread.run_sync). +# +# To maximize sharing, there's only one thread cache per process, even if you +# have multiple calls to trio.run. +# +# Guarantees: +# +# It's safe to call start_thread_soon simultaneously from +# multiple threads. +# +# Idle threads are chosen in LIFO order, i.e. we *don't* spread work evenly +# over all threads. Instead we try to let some threads do most of the work +# while others sit idle as much as possible. Compared to FIFO, this has better +# memory cache behavior, and it makes it easier to detect when we have too +# many threads, so idle ones can exit. +# +# This code assumes that 'dict' has the following properties: +# +# - __setitem__, __delitem__, and popitem are all thread-safe and atomic with +# respect to each other. This is guaranteed by the GIL. +# +# - popitem returns the most-recently-added item (i.e., __setitem__ + popitem +# give you a LIFO queue). This relies on dicts being insertion-ordered, like +# they are in py36+. + +# How long a thread will idle waiting for new work before gives up and exits. +# This value is pretty arbitrary; I don't think it matters too much. +IDLE_TIMEOUT = 10 # seconds + +name_counter = count() + + +class WorkerThread: + def __init__(self, thread_cache): + self._job = None + self._thread_cache = thread_cache + # This Lock is used in an unconventional way. + # + # "Unlocked" means we have a pending job that's been assigned to us; + # "locked" means that we don't. + # + # Initially we have no job, so it starts out in locked state. + self._worker_lock = Lock() + self._worker_lock.acquire() + thread = Thread(target=self._work, daemon=True) + thread.name = f"Trio worker thread {next(name_counter)}" + thread.start() + + def _work(self): + while True: + if self._worker_lock.acquire(timeout=IDLE_TIMEOUT): + # We got a job + fn, deliver = self._job + self._job = None + result = outcome.capture(fn) + # Tell the cache that we're available to be assigned a new + # job. We do this *before* calling 'deliver', so that if + # 'deliver' triggers a new job, it can be assigned to us + # instead of spawning a new thread. + self._thread_cache._idle_workers[self] = None + deliver(result) + else: + # Timeout acquiring lock, so we can probably exit. But, + # there's a race condition: we might be assigned a job *just* + # as we're about to exit. So we have to check. + try: + del self._thread_cache._idle_workers[self] + except KeyError: + # Someone else removed us from the idle worker queue, so + # they must be in the process of assigning us a job - loop + # around and wait for it. + continue + else: + # We successfully removed ourselves from the idle + # worker queue, so no more jobs are incoming; it's safe to + # exit. + return + + +class ThreadCache: + def __init__(self): + self._idle_workers = {} + self._cache_lock = Lock() + + def start_thread_soon(self, deliver, fn): + try: + worker, _ = self._idle_workers.popitem() + except KeyError: + worker = WorkerThread(self) + worker._job = (fn, deliver) + worker._worker_lock.release() + + +THREAD_CACHE = ThreadCache() + + +def start_thread_soon(deliver, fn): + """Runs ``deliver(outcome.capture(fn))`` in a worker thread. + + Generally ``fn`` does some blocking work, and ``deliver`` delivers the + result back to whoever is interested. + + This is a low-level, no-frills interface, very similar to using + `threading.Thread` to spawn a thread directly. The main difference is + that this function tries to re-use threads when possible, so it can be + a bit faster than `threading.Thread`. + + Worker threads have the `~threading.Thread.daemon` flag set, which means + that if your main thread exits, worker threads will automatically be + killed. If you want to make sure that your ``fn`` runs to completion, then + you should make sure that the main thread remains alive until ``deliver`` + is called. + + It is safe to call this function simultaneously from multiple threads. + + Args: + + deliver (sync function): Takes the `outcome.Outcome` of ``fn``, and + delivers it. *Must not block.* + + fn (sync function): Performs arbitrary blocking work. + + Because worker threads are cached and reused for multiple calls, neither + function should mutate thread-level state, like `threading.local` objects + – or if they do, they should be careful to revert their changes before + returning. + + Note: + + The split between ``fn`` and ``deliver`` serves two purposes. First, + it's convenient, since most callers need something like this anyway. + + Second, it avoids a small race condition that could cause too many + threads to be spawned. Consider a program that wants to run several + jobs sequentially on a thread, so the main thread submits a job, waits + for it to finish, submits another job, etc. In theory, this program + should only need one worker thread. But what could happen is: + + 1. Worker thread: First job finishes, and calls ``deliver``. + + 2. Main thread: receives notification that the job finished, and calls + ``start_thread_soon``. + + 3. Main thread: sees that no worker threads are marked idle, so spawns + a second worker thread. + + 4. Original worker thread: marks itself as idle. + + To avoid this, threads mark themselves as idle *before* calling + ``deliver``. + + Is this potential extra thread a major problem? Maybe not, but it's + easy enough to avoid, and we figure that if the user is trying to + limit how many threads they're using then it's polite to respect that. + + """ + THREAD_CACHE.start_thread_soon(deliver, fn) diff --git a/trio/_core/tests/test_thread_cache.py b/trio/_core/tests/test_thread_cache.py new file mode 100644 index 000000000..6c6fc3a10 --- /dev/null +++ b/trio/_core/tests/test_thread_cache.py @@ -0,0 +1,120 @@ +import pytest +import threading +from queue import Queue +import time + +from .tutil import slow +from .. import _thread_cache +from .._thread_cache import start_thread_soon, ThreadCache + + +def test_thread_cache_basics(): + q = Queue() + + def fn(): + raise RuntimeError("hi") + + def deliver(outcome): + q.put(outcome) + + start_thread_soon(deliver, fn) + + outcome = q.get() + with pytest.raises(RuntimeError, match="hi"): + outcome.unwrap() + + +@slow +def test_spawning_new_thread_from_deliver_reuses_starting_thread(): + # We know that no-one else is using the thread cache, so if we keep + # submitting new jobs the instant the previous one is finished, we should + # keep getting the same thread over and over. This tests both that the + # thread cache is LIFO, and that threads can be assigned new work *before* + # deliver exits. + + # Make sure there are a few threads running, so if we weren't LIFO then we + # could grab the wrong one. + q = Queue() + COUNT = 5 + for _ in range(COUNT): + start_thread_soon(lambda result: q.put(result), lambda: time.sleep(1)) + for _ in range(COUNT): + q.get().unwrap() + + seen_threads = set() + done = threading.Event() + + def deliver(n, _): + print(n) + seen_threads.add(threading.current_thread()) + if n == 0: + done.set() + else: + start_thread_soon(lambda _: deliver(n - 1, _), lambda: None) + + start_thread_soon(lambda _: deliver(5, _), lambda: None) + + done.wait() + + assert len(seen_threads) == 1 + + +@slow +def test_idle_threads_exit(monkeypatch): + # Temporarily set the idle timeout to something tiny, to speed up the + # test. (But non-zero, so that the worker loop will at least yield the + # CPU.) + monkeypatch.setattr(_thread_cache, "IDLE_TIMEOUT", 0.0001) + + q = Queue() + start_thread_soon(lambda _: q.put(threading.current_thread()), lambda: None) + seen_thread = q.get() + # Since the idle timeout is 0, after sleeping for 1 second, the thread + # should have exited + time.sleep(1) + assert not seen_thread.is_alive() + + +def test_race_between_idle_exit_and_job_assignment(monkeypatch): + # This is a lock where the first few times you try to acquire it with a + # timeout, it waits until the lock is available and then pretends to time + # out. Using this in our thread cache implementation causes the following + # sequence: + # + # 1. start_thread_soon grabs the worker thread, assigns it a job, and + # releases its lock. + # 2. The worker thread wakes up (because the lock has been released), but + # the JankyLock lies to it and tells it that the lock timed out. So the + # worker thread tries to exit. + # 3. The worker thread checks for the race between exiting and being + # assigned a job, and discovers that it *is* in the process of being + # assigned a job, so it loops around and tries to acquire the lock + # again. + # 4. Eventually the JankyLock admits that the lock is available, and + # everything proceeds as normal. + + class JankyLock: + def __init__(self): + self._lock = threading.Lock() + self._counter = 3 + + def acquire(self, timeout=None): + self._lock.acquire() + if timeout is None: + return True + else: + if self._counter > 0: + self._counter -= 1 + self._lock.release() + return False + return True + + def release(self): + self._lock.release() + + monkeypatch.setattr(_thread_cache, "Lock", JankyLock) + + tc = ThreadCache() + done = threading.Event() + tc.start_thread_soon(lambda _: done.set(), lambda: None) + done.wait() diff --git a/trio/_threads.py b/trio/_threads.py index 92e2b5dc0..f441952b5 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -14,6 +14,7 @@ disable_ki_protection, RunVar, TrioToken, + start_thread_soon, ) from ._util import coroutine_or_error @@ -253,7 +254,7 @@ async def to_thread_run_sync(sync_fn, *args, cancellable=False, limiter=None): # for the result – or None if this function was cancelled and we should # discard the result. task_register = [trio.lowlevel.current_task()] - name = "trio-worker-{}".format(next(_thread_counter)) + name = f"trio.to_thread.run_sync-{next(_thread_counter)}" placeholder = ThreadPlaceholder(name) # This function gets scheduled into the Trio run loop to deliver the @@ -273,32 +274,26 @@ def do_release_then_return_result(): if task_register[0] is not None: trio.lowlevel.reschedule(task_register[0], result) - # This is the function that runs in the worker thread to do the actual - # work and then schedule the call to report_back_in_trio_thread_fn - # Since this is spawned in a new thread, the trio token needs to be passed - # explicitly to it so it can inject it into thread local storage - def worker_thread_fn(trio_token): - TOKEN_LOCAL.token = trio_token + current_trio_token = trio.lowlevel.current_trio_token() + + def worker_fn(): + TOKEN_LOCAL.token = current_trio_token try: - result = outcome.capture(sync_fn, *args) - try: - trio_token.run_sync_soon(report_back_in_trio_thread_fn, result) - except trio.RunFinishedError: - # The entire run finished, so our particular task is certainly - # long gone -- it must have cancelled. - pass + return sync_fn(*args) finally: del TOKEN_LOCAL.token + def deliver_worker_fn_result(result): + try: + current_trio_token.run_sync_soon(report_back_in_trio_thread_fn, result) + except trio.RunFinishedError: + # The entire run finished, so our particular task is certainly + # long gone -- it must have been cancelled and abandoned us. + pass + await limiter.acquire_on_behalf_of(placeholder) try: - # daemon=True because it might get left behind if we cancel, and in - # this case shouldn't block process exit. - current_trio_token = trio.lowlevel.current_trio_token() - thread = threading.Thread( - target=worker_thread_fn, args=(current_trio_token,), name=name, daemon=True, - ) - thread.start() + start_thread_soon(deliver_worker_fn_result, worker_fn) except: limiter.release_on_behalf_of(placeholder) raise diff --git a/trio/lowlevel.py b/trio/lowlevel.py index 21ec0597d..3ce3e741b 100644 --- a/trio/lowlevel.py +++ b/trio/lowlevel.py @@ -41,6 +41,7 @@ wait_readable, wait_writable, notify_closing, + start_thread_soon, ) # Unix-specific symbols diff --git a/trio/tests/test_threads.py b/trio/tests/test_threads.py index b4acae8b5..632ce1365 100644 --- a/trio/tests/test_threads.py +++ b/trio/tests/test_threads.py @@ -239,7 +239,9 @@ async def child(q, cancellable): # Make sure that if trio.run exits, and then the thread finishes, then that's # handled gracefully. (Requires that the thread result machinery be prepared # for call_soon to raise RunFinishedError.) -def test_run_in_worker_thread_abandoned(capfd): +def test_run_in_worker_thread_abandoned(capfd, monkeypatch): + monkeypatch.setattr(_core._thread_cache, "IDLE_TIMEOUT", 0.01) + q1 = stdlib_queue.Queue() q2 = stdlib_queue.Queue() @@ -426,10 +428,10 @@ def release_on_behalf_of(self, borrower): async def test_run_in_worker_thread_fail_to_spawn(monkeypatch): # Test the unlikely but possible case where trying to spawn a thread fails - def bad_start(self): + def bad_start(self, *args): raise RuntimeError("the engines canna take it captain") - monkeypatch.setattr(threading.Thread, "start", bad_start) + monkeypatch.setattr(_core._thread_cache.ThreadCache, "start_thread_soon", bad_start) limiter = current_default_thread_limiter() assert limiter.borrowed_tokens == 0 From f4a57d5a9de99ac67d991f1c090e3814e9b86687 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Sat, 23 May 2020 01:01:23 -0700 Subject: [PATCH 2/4] Remove long, obsolete comment --- trio/_threads.py | 100 ----------------------------------------------- 1 file changed, 100 deletions(-) diff --git a/trio/_threads.py b/trio/_threads.py index f441952b5..922e102d3 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -35,106 +35,6 @@ def run_sync(self, fn, *args): return from_thread_run_sync(fn, *args, trio_token=self._trio_token) -################################################################ - -# XX at some point it probably makes sense to implement some sort of thread -# pool? Or at least that's what everyone says. -# -# There are two arguments for thread pools: -# - speed (re-using threads instead of starting new ones) -# - throttling (if you have 1000 tasks, queue them up instead of spawning 1000 -# threads and running out of memory) -# -# Regarding speed, it's not clear how much of an advantage this is. Some -# numbers on my Linux laptop: -# -# Spawning and then joining a thread: -# -# In [25]: %timeit t = threading.Thread(target=lambda: None); t.start(); t.join() -# 10000 loops, best of 3: 44 µs per loop -# -# Using a thread pool: -# -# In [26]: tpp = concurrent.futures.ThreadPoolExecutor() -# In [27]: %timeit tpp.submit(lambda: None).result() -# -# In [28]: %timeit tpp.submit(lambda: None).result() -# 10000 loops, best of 3: 40.8 µs per loop -# -# What's a fast getaddrinfo look like? -# -# # with hot DNS cache: -# In [23]: %timeit socket.getaddrinfo("google.com", "80") -# 10 loops, best of 3: 50.9 ms per loop -# -# In [29]: %timeit socket.getaddrinfo("127.0.0.1", "80") -# 100000 loops, best of 3: 9.73 µs per loop -# -# -# So... maybe we can beat concurrent.futures with a super-efficient thread -# pool or something, but there really is not a lot of headroom here. -# -# Of course other systems might be different... here's CPython 3.6 in a -# Virtualbox VM running Windows 10 on that same Linux laptop: -# -# In [13]: %timeit t = threading.Thread(target=lambda: None); t.start(); t.join() -# 10000 loops, best of 3: 127 µs per loop -# -# In [18]: %timeit tpp.submit(lambda: None).result() -# 10000 loops, best of 3: 31.9 µs per loop -# -# So on Windows there *might* be an advantage? You've gotta be doing a lot of -# connections, with very fast DNS indeed, for that 100 us to matter. But maybe -# someone is. -# -# -# Regarding throttling: this is very much a trade-off. On the one hand, you -# don't want to overwhelm the machine, obviously. On the other hand, queueing -# up work on a central thread-pool creates a central coordination point which -# can potentially create deadlocks and all kinds of fun things. This is very -# context dependent. For getaddrinfo, whatever, they'll make progress and -# complete (we hope), and you want to throttle them to some reasonable -# amount. For calling waitpid() (because just say no to SIGCHLD), then you -# really want one thread-per-waitpid(), because for all you know the user has -# written some ridiculous thing like: -# -# for p in processes: -# await spawn(p.wait) -# # Deadlock here if there are enough processes: -# await some_other_subprocess.wait() -# for p in processes: -# p.terminate() -# -# This goes doubly for the sort of wacky thread usage we see in curio.abide -# (though, I'm not sure if that's actually useful in practice in our context, -# run_in_trio_thread seems like it might be a nicer synchronization primitive -# for most uses than trying to make threading.Lock awaitable). -# -# See also this very relevant discussion: -# -# https://twistedmatrix.com/trac/ticket/5298 -# -# "Interacting with the products at Rackspace which use Twisted, I've seen -# problems caused by thread-pool maximum sizes with some annoying -# regularity. The basic problem is this: if you have a hard limit on the -# number of threads, *it is not possible to write a correct program which may -# require starting a new thread to un-block a blocked pool thread*" - glyph -# -# For now, if we want to throttle getaddrinfo I think the simplest thing is -# for the socket code to have a semaphore for getaddrinfo calls. -# -# Regarding the memory overhead of threads, in theory one should be able to -# reduce this a *lot* for a thread that's just calling getaddrinfo or -# (especially) waitpid. Windows and pthreads both offer the ability to set -# thread stack size on a thread-by-thread basis. Unfortunately as of 3.6 -# CPython doesn't expose this in a useful way (all you can do is set it -# globally for the whole process, so it's - ironically - not thread safe). -# -# (It's also unclear how much stack size actually matters; on a 64-bit Linux -# server with overcommit -- i.e., the most common configuration -- then AFAICT -# really the only real limit is on stack size actually *used*; how much you -# *allocate* should be pretty much irrelevant.) - _limiter_local = RunVar("limiter") # I pulled this number out of the air; it isn't based on anything. Probably we # should make some kind of measurements to pick a good value. From 2751323b52cada57cb021a751a047830a72c14c0 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Tue, 26 May 2020 23:39:58 -0700 Subject: [PATCH 3/4] Swap order of arguments to start_thread_soon --- trio/_core/_thread_cache.py | 10 +++++----- trio/_core/tests/test_thread_cache.py | 12 ++++++------ trio/_threads.py | 7 ++++--- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/trio/_core/_thread_cache.py b/trio/_core/_thread_cache.py index 1ecd4bdfe..8716e206a 100644 --- a/trio/_core/_thread_cache.py +++ b/trio/_core/_thread_cache.py @@ -93,7 +93,7 @@ def __init__(self): self._idle_workers = {} self._cache_lock = Lock() - def start_thread_soon(self, deliver, fn): + def start_thread_soon(self, fn, deliver): try: worker, _ = self._idle_workers.popitem() except KeyError: @@ -105,7 +105,7 @@ def start_thread_soon(self, deliver, fn): THREAD_CACHE = ThreadCache() -def start_thread_soon(deliver, fn): +def start_thread_soon(fn, deliver): """Runs ``deliver(outcome.capture(fn))`` in a worker thread. Generally ``fn`` does some blocking work, and ``deliver`` delivers the @@ -126,11 +126,11 @@ def start_thread_soon(deliver, fn): Args: + fn (sync function): Performs arbitrary blocking work. + deliver (sync function): Takes the `outcome.Outcome` of ``fn``, and delivers it. *Must not block.* - fn (sync function): Performs arbitrary blocking work. - Because worker threads are cached and reused for multiple calls, neither function should mutate thread-level state, like `threading.local` objects – or if they do, they should be careful to revert their changes before @@ -165,4 +165,4 @@ def start_thread_soon(deliver, fn): limit how many threads they're using then it's polite to respect that. """ - THREAD_CACHE.start_thread_soon(deliver, fn) + THREAD_CACHE.start_thread_soon(fn, deliver) diff --git a/trio/_core/tests/test_thread_cache.py b/trio/_core/tests/test_thread_cache.py index 6c6fc3a10..895c1c2e5 100644 --- a/trio/_core/tests/test_thread_cache.py +++ b/trio/_core/tests/test_thread_cache.py @@ -17,7 +17,7 @@ def fn(): def deliver(outcome): q.put(outcome) - start_thread_soon(deliver, fn) + start_thread_soon(fn, deliver) outcome = q.get() with pytest.raises(RuntimeError, match="hi"): @@ -37,7 +37,7 @@ def test_spawning_new_thread_from_deliver_reuses_starting_thread(): q = Queue() COUNT = 5 for _ in range(COUNT): - start_thread_soon(lambda result: q.put(result), lambda: time.sleep(1)) + start_thread_soon(lambda: time.sleep(1), lambda result: q.put(result)) for _ in range(COUNT): q.get().unwrap() @@ -50,9 +50,9 @@ def deliver(n, _): if n == 0: done.set() else: - start_thread_soon(lambda _: deliver(n - 1, _), lambda: None) + start_thread_soon(lambda: None, lambda _: deliver(n - 1, _)) - start_thread_soon(lambda _: deliver(5, _), lambda: None) + start_thread_soon(lambda: None, lambda _: deliver(5, _)) done.wait() @@ -67,7 +67,7 @@ def test_idle_threads_exit(monkeypatch): monkeypatch.setattr(_thread_cache, "IDLE_TIMEOUT", 0.0001) q = Queue() - start_thread_soon(lambda _: q.put(threading.current_thread()), lambda: None) + start_thread_soon(lambda: None, lambda _: q.put(threading.current_thread())) seen_thread = q.get() # Since the idle timeout is 0, after sleeping for 1 second, the thread # should have exited @@ -116,5 +116,5 @@ def release(self): tc = ThreadCache() done = threading.Event() - tc.start_thread_soon(lambda _: done.set(), lambda: None) + tc.start_thread_soon(lambda: None, lambda _: done.set()) done.wait() diff --git a/trio/_threads.py b/trio/_threads.py index 922e102d3..8867388bd 100644 --- a/trio/_threads.py +++ b/trio/_threads.py @@ -187,13 +187,14 @@ def deliver_worker_fn_result(result): try: current_trio_token.run_sync_soon(report_back_in_trio_thread_fn, result) except trio.RunFinishedError: - # The entire run finished, so our particular task is certainly - # long gone -- it must have been cancelled and abandoned us. + # The entire run finished, so the task we're trying to contact is + # certainly long gone -- it must have been cancelled and abandoned + # us. pass await limiter.acquire_on_behalf_of(placeholder) try: - start_thread_soon(deliver_worker_fn_result, worker_fn) + start_thread_soon(worker_fn, deliver_worker_fn_result) except: limiter.release_on_behalf_of(placeholder) raise From ca6cc7caffc0aa9d0cfc4b07abbdbbdd320a3a79 Mon Sep 17 00:00:00 2001 From: "Nathaniel J. Smith" Date: Tue, 26 May 2020 23:40:35 -0700 Subject: [PATCH 4/4] Remove unused/unnecessary lock --- trio/_core/_thread_cache.py | 1 - 1 file changed, 1 deletion(-) diff --git a/trio/_core/_thread_cache.py b/trio/_core/_thread_cache.py index 8716e206a..c71a9a37e 100644 --- a/trio/_core/_thread_cache.py +++ b/trio/_core/_thread_cache.py @@ -91,7 +91,6 @@ def _work(self): class ThreadCache: def __init__(self): self._idle_workers = {} - self._cache_lock = Lock() def start_thread_soon(self, fn, deliver): try: