Quantcast

rabbitmq java client keeps pulling data?

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

rabbitmq java client keeps pulling data?

DeepNightTwo
I created consumer like this:

    ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setVirtualHost(vhost);
        factory.setUsername(username);
        factory.setPassword(passwd);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(4096);
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);
       
        while(true){
                consumer.nextDelivery();
        }

and I found that there is a thread keep pulling data from rabbitmq server. If the data is consumed slower than produced, lots of Delivery instances may cause OOM. If I use a fix sized blocking queue to create QueueingConsumer, it will still keep pulling data and cause the queue full exception:

2014-09-19 14:46:16 STDIO [ERROR] DefaultExceptionHandler: Consumer com.rabbitmq.client.QueueingConsumer@2316ea4 (amq.ctag-1Ee7d70sWHCKBdbunbBw-w) method handleDelivery for channel AMQChannel(amqp://sec-read@10.8.91.153:5672infosec,1) threw an exception for channel AMQChannel(amqp://sec-read@10.8.91.153:5672infosec,1):
2014-09-19 14:46:16 STDIO [ERROR] java.lang.IllegalStateException: Queue full
2014-09-19 14:46:16 STDIO [ERROR] at java.util.AbstractQueue.add(AbstractQueue.java:98)
2014-09-19 14:46:16 STDIO [ERROR] at com.rabbitmq.client.QueueingConsumer.handleDelivery(QueueingConsumer.java:125)
2014-09-19 14:46:16 STDIO [ERROR] at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:140)
2014-09-19 14:46:16 STDIO [ERROR] at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:85)
2014-09-19 14:46:16 STDIO [ERROR] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
2014-09-19 14:46:16 STDIO [ERROR] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)



my question is, is there a size setting for consumer to limit the count of un-consumed delivery? Or I should use the basicGet api?



Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: rabbitmq java client keeps pulling data?

Michael Klishin-4
On 19 September 2014 at 11:03:34, DeepNightTwo ([hidden email]) wrote:
> my question is, is there a size setting for consumer to limit  
> the count of
> un-consumed delivery? Or I should use the basicGet api?

Yes, basic.qos +  manual acknowledgements (see tutorial 2). I'd also extend
DefaultConsumer instead of using QueueingConsumer.

http://www.rabbitmq.com/tutorials/tutorial-two-java.html
--  
MK  

Staff Software Engineer, Pivotal/RabbitMQ
_______________________________________________
rabbitmq-discuss mailing list has moved to https://groups.google.com/forum/#!forum/rabbitmq-users,
please subscribe to the new list!

[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: rabbitmq java client keeps pulling data?

DeepNightTwo
Is it designed so for some purpose? My understanding is, if a fixed-size queue is used to create a consumer, it is expected the consumer should stop pulling data if the queue is full.

I extends from QueueingConsumer and it works for me.


class BlockingConsumer extends QueueingConsumer {

    private BlockingQueue<Delivery> q;

    public BlockingConsumer(Channel ch) {
        super(ch);
    }

    public BlockingConsumer(Channel ch, LinkedBlockingDeque<Delivery> q) {
        super(ch, q);
        this.q = q;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        if (q == null) {
            super.handleDelivery(consumerTag, envelope, properties, body);
        } else {
            try {
                this.q.put(new Delivery(envelope, properties, body));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

q.put will block until the queue has a slot available. But I'm not sure if this will bring network issue. For example, if the queue keeps full, will the connection be timeout/reset?

And this impl is ugly.

Because the _queue is private, I have to store a reference in subclass. And because the _checkShutdown method is private, I didn't invoke it in the sub class.

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: rabbitmq java client keeps pulling data?

Michael Klishin
You are reinventing basic.qos in the client, except that if your entire JVM goes down, RabbitMQ won't notice and re-queue unacknowledged deliveries.

basic.qos exist precisely for consumer-controlled delivery rate limiting.

MK

> On 19/9/2014, at 12:08, DeepNightTwo <[hidden email]> wrote:
>
> Is it designed so for some purpose? My understanding is, if a fixed-size
> queue is used to create a consumer, it is expected the consumer should stop
> pulling data if the queue is full.
_______________________________________________
rabbitmq-discuss mailing list has moved to https://groups.google.com/forum/#!forum/rabbitmq-users,
please subscribe to the new list!

[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: rabbitmq java client keeps pulling data?

DeepNightTwo
        Channel channel = connection.createChannel();
        channel.basicQos(4096);


I've set the basic.qos like above. But it seems not work as I expected and keeps pulling data and cause OOM.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: rabbitmq java client keeps pulling data?

Michael Klishin
Did you switch basicConsume to use manual acknowledgements?
basic.qos has no effect without them.

This is covered in tutorial 2, please do read it.

MK

> On 19/9/2014, at 12:41, DeepNightTwo <[hidden email]> wrote:
>
> I've set the basic.qos like above. But it seems not work as I expected and
> keeps pulling data and cause OOM.
_______________________________________________
rabbitmq-discuss mailing list has moved to https://groups.google.com/forum/#!forum/rabbitmq-users,
please subscribe to the new list!

[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: rabbitmq java client keeps pulling data?

DeepNightTwo
Do you mean I need to use manual ack to make basicQos work? I did use autoack in previous use case :

channel.basicConsume(queueName, true, consumer);


So if I use

channel.basicConsume(queueName, false, consumer);  

and ack it after the message is proceed. BasicQos would works, right?
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: rabbitmq java client keeps pulling data?

Michael Klishin
Tutorial 2 answers this.

MK

> On 19/9/2014, at 13:30, DeepNightTwo <[hidden email]> wrote:
>
> Do you mean I need to use manual ack to make basicQos work? I did use autoack
> in previous use case :
>
> channel.basicConsume(queueName, true, consumer);
>
>
> So if I use
>
> channel.basicConsume(queueName, false, consumer);  
>
> and ack it after the message is proceed. BasicQos would works, right?
_______________________________________________
rabbitmq-discuss mailing list has moved to https://groups.google.com/forum/#!forum/rabbitmq-users,
please subscribe to the new list!

[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: rabbitmq java client keeps pulling data?

Michael Klishin-4
In reply to this post by DeepNightTwo
 On 19 September 2014 at 13:31:48, DeepNightTwo ([hidden email]) wrote:

> Do you mean I need to use manual ack to make basicQos work? I did  
> use autoack
> in previous use case :
>  
> channel.basicConsume(queueName, true, consumer);
>  
>  
> So if I use
>  
> channel.basicConsume(queueName, false, consumer);
>  
> and ack it after the message is proceed. BasicQos would works,  
> right

Also, I've noticed this is posted to rabbitmq-discuss. rabbitmq-discuss
is the legacy list that has been deprecated in late July. It will be
discontinued on Oct 1st.

Please start all new discussions on rabbitmq-users:
https://groups.google.com/forum/#!forum/rabbitmq-users
--  
MK  

Staff Software Engineer, Pivotal/RabbitMQ
_______________________________________________
rabbitmq-discuss mailing list has moved to https://groups.google.com/forum/#!forum/rabbitmq-users,
please subscribe to the new list!

[hidden email]
https://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Loading...