關於librdkafka庫的介紹,可以參考kafka的c/c++高性能客戶端librdkafka簡介,本文使用librdkafka庫來進行kafka的簡單的生產、消費
一、producer
librdkafka進行kafka生產操作的大致步驟如下:
1、創建kafka配置
- rd_kafka_conf_t *rd_kafka_conf_new (void)
2、配置kafka各項參數
- rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,
- const char *name,
- const char *value,
- char *errstr, size_t errstr_size)
3、設置發送回調函數
- void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,
- void (*dr_msg_cb) (rd_kafka_t *rk,
- const rd_kafka_message_t *
- rkmessage,
- void *opaque))
4、創建producer實例
- rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,char *errstr, size_t errstr_size)
5、實例化topic
- rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)
6、異步調用將消息發送到指定的topic
- int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
- int msgflags,
- void *payload, size_t len,
- const void *key, size_t keylen,
- void *msg_opaque)
7、阻塞等待消息發送完成
- int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms)
8、等待完成producer請求完成
- rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms)
9、銷毀topic
- void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt)
10、銷毀producer實例
- void rd_kafka_destroy (rd_kafka_t *rk)
完整代碼如下my_producer.c:
- #include <stdio.h>
- #include <signal.h>
- #include <string.h>
- #include "../src/rdkafka.h"
- static int run = 1;
- static void stop(int sig){
- run = 0;
- fclose(stdin);
- }
- /*
- 每條消息調用一次該回調函數,說明消息是傳遞成功(rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR)
- 還是傳遞失敗(rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR)
- 該回調函數由rd_kafka_poll()觸發,在應用程序的線程上執行
- */
- static void dr_msg_cb(rd_kafka_t *rk,
- const rd_kafka_message_t *rkmessage, void *opaque){
- if(rkmessage->err)
- fprintf(stderr, "%% Message delivery failed: %s\n",
- rd_kafka_err2str(rkmessage->err));
- else
- fprintf(stderr,
- "%% Message delivered (%zd bytes, "
- "partition %"PRId32")\n",
- rkmessage->len, rkmessage->partition);
- /* rkmessage被librdkafka自動銷毀*/
- }
- int main(int argc, char **argv){
- rd_kafka_t *rk; /*Producer instance handle*/
- rd_kafka_topic_t *rkt; /*topic對象*/
- rd_kafka_conf_t *conf; /*臨時配置對象*/
- char errstr[512];
- char buf[512];
- const char *brokers;
- const char *topic;
- if(argc != 3){
- fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);
- return 1;
- }
- brokers = argv[1];
- topic = argv[2];
- /* 創建一個kafka配置占位 */
- conf = rd_kafka_conf_new();
- /*創建broker集群*/
- if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
- sizeof(errstr)) != RD_KAFKA_CONF_OK){
- fprintf(stderr, "%s\n", errstr);
- return 1;
- }
- /*設置發送報告回調函數,rd_kafka_produce()接收的每條消息都會調用一次該回調函數
- *應用程序需要定期調用rd_kafka_poll()來服務排隊的發送報告回調函數*/
- rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
- /*創建producer實例
- rd_kafka_new()獲取conf對象的所有權,應用程序在此調用之后不得再次引用它*/
- rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
- if(!rk){
- fprintf(stderr, "%% Failed to create new producer:%s\n", errstr);
- return 1;
- }
- /*實例化一個或多個topics(`rd_kafka_topic_t`)來提供生產或消費,topic
- 對象保存topic特定的配置,並在內部填充所有可用分區和leader brokers,*/
- rkt = rd_kafka_topic_new(rk, topic, NULL);
- if (!rkt){
- fprintf(stderr, "%% Failed to create topic object: %s\n",
- rd_kafka_err2str(rd_kafka_last_error()));
- rd_kafka_destroy(rk);
- return 1;
- }
- /*用於中斷的信號*/
- signal(SIGINT, stop);
- fprintf(stderr,
- "%% Type some text and hit enter to produce message\n"
- "%% Or just hit enter to only serve delivery reports\n"
- "%% Press Ctrl-C or Ctrl-D to exit\n");
- while(run && fgets(buf, sizeof(buf), stdin)){
- size_t len = strlen(buf);
- if(buf[len-1] == '\n')
- buf[--len] = '\0';
- if(len == 0){
- /*輪詢用於事件的kafka handle,
- 事件將導致應用程序提供的回調函數被調用
- 第二個參數是最大阻塞時間,如果設為0,將會是非阻塞的調用*/
- rd_kafka_poll(rk, 0);
- continue;
- }
- retry:
- /*Send/Produce message.
- 這是一個異步調用,在成功的情況下,只會將消息排入內部producer隊列,
- 對broker的實際傳遞嘗試由后台線程處理,之前注冊的傳遞回調函數(dr_msg_cb)
- 用於在消息傳遞成功或失敗時向應用程序發回信號*/
- if (rd_kafka_produce(
- /* Topic object */
- rkt,
- /*使用內置的分區來選擇分區*/
- RD_KAFKA_PARTITION_UA,
- /*生成payload的副本*/
- RD_KAFKA_MSG_F_COPY,
- /*消息體和長度*/
- buf, len,
- /*可選鍵及其長度*/
- NULL, 0,
- NULL) == -1){
- fprintf(stderr,
- "%% Failed to produce to topic %s: %s\n",
- rd_kafka_topic_name(rkt),
- rd_kafka_err2str(rd_kafka_last_error()));
- if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){
- /*如果內部隊列滿,等待消息傳輸完成並retry,
- 內部隊列表示要發送的消息和已發送或失敗的消息,
- 內部隊列受限於queue.buffering.max.messages配置項*/
- rd_kafka_poll(rk, 1000);
- goto retry;
- }
- }else{
- fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n",
- len, rd_kafka_topic_name(rkt));
- }
- /*producer應用程序應不斷地通過以頻繁的間隔調用rd_kafka_poll()來為
- 傳送報告隊列提供服務。在沒有生成消息以確定先前生成的消息已發送了其
- 發送報告回調函數(和其他注冊過的回調函數)期間,要確保rd_kafka_poll()
- 仍然被調用*/
- rd_kafka_poll(rk, 0);
- }
- fprintf(stderr, "%% Flushing final message.. \n");
- /*rd_kafka_flush是rd_kafka_poll()的抽象化,
- 等待所有未完成的produce請求完成,通常在銷毀producer實例前完成
- 以確保所有排列中和正在傳輸的produce請求在銷毀前完成*/
- rd_kafka_flush(rk, 10*1000);
- /* Destroy topic object */
- rd_kafka_topic_destroy(rkt);
- /* Destroy the producer instance */
- rd_kafka_destroy(rk);
- return 0;
- }
二、consumer
librdkafka進行kafka消費操作的大致步驟如下:
1、創建kafka配置
- rd_kafka_conf_t *rd_kafka_conf_new (void)
2、創建kafka topic的配置
- rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void)
3、配置kafka各項參數
- rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,
- const char *name,
- const char *value,
- char *errstr, size_t errstr_size)
4、配置kafka topic各項參數
- rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,
- const char *name,
- const char *value,
- char *errstr, size_t errstr_size)
5、創建consumer實例
- rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)
6、為consumer實例添加brokerlist
- int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)
7、開啟consumer訂閱
- rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)
8、輪詢消息或事件,並調用回調函數
- rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,int timeout_ms)
9、關閉consumer實例
- rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk)
10、釋放topic list資源
- rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist)
11、銷毀consumer實例
- void rd_kafka_destroy (rd_kafka_t *rk)
12、等待consumer對象的銷毀
- int rd_kafka_wait_destroyed (int timeout_ms)
完整代碼如下my_consumer.c
- #include <string.h>
- #include <stdlib.h>
- #include <syslog.h>
- #include <signal.h>
- #include <error.h>
- #include <getopt.h>
- #include "../src/rdkafka.h"
- static int run = 1;
- //`rd_kafka_t`自帶一個可選的配置API,如果沒有調用API,Librdkafka將會使用CONFIGURATION.md中的默認配置。
- static rd_kafka_t *rk;
- static rd_kafka_topic_partition_list_t *topics;
- static void stop (int sig) {
- if (!run)
- exit(1);
- run = 0;
- fclose(stdin); /* abort fgets() */
- }
- static void sig_usr1 (int sig) {
- rd_kafka_dump(stdout, rk);
- }
- /**
- * 處理並打印已消費的消息
- */
- static void msg_consume (rd_kafka_message_t *rkmessage,
- void *opaque) {
- if (rkmessage->err) {
- if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
- fprintf(stderr,
- "%% Consumer reached end of %s [%"PRId32"] "
- "message queue at offset %"PRId64"\n",
- rd_kafka_topic_name(rkmessage->rkt),
- rkmessage->partition, rkmessage->offset);
- return;
- }
- if (rkmessage->rkt)
- fprintf(stderr, "%% Consume error for "
- "topic \"%s\" [%"PRId32"] "
- "offset %"PRId64": %s\n",
- rd_kafka_topic_name(rkmessage->rkt),
- rkmessage->partition,
- rkmessage->offset,
- rd_kafka_message_errstr(rkmessage));
- else
- fprintf(stderr, "%% Consumer error: %s: %s\n",
- rd_kafka_err2str(rkmessage->err),
- rd_kafka_message_errstr(rkmessage));
- if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
- rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
- run = 0;
- return;
- }
- fprintf(stdout, "%% Message (topic %s [%"PRId32"], "
- "offset %"PRId64", %zd bytes):\n",
- rd_kafka_topic_name(rkmessage->rkt),
- rkmessage->partition,
- rkmessage->offset, rkmessage->len);
- if (rkmessage->key_len) {
- printf("Key: %.*s\n",
- (int)rkmessage->key_len, (char *)rkmessage->key);
- }
- printf("%.*s\n",
- (int)rkmessage->len, (char *)rkmessage->payload);
- }
- /*
- init all configuration of kafka
- */
- int initKafka(char *brokers, char *group,char *topic){
- rd_kafka_conf_t *conf;
- rd_kafka_topic_conf_t *topic_conf;
- rd_kafka_resp_err_t err;
- char tmp[16];
- char errstr[512];
- /* Kafka configuration */
- conf = rd_kafka_conf_new();
- //quick termination
- snprintf(tmp, sizeof(tmp), "%i", SIGIO);
- rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
- //topic configuration
- topic_conf = rd_kafka_topic_conf_new();
- /* Consumer groups require a group id */
- if (!group)
- group = "rdkafka_consumer_example";
- if (rd_kafka_conf_set(conf, "group.id", group,
- errstr, sizeof(errstr)) !=
- RD_KAFKA_CONF_OK) {
- fprintf(stderr, "%% %s\n", errstr);
- return -1;
- }
- /* Consumer groups always use broker based offset storage */
- if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",
- "broker",
- errstr, sizeof(errstr)) !=
- RD_KAFKA_CONF_OK) {
- fprintf(stderr, "%% %s\n", errstr);
- return -1;
- }
- /* Set default topic config for pattern-matched topics. */
- rd_kafka_conf_set_default_topic_conf(conf, topic_conf);
- //實例化一個頂級對象rd_kafka_t作為基礎容器,提供全局配置和共享狀態
- rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
- if(!rk){
- fprintf(stderr, "%% Failed to create new consumer:%s\n", errstr);
- return -1;
- }
- //Librdkafka需要至少一個brokers的初始化list
- if (rd_kafka_brokers_add(rk, brokers) == 0){
- fprintf(stderr, "%% No valid brokers specified\n");
- return -1;
- }
- //重定向 rd_kafka_poll()隊列到consumer_poll()隊列
- rd_kafka_poll_set_consumer(rk);
- //創建一個Topic+Partition的存儲空間(list/vector)
- topics = rd_kafka_topic_partition_list_new(1);
- //把Topic+Partition加入list
- rd_kafka_topic_partition_list_add(topics, topic, -1);
- //開啟consumer訂閱,匹配的topic將被添加到訂閱列表中
- if((err = rd_kafka_subscribe(rk, topics))){
- fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
- return -1;
- }
- return 1;
- }
- int main(int argc, char **argv){
- char *brokers = "localhost:9092";
- char *group = NULL;
- char *topic = NULL;
- int opt;
- rd_kafka_resp_err_t err;
- while ((opt = getopt(argc, argv, "g:b:t:qd:eX:As:DO")) != -1){
- switch (opt) {
- case 'b':
- brokers = optarg;
- break;
- case 'g':
- group = optarg;
- break;
- case 't':
- topic = optarg;
- break;
- default:
- break;
- }
- }
- signal(SIGINT, stop);
- signal(SIGUSR1, sig_usr1);
- if(!initKafka(brokers, group, topic)){
- fprintf(stderr, "kafka server initialize error\n");
- }else{
- while(run){
- rd_kafka_message_t *rkmessage;
- /*-輪詢消費者的消息或事件,最多阻塞timeout_ms
- -應用程序應該定期調用consumer_poll(),即使沒有預期的消息,以服務
- 所有排隊等待的回調函數,當注冊過rebalance_cb,該操作尤為重要,
- 因為它需要被正確地調用和處理以同步內部消費者狀態 */
- rkmessage = rd_kafka_consumer_poll(rk, 1000);
- if(rkmessage){
- msg_consume(rkmessage, NULL);
- /*釋放rkmessage的資源,並把所有權還給rdkafka*/
- rd_kafka_message_destroy(rkmessage);
- }
- }
- }
- done:
- /*此調用將會阻塞,直到consumer撤銷其分配,調用rebalance_cb(如果已設置),
- commit offset到broker,並離開consumer group
- 最大阻塞時間被設置為session.timeout.ms
- */
- err = rd_kafka_consumer_close(rk);
- if(err){
- fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
- }else{
- fprintf(stderr, "%% Consumer closed\n");
- }
- //釋放topics list使用的所有資源和它自己
- rd_kafka_topic_partition_list_destroy(topics);
- //destroy kafka handle
- rd_kafka_destroy(rk);
- run = 5;
- //等待所有rd_kafka_t對象銷毀,所有kafka對象被銷毀,返回0,超時返回-1
- while(run-- > 0 && rd_kafka_wait_destroyed(1000) == -1){
- printf("Waiting for librdkafka to decommission\n");
- }
- if(run <= 0){
- //dump rdkafka內部狀態到stdout流
- rd_kafka_dump(stdout, rk);
- }
- return 0;
- }
在linux下編譯producer和consumer的代碼:
- gcc my_producer.c -o my_producer -lrdkafka -lz -lpthread -lrt
- gcc my_consumer.c -o my_consumer -lrdkafka -lz -lpthread -lrt
在運行my_producer或my_consumer時可能會報錯"error while loading shared libraries xxx.so", 此時需要在/etc/ld.so.conf中加入xxx.so所在的目錄
在本地啟動一個簡單的kafka服務,設置broker集群為localhost:9092並創建一個叫“test_topic”的topic
啟動方式可參考 kafka0.8.2集群的環境搭建並實現基本的生產消費
啟動consumer:

啟動producer,並發送一條數據“hello world”:

consumer處成功收到producer發送的“hello world”:

http://orchome.com/5
https://github.com/edenhill/librdkafka
https://github.com/mfontanini/cppkafka
https://github.com/zengyuxing007/kafka_test_cpp
