rabbitmq-c: Nonblocking recv

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

rabbitmq-c: Nonblocking recv

Arun Chandrasekaran
Hi all,

How do we make a non blocking recv from librabbitmq? amqp_simple_wait_frame blocks if the queue doesn't contain any message.

Should we call amqp_data_in_buffer() and based on the result call amqp_simple_wait_frame or implement a solution like this?

http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-September/014868.html

Thanks in advance.

- Arun
_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|

Re: rabbitmq-c: Nonblocking recv

alan.antonuk
Arun;

I agree with the method used in the gist linked in your email: 

First check amqp_frames_enqueued(). If this returns true, there are decoded frames ready, amqp_simple_wait_frame() will not block.

Second check amqp_data_in_buffer(). If this returns true, there is data that has already been received (with recv()) but not decoded. It is likely if you call amqp_simple_wait_frame() it will not block. However if the data in the buffer doesn't complete a frame, recv() will be called and may block.

Third, call select() or poll() on the socket associated with the connection (you can use amqp_get_sockfd() to get the socket descriptor). If this system call shows that the socket can be read from amqp_simple_wait_frame() will call recv() and likely won't block - assuming a full frame is received.

As you may notice the last two steps don't give you a "correct all the time" answer to whether amqp_simple_wait_frame() will block or not. In practice I've found the majority of the time the above works well enough for RPC-style AMQP messaging for the following two reasons:
1. When the broker sends data - typically it sends it as an entire frame, if your select() call returns that there is data in the buffer likely you already have, or soon will have an entire frame ready to be read by recv()
2. AMQP is intended to be run on a low-latency, high-bandwidth LAN, so if you do get a partial frame when recv is called, within a short time period you will receive the rest of the frame, otherwise it is likely something serious has happened and that will cause the whole connection to die at some point in the near future (possibly time out - which I grant you can possibly block for a lengthy amount of time).

There is definitely room for improvement in the rabbitmq-c library handles non-blocking behavior.

HTH
-Alan

On Feb 15, 2012, at 2:28 PM, Brett Cameron wrote:

Arun,

I'd look at implementing something along the lines of what Alex describes in the link using select() or poll(). Have cc'd Alan for his consideration as to how this might best be done. The approach outlined by Alex in the link is okay, but you could still potentially find yourself hanging on a blocked read if something bad happened between the initial check for data available and a read operation. An alternative might be to add a timeout parameter to the wait_frame call or create a variant of this function that includes a timeout...

Regards,
Brett

On Thu, Feb 16, 2012 at 7:58 AM, Arun Chandrasekaran <[hidden email]> wrote:
Hi all,

How do we make a non blocking recv from librabbitmq? amqp_simple_wait_frame blocks if the queue doesn't contain any message.

Should we call amqp_data_in_buffer() and based on the result call amqp_simple_wait_frame or implement a solution like this?

http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2011-September/014868.html

Thanks in advance.

- Arun

_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss




_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|

Re: rabbitmq-c: Nonblocking recv

Arun Chandrasekaran
Thanks for reply Alan.

I modified the amqp_consumer.c sample code to simulate the non-blocking behaviour as you mentioned.

    /* if (!amqp_frames_enqueued(conn) && !amqp_data_in_buffer(conn)) { */
    if (1) {
       int sock = amqp_get_sockfd(conn);
       printf("socket: %d\n", sock);

       /* Watch socket fd to see when it has input. */
       fd_set read_flags;
       FD_ZERO(&read_flags);
       FD_SET(sock), &read_flags);
       int ret = 0;
       do {
          struct timeval timeout;

          /* Wait upto a second. */
          timeout.tv_sec = 1;
          timeout.tv_usec = 0;

          ret = select(sock+1, &read_flags, NULL, NULL, &timeout);
          if (ret == -1)
             printf("select: %s\n", strerror(errno));
          else if (ret == 0)
             printf("select timedout\n");
          if (FD_ISSET(sock, &read_flags)) {
             printf("Flag is set\n");
          }
       } while (ret == 0);
    }

But this always results in a timeout. Any idea where I might be going wrong? I have commented the first two checks that you mentioned just for sake of clarity on select().

-Arun

_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|

Re: rabbitmq-c: Nonblocking recv

Brett Cameron
Arun,

A couple of random early in the morning pre-caffeine thoughts: is sock set to non-blocking? What's the errno value after the select() returns?

Brett


On Fri, Feb 17, 2012 at 6:14 AM, Arun Chandrasekaran <[hidden email]> wrote:
Thanks for reply Alan.

I modified the amqp_consumer.c sample code to simulate the non-blocking behaviour as you mentioned.

    /* if (!amqp_frames_enqueued(conn) && !amqp_data_in_buffer(conn)) { */
    if (1) {
       int sock = amqp_get_sockfd(conn);
       printf("socket: %d\n", sock);

       /* Watch socket fd to see when it has input. */
       fd_set read_flags;
       FD_ZERO(&read_flags);
       FD_SET(sock), &read_flags);
       int ret = 0;
       do {
          struct timeval timeout;

          /* Wait upto a second. */
          timeout.tv_sec = 1;
          timeout.tv_usec = 0;

          ret = select(sock+1, &read_flags, NULL, NULL, &timeout);
          if (ret == -1)
             printf("select: %s\n", strerror(errno));
          else if (ret == 0)
             printf("select timedout\n");
          if (FD_ISSET(sock, &read_flags)) {
             printf("Flag is set\n");
          }
       } while (ret == 0);
    }

But this always results in a timeout. Any idea where I might be going wrong? I have commented the first two checks that you mentioned just for sake of clarity on select().

-Arun


_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|

Re: rabbitmq-c: Nonblocking recv

alan.antonuk
Brief glance at the code - it looks good to me, except for the FD_SET(...) line which seems to have an extra ) after the sock variable.

I would uncomment the line you have commented. Its likely that if you run this program after amqp_producer.c program, that a frame or two is cached and that the result of amqp_frames_enqueued() is returning true

Also check with the management plugin see if there is a message in the queue ready to be consumed.

-Alan

On Thu, Feb 16, 2012 at 1:08 PM, Brett Cameron <[hidden email]> wrote:
Arun,

A couple of random early in the morning pre-caffeine thoughts: is sock set to non-blocking? What's the errno value after the select() returns?

Brett


On Fri, Feb 17, 2012 at 6:14 AM, Arun Chandrasekaran <[hidden email]> wrote:
Thanks for reply Alan.

I modified the amqp_consumer.c sample code to simulate the non-blocking behaviour as you mentioned.

    /* if (!amqp_frames_enqueued(conn) && !amqp_data_in_buffer(conn)) { */
    if (1) {
       int sock = amqp_get_sockfd(conn);
       printf("socket: %d\n", sock);

       /* Watch socket fd to see when it has input. */
       fd_set read_flags;
       FD_ZERO(&read_flags);
       FD_SET(sock), &read_flags);
       int ret = 0;
       do {
          struct timeval timeout;

          /* Wait upto a second. */
          timeout.tv_sec = 1;
          timeout.tv_usec = 0;

          ret = select(sock+1, &read_flags, NULL, NULL, &timeout);
          if (ret == -1)
             printf("select: %s\n", strerror(errno));
          else if (ret == 0)
             printf("select timedout\n");
          if (FD_ISSET(sock, &read_flags)) {
             printf("Flag is set\n");
          }
       } while (ret == 0);
    }

But this always results in a timeout. Any idea where I might be going wrong? I have commented the first two checks that you mentioned just for sake of clarity on select().

-Arun



_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|

Re: rabbitmq-c: Nonblocking recv

Arun Chandrasekaran
Hi Alan,
 
Brief glance at the code - it looks good to me, except for the FD_SET(...) line which seems to have an extra ) after the sock variable.

I would uncomment the line you have commented. Its likely that if you run this program after amqp_producer.c program, that a frame or two is cached and that the result of amqp_frames_enqueued() is returning true


Thanks, I've done that. But I may be missing something here. So I've attached the hg diff to make sure the changes are in the right place for the non-blocking consumer.
 
Also check with the management plugin see if there is a message in the queue ready to be consumed.

Currently I run on RHEL5. So I'll have to install the latest Erlang, I'll do that today. Alternatively I have made sure that the message is ready to be consumed by starting the consumer ahead of the producer. Will that suffice?

Thanks,
Arun

_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

non-block-consume.diff (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: rabbitmq-c: Nonblocking recv

alan.antonuk
You need to have the FD_ZERO() and FD_SET() inside the do loop.  select() clears the fd_set if nothing has been seen on a socket.

-Alan

On Fri, Feb 17, 2012 at 1:06 AM, Arun Chandrasekaran <[hidden email]> wrote:
Hi Alan,
 
Brief glance at the code - it looks good to me, except for the FD_SET(...) line which seems to have an extra ) after the sock variable.

I would uncomment the line you have commented. Its likely that if you run this program after amqp_producer.c program, that a frame or two is cached and that the result of amqp_frames_enqueued() is returning true


Thanks, I've done that. But I may be missing something here. So I've attached the hg diff to make sure the changes are in the right place for the non-blocking consumer.
 
Also check with the management plugin see if there is a message in the queue ready to be consumed.

Currently I run on RHEL5. So I'll have to install the latest Erlang, I'll do that today. Alternatively I have made sure that the message is ready to be consumed by starting the consumer ahead of the producer. Will that suffice?

Thanks,
Arun


_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|

Re: rabbitmq-c: Nonblocking recv

Arun Chandrasekaran
Hi Alan,

I was out of country. So I was not able to reply.

Thanks, that made it work. I was actually trying with the sample consumer (amqp_consumer.c) which operates with the unnamed queues. The messages were actually dropped by the broker.

-Arun

_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss