Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueueDeclare - when a TimeoutException occurs, subsequent commands cause a series of NotSupportedException: pipelining of requests forbidden #402

Closed
sandra-matts-hp-com opened this issue Apr 6, 2018 · 6 comments

Comments

@sandra-matts-hp-com
Copy link

sandra-matts-hp-com commented Apr 6, 2018

  • Description: In a production environment with 12 Windows clients and one RabbitMQ server, occasionally a TimeoutException will occur. The TimeoutException is followed by several System.NotSupportedExceptions.
    The System.NotSupportedException message is "Pipelining of requests forbidden". It seems that the TimeoutException doesn't clear the channel of the current command. The subsequent
    commands that are sent through the channel end up with the pipelining error.

  • RabbitMQ version: 3.6.15

  • Erlang version: 20.1

  • RabbitMQ plugin information via rabbitmq-plugins list:
    amqp_client 3.6.15
    cowboy 1.0.4
    cowlib 1.0.2
    rabbitmq_management 3.6.15
    rabbitmq_management_agent 3.6.15
    rabbitmq_web_dispatch 3.6.15

  • Client library version (for all libraries used): 5.0.1

  • Operating system, version, and patch level: Windows Server 2008 R2 Datacenter SP1 and
    Windows 10 [Version 10.0.16299.125]. It is reproducible on both versions.

  • Output of rabbitmqctl status:

Status of node 'rabbit@xxxx'
[{pid,2024},
 {running_applications,
     [{rabbitmq_management,"RabbitMQ Management Console","3.6.15"},
      {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.15"},
      {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.15"},
      {cowboy,"Small, fast, modular HTTP server.","1.0.4"},
      {rabbit,"RabbitMQ","3.6.15"},
      {ranch,"Socket acceptor pool for TCP protocols.","1.3.2"},
      {ssl,"Erlang/OTP SSL application","8.2.1"},
      {public_key,"Public key infrastructure","1.5"},
      {asn1,"The Erlang ASN1 compiler version 5.0.3","5.0.3"},
      {inets,"INETS  CXC 138 49","6.4.2"},
      {cowlib,"Support library for manipulating Web protocols.","1.0.2"},
      {mnesia,"MNESIA  CXC 138 12","4.15.1"},
      {amqp_client,"RabbitMQ AMQP Client","3.6.15"},
      {rabbit_common,
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
          "3.6.15"},
      {os_mon,"CPO  CXC 138 46","2.4.3"},
      {syntax_tools,"Syntax tools","2.1.3"},
      {xmerl,"XML parser","1.3.15"},
      {compiler,"ERTS  CXC 138 10","7.1.2"},
      {crypto,"CRYPTO","4.1"},
      {recon,"Diagnostic tools for production use","2.3.2"},
      {sasl,"SASL  CXC 138 11","3.1"},
      {stdlib,"ERTS  CXC 138 10","3.4.2"},
      {kernel,"ERTS  CXC 138 10","5.4"}]},
 {os,{win32,nt}},
 {erlang_version,
     "Erlang/OTP 20 [erts-9.1] [64-bit] [smp:16:16] [ds:16:16:10] [async-threads:64]\n"},
 {memory,
     [{connection_readers,11826864},
      {connection_writers,2022600},
      {connection_channels,31796864},
      {connection_other,30418776},
      {queue_procs,1650404896},
      {queue_slave_procs,0},
      {plugins,469388216},
      {other_proc,185655824},
      {metrics,74806136},
      {mgmt_db,413794752},
      {mnesia,73149272},
      {other_ets,18589208},
      {binary,527300256},
      {msg_index,233706840},
      {code,24976922},
      {atom,1041593},
      {other_system,48424949},
      {allocated_unused,2877865312},
      {reserved_unallocated,0},
      {total,6670974976}]},
 {alarms,[]},
 {listeners,
     [{clustering,44002,"::"},
      {amqp,5672,"::"},
      {amqp,5672,"0.0.0.0"},
      {http,15672,"::"},
      {http,15672,"0.0.0.0"}]},
 {vm_memory_calculation_strategy,rss},
 {vm_memory_high_watermark,0.7},
 {vm_memory_limit,91697267916},
 {disk_free_limit,50000000},
 {disk_free,107146006528},
 {file_descriptors,
     [{total_limit,8092},
      {total_used,7202},
      {sockets_limit,7280},
      {sockets_used,322}]},
 {processes,[{limit,1048576},{used,46025}]},
 {run_queue,0},
 {uptime,1383719},
 {kernel,{net_ticktime,60}}]
  • I was able to reproduce the defect with a simple client written in .net core. The publish client sends messages for 6 minutes. A consumer client consumes the messages. I also use a program called "Clumsy" that throttles the network connection to try and force the timeout to occur while the publish client is publishing to the RabbitMQ server.

This is the stack trace of the TimeoutException followed by a Pipelining error:

System.TimeoutException: The operation has timed out.
   at RabbitMQ.Util.BlockingCell.GetValue(TimeSpan timeout)
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
   at RabbitMQ.Client.Impl.AutorecoveringModel.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
   at SendMessages.Program.Main(String[] args) in SendMessages\Program.cs:line 35

Unhandled Exception: System.NotSupportedException: Pipelining of requests forbidden
   at RabbitMQ.Client.Impl.RpcContinuationQueue.Enqueue(IRpcContinuation k)
   at RabbitMQ.Client.Impl.ModelBase.Enqueue(IRpcContinuation k)
   at RabbitMQ.Client.Impl.ModelBase.QueueDeclare(String queue, Boolean passive, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
   at RabbitMQ.Client.Impl.AutorecoveringModel.QueueDeclare(String queue, Boolean durable, Boolean exclusive, Boolean autoDelete, IDictionary`2 arguments)
   at SendMessages.Program.Main(String[] args) in SendMessages\Program.cs:line 35
  • The code used to reproduce the error (simple publish client):
    static void Main(string[] args)
    {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Password = "xxxx",
                Port = 5672,
                UserName = "xxxx"
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                string message = CreateMessage();
                var body = Encoding.UTF8.GetBytes(message);

                var currentTime = DateTime.UtcNow;
                var runTime = TimeSpan.FromMinutes(6);
                var finishTime = currentTime.Add(runTime);
                var timesUp = false;
                while (!timesUp)
                {
                    try
                    {
                        channel.QueueDeclare(queue: "timeout",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);


                        channel.BasicPublish(exchange: "",
                            routingKey: "timeout",
                            basicProperties: null,
                            body: body);
                        Console.WriteLine(" [x] Sent message, byteCount={0}", body.Length);
                    }
                    catch (System.TimeoutException e)
                    {
                        Console.WriteLine(e);
                    }

                    if (DateTime.UtcNow > finishTime)
                    {
                        timesUp = true;
                    }
                }
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }

        private static string CreateMessage()
        {
            var builder = new StringBuilder();
            for (int i = 0; i < 10000; i++)
            {
                builder.Append("abcdefghijklmnopqrstuvwxyz0123456789");
            }
            return builder.ToString();
        }
    }
@michaelklishin
Copy link
Member

Thank you for your time.

Team RabbitMQ uses GitHub issues for specific actionable items engineers can work on. GitHub issues are not used for questions, investigations, root cause analysis, discussions of potential issues, etc (as defined by this team).

We get at least a dozen of questions through various venues every single day, often light on details.
At that rate GitHub issues can very quickly turn into a something impossible to navigate and make sense of even for our team. Because GitHub is a tool our team uses heavily nearly every day, the signal/noise ratio of issues is something we care about a lot.

Please post this to rabbitmq-users.

Thank you.

@michaelklishin
Copy link
Member

michaelklishin commented Apr 6, 2018

IModel continuations are not designed to support operation pipelining due to the semantics of the protocol (queue.declare is a synchronous method that requires a response). basic.publish does not require (or even have, besides Publisher confirms) a response and that confuses the continuation implementation in case it never received any. There could be ways to work around this but without significant changes to the client — see #83, #356 — I doubt it's going to be side-effect free.

In most cases making sure the channel is closed and opening a new one will be an acceptable workaround.

Network slowdowns and similar conditions that will trigger a continuation timeout in practice are also likely kick off connection recovery which has other limitations for publishers.

@pabermod
Copy link

pabermod commented Aug 8, 2023

Hi, i'm facing the same problem.

The only solution is to create a new IModel? What if the timeout occurs when publishing a message or confirming a message? Do we have to close the channel and create it again?

Why an error in one operation should affect other operations?

This behaviour should be documented because as a user of this library I don't expect this strange behaviour

@Pliner
Copy link
Contributor

Pliner commented Aug 8, 2023

What if the timeout occurs when publishing a message or confirming a message? Do we have to close the channel and create it again?

You might consider creating multiple channels: for topology operations and for publishing. The same was done in EasyNetQ (EasyNetQ/EasyNetQ#1063) years ago, and I didn't hear any complaints about that.

@pabermod
Copy link

pabermod commented Aug 8, 2023

What if the timeout occurs when publishing a message or confirming a message? Do we have to close the channel and create it again?

You might consider creating multiple channels: for topology operations and for publishing. The same was done in EasyNetQ (EasyNetQ/EasyNetQ#1063) years ago, and I didn't hear any complaints about that.

Hi, thanks for your answer. In my case I already have different channels for consumig and publishing. In my publishing client I don't define queues.

My question is if this exception can occur when sending messages or confirming them, or if it only happens in topology operations.

If this happens only in topology operations it is safe to dispose the channel and create a new one?

I don't use autorecovery

My client consumer application lifecycle is:

In a loop:

  • Connect (if not connected)
  • Create channel (if not created)
  • Create and bind queue (if not created)
  • Create consumer and start it (if not created)

Regards

@lukebakken
Copy link
Contributor

@pabermod in this case it would be great for you to open a new issue in this repo, with code to reproduce this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants