|
Hi,
We used RabbitMQ to implement a simple producer-consumer architecture. It was fine until very recently, when we encountered the blocking calls. I ran two nodes in a cluster; in cases where the consumer took more than the usual time and all subsequent messages kept on waiting. Then, I came to know that I used QueuingConsumer which is a blocking architecture, now I know I have to use my own implementation of the DefaultConsumer. But, I am clueless as to whether I should manage the threading model or the RabbitMQ itself has any provision on that. Another thing that is confusing is, whether we run different type of consumers (with different duties) separately or we maintain a single main class which will start consumers as the messages come in. Thanks in advance, Mahesh _______________________________________________ rabbitmq-discuss mailing list [hidden email] https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss |
|
Hi Mahesh,
On 27 Aug 2012, at 08:58, Mahesh Viraktamath wrote: > Hi, > > We used RabbitMQ to implement a simple producer-consumer architecture. > It was fine until very recently, when we encountered the blocking > calls. I ran two nodes in a cluster; in cases where the consumer took > more than the usual time and all subsequent messages kept on waiting. > Then, I came to know that I used QueuingConsumer which is a blocking > architecture, now I know I have to use my own implementation of the > DefaultConsumer. But, I am clueless as to whether I should manage the > threading model or the RabbitMQ itself has any provision on that. > Another thing that is confusing is, whether we run different type of > consumers (with different duties) separately or we maintain a single > main class which will start consumers as the messages come in. > Have you read the API guide (http://www.rabbitmq.com/api-guide.html) for details about threading and using your own executor service? That's probably a good place to start. The latter question (about choosing how to structure your code) is really something you'll have to decide yourself, based on the responsibilities of each class and whether or not and to what extent separation makes sense in the design. Cheers, Tim _______________________________________________ rabbitmq-discuss mailing list [hidden email] https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss |
|
Thanks Tim,
I read the API guide and still stuck to the QueuingConsumer, for each incoming message I create a new service thread (with common channel, without that the service won't know which message to acknowledge). But in the guide, it is mentioned that the channel should be created for each thread. Now, my question is does this guide assumes that we run different consumer threads? I can't understand the concept of different consumer threads. I run a single consumer and use basicConsume() to listen to the messages and each message (nextDelivery()) will give rise to a new service thread. Again, each message creating a thread is scary ! If it helps, I am running the consumer as a java application and use basicConsume() and nextDelivery() in a while loop to process the messages. Our application sends messages to these non-stop. So, I am looking for a consumer which will not block the incoming messages when it is processing a large message.
-Mahesh On Mon, Aug 27, 2012 at 2:47 PM, Tim Watson <[hidden email]> wrote: > Hi Mahesh, > > On 27 Aug 2012, at 08:58, Mahesh Viraktamath wrote: > >> Hi, >> >> We used RabbitMQ to implement a simple producer-consumer architecture. >> It was fine until very recently, when we encountered the blocking >> calls. I ran two nodes in a cluster; in cases where the consumer took >> more than the usual time and all subsequent messages kept on waiting. >> Then, I came to know that I used QueuingConsumer which is a blocking >> architecture, now I know I have to use my own implementation of the >> DefaultConsumer. But, I am clueless as to whether I should manage the >> threading model or the RabbitMQ itself has any provision on that. >> Another thing that is confusing is, whether we run different type of >> consumers (with different duties) separately or we maintain a single >> main class which will start consumers as the messages come in. >> > > Have you read the API guide (http://www.rabbitmq.com/api-guide.html) for details about threading and using your own executor service? That's probably a good place to start. The latter question (about choosing how to structure your code) is really something you'll have to decide yourself, based on the responsibilities of each class and whether or not and to what extent separation makes sense in the design. > > Cheers, > Tim _______________________________________________ rabbitmq-discuss mailing list [hidden email] https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss |
|
Mahesh,
On 27/08/12 13:37, Mahesh Viraktamath wrote: > I read the API guide and still stuck to the QueuingConsumer, for each > incoming message I create a new service thread (with common channel, > without that the service won't know which message to acknowledge). But > in the guide, it is mentioned that the channel should be created for > each thread. Now, my question is does this guide assumes that we run > different consumer threads? *I can't understand the concept of different > consumer threads*. I run a single consumer and use basicConsume() to > listen to the messages and each message (nextDelivery()) will give rise > to a new service thread. Again, each message creating a thread is scary ! > > If it helps, I am running the consumer as a java application and use > basicConsume() and nextDelivery() in a while loop to process the > messages. Our application sends messages to these non-stop. So, I am > looking for a consumer which will not block the incoming messages when > it is processing a large message. I suggest you create N channels and for each channel: - set the basic.qos prefetch to 1 or some other low-ish value (for fair dispatch) - create a consumer as shown in http://www.rabbitmq.com/api-guide.html#consuming You write above that you only have one channel because "without that the service won't know which message to acknowledge", but the channel is accessible via the context (as shown in the example), so there is no ambiguity. Obviously message ordering goes out of the window, but that's the case for any scheme where messages are handled by multiple threads. Regards, Matthias. _______________________________________________ rabbitmq-discuss mailing list [hidden email] https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss |
|
Hi,
As per your suggestion, I created 5 channels as follows: for (int i = 0; i < 5; i++) { final Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); log.info(" [x] Awaiting RPC requests");
boolean autoAck = false; channel.basicConsume(RPC_QUEUE_NAME, autoAck, "watermark" + i, new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag(); //process the message here channel.basicPublish("", properties.getReplyTo(),
new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build(), response.getBytes("UTF-8"));
channel.basicAck(deliveryTag, false);
} }); } But, the consumer doesn't seem to listen to the queue, as soon as I run this application, it exists. Earlier I used the while(true) loop to get the messages. How do I make sure if the consumer listens to the intended queue and process messages?
-Mahesh On Mon, Aug 27, 2012 at 6:54 PM, Matthias Radestock <[hidden email]> wrote: Mahesh, _______________________________________________ rabbitmq-discuss mailing list [hidden email] https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss |
|
Mahesh,
On 27/08/12 19:30, Mahesh Viraktamath wrote: > as soon as I run this application, it exists So you want your application to never terminate? Simply stick something like this at the end: synchronized(this) { while (true) this.wait(); } or while(true) Thread.sleep(Long.MAX_VALUE); or something similar. Regards, Matthias. _______________________________________________ rabbitmq-discuss mailing list [hidden email] https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss |
| Powered by Nabble | Edit this page |
