Problem with BasicProperties (bug?)

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Problem with BasicProperties (bug?)

radisb
he code below does the following: Sends 1000 messages (each tagged with a key by BasicProperties.getHeaders.put("MESSAGE_KEY", value) and stored in a map) directly to queueA and then waits in a loop for incoming replies on the queue Replies.
It then fires 2 Threads (SimpleForwarder class). The first thread waits for messages on QueueA and upon receipt it sends them to QueueB. The second Thread waits on QueueB and sends to Replies.

When a message is received on Replies , its key (taken from the header in the delivery properties) is removed from a map. The test finishes succesfully if all keys sent where received and removed from the map.

The problem is in the forwarding threads code. If before forwarding the message, the thread recreates the BasicProperties and sends a fresh BasicProperties , this causes the consumer on Replies to lose messages and get duplicates in their place. Sometimes the forwarder throws null pointer exception because it cant find the header that has the key(This cant happen because all messages are assigned one before sending). What is strange is that if the 2 forwarders are started in separate JVMs the problem disappears. I have marked the problematic area with /* -- PROBLEM -- */

Note that the lost messages are random and the problem might not manifest if run for a few messages. Above 400-500 it always happens for me.

I use server 1.7.2 and javaclient 1.7.2.
Any suggestions?

Here is the code:
Tester.java

package test;


import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionParameters;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.impl.LongString;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;


public class Tester
{
        ConnectionParameters parameters;
        Channel senderChannel;
        Channel receiverChannel;
        QueueingConsumer consumer;
        String sourceQueue;
        String targetRoutingKey;
       
        public Tester(ConnectionParameters parameters, String sourceQueue, String targetRoutingKey)
        {
                this.parameters = parameters;
                this.sourceQueue = sourceQueue;
                this.targetRoutingKey = targetRoutingKey;
        }

        public void test(int messageCount) throws IOException
        {
                senderChannel = new ConnectionFactory(parameters).newConnection("localhost").createChannel();
                receiverChannel = new ConnectionFactory(parameters).newConnection("localhost").createChannel();
                receiverChannel.queueDeclare(sourceQueue, false);
                receiverChannel.queueBind(sourceQueue, "amq.direct", sourceQueue);
                consumer = new QueueingConsumer(receiverChannel);
                String dummyData = "dummyData";
                Set<String>sentKeys = new HashSet<String>();
                int c = 0;
                while(c < messageCount)
                {
                        c++;
                        String key =  String.valueOf(c);
                        BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
                        props.setHeaders(new HashMap<String, Object>());
                        props.getHeaders().put("MESSAGE_KEY", key);
                        senderChannel.basicPublish("amq.direct", targetRoutingKey, props, dummyData.getBytes());
                        sentKeys.add(key);
                }
                System.out.println("Sent " + c + " messages. Firing up consumer to wait for replies:\n");
               
                receiverChannel.basicConsume(sourceQueue, false, consumer);

                while(true)
                {
                        try
                        {
                                Delivery delivery = consumer.nextDelivery(1000);
                                if(delivery != null)
                                {
                                        receiverChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                                        String receivedKey = new String(((LongString)delivery.getProperties().getHeaders().get("MESSAGE_KEY")).getBytes());
                                        System.out.print("Key: " + receivedKey);
                                        if(!sentKeys.remove(receivedKey))
                                                System.out.println(" ...Already received");
                                        else
                                        {
                                                System.out.println(" ...OK");
                                                if(sentKeys.isEmpty())
                                                {
                                                        System.out.println("All keys removed");
                                                        close();
                                                        return;
                                                }
                                        }
                                }
                                else
                                {
                                        System.out.println("Timed out.");
                                }
                        }
                        catch(InterruptedException ex){System.err.println("Interrupted while waiting for message. Exiting");return;}
                        catch(ShutdownSignalException sse){ System.err.println("Caught shutdown signal. Exiting");return;}
                        catch(IOException ex){System.err.println("Unexpected IO error. Exiting");}
                }
        }

        private void close() throws IOException
        {
                senderChannel.getConnection().close();
                receiverChannel.getConnection().close();
        }

        public static class SimpleForwarder extends Thread
        {
                ConnectionParameters parameters;
                Channel senderChannel;
                Channel receiverChannel;
                QueueingConsumer consumer;
                String sourceQueue;
                String targetRoutingKey;

                public SimpleForwarder(ConnectionParameters parameters, String sourceQueue, String targetRoutingKey)
                {
                        this.parameters = parameters;
                        this.sourceQueue = sourceQueue;
                        this.targetRoutingKey = targetRoutingKey;
                        this.setName(sourceQueue);
                }

                private void init() throws IOException
                {
                        senderChannel = new ConnectionFactory(parameters).newConnection("localhost").createChannel();
                        receiverChannel = new ConnectionFactory(parameters).newConnection("localhost").createChannel();
                        receiverChannel.basicQos(1);
                        receiverChannel.queueDeclare(sourceQueue, false);
                        receiverChannel.queueBind(sourceQueue, "amq.direct", sourceQueue);
                        consumer = new QueueingConsumer(receiverChannel);
                        receiverChannel.basicConsume(sourceQueue, false, consumer);
                }
                @Override
                public void run()
                {
                        try{init();}catch(IOException ex){System.err.println("Error initializing. Exiting.");close();return;}
                        while(!Thread.currentThread().isInterrupted())
                        {
                                try
                                {
                                        Delivery delivery = consumer.nextDelivery(5000);

                                        if(delivery != null)
                                        {
                                                String key = new String(((LongString)delivery.getProperties().getHeaders().get("MESSAGE_KEY")).getBytes());
                                                String data = new String(delivery.getBody());

        /* -- PROBLEM -- */
        //---------   THIS CAUSES the problem. (BUT ONLY WHEN SimpleForwarders are running IN THE SAME JVM).
                                                BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;

        //--------- THIS NEVER CAUSES PROBLEM EVEN when SimpleForwarders IN THE SAME JVM
        // BasicProperties props = delivery.getProperties();
        /* -- END OF PROBLEM -- */
                                               
                                                props.setHeaders(new HashMap<String, Object>());
                                                props.getHeaders().put("MESSAGE_KEY", key);
                                                senderChannel.basicPublish("amq.direct", targetRoutingKey, props , data.getBytes());
                                                receiverChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                                        }
                                }
                                catch(InterruptedException ex){System.err.println("[" + Thread.currentThread().getName() + "] Interrupted while waiting for message. Exiting");close();return;}
                                catch(ShutdownSignalException sse){ System.err.println("[" + Thread.currentThread().getName() + "] Caught shutdown signal. Exiting");close();return;}
                                catch(IOException ex){System.err.println("[" + Thread.currentThread().getName() + "] Unexpected IO error. Exiting");return;}
                        }
                }
                public void close()
                {
                        try
                        {
                                senderChannel.getConnection().close();
                                receiverChannel.getConnection().close();
                        }
                        catch(IOException ex)
                        {
                                System.err.println("Close failed");
                        }
                }
        }

        public static void main(String[] args) throws IOException
        {
                ConnectionParameters params = new ConnectionParameters();
                params.setPassword("guest");
                params.setUsername("guest");
                params.setVirtualHost("/");

                SimpleForwarder f1 = new SimpleForwarder(params, "QueueA", "QueueB");
                SimpleForwarder f2 = new SimpleForwarder(params, "QueueB", "Replies");
                f1.start();
                f2.start();

                Tester tester = new Tester(params, "Replies", "QueueA");
                tester.test(1000);
        }
}

Reply | Threaded
Open this post in threaded view
|

Re: Problem with BasicProperties (bug?)

Matthias Radestock-3
Hi,

radisb wrote:
> The problem is in the forwarding threads code. If before forwarding the
> message, the thread recreates the BasicProperties and sends a fresh
> BasicProperties , this causes the consumer on Replies to lose messages and
> get duplicates in their place. Sometimes the forwarder throws null pointer
> exception because it cant find the header that has the key(This cant happen
> because all messages are assigned one before sending). What is strange is
> that if the 2 forwarders are started in separate JVMs the problem
> disappears.
 > [...]
> BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
>       [...]
> props.setHeaders(new HashMap<String, Object>());

That last line ends up modifying the
MessageProperties.PERSISTENT_TEXT_PLAIN object. That's bad in itself,
but made worse by the fact that you have multiple threads doing this
concurrently.

To avoid this, you should clone() the object first and then use/modify
the clone instead.

Regards,

Matthias.

_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Problem with BasicProperties (bug?)

radisb
Hi matthias,
 I looked into the source code of MessageProperties.XXXXX and the reason for the problem is that   MessageProperties.XXXXX are static final. This means that either I am mistaken in thinking that their use were as a shortcut for construction of BasicProperties, or they shouldnt be static.
But I then read in the javadoc that they are "Constant holder class with useful static instances of AMQContentHeader. These are intended for use with Channel.basicPublish". This makes sense but the problem is that the BasicProperty class holds user data (the headers) that is to be set and get by the user. So the kind of use  MessageProperties.XXXXX the javadoc suggests, is useless for me because this way how am I going to set my headers before calling basicPublish? I have to create a fresh BasicProperties instance to set the headers and then pass them to basicPublish. And since MessageProperties.XXXX is static , the only way to do that is create BasicProperties by constructor. The constructor has many boilerplate arguments, thats why I though the MessageProperties where a convinience for construction. It seems the current design of MessageProperties assumes the user is only making use of the body and not the headers. For those users that they need access to headers (and I think they are a lot) shouldn't we provide the same convinience?


Thanks a lot,

Bill Radis.

Matthias Radestock-3 wrote
Hi,

radisb wrote:
> The problem is in the forwarding threads code. If before forwarding the
> message, the thread recreates the BasicProperties and sends a fresh
> BasicProperties , this causes the consumer on Replies to lose messages and
> get duplicates in their place. Sometimes the forwarder throws null pointer
> exception because it cant find the header that has the key(This cant happen
> because all messages are assigned one before sending). What is strange is
> that if the 2 forwarders are started in separate JVMs the problem
> disappears.
 > [...]
> BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
>       [...]
> props.setHeaders(new HashMap<String, Object>());

That last line ends up modifying the
MessageProperties.PERSISTENT_TEXT_PLAIN object. That's bad in itself,
but made worse by the fact that you have multiple threads doing this
concurrently.

To avoid this, you should clone() the object first and then use/modify
the clone instead.

Regards,

Matthias.

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Reply | Threaded
Open this post in threaded view
|

Re: Problem with BasicProperties (bug?)

Matthias Radestock-3
Bill,

radisb wrote:
> So the kind of use  MessageProperties.XXXXX the javadoc
> suggests, is useless for me because this way how am I going to set my
> headers before calling basicPublish?

You can call clone() on the MessageProperties.XXXXX and then set the
headers on the result, as I suggested in my first reply.


Matthias.

_______________________________________________
rabbitmq-discuss mailing list
[hidden email]
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss