c語言使用librdkafka庫實現kafka的生產和消費實例(轉)


關於librdkafka庫的介紹,可以參考kafka的c/c++高性能客戶端librdkafka簡介,本文使用librdkafka庫來進行kafka的簡單的生產、消費

 

一、producer

librdkafka進行kafka生產操作的大致步驟如下:

1、創建kafka配置

 

[cpp]  view plain  copy
 
  1. rd_kafka_conf_t *rd_kafka_conf_new (void)  

 

2、配置kafka各項參數

[cpp]  view plain  copy
 
  1. rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,  
  2.                                        const char *name,  
  3.                                        const char *value,  
  4.                                        char *errstr, size_t errstr_size)  

 

3、設置發送回調函數

 

[cpp]  view plain  copy
 
  1. void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,  
  2.                                   void (*dr_msg_cb) (rd_kafka_t *rk,  
  3.                                   const rd_kafka_message_t *  
  4.                                   rkmessage,  
  5.                                   void *opaque))  

 

4、創建producer實例

 

[cpp]  view plain  copy
 
  1. rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,char *errstr, size_t errstr_size)  

 

5、實例化topic

[cpp]  view plain  copy
 
  1. rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)  

 

6、異步調用將消息發送到指定的topic

[cpp]  view plain  copy
 
  1. int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,  
  2.               int msgflags,  
  3.               void *payload, size_t len,  
  4.               const void *key, size_t keylen,  
  5.               void *msg_opaque)  

 

7、阻塞等待消息發送完成

[cpp]  view plain  copy
 
  1. int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms)  

 

8、等待完成producer請求完成

[cpp]  view plain  copy
 
  1. rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms)  

 

9、銷毀topic

[cpp]  view plain  copy
 
  1. void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt)  

 

10、銷毀producer實例

[cpp]  view plain  copy
 
  1. void rd_kafka_destroy (rd_kafka_t *rk)  


完整代碼如下my_producer.c:

 

 

[cpp]  view plain  copy
 
  1. #include <stdio.h>  
  2. #include <signal.h>  
  3. #include <string.h>  
  4.   
  5. #include "../src/rdkafka.h"  
  6.   
  7. static int run = 1;  
  8.   
  9. static void stop(int sig){  
  10.     run = 0;  
  11.     fclose(stdin);  
  12. }  
  13.   
  14. /* 
  15.     每條消息調用一次該回調函數,說明消息是傳遞成功(rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) 
  16.     還是傳遞失敗(rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) 
  17.     該回調函數由rd_kafka_poll()觸發,在應用程序的線程上執行 
  18.  */  
  19. static void dr_msg_cb(rd_kafka_t *rk,  
  20.                       const rd_kafka_message_t *rkmessage, void *opaque){  
  21.         if(rkmessage->err)  
  22.             fprintf(stderr, "%% Message delivery failed: %s\n",   
  23.                     rd_kafka_err2str(rkmessage->err));  
  24.         else  
  25.             fprintf(stderr,  
  26.                         "%% Message delivered (%zd bytes, "  
  27.                         "partition %"PRId32")\n",  
  28.                         rkmessage->len, rkmessage->partition);  
  29.         /* rkmessage被librdkafka自動銷毀*/  
  30. }  
  31.   
  32. int main(int argc, char **argv){  
  33.     rd_kafka_t *rk;            /*Producer instance handle*/  
  34.     rd_kafka_topic_t *rkt;     /*topic對象*/  
  35.     rd_kafka_conf_t *conf;     /*臨時配置對象*/  
  36.     char errstr[512];            
  37.     char buf[512];               
  38.     const char *brokers;         
  39.     const char *topic;           
  40.   
  41.     if(argc != 3){  
  42.         fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);  
  43.         return 1;  
  44.     }  
  45.   
  46.     brokers = argv[1];  
  47.     topic = argv[2];  
  48.   
  49.     /* 創建一個kafka配置占位 */  
  50.     conf = rd_kafka_conf_new();  
  51.   
  52.     /*創建broker集群*/  
  53.     if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,  
  54.                 sizeof(errstr)) != RD_KAFKA_CONF_OK){  
  55.         fprintf(stderr, "%s\n", errstr);  
  56.         return 1;  
  57.     }  
  58.   
  59.     /*設置發送報告回調函數,rd_kafka_produce()接收的每條消息都會調用一次該回調函數 
  60.      *應用程序需要定期調用rd_kafka_poll()來服務排隊的發送報告回調函數*/  
  61.     rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);  
  62.   
  63.     /*創建producer實例 
  64.       rd_kafka_new()獲取conf對象的所有權,應用程序在此調用之后不得再次引用它*/  
  65.     rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));  
  66.     if(!rk){  
  67.         fprintf(stderr, "%% Failed to create new producer:%s\n", errstr);  
  68.         return 1;  
  69.     }  
  70.   
  71.     /*實例化一個或多個topics(`rd_kafka_topic_t`)來提供生產或消費,topic 
  72.     對象保存topic特定的配置,並在內部填充所有可用分區和leader brokers,*/  
  73.     rkt = rd_kafka_topic_new(rk, topic, NULL);  
  74.     if (!rkt){  
  75.         fprintf(stderr, "%% Failed to create topic object: %s\n",   
  76.                 rd_kafka_err2str(rd_kafka_last_error()));  
  77.         rd_kafka_destroy(rk);  
  78.         return 1;  
  79.     }  
  80.   
  81.     /*用於中斷的信號*/  
  82.     signal(SIGINT, stop);  
  83.   
  84.     fprintf(stderr,  
  85.                 "%% Type some text and hit enter to produce message\n"  
  86.                 "%% Or just hit enter to only serve delivery reports\n"  
  87.                 "%% Press Ctrl-C or Ctrl-D to exit\n");  
  88.   
  89.      while(run && fgets(buf, sizeof(buf), stdin)){  
  90.         size_t len = strlen(buf);  
  91.   
  92.         if(buf[len-1] == '\n')  
  93.             buf[--len] = '\0';  
  94.   
  95.         if(len == 0){  
  96.             /*輪詢用於事件的kafka handle, 
  97.             事件將導致應用程序提供的回調函數被調用 
  98.             第二個參數是最大阻塞時間,如果設為0,將會是非阻塞的調用*/  
  99.             rd_kafka_poll(rk, 0);  
  100.             continue;  
  101.         }  
  102.   
  103.      retry:  
  104.          /*Send/Produce message. 
  105.            這是一個異步調用,在成功的情況下,只會將消息排入內部producer隊列, 
  106.            對broker的實際傳遞嘗試由后台線程處理,之前注冊的傳遞回調函數(dr_msg_cb) 
  107.            用於在消息傳遞成功或失敗時向應用程序發回信號*/  
  108.         if (rd_kafka_produce(  
  109.                     /* Topic object */  
  110.                     rkt,  
  111.                     /*使用內置的分區來選擇分區*/  
  112.                     RD_KAFKA_PARTITION_UA,  
  113.                     /*生成payload的副本*/  
  114.                     RD_KAFKA_MSG_F_COPY,  
  115.                     /*消息體和長度*/  
  116.                     buf, len,  
  117.                     /*可選鍵及其長度*/  
  118.                     NULL, 0,  
  119.                     NULL) == -1){  
  120.             fprintf(stderr,   
  121.                 "%% Failed to produce to topic %s: %s\n",   
  122.                 rd_kafka_topic_name(rkt),  
  123.                 rd_kafka_err2str(rd_kafka_last_error()));  
  124.   
  125.             if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){  
  126.                 /*如果內部隊列滿,等待消息傳輸完成並retry, 
  127.                 內部隊列表示要發送的消息和已發送或失敗的消息, 
  128.                 內部隊列受限於queue.buffering.max.messages配置項*/  
  129.                 rd_kafka_poll(rk, 1000);  
  130.                 goto retry;  
  131.             }     
  132.         }else{  
  133.             fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n",   
  134.                 len, rd_kafka_topic_name(rkt));  
  135.         }  
  136.   
  137.         /*producer應用程序應不斷地通過以頻繁的間隔調用rd_kafka_poll()來為 
  138.         傳送報告隊列提供服務。在沒有生成消息以確定先前生成的消息已發送了其 
  139.         發送報告回調函數(和其他注冊過的回調函數)期間,要確保rd_kafka_poll() 
  140.         仍然被調用*/  
  141.         rd_kafka_poll(rk, 0);  
  142.      }  
  143.   
  144.      fprintf(stderr, "%% Flushing final message.. \n");  
  145.      /*rd_kafka_flush是rd_kafka_poll()的抽象化, 
  146.      等待所有未完成的produce請求完成,通常在銷毀producer實例前完成 
  147.      以確保所有排列中和正在傳輸的produce請求在銷毀前完成*/  
  148.      rd_kafka_flush(rk, 10*1000);  
  149.   
  150.      /* Destroy topic object */  
  151.      rd_kafka_topic_destroy(rkt);  
  152.   
  153.      /* Destroy the producer instance */  
  154.      rd_kafka_destroy(rk);  
  155.   
  156.      return 0;  
  157. }  



 

二、consumer

librdkafka進行kafka消費操作的大致步驟如下:

 

1、創建kafka配置

[cpp]  view plain  copy
 
  1. rd_kafka_conf_t *rd_kafka_conf_new (void)  

 

2、創建kafka topic的配置

[cpp]  view plain  copy
 
  1. rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void)   

 

3、配置kafka各項參數

[cpp]  view plain  copy
 
  1. rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,  
  2.                                        const char *name,  
  3.                                        const char *value,  
  4.                                        char *errstr, size_t errstr_size)  

 

4、配置kafka topic各項參數

[cpp]  view plain  copy
 
  1. rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,  
  2.                          const char *name,  
  3.                          const char *value,  
  4.                          char *errstr, size_t errstr_size)  

 

5、創建consumer實例

[cpp]  view plain  copy
 
  1. rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)  

 

6、為consumer實例添加brokerlist

[cpp]  view plain  copy
 
  1. int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)  

 

7、開啟consumer訂閱

[cpp]  view plain  copy
 
  1. rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)  

 

8、輪詢消息或事件,並調用回調函數

[cpp]  view plain  copy
 
  1. rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,int timeout_ms)  

 

9、關閉consumer實例

[cpp]  view plain  copy
 
  1. rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk)  

 

10、釋放topic list資源

[cpp]  view plain  copy
 
  1. rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist)  

 

11、銷毀consumer實例

[cpp]  view plain  copy
 
  1. void rd_kafka_destroy (rd_kafka_t *rk)   

 

12、等待consumer對象的銷毀

[cpp]  view plain  copy
 
  1. int rd_kafka_wait_destroyed (int timeout_ms)  

 

 

完整代碼如下my_consumer.c

[cpp]  view plain  copy
 
  1. #include <string.h>  
  2. #include <stdlib.h>  
  3. #include <syslog.h>  
  4. #include <signal.h>  
  5. #include <error.h>  
  6. #include <getopt.h>  
  7.   
  8. #include "../src/rdkafka.h"  
  9.   
  10. static int run = 1;  
  11. //`rd_kafka_t`自帶一個可選的配置API,如果沒有調用API,Librdkafka將會使用CONFIGURATION.md中的默認配置。  
  12. static rd_kafka_t *rk;  
  13. static rd_kafka_topic_partition_list_t *topics;  
  14.   
  15. static void stop (int sig) {  
  16.   if (!run)  
  17.     exit(1);  
  18.   run = 0;  
  19.   fclose(stdin); /* abort fgets() */  
  20. }  
  21.   
  22. static void sig_usr1 (int sig) {  
  23.   rd_kafka_dump(stdout, rk);  
  24. }  
  25.   
  26. /** 
  27.  * 處理並打印已消費的消息 
  28.  */  
  29. static void msg_consume (rd_kafka_message_t *rkmessage,  
  30.        void *opaque) {  
  31.   if (rkmessage->err) {  
  32.     if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {  
  33.       fprintf(stderr,  
  34.         "%% Consumer reached end of %s [%"PRId32"] "  
  35.              "message queue at offset %"PRId64"\n",  
  36.              rd_kafka_topic_name(rkmessage->rkt),  
  37.              rkmessage->partition, rkmessage->offset);  
  38.   
  39.       return;  
  40.     }  
  41.   
  42.     if (rkmessage->rkt)  
  43.             fprintf(stderr, "%% Consume error for "  
  44.                     "topic \"%s\" [%"PRId32"] "  
  45.                     "offset %"PRId64": %s\n",  
  46.                     rd_kafka_topic_name(rkmessage->rkt),  
  47.                     rkmessage->partition,  
  48.                     rkmessage->offset,  
  49.                     rd_kafka_message_errstr(rkmessage));  
  50.     else  
  51.             fprintf(stderr, "%% Consumer error: %s: %s\n",  
  52.                     rd_kafka_err2str(rkmessage->err),  
  53.                     rd_kafka_message_errstr(rkmessage));  
  54.   
  55.     if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||  
  56.         rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)  
  57.           run = 0;  
  58.     return;  
  59.   }  
  60.   
  61.   fprintf(stdout, "%% Message (topic %s [%"PRId32"], "  
  62.                       "offset %"PRId64", %zd bytes):\n",  
  63.                       rd_kafka_topic_name(rkmessage->rkt),  
  64.                       rkmessage->partition,  
  65.     rkmessage->offset, rkmessage->len);  
  66.   
  67.   if (rkmessage->key_len) {  
  68.     printf("Key: %.*s\n",  
  69.              (int)rkmessage->key_len, (char *)rkmessage->key);  
  70.   }  
  71.   
  72.   printf("%.*s\n",  
  73.            (int)rkmessage->len, (char *)rkmessage->payload);  
  74.     
  75. }  
  76.   
  77. /* 
  78.   init all configuration of kafka 
  79.  */  
  80. int initKafka(char *brokers, char *group,char *topic){  
  81.   rd_kafka_conf_t *conf;  
  82.   rd_kafka_topic_conf_t *topic_conf;  
  83.   rd_kafka_resp_err_t err;  
  84.   char tmp[16];  
  85.   char errstr[512];  
  86.   
  87.   /* Kafka configuration */  
  88.   conf = rd_kafka_conf_new();  
  89.   
  90.   //quick termination  
  91.   snprintf(tmp, sizeof(tmp), "%i", SIGIO);  
  92.   rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);  
  93.   
  94.   //topic configuration  
  95.   topic_conf = rd_kafka_topic_conf_new();  
  96.   
  97.   /* Consumer groups require a group id */  
  98.   if (!group)  
  99.           group = "rdkafka_consumer_example";  
  100.   if (rd_kafka_conf_set(conf, "group.id", group,  
  101.                         errstr, sizeof(errstr)) !=  
  102.       RD_KAFKA_CONF_OK) {  
  103.           fprintf(stderr, "%% %s\n", errstr);  
  104.           return -1;  
  105.   }  
  106.   
  107.   /* Consumer groups always use broker based offset storage */  
  108.   if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",  
  109.                               "broker",  
  110.                               errstr, sizeof(errstr)) !=  
  111.       RD_KAFKA_CONF_OK) {  
  112.           fprintf(stderr, "%% %s\n", errstr);  
  113.           return -1;  
  114.   }  
  115.   
  116.   /* Set default topic config for pattern-matched topics. */  
  117.   rd_kafka_conf_set_default_topic_conf(conf, topic_conf);  
  118.   
  119.   //實例化一個頂級對象rd_kafka_t作為基礎容器,提供全局配置和共享狀態  
  120.   rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));  
  121.   if(!rk){  
  122.     fprintf(stderr, "%% Failed to create new consumer:%s\n", errstr);  
  123.     return -1;  
  124.   }  
  125.   
  126.   //Librdkafka需要至少一個brokers的初始化list  
  127.   if (rd_kafka_brokers_add(rk, brokers) == 0){  
  128.     fprintf(stderr, "%% No valid brokers specified\n");  
  129.     return -1;  
  130.   }  
  131.   
  132.   //重定向 rd_kafka_poll()隊列到consumer_poll()隊列  
  133.   rd_kafka_poll_set_consumer(rk);  
  134.   
  135.   //創建一個Topic+Partition的存儲空間(list/vector)  
  136.   topics = rd_kafka_topic_partition_list_new(1);  
  137.   //把Topic+Partition加入list  
  138.   rd_kafka_topic_partition_list_add(topics, topic, -1);  
  139.   //開啟consumer訂閱,匹配的topic將被添加到訂閱列表中  
  140.   if((err = rd_kafka_subscribe(rk, topics))){  
  141.       fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));  
  142.       return -1;  
  143.   }  
  144.   
  145.   return 1;  
  146. }  
  147.   
  148. int main(int argc, char **argv){  
  149.   char *brokers = "localhost:9092";  
  150.   char *group = NULL;  
  151.   char *topic = NULL;  
  152.     
  153.   int opt;  
  154.   rd_kafka_resp_err_t err;  
  155.   
  156.   while ((opt = getopt(argc, argv, "g:b:t:qd:eX:As:DO")) != -1){  
  157.     switch (opt) {  
  158.       case 'b':  
  159.         brokers = optarg;  
  160.         break;  
  161.       case 'g':  
  162.         group = optarg;  
  163.         break;  
  164.       case 't':  
  165.         topic = optarg;  
  166.         break;  
  167.       default:  
  168.         break;  
  169.     }  
  170.   }   
  171.   
  172.   signal(SIGINT, stop);  
  173.   signal(SIGUSR1, sig_usr1);  
  174.   
  175.   if(!initKafka(brokers, group, topic)){  
  176.     fprintf(stderr, "kafka server initialize error\n");  
  177.   }else{  
  178.     while(run){  
  179.       rd_kafka_message_t *rkmessage;  
  180.       /*-輪詢消費者的消息或事件,最多阻塞timeout_ms 
  181.         -應用程序應該定期調用consumer_poll(),即使沒有預期的消息,以服務 
  182.         所有排隊等待的回調函數,當注冊過rebalance_cb,該操作尤為重要, 
  183.         因為它需要被正確地調用和處理以同步內部消費者狀態 */  
  184.       rkmessage = rd_kafka_consumer_poll(rk, 1000);  
  185.       if(rkmessage){  
  186.         msg_consume(rkmessage, NULL);  
  187.         /*釋放rkmessage的資源,並把所有權還給rdkafka*/  
  188.         rd_kafka_message_destroy(rkmessage);  
  189.       }  
  190.     }  
  191.   }  
  192.   
  193. done:  
  194.     /*此調用將會阻塞,直到consumer撤銷其分配,調用rebalance_cb(如果已設置), 
  195.     commit offset到broker,並離開consumer group 
  196.     最大阻塞時間被設置為session.timeout.ms 
  197.     */  
  198.     err = rd_kafka_consumer_close(rk);  
  199.     if(err){  
  200.       fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));  
  201.     }else{  
  202.       fprintf(stderr, "%% Consumer closed\n");  
  203.     }  
  204.   
  205.     //釋放topics list使用的所有資源和它自己  
  206.     rd_kafka_topic_partition_list_destroy(topics);  
  207.   
  208.     //destroy kafka handle  
  209.     rd_kafka_destroy(rk);  
  210.     
  211.     run = 5;  
  212.     //等待所有rd_kafka_t對象銷毀,所有kafka對象被銷毀,返回0,超時返回-1  
  213.     while(run-- > 0 && rd_kafka_wait_destroyed(1000) == -1){  
  214.       printf("Waiting for librdkafka to decommission\n");  
  215.     }  
  216.     if(run <= 0){  
  217.       //dump rdkafka內部狀態到stdout流  
  218.       rd_kafka_dump(stdout, rk);  
  219.     }  
  220.   
  221.     return 0;  
  222. }  


在linux下編譯producer和consumer的代碼:

[cpp]  view plain  copy
 
  1. gcc my_producer.c -o my_producer  -lrdkafka -lz -lpthread -lrt  
  2. gcc my_consumer.c -o my_consumer  -lrdkafka -lz -lpthread -lrt  

在運行my_producer或my_consumer時可能會報錯"error while loading shared libraries xxx.so", 此時需要在/etc/ld.so.conf中加入xxx.so所在的目錄


在本地啟動一個簡單的kafka服務,設置broker集群為localhost:9092並創建一個叫“test_topic”的topic
啟動方式可參考 kafka0.8.2集群的環境搭建並實現基本的生產消費

啟動consumer:

 

啟動producer,並發送一條數據“hello world”:


consumer處成功收到producer發送的“hello world”:

 

http://orchome.com/5

https://github.com/edenhill/librdkafka

https://github.com/mfontanini/cppkafka

https://github.com/zengyuxing007/kafka_test_cpp


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM