kafka C客戶端librdkafka producer源碼分析


簡介

kafka網站上提供了C語言的客戶端librdkafka,地址在這

librdkafka是使用C語言根據apache kafka 協議實現的客戶端。另外這個客戶端還有簡單的c++接口。客戶端作者對這個客戶端比較上心,經常會修改bug並提交新功能。

librdkafka的基本原理和我之前博客說的java版producer類似,一個線程向隊列中加數據,另一個線程通過非阻塞的方式從隊列中取出數據,並寫入到broker。

 

源碼分析

源碼包含兩個文件夾src和src-cpp

image image

src是用c實現的源碼,而src-cpp是在c接口上包裝的一層c++類,實現了基本的功能。

代碼運行流程如下

1、rd_kafka_conf_set設置全局配置

2、rd_kafka_topic_conf_set設置topic配置

3、rd_kafka_brokers_add設置broker地址,啟動向broker發送消息的線程

4、rd_kafka_new啟動kafka主線程

5、rd_kafka_topic_new建topic

6、rd_kafka_produce使用本函數發送消息

7、rd_kafka_poll調用回調函數

還是看發送一條消息的過程

入隊列過程

 調用rd_kafka_produce可以將消息寫到隊列

1 int rd_kafka_produce (...) {
2     //調用rd_kafka_msg_new
3     return rd_kafka_msg_new(...);
4 }
首先先將消息包裝成rd_kafka_msg_t類型,然后獲取分區並相應的隊列
1 int rd_kafka_msg_new (...) {
2     ...
3     //創建消息,將傳入的參數轉換為rkm
4     rkm = rd_kafka_msg_new0(...);
5     //分區並入隊
6     err = rd_kafka_msg_partitioner(rkt, rkm, 1);
7     ...
8     return -1;
9 }
 1 int rd_kafka_msg_partitioner (...) {
 2      ...
 3      //獲取分區號
 4      switch (rkt->rkt_state)
 5      {
 6          ...
 7      }
 8     //獲取分區
 9     rktp_new = rd_kafka_toppar_get(rkt, partition, 0);
10     ...
11     //加入隊列
12     rd_kafka_toppar_enq_msg(rktp_new, rkm);
13     return 0;
14 }

 

出隊列過程

添加broker的過程中就啟動了掃描隊列的操作

 

 1 static rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk,
 2                            rd_kafka_confsource_t source,
 3                            const char *name, uint16_t port,
 4                            int32_t nodeid) {
 5     ...
 6     pthread_attr_init(&attr);
 7     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
 8     //啟動向broker發送消息的主線程
 9     if ((err = pthread_create(&rkb->rkb_thread, &attr,
10                   rd_kafka_broker_thread_main, rkb))) {
11         ...
12         return NULL;
13     }
14     //將broker加到broker隊列中
15     TAILQ_INSERT_TAIL(&rkb->rkb_rk->rk_brokers, rkb, rkb_link);
16     (void)rd_atomic_add(&rkb->rkb_rk->rk_broker_cnt, 1);
17     ...
18     return rkb;
19 }

 

啟動rd_kafka_broker_thread_main主線程

 1 static void *rd_kafka_broker_thread_main (void *arg) {
 2     ...
 3     while (!rkb->rkb_rk->rk_terminate) {
 4         switch (rkb->rkb_state)
 5         {
 6         //如果broker連接未初始化,或中斷,則不斷重連broker
 7         case RD_KAFKA_BROKER_STATE_INIT:
 8         case RD_KAFKA_BROKER_STATE_DOWN:
 9             if (rd_kafka_broker_connect(rkb) == -1) {
10                 ...
11             }
12             break;
13         //如果broker連接已經建立,則調用serve函數
14         case RD_KAFKA_BROKER_STATE_UP:
15             if (rkb->rkb_nodeid == RD_KAFKA_NODEID_UA)
16                 rd_kafka_broker_ua_idle(rkb);
17             else if (rk->rk_type == RD_KAFKA_PRODUCER)
18                 rd_kafka_broker_producer_serve(rkb);
19             else if (rk->rk_type == RD_KAFKA_CONSUMER)
20                 rd_kafka_broker_consumer_serve(rkb);
21             break;
22         }
23     }
24     ...
25     return NULL;
26 }

 

只看producer的處理函數,該函數掃描消息並發送

 1 static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb) {
 2     ...
 3     while (!rkb->rkb_rk->rk_terminate &&
 4            rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP) {
 5         ...
 6         do {
 7             cnt = 0;
 8             ...
 9             //掃描所有的topic-partitions,並發送消息
10             TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) {
11                 ...
12                 //將入隊過程中的隊列rktp_msgq加到rktp_xmit_msgq中
13                 if (rktp->rktp_msgq.rkmq_msg_cnt > 0)
14                     rd_kafka_msgq_concat(&rktp->
15                                  rktp_xmit_msgq,
16                                  &rktp->rktp_msgq);
17                 rd_kafka_toppar_unlock(rktp);
18                 //掃描消息隊列中數據是否超時
19                 if (unlikely(do_timeout_scan))
20                     rd_kafka_msgq_age_scan(&rktp->
21                                    rktp_xmit_msgq,
22                                    &timedout,
23                                    now);
24                 //隊列為空則從頭繼續
25                 if (rktp->rktp_xmit_msgq.rkmq_msg_cnt == 0)
26                     continue;
27                 
28                 //如果沒有超時,或者沒達到處理消息數量的閾值,則從頭繼續,這樣批處理可以提高性能
29                 if (rktp->rktp_ts_last_xmit +
30                     (rkb->rkb_rk->rk_conf.
31                      buffering_max_ms * 1000) > now &&
32                     rktp->rktp_xmit_msgq.rkmq_msg_cnt <
33                     rkb->rkb_rk->rk_conf.
34                     batch_num_messages) {
35                     /* Wait for more messages */
36                     continue;
37                 }
38 
39                 rktp->rktp_ts_last_xmit = now;
40 
41                 //按協議轉換並填充數據到rkb中
42                 while (rktp->rktp_xmit_msgq.rkmq_msg_cnt > 0) {
43                     int r = rd_kafka_broker_produce_toppar(
44                         rkb, rktp);
45                     if (likely(r > 0))
46                         cnt += r;
47                     else
48                         break;
49                 }
50             }
51 
52         } while (cnt);
53 
54         //觸發數據發送情況的回調函數,將發送失敗的寫到一個操作結果隊列中
55         if (unlikely(isrfailed.rkmq_msg_cnt > 0))
56             rd_kafka_dr_msgq(rkb->rkb_rk, &isrfailed,
57                      RD_KAFKA_RESP_ERR__ISR_INSUFF);
58 
59         if (unlikely(timedout.rkmq_msg_cnt > 0))
60             rd_kafka_dr_msgq(rkb->rkb_rk, &timedout,
61                      RD_KAFKA_RESP_ERR__MSG_TIMED_OUT);
62 
63         rd_kafka_broker_toppars_unlock(rkb);
64 
65         /* Check and move retry buffers */
66         if (unlikely(rkb->rkb_retrybufs.rkbq_cnt) > 0)
67             rd_kafka_broker_retry_bufs_move(rkb);
68 
69         rd_kafka_broker_unlock(rkb);
70 
71         //開始在網絡上發送數據
72                 rd_kafka_broker_io_serve(rkb);
73 
74         /* Scan wait-response queue
75          * Note: 'now' may be a bit outdated by now. */
76         if (do_timeout_scan)
77             rd_kafka_broker_waitresp_timeout_scan(rkb, now);
78 
79         rd_kafka_broker_lock(rkb);
80     }
81 
82     rd_kafka_broker_unlock(rkb);
83 } 
通過poll處理網絡事件,將消息從網絡發送到broker
 1 static void rd_kafka_broker_io_serve (rd_kafka_broker_t *rkb) {
 2     rd_kafka_op_t *rko;
 3     rd_ts_t now = rd_clock();
 4     //處理broker操作
 5     if (unlikely(rd_kafka_q_len(&rkb->rkb_ops) > 0))
 6         while ((rko = rd_kafka_q_pop(&rkb->rkb_ops, RD_POLL_NOWAIT)))
 7             rd_kafka_broker_op_serve(rkb, rko);
 8     //請求metadata
 9     if (unlikely(now >= rkb->rkb_ts_metadata_poll))
10         rd_kafka_broker_metadata_req(rkb, 1 /* all topics */, NULL,
11                                              NULL, "periodic refresh");
12     //如果有消息,手動增加寫事件
13     if (rkb->rkb_outbufs.rkbq_cnt > 0)
14         rkb->rkb_pfd.events |= POLLOUT;
15     else
16         rkb->rkb_pfd.events &= ~POLLOUT;
17     if (poll(&rkb->rkb_pfd, 1,
18          rkb->rkb_rk->rk_conf.buffering_max_ms) <= 0)
19         return;
20     //poll函數,處理各種事件,發送消息時,只處理寫事件,當請求metadata時,處理讀事件
21     if (rkb->rkb_pfd.revents & POLLIN)
22         while (rd_kafka_recv(rkb) > 0)
23             ;
24     if (rkb->rkb_pfd.revents & POLLHUP)
25         return rd_kafka_broker_fail(rkb, RD_KAFKA_RESP_ERR__TRANSPORT,
26                         "Connection closed");
27     if (rkb->rkb_pfd.revents & POLLOUT)
28         while (rd_kafka_send(rkb) > 0)
29             ;
30 }
 
        

問題

librdkafka不像java客戶端那樣,可以通過future.get()實現同步發送。所以,如果broker不能連通的話,send方法還是可以正常將消息放入隊列。這會導致兩個問題

1、我們的客戶端是不會知道broker已經掛掉了,因而不能對這種情況作出及時處理,導致消息全部堆積在內存中,如果此時不幸,我們的客戶端也掛掉了,那這部分消息就全部丟失了。

2、如果broker一直沒有恢復,而我們一直向隊列中寫數據的話,producer中有一個選項message.timeout.ms,如果超過了設定的消息超時時間,那么會有線程清理隊列中的數據,導致消息丟失,而如果將時間設置為0(永不超時)的話,將導致客戶端內存撐滿。

上面這個問題可以通過如下方法實現的同步發送來解決

 1 void dr_cb (...err, , void *msg_opaque) {
 2      int *produce_statusp = (int *)msg_opaque;
 3 
 4      /* set sync_produce()'s produce_status value to the error code (which can be NO_ERROR) */
 5      *produce_statusp = err;
 6 }
 7 
 8 int sync_produce (rkt, msg..) {
 9    int produce_status = -100000; /* or some other magic value that is not proper value in rd_kafka_resp_err_t */
10 
11    rd_kafka_produce(rkt, ..msg, .., &produce_status /* msg_opaque */);
12 
13    do {
14      /* poll dr and error callbacks. */
15      rd_kafka_poll(rk, 1000);
16     /* wait for dr_cb to be called and setting produce_status to the error value. */
17    } while (produce_status == -100000);
18 
19   if (produce_status == RD_KAFKA_RESP_ERR_NO_ERROR)
20    return SUCCESS!;
21   else
22    return FAILURE;
23 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM