Linux下librdkafka客戶端的編譯運行
librdkafka是一個開源的Kafka客戶端C/C++實現,提供了Kafka生產者、消費者接口。
由於項目需要,我要將Kafka生產者接口封裝起來給別人調用,所以先安裝了librdkakfa,然后在demo上進行修改封裝一個生產者接口。
[一] 安裝librdkafka
首先在github上下載librdkafka源碼,解壓后進行編譯;
cd librdkafka-master
chmod 777 configure lds-gen.py
./configure
make
make install
在make的時候,如果是64位Linux會報下面這個異常
/bin/ld:librdkafka.lds:1: syntax error in VERSION script
只要Makefile.config里面的WITH_LDS=y這一行注釋掉就不會報錯了。
[二] 封裝librdkafka的生產者接口
#include <ctype.h> #include <signal.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <syslog.h> #include <time.h> #include <sys/time.h> #include "librdkafka/rdkafka.h" /* for Kafka driver */ static int run = 1; static rd_kafka_t *rk; rd_kafka_topic_t *rkt; int partition = RD_KAFKA_PARTITION_UA; rd_kafka_topic_conf_t *topic_conf; static void stop (int sig) { run = 0; fclose(stdin); /* abort fgets() */ } static void sig_usr1 (int sig) { rd_kafka_dump(stdout, rk); } int initProducer (char *parameters) { int argc = 1; char **argv; char *para; char *delim = " "; char *brokers = "localhost:9092"; char *topic = NULL; int opt; rd_kafka_conf_t *conf; char errstr[512]; char tmp[16]; char copyParameters[1024]; strcpy(copyParameters, parameters); para = strtok(parameters, delim); argc++; while((para = strtok(NULL, delim)) != NULL){ argc++; } argv = (char**)malloc(argc*sizeof(char*)); argc = 0; argv[argc] = "initProducer"; para = strtok(copyParameters, delim); argc++; argv[argc] = para; while((para = strtok(NULL, delim)) != NULL){ argc++; argv[argc] = para; } argc++; /* Kafka configuration */ conf = rd_kafka_conf_new(); /* Quick termination */ snprintf(tmp, sizeof(tmp), "%i", SIGIO); rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); /* Topic configuration */ topic_conf = rd_kafka_topic_conf_new(); while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:As:")) != -1) { switch (opt) { case 't': topic = optarg; break; case 'p': partition = atoi(optarg); break; case 'b': brokers = optarg; break; default: fprintf(stderr, "%% Failed to init producer with error parameters\n"); } } if (optind != argc || !topic) { exit(1); } signal(SIGINT, stop); signal(SIGUSR1, sig_usr1); /* Create Kafka handle */ if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)))) { fprintf(stderr, "%% Failed to create new producer: %s\n", errstr); exit(1); } rd_kafka_set_log_level(rk, LOG_DEBUG); /* Add brokers */ if (rd_kafka_brokers_add(rk, brokers) == 0) { fprintf(stderr, "%% No valid brokers specified\n"); exit(1); } /* Create topic */ rkt = rd_kafka_topic_new(rk, topic, topic_conf); topic_conf = NULL; /* Now owned by topic */ return 1; } int freeProducer() { /* Destroy topic */ rd_kafka_topic_destroy(rkt); /* Destroy the handle */ rd_kafka_destroy(rk); if (topic_conf) rd_kafka_topic_conf_destroy(topic_conf); /* Let background threads clean up and terminate cleanly. */ run = 5; while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1) printf("Waiting for librdkafka to decommission\n"); if (run <= 0) rd_kafka_dump(stdout, rk); return 1; } int main (int argc, char **argv) { char parameter[] = "-t XX-HTTP-KEYWORD-LOG -b 10.10.6.101:9092,10.10.6.102:9092,10.10.6.104:9092"; char buf[1024]; initProducer(parameter); while (run && fgets(buf, sizeof(buf), stdin)) { if(rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buf, strlen(buf), NULL, 0, NULL) == -1){ fprintf(stderr, "%% Failed to produce to topic %s partition %i: %s\n", rd_kafka_topic_name(rkt), partition, rd_kafka_err2str(rd_kafka_last_error())); }else{ fprintf(stderr, "%% Sent %zd bytes to topic %s partition %i\n", strlen(buf), rd_kafka_topic_name(rkt), partition); } } freeProducer(); return 0; }
[三] 編譯運行
編譯的時候要加上-lrdkafka -lz -lpthread -lrt這些選項:gcc myProducer.c -o myProducer -lrdkafka -lz -lpthread -lrt
在編譯的時候會報error while loading share library librdkafak.so.1,這是因為make的時候將librdkafak.so.1放在了/usr/local/lib下,在Linux的默認共享庫路徑/lib和/usr/lib下找不到,只要執行下面兩句就可以了:
echo "/usr/local/lib" >> /etc/ld.so.conf
ldconfig
運行./myProducer,會不斷的從終端讀取鍵入的字符串,然后發送到Kafka,通過Kafka自帶的console consumer能夠消費查看數據。
