-
Notifications
You must be signed in to change notification settings - Fork 1
/
checkout_queue.py
74 lines (65 loc) · 2.56 KB
/
checkout_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import heapq
import time
from collections import deque
from itertools import count
class CheckoutQueue:
"""Queue that waits for confirmation before removing items.
Instead of just popping an item, a client "checks out" an item for a
specified duration, during which it is unavailable to other clients.
The client must then separately signal the "completion" of that item
within the duration (according to the queue's reckoning), or else
the item *may* be returned to the front of the queue and checked out
to another client. (The current implementation only guarantees this
behavior if the queue receives another checkout request after the
item's deadline but before the item is completed.)
This class is not thread safe. Use asyncio or a message passing
system for concurrent access.
"""
def __init__(self, clock=time.monotonic):
self.clock = clock
self.ids = count()
self._todo = deque()
self._deadlines = [] # heap of (deadline, id)
self._checked_out = {}
def put(self, obj):
self._todo.append((next(self.ids), obj))
def checkout(self, duration=1):
self._collect_overdue()
id, obj = self._todo.popleft()
deadline = self.clock() + duration
heapq.heappush(self._deadlines, (deadline, id))
self._checked_out[id] = obj
return id, obj
def complete(self, id):
try:
self._checked_out.pop(id)
except KeyError:
return False
else:
return True
def _collect_overdue(self):
while True:
try:
# Get the checked-out task with the earlist deadline
deadline, id = self._deadlines[0]
except IndexError:
# No tasks have been checked out; we're done
return
if self.clock() < deadline:
# The task is not overdue; we're done
return
else:
# The task is overdue
heapq.heappop(self._deadlines)
try:
obj = self._checked_out.pop(id)
except KeyError:
# The task was already completed;
# continue on to the next one
continue
else:
# Return the task to the start of the to-do list
self._todo.appendleft((id, obj))
# We only need to do this once for each checkout
# in order to prevent starvation
return