RabbitMQ-C 客戶端接口使用說明


  rabbitmq-c是一個用於C語言的,與AMQP server進行交互的client庫。AMQP協議為版本0-9-1。rabbitmq-c與server進行交互前需要首先進行login操作,在操作后,可以根據AMQP協議規范,執行一系列操作。

  這里,根據項目需求,只進行部分接口說明,文后附demo的github地址。

接口描述

  • 接口說明:聲明一個新的amqp connection

    amqp_connection_state_t amqp_new_connection(void);

  • 接口說明:獲取socket

     參數說明:hostname         RabbitMQ server所在主機

       portnumber      RabbitMQ server監聽端口   

    int amqp_open_socket(char const *hostname, int portnumber);

  • 接口說明:將amqp connection和sockfd進行綁定

    void amqp_set_sockfd(amqp_connection_state_t state,int sockfd);

  • 接口說明:用於登錄RabbitMQ server,主要目的為了進行權限管理;

    參數說明:state    amqp connection

                     vhost   rabbit-mq的虛機主機,是rabbit-mq進行權限管理的最小單位

                     channel_max  最大鏈接數,此處設成0即可

                     frame_max  和客戶端通信時所允許的最大的frame size.默認值為131072,增大這個值有助於提高吞吐,降低這個值有利於降低時延

                     heartbeat 含義未知,默認值填0

                     sasl_method  用於SSL鑒權,默認值參考后文demo

    amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost,int channel_max,int frame_max,int heartbeat,amqp_sasl_method_enum sasl_method, ...);

  • 接口說明:用於關聯conn和channel

    amqp_channel_open_ok_t *amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel);

  • 接口說明:聲明declare

   參數說明:state

                         channel

                           exchange

                           type     "fanout"  "direct" "topic"三選一

                           passive

                           curable

                           arguments

          amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t durable, amqp_table_t arguments); 

  • 接口說明:聲明queue

      參數說明:state   amqp connection

                  channel 

                  queue  queue name

                  passive 

                  durable  隊列是否持久化

                  exclusive  當前連接不在時,隊列是否自動刪除

                  aoto_delete 沒有consumer時,隊列是否自動刪除

                  arguments 用於拓展參數,比如x-ha-policy用於mirrored queue

   amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_t arguments); 

 

  • 接口說明:聲明binding   

   amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_tab le_t arguments);

  • 接口說明:qos是 quality of service,我們這里使用主要用於控制預取消息數,避免消息按條數均勻分配,需要和no_ack配合使用

        參數說明:state

                   channel

                   prefetch_size 以bytes為單位,0為unlimited

                   prefetch_count 預取的消息條數

                   global

  amqp_basic_qos_ok_t *amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel, uint32_t prefetch_size, uint16_t prefetch_count, amqp_boolean_t global);

  • 接口說明:開始一個queue consumer

        參數說明:state

                   channel

                   queue

                   consumer_tag

                   no_local

                   no_ack    是否需要確認消息后再從隊列中刪除消息

                   exclusive

                   arguments

  amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack, amqp_boolean_t exclusive, amqp_table_t arguments); 

 

  • 接口說明:發布消息

      參數說明:state 

                  channel

                  exchange  

                  routing_key  當exchange為默認“”時,此處填寫queue_name,當exchange為direct,此處為binding_key

                  mandatory 參見參考文獻2

                  immediate 同上

                  properties 更多屬性,如何設置消息持久化,參見文后demo

                  body 消息體

   int amqp_basic_publish(amqp_connection_state_t state,amqp_channel_t channel,amqp_bytes_t exchange,amqp_bytes_t routing_key,amqp_boolean_t mandatory,amqp_boolean_t immediate,struct amqp_basic_properties_t_ const *properties,amqp_bytes_t body);

 

  amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,amqp_channel_t channel,int code);

  amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,int code);

  int amqp_destroy_connection(amqp_connection_state_t state);

   如何consume消息,參見文后demo。

 demo

https://github.com/liuhaobupt/rabbitmq_work_queues_demo-with-rabbit-c-client-lib

其中 rmq_new_task.c和rmq_worker.c對應於RabbitMQ tutorial里的work queues章節(http://www.rabbitmq.com/tutorials/tutorial-two-python.html),emit_log_direct.c和receive_logs_direct.c對應於RabbitMQ tutorial里的routing章節(http://www.rabbitmq.com/tutorials/tutorial-four-python.html),這兩個demo覆蓋了RabbitMQ的常用應用場景。

編譯需要librabbitmq.a庫,同時需要rabbitmq-c提供的幾個頭文件(amqp.h和amqp_framing.h)以及utils.c文件,這些在github project頁面均可獲得。

參考文獻

  1. rabbitmq-c主頁 http://hg.rabbitmq.com/rabbitmq-c/summary
  2. http://www.rabbitmq.com/amqp-0-9-1-reference.html
 轉自:http://www.cnblogs.com/liuhao/archive/2012/04/13/2445641.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM