Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JoinAll blocks on tokio #2140

Closed
najamelan opened this issue Apr 28, 2020 · 10 comments
Closed

JoinAll blocks on tokio #2140

najamelan opened this issue Apr 28, 2020 · 10 comments

Comments

@najamelan
Copy link
Contributor

najamelan commented Apr 28, 2020

Related to #2047, #2130. JoinAll hangs (consuming 100% CPU) in some situations with 128 tasks or more on tokio LocalSet. Since tokio 0.2.14 (tokio-rs/tokio#2160). I don't know the tokio executor internals very well, so I have a hard time telling whether this is the fault of JoinAll or something in the budget system tokio introduced, but in any case JoinAll will not yield if it can make progress... so it seems here the scheduler doesn't manage to wake it up anymore. Only the prints from inner will show. At 127, everything runs to completion.

@jonhoo do you see what's going on here?

use
{
	tokio   :: { task::LocalSet, runtime::Builder } ,
	futures :: { future::join_all                 } ,
};


fn main()
{
	let mut exec    = Builder::new().basic_scheduler().build().unwrap();
	let     set     = LocalSet::new();
	let mut handles = Vec::new();

	for i in 1..=128
	{
		handles.push( set.spawn_local( async move
		{
			tokio::task::spawn_local( async move
			{
				println!( "Running inner {}", i );

			}).await.unwrap();

			println!( "Running {}", i );
		}));
	}

	exec.block_on( set.run_until( join_all( handles ) ) );
}
@jonhoo
Copy link
Contributor

jonhoo commented Apr 28, 2020

Hmm, looking at the implementation of join_all, it certainly suffers the problem of polling all the non-ready futures on every top-level poll, which isn't great. It should probably have some kind of restriction the same way FuturesUnordered has. Or alternatively it should just switch to using FuturesUnordered under the hood. There are some other inefficiencies in join_all that would also be fixed by doing so, such as it currently having to walk all the futures even if only a few aren't ready.

As for why it hangs with the new coop stuff, I don't immediately see a reason why. In particular, it doesn't have the same problem as FuturesUnordered where it'd enter an infinite loop if a future was always marked as ready during polling — join_all only ever walks all the futures once for any given call to poll. My guess would actually be that it is related to LocalSet, not to join_all. Can you reproduce without using LocalSet?

@najamelan
Copy link
Contributor Author

najamelan commented Apr 28, 2020

Yes, there seem indeed to be 2 issues here. I didn't really know whether to report here or at tokio.

In tests I did, it didn't happen with threadpool. I will add some logging to localset and look at what's looping, since as the cpu keeps working, it's doing something. However, it only happens from when budget was introduced, I bisected it.

@hawkw
Copy link
Contributor

hawkw commented Apr 28, 2020

@najamelan thanks for looking into this, it would be really helpful to know if the issue can be reproduced with just join_all or just LocalSet — if the issue doesn't exist with Tokio's threadpool, it might also be worth trying with Tokio's single-threaded basic_scheduler but no LocalSet.

@hawkw
Copy link
Contributor

hawkw commented Apr 28, 2020

FWIW, I haven't been able to get the issue to occur without LocalSet. However, I did notice that it still occurs if I change the inner spawn to be a tokio::spawn rather than a spawn_local, even when the threadpool is used rather than the basic scheduler. This does seem like a LocalSet bug — might be worth opening a new ticket against Tokio's issue tracker.

@hawkw
Copy link
Contributor

hawkw commented Apr 28, 2020

Hmm, if I replace join_all with FuturesUnordered, I can no longer reproduce the hang:

use futures::stream::{futures_unordered::FuturesUnordered, StreamExt};
use tokio::{runtime::Builder, task::LocalSet};
fn main() {
    let mut exec = Builder::new().basic_scheduler().build().unwrap();
    let set = LocalSet::new();
    let mut handles = FuturesUnordered::new();

    for i in 1..=128 {
        handles.push(set.spawn_local(async move {
            tokio::task::spawn_local(async move {
                println!("Running inner {}", i);
            })
            .await
            .unwrap();

            println!("Running {}", i);
        }));
    }

    exec.block_on(set.run_until(handles.for_each(|_| async { () })));
}

This code runs to completion.

@najamelan
Copy link
Contributor Author

Here is a flamegraph of it looping. From what I can tell, it's just polling the JoinAll over and over, and that always returns Pending, because there is always futures that are pending. It seems they are stuck on the inner task, but that has already printed to the console and really has nothing more to do. Why this happens is not yet clear to me and how the budget interferes with that:

https://sendeyo.com/up/d/f1de51b8fd

@najamelan
Copy link
Contributor Author

Oh yes, of course, it is because the joinall holds the JoinHandles, which don't really poll the actual inner tasks, so it is just LocalSet never scheduling them again. In principle they should have never been pending, because they don't await anything, but somehow the budget must cause it to not consider them complete and never schedule them again.

@najamelan
Copy link
Contributor Author

najamelan commented Apr 28, 2020

Ok, I'm starting to see a pattern. Without a budget this happens:

Outer::poll 1
Outer::poll - inner joinhandle returned pending 1
Outer::poll 2
Outer::poll - inner joinhandle returned pending 2
Outer::poll 3
Outer::poll - inner joinhandle returned pending 3
Outer::poll 4
Outer::poll - inner joinhandle returned pending 4
Outer::poll 5
Outer::poll - inner joinhandle returned pending 5
Polling inner 1
Polling inner 2
Polling inner 3
Polling inner 4
Polling inner 5
Outer::poll 1
Running 1
Outer::poll 2
Running 2
Outer::poll 3
Running 3
Outer::poll 4
Running 4
Outer::poll 5
Running 5
  • Step 1: So it polls the outers, which immediately await the joinhandle of the inners they just spawned. Those JoinHandles return Pending at this moment, because the inners haven't run yet.
  • Step 2: It continues to poll the inner tasks which hadn't been polled yet. They all return Ready.
  • Step 3: It reruns the outers and all the JoinHandles return Ready.

Here there is no budget interfering, because it's 128, and we never went so long without Pending.

Now with 127 tasks, things change quite a bit:

Outer::poll 1
Outer::poll - inner joinhandle returned pending 1
Outer::poll 2
...
Outer::poll 126
Outer::poll - inner joinhandle returned pending 126
Outer::poll 127
Outer::poll - inner joinhandle returned pending 127
--- RUN 2
Polling inner 1
Polling inner 2
Outer::poll 2
Outer::poll - inner joinhandle returned pending 2      <-- Strange, inner is done but JoinHandle doesn't think so.
Polling inner 3
Outer::poll 3
Outer::poll - inner joinhandle returned pending 3
...
Polling inner 28
Outer::poll 28
Outer::poll - inner joinhandle returned pending 28
Polling inner 29
Outer::poll 29
Running 29                                             <-- outer finished
Polling inner 30
Outer::poll 30
Outer::poll - inner joinhandle returned pending 30
...
Polling inner 59
Outer::poll 59
Outer::poll - inner joinhandle returned pending 59
Polling inner 60
Outer::poll 60
Running 60                                             <-- outer finished
Polling inner 61
Outer::poll 61
Outer::poll - inner joinhandle returned pending 61
Polling inner 62
Polling inner 63
Outer::poll 63
Outer::poll - inner joinhandle returned pending 63
...

Step one is the same. It runs through all outers, all inner joinhandles return pending. But then, it starts interleaving them with the inners always running before the outers, but the JoinHandles still return Pending. Except a few. Overall, it takes 6 runs through the LocalSet for it to complete, with more and more JoinHandles gradually returning Ready, even though theoretically it could be done in 2. Looks like the interleaving is the effect of the budget.

Now with 128 and over:

Outer::poll 1
Outer::poll - inner joinhandle returned pending 1
Outer::poll 2
...
Outer::poll 127
Outer::poll - inner joinhandle returned pending 127
Outer::poll 128
Outer::poll - inner joinhandle returned pending 128
---
Polling inner 1
Outer::poll 1
Outer::poll - inner joinhandle returned pending 1
Polling inner 2
Outer::poll 2
Outer::poll - inner joinhandle returned pending 2
...
Polling inner 127
Outer::poll 127
Outer::poll - inner joinhandle returned pending 127
Polling inner 128
Outer::poll 128
Outer::poll - inner joinhandle returned pending 128
---
Outer::poll 1
Outer::poll - inner joinhandle returned pending 1
...
Outer::poll 127
Outer::poll - inner joinhandle returned pending 127
Outer::poll 128
Outer::poll - inner joinhandle returned pending 128

The JoinHandles never declare Ready. Eternally Pending and it just keeps looping over the outers.

It looks like the underlying issue is an inefficiency in the synchronization of the JoinHandles, with the budget applying the finishing blow. Im not familiar with the internals of the tokio schedulers, so I'm not sure what exactly happens.

@najamelan
Copy link
Contributor Author

The code for that produced the logs:

use
{
	tokio   :: { task::*, runtime::Builder } ,
	futures :: { future::join_all                 } ,
	std     :: { future::*, task::*, pin::*       } ,
};

fn main()
{
	let mut exec    = Builder::new().basic_scheduler().build().unwrap();
	let     set     = LocalSet::new();
	let mut handles = Vec::new();

	for i in 1..=5
	{
		handles.push( set.spawn_local( Outer{i, inner: None} ));
	}

	exec.block_on( set.run_until( join_all( handles ) ) );
}


struct Outer
{
	i: usize,
	inner: Option<JoinHandle<()>>,
}


struct Inner
{
	i: usize,
}


impl Future for Inner
{
	type Output = ();

	fn poll( self: Pin< &mut Self>, _cx: &mut Context<'_> ) -> Poll<()>
	{
		println!( "Polling inner {}", self.i );
		Poll::Ready(())
	}
}


impl Future for Outer
{
	type Output = ();

	fn poll( mut self: Pin< &mut Self>, cx: &mut Context<'_> ) -> Poll<()>
	{
		println!( "Outer::poll {}", self.i );

		let mut handle = self.inner.take().unwrap_or_else( ||
		{
			tokio::task::spawn_local( Inner{i: self.i} )
		});

		match Pin::new( &mut handle ).poll(cx)
		{
			Poll::Pending =>
			{
				println!( "Outer::poll - inner returned pending {}", self.i );
				self.inner = handle.into();
				return Poll::Pending
			}

			Poll::Ready(_) => {}
		}

		println!( "Running {}", self.i );
		Poll::Ready(())
	}
}

@najamelan
Copy link
Contributor Author

Closing in favor of the tokio issue. I will open a new issue for questioning the implementation of JoinAll.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants