使用librdkafka庫實現kafka的生產和消費實例--生產者


一、生產者

librdkafka進行kafka生產操作的大致步驟如下:

1、創建kafka配置

rd_kafka_conf_t *rd_kafka_conf_new (void)

2、配置kafka各項參數

rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,  
                                       const char *name,  
                                       const char *value,  
                                       char *errstr, size_t errstr_size) 

3、設置發送回調函數

void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,  
                                  void (*dr_msg_cb) (rd_kafka_t *rk,  
                                  const rd_kafka_message_t *  
                                  rkmessage,  
                                  void *opaque)) 

4、創建producer實例

rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,char *errstr, size_t errstr_size)  

5、創建topic

rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)  

6、異步調用將消息發送到指定的topic

int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,  
              int msgflags,  
              void *payload, size_t len,  
              const void *key, size_t keylen,  
              void *msg_opaque)  

7、阻塞等待消息發送完成

int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms) 

8、等待完成product請求完成

rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms)

9、銷毀topic

void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt)  

10、銷毀producer實例

void rd_kafka_destroy (rd_kafka_t *rk)

代碼示例product.c

#include <stdio.h>  
#include <signal.h>  
#include <string.h>  
  
#include "../src/rdkafka.h"  
  
static int run = 1;  
  
static void stop(int sig){  
    run = 0;  
    fclose(stdin);  
}  
  
/* 
    每條消息調用一次該回調函數,說明消息是傳遞成功(rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) 
    還是傳遞失敗(rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) 
    該回調函數由rd_kafka_poll()觸發,在應用程序的線程上執行 
 */  
static void dr_msg_cb(rd_kafka_t *rk,  
                      const rd_kafka_message_t *rkmessage, void *opaque)
{
if(rkmessage->err) fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err)); else fprintf(stderr, "%% Message delivered (%zd bytes, " "partition %"PRId32")\n", rkmessage->len, rkmessage->partition); /* rkmessage被librdkafka自動銷毀*/ } int main(int argc, char **argv){ rd_kafka_t *rk; /*Producer instance handle*/ rd_kafka_topic_t *rkt; /*topic對象*/ rd_kafka_conf_t *conf; /*臨時配置對象*/ char errstr[512]; char buf[512]; const char *brokers; const char *topic; if(argc != 3){ fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]); return 1; } brokers = argv[1]; topic = argv[2]; /* 創建一個kafka配置占位 */ conf = rd_kafka_conf_new(); /*創建broker集群*/ if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK){ fprintf(stderr, "%s\n", errstr); return 1; } /*設置發送報告回調函數,rd_kafka_produce()接收的每條消息都會調用一次該回調函數 *應用程序需要定期調用rd_kafka_poll()來服務排隊的發送報告回調函數*/ rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); /*創建producer實例 rd_kafka_new()獲取conf對象的所有權,應用程序在此調用之后不得再次引用它*/ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if(!rk){ fprintf(stderr, "%% Failed to create new producer:%s\n", errstr); return 1; } /*實例化一個或多個topics(`rd_kafka_topic_t`)來提供生產或消費,topic 對象保存topic特定的配置,並在內部填充所有可用分區和leader brokers,*/ rkt = rd_kafka_topic_new(rk, topic, NULL); if (!rkt){ fprintf(stderr, "%% Failed to create topic object: %s\n", rd_kafka_err2str(rd_kafka_last_error())); rd_kafka_destroy(rk); return 1; } /*用於中斷的信號*/ signal(SIGINT, stop); fprintf(stderr, "%% Type some text and hit enter to produce message\n" "%% Or just hit enter to only serve delivery reports\n" "%% Press Ctrl-C or Ctrl-D to exit\n"); while(run && fgets(buf, sizeof(buf), stdin)){ size_t len = strlen(buf); if(buf[len-1] == '\n') buf[--len] = '\0'; if(len == 0){ /*輪詢用於事件的kafka handle, 事件將導致應用程序提供的回調函數被調用 第二個參數是最大阻塞時間,如果設為0,將會是非阻塞的調用*/ rd_kafka_poll(rk, 0); continue; } retry: /*Send/Produce message. 這是一個異步調用,在成功的情況下,只會將消息排入內部producer隊列, 對broker的實際傳遞嘗試由后台線程處理,之前注冊的傳遞回調函數(dr_msg_cb) 用於在消息傳遞成功或失敗時向應用程序發回信號*/ if (rd_kafka_produce( /* Topic object */ rkt, /*使用內置的分區來選擇分區*/ RD_KAFKA_PARTITION_UA, /*生成payload的副本*/ RD_KAFKA_MSG_F_COPY, /*消息體和長度*/ buf, len, /*可選鍵及其長度*/ NULL, 0, NULL) == -1){ fprintf(stderr, "%% Failed to produce to topic %s: %s\n", rd_kafka_topic_name(rkt), rd_kafka_err2str(rd_kafka_last_error())); if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){ /*如果內部隊列滿,等待消息傳輸完成並retry, 內部隊列表示要發送的消息和已發送或失敗的消息, 內部隊列受限於queue.buffering.max.messages配置項*/ rd_kafka_poll(rk, 1000); goto retry; } }else{ fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n", len, rd_kafka_topic_name(rkt)); } /*producer應用程序應不斷地通過以頻繁的間隔調用rd_kafka_poll()來為 傳送報告隊列提供服務。在沒有生成消息以確定先前生成的消息已發送了其 發送報告回調函數(和其他注冊過的回調函數)期間,要確保rd_kafka_poll() 仍然被調用*/ rd_kafka_poll(rk, 0); } fprintf(stderr, "%% Flushing final message.. \n"); /*rd_kafka_flush是rd_kafka_poll()的抽象化, 等待所有未完成的produce請求完成,通常在銷毀producer實例前完成 以確保所有排列中和正在傳輸的produce請求在銷毀前完成*/ rd_kafka_flush(rk, 10*1000); /* Destroy topic object */ rd_kafka_topic_destroy(rkt); /* Destroy the producer instance */ rd_kafka_destroy(rk); return 0; }

 

原文博客:http://blog.csdn.net/lijinqi1987/article/details/76582067


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM