Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add three-stage enqueuer/dispatcher scheme to SocketAsyncEngine, ThreadPoolWorkQueue and ThreadPoolTypedWorkItemQueue #100506

Merged
merged 21 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,23 @@ private static SocketAsyncEngine[] CreateEngines()
//
private readonly ConcurrentQueue<SocketIOEvent> _eventQueue = new ConcurrentQueue<SocketIOEvent>();

// The scheme works as following:
// From NotScheduled, the only transition is to Scheduled when new events are enqueued and a work item is enqueued to process them.
// From Scheduled, the only transition is to Determining right before trying to dequeue an event.
// From Determining, it can go to either NotScheduled when no events are present in the queue (the previous work item processed all of them)
// or Scheduled if the queue is still not empty (let the current work item handle parallelization as convinient).
eduardo-vp marked this conversation as resolved.
Show resolved Hide resolved
//
// This field is set to 1 to indicate that a thread pool work item is scheduled to process events in _eventQueue. It is
// set to 0 when the scheduled work item starts running, to indicate that a thread pool work item to process events is
// not scheduled. Changes are protected by atomic operations as appropriate.
//
private int _eventQueueProcessingRequested;
// The goal is to avoid enqueueing more work items than necessary, while still ensuring that all events are processed.
// Another work item isn't enqueued to the thread pool hastily while the state is Determining,
// instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time.
private enum EventQueueProcessingStage
{
NotScheduled,
Determining,
Scheduled
}

private int _eventQueueProcessingStage;

//
// Registers the Socket with a SocketAsyncEngine, and returns the associated engine.
Expand Down Expand Up @@ -190,9 +201,14 @@ private void EventLoop()
// The native shim is responsible for ensuring this condition.
Debug.Assert(numEvents > 0, $"Unexpected numEvents: {numEvents}");

if (handler.HandleSocketEvents(numEvents))
// Only enqueue a work item if the stage is NotScheduled.
// Otherwise there must be a work item already queued or another thread already handling parallelization.
if (handler.HandleSocketEvents(numEvents) &&
Interlocked.Exchange(
ref _eventQueueProcessingStage,
(int)EventQueueProcessingStage.Scheduled) == (int)EventQueueProcessingStage.NotScheduled)
{
ScheduleToProcessEvents();
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}
}
}
Expand All @@ -202,42 +218,67 @@ private void EventLoop()
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ScheduleToProcessEvents()
private void UpdateEventQueueProcessingStage(bool isEventQueueEmpty)
{
// Schedule a thread pool work item to process events. Only one work item is scheduled at any given time to avoid
// over-parallelization. When the work item begins running, this field is reset to 0, allowing for another work item
// to be scheduled for parallelizing processing of events.
if (Interlocked.CompareExchange(ref _eventQueueProcessingRequested, 1, 0) == 0)
if (!isEventQueueEmpty)
{
ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
// There are more events to process, set stage to Scheduled and enqueue a work item.
_eventQueueProcessingStage = (int)EventQueueProcessingStage.Scheduled;
}
else
{
// The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
// otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
// would not have scheduled a work item to process the work, so try to dequeue a work item again.
eduardo-vp marked this conversation as resolved.
Show resolved Hide resolved
int stageBeforeUpdate =
Interlocked.CompareExchange(
ref _eventQueueProcessingStage,
(int)EventQueueProcessingStage.NotScheduled,
(int)EventQueueProcessingStage.Determining);
Debug.Assert(stageBeforeUpdate != (int)EventQueueProcessingStage.NotScheduled);
if (stageBeforeUpdate == (int)EventQueueProcessingStage.Determining)
{
return;
}
}

ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
}

void IThreadPoolWorkItem.Execute()
{
// Indicate that a work item is no longer scheduled to process events. The change needs to be visible to enqueuer
// threads (only for EventLoop() currently) before an event is attempted to be dequeued. In particular, if an
// enqueuer queues an event and does not schedule a work item because it is already scheduled, and this thread is
// the last thread processing events, it must see the event queued by the enqueuer.
Interlocked.Exchange(ref _eventQueueProcessingRequested, 0);

ConcurrentQueue<SocketIOEvent> eventQueue = _eventQueue;
if (!eventQueue.TryDequeue(out SocketIOEvent ev))
SocketIOEvent ev;
while (true)
{
return;
}
Debug.Assert(_eventQueueProcessingStage == (int)EventQueueProcessingStage.Scheduled);
_eventQueueProcessingStage = (int)EventQueueProcessingStage.Determining;
Interlocked.MemoryBarrier();

int startTimeMs = Environment.TickCount;
if (eventQueue.TryDequeue(out ev))
{
break;
}
eduardo-vp marked this conversation as resolved.
Show resolved Hide resolved

// An event was successfully dequeued, and there may be more events to process. Schedule a work item to parallelize
// processing of events, before processing more events. Following this, it is the responsibility of the new work
// item and the epoll thread to schedule more work items as necessary. The parallelization may be necessary here if
// the user callback as part of handling the event blocks for some reason that may have a dependency on other queued
// socket events.
ScheduleToProcessEvents();
// The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
// otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
// would not have scheduled a work item to process the work, so try to dequeue a work item again.
int stageBeforeUpdate =
Interlocked.CompareExchange(
ref _eventQueueProcessingStage,
(int)EventQueueProcessingStage.NotScheduled,
(int)EventQueueProcessingStage.Determining);
Debug.Assert(stageBeforeUpdate != (int)EventQueueProcessingStage.NotScheduled);
if (stageBeforeUpdate == (int)EventQueueProcessingStage.Determining)
{
return;
}
}

while (true)
UpdateEventQueueProcessingStage(eventQueue.IsEmpty);

int startTimeMs = Environment.TickCount;
do
{
ev.Context.HandleEvents(ev.Events);

Expand All @@ -253,19 +294,7 @@ void IThreadPoolWorkItem.Execute()
// using Stopwatch instead (like 1 ms, 5 ms, etc.), from quick tests they appeared to have a slightly greater
// impact on throughput compared to the threshold chosen below, though it is slight enough that it may not
// matter much. Higher thresholds didn't seem to have any noticeable effect.
if (Environment.TickCount - startTimeMs >= 15)
{
break;
}

if (!eventQueue.TryDequeue(out ev))
{
return;
}
}

// The queue was not observed to be empty, schedule another work item before yielding the thread
ScheduleToProcessEvents();
} while (Environment.TickCount - startTimeMs < 15 && eventQueue.TryDequeue(out ev));
}

private void FreeNativeResources()
Expand Down
Loading
Loading