Quantcast

Non-blocking Consumers approach

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Non-blocking Consumers approach

Mahesh Viraktamath
Hi,

We used RabbitMQ to implement a simple producer-consumer architecture.
It was fine until very recently, when we encountered the blocking
calls. I ran two nodes in a cluster; in cases where the consumer took
more than the usual time and all subsequent messages kept on waiting.
Then, I came to know that I used QueuingConsumer which is a blocking
architecture, now I know I have to use my own implementation of the
DefaultConsumer. But, I am clueless as to whether I should manage the
threading model or the RabbitMQ itself has any provision on that.
Another thing that is confusing is, whether we run different type of
consumers (with different duties) separately or we maintain a single
main class which will start consumers as the messages come in.



Thanks in advance,
Mahesh
_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Non-blocking Consumers approach

Tim Watson-6
Hi Mahesh,

On 27 Aug 2012, at 08:58, Mahesh Viraktamath wrote:

> Hi,
>
> We used RabbitMQ to implement a simple producer-consumer architecture.
> It was fine until very recently, when we encountered the blocking
> calls. I ran two nodes in a cluster; in cases where the consumer took
> more than the usual time and all subsequent messages kept on waiting.
> Then, I came to know that I used QueuingConsumer which is a blocking
> architecture, now I know I have to use my own implementation of the
> DefaultConsumer. But, I am clueless as to whether I should manage the
> threading model or the RabbitMQ itself has any provision on that.
> Another thing that is confusing is, whether we run different type of
> consumers (with different duties) separately or we maintain a single
> main class which will start consumers as the messages come in.
>

Have you read the API guide (http://www.rabbitmq.com/api-guide.html) for details about threading and using your own executor service? That's probably a good place to start. The latter question (about choosing how to structure your code) is really something you'll have to decide yourself, based on the responsibilities of each class and whether or not and to what extent separation makes sense in the design.

Cheers,
Tim
_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Non-blocking Consumers approach

Mahesh Viraktamath
Thanks Tim,

I read the API guide and still stuck to the QueuingConsumer, for each incoming message I create a new service thread (with common channel, without that the service won't know which message to acknowledge). But in the guide, it is mentioned that the channel should be created for each thread. Now, my question is does this guide assumes that we run different consumer threads? I can't understand the concept of different consumer threads. I run a single consumer and use basicConsume() to listen to the messages and each message (nextDelivery()) will give rise to a new service thread. Again, each message creating a thread is scary !

If it helps, I am running the consumer as a java application and use basicConsume() and nextDelivery() in a while loop to process the messages. Our application sends messages to these non-stop. So, I am looking for a consumer which will not block the incoming messages when it is processing a large message.



-Mahesh

On Mon, Aug 27, 2012 at 2:47 PM, Tim Watson <[hidden email]> wrote:
> Hi Mahesh,
>
> On 27 Aug 2012, at 08:58, Mahesh Viraktamath wrote:
>
>> Hi,
>>
>> We used RabbitMQ to implement a simple producer-consumer architecture.
>> It was fine until very recently, when we encountered the blocking
>> calls. I ran two nodes in a cluster; in cases where the consumer took
>> more than the usual time and all subsequent messages kept on waiting.
>> Then, I came to know that I used QueuingConsumer which is a blocking
>> architecture, now I know I have to use my own implementation of the
>> DefaultConsumer. But, I am clueless as to whether I should manage the
>> threading model or the RabbitMQ itself has any provision on that.
>> Another thing that is confusing is, whether we run different type of
>> consumers (with different duties) separately or we maintain a single
>> main class which will start consumers as the messages come in.
>>
>
> Have you read the API guide (http://www.rabbitmq.com/api-guide.html) for details about threading and using your own executor service? That's probably a good place to start. The latter question (about choosing how to structure your code) is really something you'll have to decide yourself, based on the responsibilities of each class and whether or not and to what extent separation makes sense in the design.
>
> Cheers,
> Tim


_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Non-blocking Consumers approach

Matthias Radestock-3
Mahesh,

On 27/08/12 13:37, Mahesh Viraktamath wrote:

> I read the API guide and still stuck to the QueuingConsumer, for each
> incoming message I create a new service thread (with common channel,
> without that the service won't know which message to acknowledge). But
> in the guide, it is mentioned that the channel should be created for
> each thread. Now, my question is does this guide assumes that we run
> different consumer threads? *I can't understand the concept of different
> consumer threads*. I run a single consumer and use basicConsume() to
> listen to the messages and each message (nextDelivery()) will give rise
> to a new service thread. Again, each message creating a thread is scary !
>
> If it helps, I am running the consumer as a java application and use
> basicConsume() and nextDelivery() in a while loop to process the
> messages. Our application sends messages to these non-stop. So, I am
> looking for a consumer which will not block the incoming messages when
> it is processing a large message.

I suggest you create N channels and for each channel:
- set the basic.qos prefetch to 1 or some other low-ish value (for fair
dispatch)
- create a consumer as shown in
http://www.rabbitmq.com/api-guide.html#consuming

You write above that you only have one channel because "without that the
service won't know which message to acknowledge", but the channel is
accessible via the context (as shown in the example), so there is no
ambiguity.

Obviously message ordering goes out of the window, but that's the case
for any scheme where messages are handled by multiple threads.

Regards,

Matthias.
_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Non-blocking Consumers approach

Mahesh Viraktamath
Hi,

As per your suggestion, I created 5 channels as follows:

for (int i = 0; i < 5; i++) {
            final Channel channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);

            log.info(" [x] Awaiting RPC requests");
            
            boolean autoAck = false;
            channel.basicConsume(RPC_QUEUE_NAME, autoAck, "watermark" + i,
                    new DefaultConsumer(channel) {

                        @Override
                        public void handleDelivery(String consumerTag,
                                Envelope envelope,
                                AMQP.BasicProperties properties,
                                byte[] body)
                                throws IOException {
                            String routingKey = envelope.getRoutingKey();
                            String contentType = properties.getContentType();
                            long deliveryTag = envelope.getDeliveryTag();
                            //process the message here
                            channel.basicPublish("", properties.getReplyTo(),
                                    new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build(),
                                    response.getBytes("UTF-8"));
                            channel.basicAck(deliveryTag, false);
                        }
                    });
            
            }

But, the consumer doesn't seem to listen to the queue, as soon as I run this application, it exists. Earlier I used the while(true) loop to get the messages. How do I make sure if the consumer listens to the intended queue and process messages?


-Mahesh

On Mon, Aug 27, 2012 at 6:54 PM, Matthias Radestock <[hidden email]> wrote:
Mahesh,


On 27/08/12 13:37, Mahesh Viraktamath wrote:
I read the API guide and still stuck to the QueuingConsumer, for each
incoming message I create a new service thread (with common channel,
without that the service won't know which message to acknowledge). But
in the guide, it is mentioned that the channel should be created for
each thread. Now, my question is does this guide assumes that we run
different consumer threads? *I can't understand the concept of different
consumer threads*. I run a single consumer and use basicConsume() to

listen to the messages and each message (nextDelivery()) will give rise
to a new service thread. Again, each message creating a thread is scary !

If it helps, I am running the consumer as a java application and use
basicConsume() and nextDelivery() in a while loop to process the
messages. Our application sends messages to these non-stop. So, I am
looking for a consumer which will not block the incoming messages when
it is processing a large message.

I suggest you create N channels and for each channel:
- set the basic.qos prefetch to 1 or some other low-ish value (for fair dispatch)
- create a consumer as shown in http://www.rabbitmq.com/api-guide.html#consuming

You write above that you only have one channel because "without that the service won't know which message to acknowledge", but the channel is accessible via the context (as shown in the example), so there is no ambiguity.

Obviously message ordering goes out of the window, but that's the case for any scheme where messages are handled by multiple threads.

Regards,

Matthias.


_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Non-blocking Consumers approach

Matthias Radestock-3
Mahesh,

On 27/08/12 19:30, Mahesh Viraktamath wrote:
> as soon as I run this application, it exists

So you want your application to never terminate? Simply stick something
like this at the end:

synchronized(this) { while (true) this.wait(); }

or

while(true) Thread.sleep(Long.MAX_VALUE);

or something similar.

Regards,

Matthias.
_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Loading...