-
Notifications
You must be signed in to change notification settings - Fork 4
/
CancellationToken.cpp
206 lines (178 loc) · 7.38 KB
/
CancellationToken.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// ==++==
//
// Copyright (c) Microsoft Corporation. All rights reserved.
//
// ==--==
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// CancellationToken.cpp
//
// Cancellation token implementation
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
#include "concrtinternal.h"
namespace Concurrency
{
namespace details
{
_CRTIMP _CancellationTokenState * _CancellationTokenState::_NewTokenState()
{
return new _CancellationTokenState();
}
_CRTIMP void _CancellationTokenState::_Destroy()
{
delete this;
}
_CRTIMP _CancellationTokenState::_CancellationTokenState() :
_M_stateFlag(0)
{
static_assert(sizeof(SafeRWList<ListEntry>) <= sizeof(_M_registrations), "size of _M_registrations too small for list entry");
new(_M_registrations) SafeRWList<ListEntry>();
}
_CRTIMP _CancellationTokenState::~_CancellationTokenState()
{
List<ListEntry> rundownList;
SafeRWList<ListEntry> *pList = reinterpret_cast<SafeRWList<ListEntry> *>(_M_registrations);
pList->Swap(&rundownList);
ListEntry *pNext = NULL;
for(ListEntry *pLE = rundownList.First(); pLE != NULL; pLE = pNext)
{
pNext = rundownList.Next(pLE);
_CancellationTokenRegistration *pRegistration = CONTAINING_RECORD(pLE, _CancellationTokenRegistration, _M_listEntry);
pRegistration->_M_state = ::_CancellationTokenRegistration::_STATE_SYNCHRONIZE;
pRegistration->_Release();
}
}
_CRTIMP void _CancellationTokenState::_Cancel()
{
if (InterlockedCompareExchange(&_M_stateFlag, 1, 0) == 0)
{
List<ListEntry> rundownList;
SafeRWList<ListEntry> *pList = reinterpret_cast<SafeRWList<ListEntry> *>(_M_registrations);
pList->Swap(&rundownList);
ListEntry *pNext = NULL;
for(ListEntry *pLE = rundownList.First(); pLE != NULL; pLE = pNext)
{
pNext = rundownList.Next(pLE);
_CancellationTokenRegistration *pRegistration = CONTAINING_RECORD(pLE, _CancellationTokenRegistration, _M_listEntry);
pRegistration->_Invoke();
}
_M_stateFlag = 2;
_M_cancelComplete.set();
}
}
_CRTIMP void _CancellationTokenState::_RegisterCallback(_CancellationTokenRegistration *_PRegistration)
{
_PRegistration->_M_state = _CancellationTokenRegistration::_STATE_CLEAR;
_PRegistration->_Reference();
_PRegistration->_M_pTokenState = this;
bool invoke = true;
if (!_IsCanceled())
{
SafeRWList<ListEntry> *pList = reinterpret_cast<SafeRWList<ListEntry> *>(_M_registrations);
SafeRWList<ListEntry>::_Scoped_lock _lock(*pList);
if (!_IsCanceled())
{
invoke = false;
pList->UnlockedAddTail(reinterpret_cast<ListEntry *>(&_PRegistration->_M_listEntry));
}
}
if (invoke)
{
_PRegistration->_Invoke();
}
}
_CRTIMP _CancellationTokenRegistration * _CancellationTokenState::_RegisterCallback(TaskProc _Proc, void *_PData, int _InitialRefs)
{
_CancellationTokenRegistration *pRegistration = new CancellationTokenRegistration_TaskProc(_Proc, _PData, _InitialRefs);
_RegisterCallback(pRegistration);
return pRegistration;
}
_CRTIMP void _CancellationTokenState::_DeregisterCallback(_CancellationTokenRegistration *_PRegistration)
{
bool synchronize = false;
{
SafeRWList<ListEntry> *pList = reinterpret_cast<SafeRWList<ListEntry> *>(_M_registrations);
SafeRWList<ListEntry>::_Scoped_lock _lock(*pList);
//
// If a cancellation has occurred, the registration list is guaranteed to be empty if we've observed it under the auspices of the
// lock. In this case, we must synchronize with the cancelling thread to guarantee that the cancellation is finished by the time
// we return from this method.
//
if (!pList->Empty())
{
pList->UnlockedRemove(reinterpret_cast<ListEntry *>(&_PRegistration->_M_listEntry));
_PRegistration->_M_state = ::_CancellationTokenRegistration::_STATE_SYNCHRONIZE;
_PRegistration->_Release();
}
else
{
synchronize = true;
}
}
//
// If the list is empty, we are in one of several situations:
//
// - The callback has already been made --> do nothing
// - The callback is about to be made --> flag it so it doesn't happen and return
// - The callback is in progress elsewhere --> synchronize with it
// - The callback is in progress on this thread --> do nothing
//
if (synchronize)
{
long result = InterlockedCompareExchange(
&_PRegistration->_M_state,
_CancellationTokenRegistration::_STATE_DEFER_DELETE,
_CancellationTokenRegistration::_STATE_CLEAR
);
switch(result)
{
case _CancellationTokenRegistration::_STATE_CLEAR:
case _CancellationTokenRegistration::_STATE_CALLED:
break;
case _CancellationTokenRegistration::_STATE_DEFER_DELETE:
case _CancellationTokenRegistration::_STATE_SYNCHRONIZE:
ASSERT(false);
break;
default:
{
DWORD tid = (long)result;
if (tid == GetCurrentThreadId())
{
//
// It is entirely legal for a caller to Deregister during a callback instead of having to provide their own synchronization
// mechanism between the two. In this case, we do *NOT* need to explicitly synchronize with the callback as doing so would
// deadlock. If the call happens during, skip any extra synchronization.
//
break;
}
event e;
_PRegistration->_M_pSyncBlock = &e;
long result = InterlockedExchange(&_PRegistration->_M_state, _CancellationTokenRegistration::_STATE_SYNCHRONIZE);
if (result != _CancellationTokenRegistration::_STATE_CALLED)
{
e.wait();
}
break;
}
}
}
}
_CRTIMP void _CancellationTokenRegistration::_Invoke()
{
DWORD tid = GetCurrentThreadId();
ASSERT(tid > 3); // If this ever fires, we need a different encoding for this.
long result = InterlockedCompareExchange(&_M_state, (long)tid, _STATE_CLEAR);
if (result == _STATE_CLEAR)
{
_Exec();
result = InterlockedCompareExchange(&_M_state, _STATE_CALLED, (long)tid);
if (result == _STATE_SYNCHRONIZE)
{
_M_pSyncBlock->set();
}
}
_Release();
}
} // namespace details
} // namespace Concurrency