[RESOLVED] Why amqp_simple_wait_frame gives a negative value on long wait?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[RESOLVED] Why amqp_simple_wait_frame gives a negative value on long wait?

kingsmasher1
This post has NOT been accepted by the mailing list yet.
This post was updated on .
I am using rabbitmq C, i created a consumer method using amqp_simple_wait_frame, and while the consumer is waiting for a long time, amqp_simple_wait_frame returns a negative value, and as a result my consumer returns. Any clue why it is returning a negative value?

Here is my consumer method if in case someone would like to have a look:

std::string amqConnection::consume(amqp_bytes_t queuename, std::string exchange, std::string routingkey) {
        Debug("%s q %s ex %s rk %s- \n", __FUNCTION__, (char*)queuename.bytes, exchange.c_str(), routingkey.c_str());

        if (exchange.length() > 0) bindQueue(queuename, exchange.c_str(), routingkey.c_str());
        amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
  amqp_get_rpc_reply(conn);

        amqp_frame_t frame;
        int result;

        amqp_basic_deliver_t *d;
        amqp_basic_properties_t *p;
        size_t body_target;
        size_t body_received;

        amqp_maybe_release_buffers(conn);
        Debug("%s - wait frame\n", __FUNCTION__);
        result = amqp_simple_wait_frame(conn, &frame);
        Debug("%s Result %d\n",__FUNCTION__, result);
        if (result < 0) {
                Debug("%s Returning as result < 0\n",__FUNCTION__);
                if (exchange.length() > 0) unbindQueue(queuename, exchange.c_str(), routingkey.c_str());
                return "";
        }

        Debug("%s Frame type %d, channel %d\n",__FUNCTION__, frame.frame_type, frame.channel);
        if (frame.frame_type != AMQP_FRAME_METHOD) {
                if (exchange.length() > 0) unbindQueue(queuename, exchange.c_str(), routingkey.c_str());
                return "";
        }

        Debug("%s Method %s\n", __FUNCTION__, amqp_method_name(frame.payload.method.id));
        if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
                if (exchange.length() > 0) unbindQueue(queuename, exchange.c_str(), routingkey.c_str());
                return "";
        }

        d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
        Debug("%s Delivery %u, exchange %.*s routingkey %.*s\n",__FUNCTION__,
                        (unsigned) d->delivery_tag,
                        (int) d->exchange.len, (char *) d->exchange.bytes,
                        (int) d->routing_key.len, (char *) d->routing_key.bytes);

        result = amqp_simple_wait_frame(conn, &frame);
        if (result < 0) {
                if (exchange.length() > 0) unbindQueue(queuename, exchange.c_str(), routingkey.c_str());
                return "";
        }
        if (frame.frame_type != AMQP_FRAME_HEADER) {
                ErrLog( "%s Expected header!",__FUNCTION__);
                unbindQueue(queuename, exchange.c_str(), routingkey.c_str());
                return "";
        }
        p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
        if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
                Debug("%s Content-type: %.*s\n",__FUNCTION__,
                                (int) p->content_type.len, (char *) p->content_type.bytes);
        }
        Debug("%s ----\n", __FUNCTION__);

        body_target = frame.payload.properties.body_size;
        body_received = 0;
        std::string response="";
        while (body_received < body_target) {
                result = amqp_simple_wait_frame(conn, &frame);
                if (result < 0)
                        break;

                if (frame.frame_type != AMQP_FRAME_BODY) {
                        ErrLog( " %s Expected body!", __FUNCTION__);
                        break;
                }

                body_received += frame.payload.body_fragment.len;
                //assert(body_received <= body_target);

                //amqp_dump(frame.payload.body_fragment.bytes,
                // frame.payload.body_fragment.len);
                response += std::string((char *) frame.payload.body_fragment.bytes, body_received );
        }


        if (exchange.length() > 0) unbindQueue(queuename, exchange.c_str(), routingkey.c_str());
        return response;
}
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Why amqp_simple_wait_frame gives a negative value on long wait?

kingsmasher1
This post has NOT been accepted by the mailing list yet.
Okay, seems like i got the solution.
I was putting a heartbeat of 300 secs in amqp_login. If i put it as '0' it goes for an indefinite wait which i was expecting.



Loading...