Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread-safety bug for the pipeline's stopping state #272

Open
austinbhale opened this issue Jan 11, 2023 · 1 comment
Open

Thread-safety bug for the pipeline's stopping state #272

austinbhale opened this issue Jan 11, 2023 · 1 comment

Comments

@austinbhale
Copy link
Contributor

austinbhale commented Jan 11, 2023

Hi, I receive exceptions when trying to dispose a data replay pipeline close to its end. Two threads enter Pipeline.Stop at a similar time, which leads to one thread thinking the pipeline is in the Completed state while the other is still trying to stop it.

Context

I am implementing a seeking capability for replaying data from a store using the PsiStoreStreamReader and Importer. The importer reads data from a store and disposes of the pipeline when there are no more messages. The user can stop the replay at any time, which calls Pipeline.Dispose from another thread.

When (T1) the replay stops on its own and (T2) user disposes of the pipeline at a similar time, there are cases where two threads (T1 + T2) have entered Pipeline.Stop simultaneously. The current way of handling this isn't sufficient as the following IsStopping check evaluates to false for both threads, since its state is set to Stopping after the check:

Solution

In terms of keeping consistent with your state pattern, I'm not sure if you want to ensure thread safety for every pipeline state. The most pressing fix was to change:
if (this.IsStopping) -> if (Interlocked.CompareExchange(ref this.isPipelineStopping, 1, 0) != 0)

Which is similar to how you check if a pipeline's been disposed:

if (Interlocked.CompareExchange(ref this.isPipelineDisposed, 1, 0) != 0)

You could also have this private property private long isPipelineStopping = 0; as a long type so you can keep using the IsStopping getter as IsStopping => Interlocked.Read(ref this.isPipelineStopping) == 1;

@chitsaw
Copy link
Contributor

chitsaw commented Jan 12, 2023

Thanks for pointing this out and for your suggestions! Yes, this has been broken for some time and we should definitely fix it. However, looking more closely at the code, I see a few additional issues potentially affecting state consistency, so we might need to come up with a more of a holistic solution that would require a bit more thought. Hopefully the workaround you have above will be adequate until we come up with a more permanent fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants