Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Task.WhenEach to process tasks as they complete #100316

Merged
merged 6 commits into from
Apr 2, 2024

Conversation

stephentoub
Copy link
Member

Fixes #61959

Copy link

Note regarding the new-api-needs-documentation label:

This serves as a reminder for when your PR is modifying a ref *.cs file and adding/modifying public APIs, please make sure the API implementation in the src *.cs file is documented with triple slash comments, so the PR reviewers can sign off that change.

@toupswork

This comment was marked as outdated.

@stephentoub stephentoub merged commit ab2bd48 into dotnet:main Apr 2, 2024
147 of 150 checks passed
@stephentoub stephentoub deleted the taskwheneach branch April 2, 2024 02:06
@MattParkerDev
Copy link

Will this be backported to .NET 8?

@stephentoub
Copy link
Member Author

Will this be backported to .NET 8?

No

@spektor56
Copy link

spektor56 commented Apr 9, 2024

@stephentoub
Any way to use this method with a timeout if the next task doesn't return in the allotted time? With the old way I just do something like:

 while (connectionRequests.Count > 0)
 {
  var cts = new CancellationTokenSource();
  var timeout = Task.Delay(10000, cts.Token);
  var result = await Task.WhenAny(Task.WhenAny(connectionRequests), timeout).ConfigureAwait(false);
  
  if (result != timeout)
  {
      ...
     yield return obj;
  }
  else
  {
      //no new task completed in 10 seconds, lets end this
  }
}

I'm not seeing any way to add a timeout with the new method, that would result in much cleaner code, thanks. I had the code like this before because in older versions of .NET some ping requests would just hang around forever and never return in Android so I just added this 10 second timeout to handle that case.

@davidfowl
Copy link
Member

Maybe something like this:

using var cts = new CancellationTokenSource(10_000);
await foreach (var item in Task.WhenEach(connectionRequests).WithCancellation(cts.Token))
{
     ProcessItem(item.Result);
     cts.TryReset(); // We got an item, so reset the token
}

@am11
Copy link
Member

am11 commented Apr 10, 2024

cts.TryReset(); // We got an item, so reset the token

Is this necessary? e.g.

Task<int>[] tasks = [
    Task.Run(() => 1),
    Task.Run(() => {Task.Delay(20_000).GetAwaiter().GetResult(); return 2; }),
    Task.Run(() => 3),
    Task.Run(() => {Task.Delay(20_000).GetAwaiter().GetResult(); return 4; }),
    Task.Run(() => 5)
];

using var cts = new CancellationTokenSource(10_000);
try
{
    await foreach (var item in Task.WhenEach(tasks).WithCancellation(cts.Token))
    {
        Console.WriteLine($"{item.Result} at {DateTime.UtcNow.ToString("o")}");
        //cts.TryReset(); // We got an item, so reset the token
    }
}
catch (TaskCanceledException ex)
{
    Console.WriteLine($"Exception at {DateTime.UtcNow.ToString("o")} {ex}");
}

dotnet9 run (daily build) outputs the expected:

1 at 2024-04-10T09:11:34.2177597Z
3 at 2024-04-10T09:11:34.2282278Z
5 at 2024-04-10T09:11:34.2282734Z
Exception at 2024-04-10T09:11:44.2201223Z System.Threading.Tasks.TaskCanceledException: A task was canceled.
   at System.Threading.Tasks.Task.WhenEachState.Iterate[T](WhenEachState waiter, CancellationToken cancellationToken)+MoveNext()
   at Program.<Main>$(String[] args) in /tmp/testtask/Program.cs:line 12
   at Program.<Main>$(String[] args) in /tmp/testtask/Program.cs:line 12

after uncommenting cts.TryReset():

1 at 2024-04-10T09:12:36.2741059Z
3 at 2024-04-10T09:12:36.2843617Z
5 at 2024-04-10T09:12:36.2843913Z
2 at 2024-04-10T09:12:56.2735385Z
4 at 2024-04-10T09:12:56.2736148Z

@MihaZupan
Copy link
Member

Try

using var cts = new CancellationTokenSource(10_000);
await foreach (var item in Task.WhenEach(connectionRequests).WithCancellation(cts.Token))
{
    // If TryReset fails, the timeout expired right after the last task completed.
    if (!cts.TryReset()) cts.Token.ThrowIfCancellationRequested();

    ProcessItem(item.Result);

    cts.CancelAfter(10_000); // Start the timer again
}

@davidfowl
Copy link
Member

Ah yes you have to call CancelAfter again! (I should have tested 😄 )

@spektor56
Copy link

didn't know about the "WithCancellation", this should work, thanks.

@davidfowl
Copy link
Member

davidfowl commented Apr 10, 2024

@am11 that works but is pretty inefficient. If you have a list of tasks then you need to create a new set of tasks and timers for each item vs reusing a single timer and cancellation token.

The reset is required because the timeout for the next item, not the entire operation IIUC.

@spektor56
Copy link

spektor56 commented Apr 10, 2024

Oh, i didn't know about it because it looks like it doesn't exist... I looked at the issue and it looks like it was implemented as "WaitAsync" in 2021. Where are you guys getting "WithCancellation" from?

If it was named "WithCancellation" im sure I would have found it when I was looking for a way to cancel a task, I would have never looked for "WaitAsync"

@davidfowl
Copy link
Member

davidfowl commented Apr 10, 2024

Where are you guys getting "WithCancellation" from?

It's an extension method on IAsyncEnumerable<T>

@davidfowl
Copy link
Member

davidfowl commented Apr 10, 2024

WaitAsync is different

@spektor56
Copy link

Where are you guys getting "WithCancellation" from?

It's an extension method on IAsyncEnumerable

Ah I see now, was just looking at Task

@spektor56
Copy link

spektor56 commented Apr 10, 2024

The only problem I see now is that this throws an exception instead of my previous code that did not when using a delay with "WhenAny", because of this I can no longer yield return inside of the "await foreach" block.

Looks like its easier to just do it the original way for my use case.

@spektor56
Copy link

I see there is ConfigureAwaitOptions.SuppressThrowing now but it's not applicable to ConfigureAwait on IAsyncEnumerable, is there a technical reason why it cant be applied here?

@davidfowl
Copy link
Member

How would you use it?

@spektor56
Copy link

spektor56 commented Apr 16, 2024

something like:

await foreach (var item in Task.WhenEach(connectionRequests).WithCancellation(cts.Token).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing))
{
    yield return device;
}

not sure how that would be able to actually end the foreach loop like an exception is able to though. I just can't have an exception here because I'm not allowed to try/catch around a "yield return".

On second thought, I think having the cancellation exception thrown up is actually better for my case since I can just handle it at the higher level and log that the scan was not complete there instead of here.

@stephentoub
Copy link
Member Author

On second thought, I think having the cancellation exception thrown up is actually better for my case since I can just handle it at the higher level and log that the scan was not complete there instead of here.

Yeah, I'd be concerned about exposing/supporting .ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing) in such use. The mental model for how ConfigureAwait applies is that it's replicated for each await, e.g. each await MoveNextAsync(). Those tasks are hidden from you with this usage, so if exceptions were suppressed from there, you'd have no way of knowing that exceptions occurred, and it'd be easy to accidentally call back into the enumerator even after it failed, at which point it could easily be in a corrupted state.

matouskozak pushed a commit to matouskozak/runtime that referenced this pull request Apr 30, 2024
* Add Task.WhenEach to process tasks as they complete

* Address PR feedback

* Fix some async void tests to be async Task across libs

* Remove extra awaiter field from state machine

Also clean up an extra level of indentation.
@github-actions github-actions bot locked and limited conversation to collaborators May 18, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[API Proposal]: Making "Process asynchronous tasks as they complete" easy by using IAsyncEnumerable
9 participants