RabbitMq學習筆記——RabbitMQ C的使用


1、需要用到的參數:

  主機名:hostname、端口號:port、交換器:exchange、路由key:routingkey 、綁定路由:bindingkey、用戶名:user、密碼:psw,默認用戶名和密碼為guest。

2、步驟:

1)、建立TCP連接:

  conn = amqp_new_connection();

  socket = amqp_tcp_socket_new(conn);

2)、打開建立的TCP連接,使用socket,主機名和端口號:

  status = amqp_socket_open(socket, hostname, port);

3)、登錄

  amqp_login(conn,"/",0,131072,0,AMQP_SASL_METHOD_PLAIN,"guest","guest");

4)、打開信道:

  amqp_channel_open(conn, 1);

  amqp_get_rpc_reply(conn);

5)、開始發送或者監聽消息

6)、關閉信道、關閉連接、銷毀連接:

  amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);

  amqp_connection_close(conn, AMQP_REPLY_SUCCESS);

  amqp_destroy_connection(conn);

3、發送端

  pro文件

  HEADERS += \

  include\amqp.h \

  include\amqp_framing.h \

  include\amqp_tcp_socket.h

  SOURCE += \

    main.cpp \

  LIBS += $$PWD\lib\librabbitmq.4.dll.a

 

main.cpp文件

  #include <QtCore/QCoreApplication>

  #include <QDebug>

  #include "amqp.h"

  #include "amqp_framing.h"

  #include "amqp_tcp_socket.h"

  int main(int argc, char *argv[])
  {
    QCoreApplication a(argc, argv);

    const char* hostname = "localhost";

    int port = 5672;

    const char* exchange = "amq.direct";

    const char* routingkey = "test";

    const char* messagebody = "Hello World!";發送"Hello World!”

    amqp_connection_state_t  conn = amqp_new_connection();

    amqp_socket_t* socket = amqp_tcp_socket_new(conn);

    if (!socket)

    {
      qDebug() << "creating TCP socket";
    }

    int status = amqp_socket_open(socket, hostname, port);
    if (status)

    {
      qDebug() << "opening TCP socket";
    }

    amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
    amqp_channel_open(conn, 1);
    amqp_get_rpc_reply(conn);

    {
      amqp_basic_properties_t props;
      props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
      props.content_type = amqp_cstring_bytes("text/plain");
      props.delivery_mode = 2; /* persistent delivery mode */
      amqp_basic_publish(conn,1,amqp_cstring_bytes(exchange),amqp_cstring_bytes(routingkey),0,0,&props,amqp_cstring_bytes(messagebody));
    }

    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);

    return a.exec();
  }

4、接收端

  pro文件

  HEADERS += \

  include\amqp.h \

  include\amqp_framing.h \

  include\amqp_tcp_socket.h

  SOURCE += \

    main.cpp \

  LIBS += $$PWD\lib\librabbitmq.4.dll.a

 

main.cpp文件

  int main(int argc, char *argv[])

  {

    QApplication a(argc, argv);
int port = 5672;
const char* hostname = "192.168.250.223";
const char* exchange = "amq.direct";
const char* bindingkey = "test";
 
        
    amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t* socket = amqp_tcp_socket_new(conn);
    if(!socket)
{
qDebug()<<"creating TCP socket";
}
 
        
    int status = amqp_socket_open(socket,hostname,port);
if(status)
{
qDebug() << "opening TCP socket";
}
    amqp_login(conn,"/",0,131072,0,AMQP_SASL_METHOD_PLAIN,"guest","guest");
amqp_channel_open(conn,1);
amqp_get_rpc_reply(conn);
    amqp_queue_declare_ok_t *r = amqp_queue_declare(conn,1,amqp_empty_bytes,0,0,0,1,amqp_empty_table);
amqp_get_rpc_reply(conn);
    amqp_bytes_t queuename = amqp_bytes_malloc_dup(r->queue);
if(queuename.bytes == nullptr)
{
fprintf(stderr,"Out of memory while copying queue name");
return 1;
}
    amqp_queue_bind(conn,1,queuename,amqp_cstring_bytes(exchange),amqp_cstring_bytes(bindingkey),amqp_empty_table);
amqp_get_rpc_reply(conn);
amqp_basic_consume(conn,1,queuename,amqp_empty_bytes,0,1,0,amqp_empty_table);
amqp_get_rpc_reply(conn);

while(1)
{
    amqp_maybe_release_buffers(conn);
    amqp_envelope_t envelope;
    amqp_rpc_reply_t res = amqp_consume_message(conn,&envelope,nullptr,0);
    if(AMQP_RESPONSE_NORMAL != res.reply_type)
    {
      break;
    }
    printf("Delivery %u, exchange %.*s routingkey %.*s\n",
          static_cast<uint>(envelope.delivery_tag),
          static_cast<int>(envelope.exchange.len),
          static_cast<char*>(envelope.exchange.bytes),
          static_cast<int>(envelope.routing_key.len),
          static_cast<char*>(envelope.routing_key.bytes));
    if(envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG)
    {
      printf("Content-type: %.*s\n",
            static_cast<int>(envelope.message.properties.content_type.len),
static_cast<char*>(envelope.message.properties.content_type.bytes));
    }
    amqp_dump(envelope.message.body.bytes,envelope.message.body.len);
    amqp_destroy_envelope(&envelope);
}
    amqp_channel_close(conn,1,AMQP_REPLY_SUCCESS);
amqp_connection_close(conn,AMQP_REPLY_SUCCESS);
amqp_destroy_connection(conn);
Widget w;
w.show();
return a.exec();
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM