Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can we make abort functions more composable? #896

Open
oremanj opened this issue Jan 30, 2019 · 8 comments
Open

Can we make abort functions more composable? #896

oremanj opened this issue Jan 30, 2019 · 8 comments

Comments

@oremanj
Copy link
Member

oremanj commented Jan 30, 2019

In chat today, @njsmith gave a neat example of how to build a resource-aware job scheduler using trio.Event. He mentioned that doing the bookkeeping in the task that's doing the wakeup, rather than in the task that's being woken up, seems broadly essential to avoiding race conditions (which I agree with).

I think there's a rough edge with respect to how cancellation handling fits into this, though:

  • If you block with wait_task_rescheduled, your abort_fn runs synchronously with the cancellation being delivered. So if the waking task sees that you're still registered, you won't be cancelled before you're woken up; and if you do get cancelled, the waking task won't see you as still eligible to be woken up. This is great. It means the most natural way to handle cancellation is also relatively race-proof.
  • But if you block by calling some other abstraction, such as trio.Event.wait(), the only way you can do cleanup on cancellation is by catching the Cancelled exception after it's injected. Moreover, it's possible that a task gets cancelled, but that before it next runs to deliver the cancellation, it also is woken up due to the resource it was waiting for becoming available. Now we're back to writing bookkeeping logic inside the woken task. This is less great.

It seems to me that we might be well served by having a way of effectively composing abort functions. That is, when I say await event.wait(), I should have the option of designating some code that will run synchronously with a cancellation of event.wait().

Doing this in a way that lets the outer logic block the cancellation (i.e., exposing the full abort_fn power) is very tricky. Doing it in a way that only supports supplying additional code to run when the cancellation succeeds is probably easier though. Do folks have thoughts on whether this is desirable, independent of how it might best be implemented?

@njsmith
Copy link
Member

njsmith commented Jan 31, 2019

Interesting question!

So first let's think about what kind of scope we're hoping for... obviously we can't support this for all blocking operations. It only seems to make sense for "primitive" blocking operations (those that only call wait_task_rescheduled once?), and not even all of those (e.g. I can't see how it would make sense to use this with sock.send). So I think we're talking about some sort of specific extensions to specific tools like Event, rather than a fundamental change in the trio scheduler.

The original idea of ParkingLot was that it was supposed to be fairly rich and friendly API for building custom sleep/wake patterns. In practice we've ended up using wait_task_rescheduled a lot more than I expected. To some extent, that's fine... wait_task_rescheduled is more finicky than some things, but it's usable. For example, here's that job scheduler using wait_task_rescheduled: https://gist.github.com/njsmith/c7aa013ca3d15a0c98df06ce8b984c3c

I do wonder if it's worth trying to extend ParkingLot to handle a richer set of cases. For example, in this case it would be enough to be able to associate some data with each task in the parking lot, iterate over them, and wake up specific tasks? You could also imagine being able to register custom abort callbacks.

Event is another candidate for such extensions, though if you start adding stuff I think you'll very quickly end up at a reimplementation of wait_task_rescheduled?

Or, maybe we should go in the other direction, and try to make wait_task_rescheduled more accessible? Maybe ParkingLot is a failure and that's fine?

What exactly are the things that make wait_task_rescheduled hard to recommend?

  • It supports multiple different wakeup patterns, several of which are exotic and rarely needed (e.g. probably 95% of abort function ignore the argument, and 95% unconditionally return Abort.SUCCEEDED). Maybe we should have a few different calls to implement the different patterns?
  • If you make a mistake and call reschedule twice, or call reschedule on a task that was already aborted, then your program probably explodes in some dramatic way instead of giving a useful error message. Add "one obvious way" for implementing the common multiplexed request/response pattern #467 talks about this a bit – we use the task object itself as the identifier for which wait_task_rescheduled you want to make return, which is convenient implementation-wise but it means that logically unrelated wakeups are constantly re-using the same identifiers, so if you make a mistake it's hard to untangle. (Now that I think about it, this is very similar to the problems with fd's being reused, like in Subtle bugs around closure handling in _unix_pipes.py and _windows_pipes.py #661...)

I'll add the potential api breaker label, in case this goes in the direction of redoing wait_task_rescheduled :-)

@ktarplee
Copy link

A almost cycling example of the above gist is here.

I was able to fix all the minor errors in the code and try to cycle it but it still fails catastrophically with an internal trio error.

task 3 started with cpu_count=15, gpu_count=2
task 3 running
task 9 started with cpu_count=38, gpu_count=0
task 4 started with cpu_count=22, gpu_count=2
task 7 started with cpu_count=20, gpu_count=2
task 2 started with cpu_count=14, gpu_count=1
task 1 started with cpu_count=32, gpu_count=0
task 0 started with cpu_count=21, gpu_count=1
task 6 started with cpu_count=1, gpu_count=2
task 8 started with cpu_count=22, gpu_count=0
task 8 running
task 5 started with cpu_count=11, gpu_count=1
task 3 finished
task 2 running
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/trio/_core/_run.py", line 1323, in run
    run_impl(runner, async_fn, args)
  File "/usr/local/lib/python3.7/site-packages/trio/_core/_run.py", line 1471, in run_impl
    runner.task_exited(task, final_outcome)
  File "/usr/local/lib/python3.7/site-packages/trio/_core/_run.py", line 944, in task_exited
    self.tasks.remove(task)
KeyError: <Task '__main__.worker' at 0x10ff8bc18>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "aux/trio_resource_pool.py", line 100, in <module>
    trio.run(main)
  File "/usr/local/lib/python3.7/site-packages/trio/_core/_run.py", line 1329, in run
    ) from exc
trio.TrioInternalError: internal error in trio - please file a bug!
Exception ignored in: <function Nursery.__del__ at 0x10fd5d510>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/trio/_core/_run.py", line 530, in __del__
AssertionError: 

@ktarplee
Copy link

The approach of using trio.Event instead of trio.hazmat.Task for the key of self._waiters works fine (but probably does not have the desired cancelling behavior that you guys want).

@njsmith
Copy link
Member

njsmith commented Jan 31, 2019

@ktarplee Oh, I see the bug, it's very silly. When we reschedule a task in the for loop at the bottom, we have to also remove it from the self._waiters dict, or else we might try to wake it up again later.

I'm not sure what this says about APIs. Maybe it's an argument for the fancier ParkingLot?

@ktarplee
Copy link

I have a working trio.Event based implementation of this same idea and that is what I had to do however I deleted the waiter after the await statement so I was not modifying the dict that we are iterating over. I tried that with the wait_task_rescheduled impl and had no luck.

Do I need to collect all tasks to reschedule, then delete them from waiters, then reschedule each?

@ktarplee
Copy link

Something like...

awake = []
for other_task, other_wanted in self._waiters.items():
    if other_wanted in self._available:
        self._available -= other_wanted
        awake.append(other_task)

for other_task in awake:
    del self._waiters[other_task]
    
for other_task in awake:
    trio.hazmat.reschedule(other_task)

@njsmith
Copy link
Member

njsmith commented Feb 2, 2019

@ktarplee The lazy solution is:

            for other_task, other_wanted in list(self._waiters.items()):
                if other_wanted in self._available:
                    self._available -= other_wanted
                    del self._waiters[other_task]
                    trio.hazmat.reschedule(other_task)

(Notice the added list() call, so that we're iterating over a copy of the dictionary items, instead of the dict itself.) Otherwise, if you want to be less lazy, your solution works too :-) (I'd probably merge the reschedule call into one of the other for loops.)

I have a working trio.Event based implementation of this same idea

So here's the problem with using trio.Event: suppose that someone does something like:

with trio.move_on_after(10):  # I'm only willing to wait 10 seconds for this
    async with resource_pool.claim(...):
        ...

And further suppose that the timeout expires while claim is waiting to acquire the resources, so it's blocked in event.wait(). When this happens, the event.wait() call raises trio.Cancelled, and we unwind out of the call to claim. And when we do... we leave behind the entry in resource_pool._waiters claiming that we're waiting for those resources, even though we aren't anymore. Later on, some other task will notice us waiting, and hand us those resources, excepting us to hand them back later when we're done. But we're already gone, so we'll never hand back the resources, so they'll just leak. It's a pretty serious correctness bug.

(You might think: well, maybe I can catch that Cancelled exception and remove the entry from the resource_pool._waiters. But this is complicated, because it could happen that someone handed us the resources in between when trio decided to send the Cancelled exception, and when the task was resumed and received the exception... in the wait_task_rescheduled version, this doesn't happen, because the abort_fn is run immediately when trio decides that it wants to send Cancelled, without waiting for the task to be resumed.)

@oremanj
Copy link
Member Author

oremanj commented Feb 4, 2019

If I try to concretize my original idea here, I get something like:

def fn():
    <bar>
with trio.hazmat.unpublish_using(fn):
    <foo>

is equivalent to

try:
    <foo>
finally:
    <bar>

except that if <foo> is cancelled, <bar> runs immediately when the cancellation occurs, without waiting for the cancelled task to actually be scheduled.

If people think this is a good idea, I don't think it would be difficult to implement, but it sounds like we may want to do something with broader scope instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants