簡介
kafka網站上提供了C語言的客戶端librdkafka,地址在這。
librdkafka是使用C語言根據apache kafka 協議實現的客戶端。另外這個客戶端還有簡單的c++接口。客戶端作者對這個客戶端比較上心,經常會修改bug並提交新功能。
librdkafka的基本原理和我之前博客說的java版producer類似,一個線程向隊列中加數據,另一個線程通過非阻塞的方式從隊列中取出數據,並寫入到broker。
源碼分析
源碼包含兩個文件夾src和src-cpp
src是用c實現的源碼,而src-cpp是在c接口上包裝的一層c++類,實現了基本的功能。
代碼運行流程如下
1、rd_kafka_conf_set設置全局配置
2、rd_kafka_topic_conf_set設置topic配置
3、rd_kafka_brokers_add設置broker地址,啟動向broker發送消息的線程
4、rd_kafka_new啟動kafka主線程
5、rd_kafka_topic_new建topic
6、rd_kafka_produce使用本函數發送消息
7、rd_kafka_poll調用回調函數
還是看發送一條消息的過程
入隊列過程
調用rd_kafka_produce可以將消息寫到隊列
1 int rd_kafka_produce (...) { 2 //調用rd_kafka_msg_new 3 return rd_kafka_msg_new(...); 4 }
首先先將消息包裝成rd_kafka_msg_t類型,然后獲取分區並相應的隊列
1 int rd_kafka_msg_new (...) { 2 ... 3 //創建消息,將傳入的參數轉換為rkm 4 rkm = rd_kafka_msg_new0(...); 5 //分區並入隊 6 err = rd_kafka_msg_partitioner(rkt, rkm, 1); 7 ... 8 return -1; 9 }
1 int rd_kafka_msg_partitioner (...) { 2 ... 3 //獲取分區號 4 switch (rkt->rkt_state) 5 { 6 ... 7 } 8 //獲取分區 9 rktp_new = rd_kafka_toppar_get(rkt, partition, 0); 10 ... 11 //加入隊列 12 rd_kafka_toppar_enq_msg(rktp_new, rkm); 13 return 0; 14 }
出隊列過程
添加broker的過程中就啟動了掃描隊列的操作
1 static rd_kafka_broker_t *rd_kafka_broker_add (rd_kafka_t *rk, 2 rd_kafka_confsource_t source, 3 const char *name, uint16_t port, 4 int32_t nodeid) { 5 ... 6 pthread_attr_init(&attr); 7 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 8 //啟動向broker發送消息的主線程 9 if ((err = pthread_create(&rkb->rkb_thread, &attr, 10 rd_kafka_broker_thread_main, rkb))) { 11 ... 12 return NULL; 13 } 14 //將broker加到broker隊列中 15 TAILQ_INSERT_TAIL(&rkb->rkb_rk->rk_brokers, rkb, rkb_link); 16 (void)rd_atomic_add(&rkb->rkb_rk->rk_broker_cnt, 1); 17 ... 18 return rkb; 19 }
啟動rd_kafka_broker_thread_main主線程
1 static void *rd_kafka_broker_thread_main (void *arg) { 2 ... 3 while (!rkb->rkb_rk->rk_terminate) { 4 switch (rkb->rkb_state) 5 { 6 //如果broker連接未初始化,或中斷,則不斷重連broker 7 case RD_KAFKA_BROKER_STATE_INIT: 8 case RD_KAFKA_BROKER_STATE_DOWN: 9 if (rd_kafka_broker_connect(rkb) == -1) { 10 ... 11 } 12 break; 13 //如果broker連接已經建立,則調用serve函數 14 case RD_KAFKA_BROKER_STATE_UP: 15 if (rkb->rkb_nodeid == RD_KAFKA_NODEID_UA) 16 rd_kafka_broker_ua_idle(rkb); 17 else if (rk->rk_type == RD_KAFKA_PRODUCER) 18 rd_kafka_broker_producer_serve(rkb); 19 else if (rk->rk_type == RD_KAFKA_CONSUMER) 20 rd_kafka_broker_consumer_serve(rkb); 21 break; 22 } 23 } 24 ... 25 return NULL; 26 }
只看producer的處理函數,該函數掃描消息並發送
1 static void rd_kafka_broker_producer_serve (rd_kafka_broker_t *rkb) { 2 ... 3 while (!rkb->rkb_rk->rk_terminate && 4 rkb->rkb_state == RD_KAFKA_BROKER_STATE_UP) { 5 ... 6 do { 7 cnt = 0; 8 ... 9 //掃描所有的topic-partitions,並發送消息 10 TAILQ_FOREACH(rktp, &rkb->rkb_toppars, rktp_rkblink) { 11 ... 12 //將入隊過程中的隊列rktp_msgq加到rktp_xmit_msgq中 13 if (rktp->rktp_msgq.rkmq_msg_cnt > 0) 14 rd_kafka_msgq_concat(&rktp-> 15 rktp_xmit_msgq, 16 &rktp->rktp_msgq); 17 rd_kafka_toppar_unlock(rktp); 18 //掃描消息隊列中數據是否超時 19 if (unlikely(do_timeout_scan)) 20 rd_kafka_msgq_age_scan(&rktp-> 21 rktp_xmit_msgq, 22 &timedout, 23 now); 24 //隊列為空則從頭繼續 25 if (rktp->rktp_xmit_msgq.rkmq_msg_cnt == 0) 26 continue; 27 28 //如果沒有超時,或者沒達到處理消息數量的閾值,則從頭繼續,這樣批處理可以提高性能 29 if (rktp->rktp_ts_last_xmit + 30 (rkb->rkb_rk->rk_conf. 31 buffering_max_ms * 1000) > now && 32 rktp->rktp_xmit_msgq.rkmq_msg_cnt < 33 rkb->rkb_rk->rk_conf. 34 batch_num_messages) { 35 /* Wait for more messages */ 36 continue; 37 } 38 39 rktp->rktp_ts_last_xmit = now; 40 41 //按協議轉換並填充數據到rkb中 42 while (rktp->rktp_xmit_msgq.rkmq_msg_cnt > 0) { 43 int r = rd_kafka_broker_produce_toppar( 44 rkb, rktp); 45 if (likely(r > 0)) 46 cnt += r; 47 else 48 break; 49 } 50 } 51 52 } while (cnt); 53 54 //觸發數據發送情況的回調函數,將發送失敗的寫到一個操作結果隊列中 55 if (unlikely(isrfailed.rkmq_msg_cnt > 0)) 56 rd_kafka_dr_msgq(rkb->rkb_rk, &isrfailed, 57 RD_KAFKA_RESP_ERR__ISR_INSUFF); 58 59 if (unlikely(timedout.rkmq_msg_cnt > 0)) 60 rd_kafka_dr_msgq(rkb->rkb_rk, &timedout, 61 RD_KAFKA_RESP_ERR__MSG_TIMED_OUT); 62 63 rd_kafka_broker_toppars_unlock(rkb); 64 65 /* Check and move retry buffers */ 66 if (unlikely(rkb->rkb_retrybufs.rkbq_cnt) > 0) 67 rd_kafka_broker_retry_bufs_move(rkb); 68 69 rd_kafka_broker_unlock(rkb); 70 71 //開始在網絡上發送數據 72 rd_kafka_broker_io_serve(rkb); 73 74 /* Scan wait-response queue 75 * Note: 'now' may be a bit outdated by now. */ 76 if (do_timeout_scan) 77 rd_kafka_broker_waitresp_timeout_scan(rkb, now); 78 79 rd_kafka_broker_lock(rkb); 80 } 81 82 rd_kafka_broker_unlock(rkb); 83 }
通過poll處理網絡事件,將消息從網絡發送到broker
1 static void rd_kafka_broker_io_serve (rd_kafka_broker_t *rkb) { 2 rd_kafka_op_t *rko; 3 rd_ts_t now = rd_clock(); 4 //處理broker操作 5 if (unlikely(rd_kafka_q_len(&rkb->rkb_ops) > 0)) 6 while ((rko = rd_kafka_q_pop(&rkb->rkb_ops, RD_POLL_NOWAIT))) 7 rd_kafka_broker_op_serve(rkb, rko); 8 //請求metadata 9 if (unlikely(now >= rkb->rkb_ts_metadata_poll)) 10 rd_kafka_broker_metadata_req(rkb, 1 /* all topics */, NULL, 11 NULL, "periodic refresh"); 12 //如果有消息,手動增加寫事件 13 if (rkb->rkb_outbufs.rkbq_cnt > 0) 14 rkb->rkb_pfd.events |= POLLOUT; 15 else 16 rkb->rkb_pfd.events &= ~POLLOUT; 17 if (poll(&rkb->rkb_pfd, 1, 18 rkb->rkb_rk->rk_conf.buffering_max_ms) <= 0) 19 return; 20 //poll函數,處理各種事件,發送消息時,只處理寫事件,當請求metadata時,處理讀事件 21 if (rkb->rkb_pfd.revents & POLLIN) 22 while (rd_kafka_recv(rkb) > 0) 23 ; 24 if (rkb->rkb_pfd.revents & POLLHUP) 25 return rd_kafka_broker_fail(rkb, RD_KAFKA_RESP_ERR__TRANSPORT, 26 "Connection closed"); 27 if (rkb->rkb_pfd.revents & POLLOUT) 28 while (rd_kafka_send(rkb) > 0) 29 ; 30 }
問題
librdkafka不像java客戶端那樣,可以通過future.get()實現同步發送。所以,如果broker不能連通的話,send方法還是可以正常將消息放入隊列。這會導致兩個問題
1、我們的客戶端是不會知道broker已經掛掉了,因而不能對這種情況作出及時處理,導致消息全部堆積在內存中,如果此時不幸,我們的客戶端也掛掉了,那這部分消息就全部丟失了。
2、如果broker一直沒有恢復,而我們一直向隊列中寫數據的話,producer中有一個選項message.timeout.ms,如果超過了設定的消息超時時間,那么會有線程清理隊列中的數據,導致消息丟失,而如果將時間設置為0(永不超時)的話,將導致客戶端內存撐滿。
上面這個問題可以通過如下方法實現的同步發送來解決
1 void dr_cb (...err, , void *msg_opaque) { 2 int *produce_statusp = (int *)msg_opaque; 3 4 /* set sync_produce()'s produce_status value to the error code (which can be NO_ERROR) */ 5 *produce_statusp = err; 6 } 7 8 int sync_produce (rkt, msg..) { 9 int produce_status = -100000; /* or some other magic value that is not proper value in rd_kafka_resp_err_t */ 10 11 rd_kafka_produce(rkt, ..msg, .., &produce_status /* msg_opaque */); 12 13 do { 14 /* poll dr and error callbacks. */ 15 rd_kafka_poll(rk, 1000); 16 /* wait for dr_cb to be called and setting produce_status to the error value. */ 17 } while (produce_status == -100000); 18 19 if (produce_status == RD_KAFKA_RESP_ERR_NO_ERROR) 20 return SUCCESS!; 21 else 22 return FAILURE; 23 }