Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

C# large streams do not permit async event handlers #186

Open
Niproblema opened this issue Feb 28, 2022 · 8 comments
Open

C# large streams do not permit async event handlers #186

Niproblema opened this issue Feb 28, 2022 · 8 comments

Comments

@Niproblema
Copy link

I have figured that sending large streams just does not work when using async.
For example,

Client side:

                    case "sendlargeasync":
                        Stream stream = GetStream(2000);
                        success = client.SendAsync(stream.Length, stream).Result;
                        Console.WriteLine(success);
                        break;

where GetStream() generates some random large stream. E.g.

        private static MemoryStream GetStream(long mb)
        {
            Random rng = new Random();
            long size = mb * 1024 * 1024;
            if (size < int.MaxValue - 60)
            {

                byte[] streamContents = new byte[mb * 1024 * 1024];
                rng.NextBytes(streamContents);
                MemoryStream memoryStream = manager.GetStream(streamContents);

                memoryStream.Position = 0;
                return memoryStream;
            }
            else
            {
                MemoryStream memoryStream = manager.GetStream();
                memoryStream.WriteByte((byte)rng.Next(256));
                memoryStream.Seek(mb * 1024 * 1024, SeekOrigin.Begin);
                memoryStream.WriteByte((byte)rng.Next(256));
                memoryStream.Position = 0;
                return memoryStream;
            }
        }

Server side:

        private static void StreamReceived(object sender, StreamReceivedEventArgs args)
        {
            try
            {
                MemoryStream ss = new MemoryStream();
                args.DataStream.CopyTo(ss);

This works perfectly, but I require non blocking approach...

        private static async void StreamReceived(object sender, StreamReceivedEventArgs args)
        {
            try
            {
                await Task.Delay(100);
                // At this point Environment.CurrentManagedThreadId is different to the one that the library invoked the event handler with.

                MemoryStream ss = new MemoryStream();
                args.DataStream.CopyTo(ss);   
           ....

In this example deadlock occurs and the copyTo() never returns. Meanwhile the client receives successful sendAsync and returns. This is true only when stream is over MaxProxiedStreamSize . (See #168 (reply in thread)_)

@Niproblema Niproblema changed the title C# large streams require same thread C# large streams do not permit async event handlers Feb 28, 2022
@Niproblema
Copy link
Author

Niproblema commented Feb 28, 2022

Would it be possible to create a callback for async streams? How else am I supposed to use stream.ReadAsync()?

@jchristn
Copy link
Collaborator

Hi @Niproblema perhaps I'm not understanding the problem correctly, but, you can't have multiple readers concurrently on the underlying socket. If I'm understanding what you're asking, you want one stream to be handled by one event, and, another stream handled by a different event. Is that correct?

@Niproblema
Copy link
Author

@jchristn I want to have multiple clients connected to one tcp server. As far as I understand each client is anyway using own dedicated server socket, right? So I should be able to stream files from multiple clients to a single server concurrently, right? It would defeat it's purpose if all but one clients were waiting for the one to finish.

However, I am talking about a different issue here. Server library exposes new stream through a C# event handler. However event handlers are single thread synchronous methods. I cannot use async within them without either blocking the original thread with calling Result (e.g. stream.CopyToAsync().Result;) or returning the task into void event handler, which apparently closes stream and causes a deadlock, if reading is attempted.

@Niproblema
Copy link
Author

Niproblema commented Feb 28, 2022

Looking at WatsonTcpServer.cs in private async Task DataReceiver

...
                        else if (_Events.IsUsingStreams)
                        {
                            StreamReceivedEventArgs sr = null;
                            WatsonStream ws = null;

                            if (msg.ContentLength >= _Settings.MaxProxiedStreamSize)
                            {
                                ws = new WatsonStream(msg.ContentLength, msg.DataStream);
                                sr = new StreamReceivedEventArgs(client.IpPort, msg.Metadata, msg.ContentLength, ws);
                                _Events.HandleStreamReceived(this, sr); 
                            }
                            else
                            {
                                MemoryStream ms = WatsonCommon.DataStreamToMemoryStream(msg.ContentLength, msg.DataStream, _Settings.StreamBufferSize);
                                ws = new WatsonStream(msg.ContentLength, ms); 
                                sr = new StreamReceivedEventArgs(client.IpPort, msg.Metadata, msg.ContentLength, ws);
                                await Task.Run(() => _Events.HandleStreamReceived(this, sr), token);
                            } 
                        }
...

I don't get this part. Why call new stream method with _Events.HandleStreamReceived(this, sr); . Doesn't this break async principle of the program? Why isn't it like await _Callbacks.HandleAsyncStreamReceivedAsync(sr).ConfigureAwait(true);

Wouldn't this lead to thread pool exhaustion, as I always have to block a thread provided by the event handler in order to read the stream contents. Having many concurrent clients this might be problematic, as all of them shall have concurrently blocked threads.

@Niproblema
Copy link
Author

Or perhaps even better, change
public event EventHandler<StreamReceivedEventArgs> StreamReceived; to public event AsyncEventHandler<StreamReceivedEventArgs> StreamReceived;?

@jchristn
Copy link
Collaborator

jchristn commented Feb 28, 2022

The reason in the first case, where msg.ContentLength >= _Settings.MaxProxiedStreamSize is true that the event is handled synchronously is because it is assumed that the client code will be reading the stream out completely, which will read all of the data from the underlying socket. In the case of msg.ContentLength < _Settings.MaxProxiedStreamSize the library itself reads all of the data out of the underlying stream. The DataReceiver loop must not continue until the data is fully read out of the underlying NetworkStream/SslStream, otherwise it's looking for a new message frame (and there is still data in the stream from the last message frame). In the case of the stream being larger than the specified size, it's more optimal to have the client code perform that task.

If you prefer the library do it, you can always set that parameter MaxProxiedStreamSize to an arbitrarily large number.

@Niproblema
Copy link
Author

Niproblema commented Feb 28, 2022

Thanks for reply. Answered exactly what I was rambling about :)

...the event is handled synchronously is because it is assumed that the client code will be reading the stream out completely, which will read all of the data from the underlying socket.

I understand that but having true async event handler or a callback still allows that while also allowing async reading, which according to the documentation should be faster:

Starting with the .NET Framework 4.5, the class includes async methods to simplify asynchronous operations. An async method contains Async in its name, such as ReadAsync, WriteAsync, CopyToAsync, and FlushAsync. These methods enable you to perform resource-intensive I/O operations without blocking the main thread. This performance consideration is particularly important in a Windows 8.x Store app or desktop app where a time-consuming stream operation can block the UI thread and make your app appear as if it is not working. The async methods are used in conjunction with the async and await keywords in Visual Basic and C#.

Seems like a win win to me if you included async event handler or async callback. Even if I didn't use async methods, having async event handler allows me to do either approach.

The DataReceiver loop must not continue until the data is fully read out

That's exactly the issue I am having by returning event handler task instead of blocking thread and awaiting task.

If you prefer the library do it, you can always set that parameter MaxProxiedStreamSize to an arbitrarily large number.
Good suggestion, but I was aiming for 0-copy transfer in order to minimize memory usage. I am dealing with data scale that benefits from having low copy overhead.

@jchristn
Copy link
Collaborator

jchristn commented Mar 1, 2022

Hi @Niproblema thank you, I think I'm on the same page now. Let me jump into the source when I get a moment and see if I can find a way to make this work. Cheers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants