Quantcast

not receiving messages as after amqp_queue_bind

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

not receiving messages as after amqp_queue_bind

sayedmomeen
This post has NOT been accepted by the mailing list yet.
Hello... i'm facing some to receive msgs... i have sender and receiver..
Sender is simple code just send msgs.
but Receiver simply receive. when it receive 1st msg it put a binding using amqp_queue_bind. after that it wont receive any msgs. even there still many msgs... if i send msgs again from sender it'll start receiving.

ill paste the sample code and output for the same...

Sender.cpp
---------------------------
#include "amqp_send.h"
#include <unistd.h>
#include <stdio.h>
extern "C"
{

int main(int argc, char ** argv)
{
    char const                  *       hostname;
    char const                  *       exchange;
    char const                  *       routingkey;
    char                        *       cMsg            = NULL;
    int                                 port;
    int                                 sockfd;

    int                                 iTimesToSendMsg  = 1;
    int                                 iBytesToSend     = 1;
    long                                lTotalTime       = 0;
    long                                lAvgTime         = 0;
    int                                 iLoop            = 0;
    struct timeval                      sStartTime;
    struct timeval                      sEndTime;
    amqp_connection_state_t             conn;

    //Check for command-line parameters
    if (argc < 4)
    {
        printf("Usage: amqp_sendstring host port exchange routingkey bytes_to_send no_of_times msg\n");
        return 1;
    }

    //Reading the command-line parameters
    hostname        = argv[1];
    port            = atoi(argv[2]);
    exchange        = argv[3];
    routingkey      = argv[4];
    if(argv[5])
        iBytesToSend    = atoi(argv[5]);
   if(argv[6])
        iTimesToSendMsg = atoi(argv[6]);

   char msg = argv[7][0];
    cMsg = new char[iBytesToSend + 1];
    memset(cMsg,msg,iBytesToSend);
    cMsg[iBytesToSend+1] = '\0';

     printf("\n\n\n/*************************** Amqp_Send ****************************************/ \n");
     printf("\n Connected To    : %s:%s\n", argv[1],argv[2]);
     printf("\n No of Messages  : %d \n",iTimesToSendMsg);
     printf("\n Size of Message : %d \n", iBytesToSend);
     printf("\n/*************************** Amqp_Send *******************************************/ \n");


    // Create a new amqp connection
    conn = amqp_new_connection();

    // open a socket to hostname on portnumber
    die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket");

    //Sets the sockfd associated with the connection.
    amqp_set_sockfd(conn, sockfd);

    // connecting to rabbitmq broker
    die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
                      "Logging in");
    //opens the channel
    amqp_channel_open(conn, 1);
    die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");


    amqp_basic_properties_t props;
    props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;

    props.delivery_mode = 1; // Non persistent delivery mode

    //Get Time before send
    gettimeofday(&sStartTime,NULL);
    while (iLoop < iTimesToSendMsg)
    {


        die_on_error(amqp_basic_publish(conn,
                                        1,
                                        amqp_cstring_bytes(exchange),
                                        amqp_cstring_bytes(routingkey),
                                        0,
                                        0,
                                        &props,
                                        amqp_cstring_bytes(cMsg)),
                     "Publishing");

        iLoop++;

    }

     //Get Time After send
    gettimeofday(&sEndTime,NULL);

    lTotalTime = (sEndTime.tv_sec + sEndTime.tv_usec) - (sStartTime.tv_sec + sStartTime.tv_usec);
    lAvgTime = lTotalTime/iTimesToSendMsg;


    printf("\n Start Time : %ld +%ld \n",sStartTime.tv_sec,sStartTime.tv_usec);
    printf("\n End Time : %ld +%ld \n",sEndTime.tv_sec,sEndTime.tv_usec);
    printf("\n Total Time Taken : %ld \n",lTotalTime);
    printf("\n Avg. Time Taken : %ld \n",lAvgTime);
    printf("\n/***************************** Finished *****************************************/ \n");


    delete(cMsg);
    die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
    die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
    die_on_error(amqp_destroy_connection(conn), "Ending connection");
    return 0;
}
}


-------------------------------------------------
Receiver.cpp

-------------------------------------------------
#include "amqp_listen.h"
#include <unistd.h>
#include <stdio.h>

extern "C"
{
int main(int argc, char const * const *argv)
{
    char const      *               hostname;
    char const      *               exchange;
    char const      *               bindingkey;
    static int                      iStartTime          = 0;
    int                             iBytesToRecv        = 0;
    int                             iBytesRcvd          = 0;
    int                             port;
    int                             amqp_listen_fd      = 0;
    int                             iCount              = 0;
    long                            lTotalTime          = 0;
    long                            lAvgTime            = 0;
    struct timeval                  sStartTime;
    struct timeval                  sEndTime;
    fd_set                          read_flags;
    amqp_bytes_t                    queuename;
    amqp_connection_state_t         conn;



    if (argc < 7)
    {
        fprintf(stderr, "Usage: amqp_listen host port exchange bindingkey Size_of_msg No_of_Msgs_To_Recv \n");
        return 1;
    }

    hostname     = argv[1];
    port         = atoi(argv[2]);
    exchange     = argv[3];
    bindingkey   = argv[4];
    iBytesToRecv = atoi(argv[5]) * atoi(argv[6]);

    printf("\n\n\n/*************************** Amqp_Listen ****************************************/ \n");
    printf("\n Listening on    : %s:%s\n", argv[1],argv[2]);
    printf("\n/*************************** Amqp_Listen *******************************************/ \n");

    // Create a new amqp connection
    conn = amqp_new_connection();

    // open a socket to hostname on portnumber
    die_on_error(amqp_listen_fd = amqp_open_socket(hostname, port), "Opening socket");

     //Sets the sockfd associated with the connection.
    amqp_set_sockfd(conn, amqp_listen_fd);

    // connecting to rabbitmq broker
    die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
                      "Logging in");

    //opens the channel
    amqp_channel_open(conn, 1);
    die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel");


    // declare a queue
    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1,
                                                    amqp_empty_table);
    die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue");
    queuename = amqp_bytes_malloc_dup(r->queue);

    if (queuename.bytes == NULL)
    {
        fprintf(stderr, "Out of memory while copying queue name");
        return 1;
    }

   // Bind with the queuename to the exchange with the binding key
    amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
                    amqp_empty_table);

    die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");

    // accept the incoming connection
    amqp_basic_consume(conn, 1, queuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);

    die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");

    FD_ZERO(&read_flags);
    FD_SET(amqp_listen_fd, &read_flags);
    while(iBytesRcvd < iBytesToRecv)
    {
        int iSel = select(amqp_listen_fd + 1, &read_flags, NULL, NULL, NULL);

        if (iSel <= 0)
        {
            continue;
        }
        if (iSel == 0)
        {
            break;
        }
        if(iStartTime == 0)
        {
            gettimeofday(&sStartTime,NULL);
            iStartTime++;
        }

        amqp_frame_t                frame;
        int                         result;

        amqp_basic_deliver_t    *   d;
        amqp_basic_properties_t *   p;
        size_t                      body_target;
        size_t                      body_received;

        amqp_maybe_release_buffers(conn);
        result = amqp_simple_wait_frame(conn, &frame);

        if (result < 0)
            break;

        if (frame.frame_type != AMQP_FRAME_METHOD)
            continue;

        if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD)
            continue;

        d = (amqp_basic_deliver_t *) frame.payload.method.decoded;

        result = amqp_simple_wait_frame(conn, &frame);

        if (result < 0)
            break;

        if (frame.frame_type != AMQP_FRAME_HEADER)
        {
            fprintf(stderr, "Expected header!");
            abort();
        }


        p = (amqp_basic_properties_t *) frame.payload.properties.decoded;

        body_target   = frame.payload.properties.body_size;
        body_received = 0;

        while (body_received < body_target)
        {
            result = amqp_simple_wait_frame(conn, &frame);
            if (result < 0)
                break;

            if (frame.frame_type != AMQP_FRAME_BODY)
            {
                fprintf(stderr, "Expected body!");
                abort();
            }

            body_received += frame.payload.body_fragment.len;
            assert(body_received <= body_target);

        }

        if (body_received != body_target)
        {
            /* Can only happen when amqp_simple_wait_frame returns <= 0 */
            /* We break here to close the connection */
            break;
        }
        iBytesRcvd += (unsigned int)body_target;
        iCount++;
        char data[200];
        memcpy(data,
               frame.payload.body_fragment.bytes,
               frame.payload.body_fragment.len);
        data[frame.payload.body_fragment.len] = '\0';
        fprintf(stderr, "\nmsg no: %d  msg:%s",iCount,data);
        if(iCount==1 || iCount == 4)
          {
                if(iCount==1)
                        bindingkey = "Hello";
                else
                        bindingkey = "ohhhhh";
            fprintf(stderr, "\n add sub!");
            amqp_queue_bind(conn,
                            1,
                            queuename,
                            amqp_cstring_bytes(exchange),
                            amqp_cstring_bytes(bindingkey),
                            amqp_empty_table);
            die_on_amqp_error(amqp_get_rpc_reply(conn), "Binding queue");
          }
    }
    gettimeofday(&sEndTime,NULL);

    lTotalTime = (sEndTime.tv_sec + sEndTime.tv_usec) - (sStartTime.tv_sec + sStartTime.tv_usec);
    lAvgTime = lTotalTime/iCount;

    printf("\n No of Msgs       : %d \n",iCount);
    printf("\n Start Time       : %ld +%ld \n",sStartTime.tv_sec,sStartTime.tv_usec);
    printf("\n End Time         : %ld +%ld \n",sEndTime.tv_sec,sEndTime.tv_usec);
    printf("\n Total Time Taken : %ld \n",lTotalTime);
    printf("\n Avg. Time Taken  : %ld \n",lAvgTime);
    printf("\n/***************************** Finished *****************************************/ \n");

    die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel");
    die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
    die_on_error(amqp_destroy_connection(conn), "Ending connection");

    return 0;
}
}
---------------------------------------------------------------------------------------------------





complete OutPut Reveiver:
--------------------------
[prime@mydev exe]$ ./amqp_listen_rls 192.168.1.2 5672 otp_test test 3 10



/*************************** Amqp_Listen ****************************************/

 Listening on    : 192.168.1.2:5672

/*************************** Amqp_Listen *******************************************/

msg no: 1  msg:HHH
 add sub!
msg no: 2  msg:HHH
msg no: 3  msg:HHH
msg no: 4  msg:HHH
 add sub!
msg no: 5  msg:HHH
msg no: 6  msg:HHH
msg no: 7  msg:HHH
msg no: 8  msg:HHH
msg no: 9  msg:HHH
msg no: 10  msg:HHH
 No of Msgs       : 10

 Start Time       : 1409204692 +602237

 End Time         : 1409204698 +562173

 Total Time Taken : -40058

 Avg. Time Taken  : -4005

/***************************** Finished *****************************************/


-------------------------------------------------------------------------------------------------------
receiver stuck after receiving 1st msg then i have send msgs again.. after receiving 4th msgs itll stuck.
again i need to send msgs from sender... but total 3 times i need run sender to receive the msgs...

for complete detail ill past the complete sender side output which i sent 3 time.

----------------------------------------------------------------------------------------------
[prime@mydev exe]$ ./amqp_send_rls 192.168.1.2 5672 otp_test test3 10 H



/*************************** Amqp_Send ****************************************/

 Connected To    : 192.168.1.2:5672

 No of Messages  : 10

 Size of Message : 3

/*************************** Amqp_Send *******************************************/

 Start Time : 1409204692 +600655

 End Time : 1409204692 +600842

 Total Time Taken : 187

 Avg. Time Taken : 18

/***************************** Finished *****************************************/
[prime@mydev exe]$ ./amqp_send_rls 192.168.1.2 5672 otp_test test 3 10 G



/*************************** Amqp_Send ****************************************/

 Connected To    : 192.168.1.2:5672

 No of Messages  : 10

 Size of Message : 3

/*************************** Amqp_Send *******************************************/

 Start Time : 1409204694 +705660

 End Time : 1409204694 +705816

 Total Time Taken : 156

 Avg. Time Taken : 15

/***************************** Finished *****************************************/
[prime@mydev exe]$ ./amqp_send_rls 192.168.1.2 5672 otp_test test 3 10 J



/*************************** Amqp_Send ****************************************/

 Connected To    : 192.168.1.2:5672

 No of Messages  : 10

 Size of Message : 3

/*************************** Amqp_Send *******************************************/

 Start Time : 1409204698 +561458

 End Time : 1409204698 +561590

 Total Time Taken : 132

 Avg. Time Taken : 13

/***************************** Finished *****************************************/

please help me how overcome this problem


Loading...