使用librdkafka庫實現kafka的生產和消費實例--消費者


一、消費者

1、創建kafka配置

rd_kafka_conf_t *rd_kafka_conf_new (void) 

2、創建kafka topic的配置

rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void)

3、創建kafka各項參數

rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,  
                                       const char *name,  
                                       const char *value,  
                                       char *errstr, size_t errstr_size)  

4、配置kafka topic各項參數

rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,  
                         const char *name,  
                         const char *value,  
                         char *errstr, size_t errstr_size)  

5、創建consumer實例

rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)  

6、為consumer實例添加broker list

int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)

7、開啟consumer訂閱

rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics) 

8、輪詢消息或事件,並調用回掉函數

rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,int timeout_ms)  

9、關閉consumer實例

rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk)

10、釋放topic list 資源

rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist)

11、銷毀consumer實例

void rd_kafka_destroy (rd_kafka_t *rk)

12、等待consumer對象的銷毀

int rd_kafka_wait_destroyed (int timeout_ms)

代碼示例consumer.c:

#include <string.h>
#include <stdlib.h>
#include <syslog.h>
#include <signal.h>
#include <error.h>
#include <getopt.h>

#include "../src/rdkafka.h"

static int run = 1;
//`rd_kafka_t`自帶一個可選的配置API,如果沒有調用API,Librdkafka將會使用CONFIGURATION.md中的默認配置。
static rd_kafka_t *rk;
static rd_kafka_topic_partition_list_t *topics;

static void stop (int sig) {
  if (!run)
    exit(1);
  run = 0;
  fclose(stdin); /* abort fgets() */
}

static void sig_usr1 (int sig) {
  rd_kafka_dump(stdout, rk);
}

/**
 * 處理並打印已消費的消息
 */
static void msg_consume (rd_kafka_message_t *rkmessage,
       void *opaque) {
  if (rkmessage->err) {
    if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
      fprintf(stderr,
        "%% Consumer reached end of %s [%"PRId32"] "
             "message queue at offset %"PRId64"\n",
             rd_kafka_topic_name(rkmessage->rkt),
             rkmessage->partition, rkmessage->offset);

      return;
    }

    if (rkmessage->rkt)
            fprintf(stderr, "%% Consume error for "
                    "topic \"%s\" [%"PRId32"] "
                    "offset %"PRId64": %s\n",
                    rd_kafka_topic_name(rkmessage->rkt),
                    rkmessage->partition,
                    rkmessage->offset,
                    rd_kafka_message_errstr(rkmessage));
    else
            fprintf(stderr, "%% Consumer error: %s: %s\n",
                    rd_kafka_err2str(rkmessage->err),
                    rd_kafka_message_errstr(rkmessage));

    if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
        rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
          run = 0;
    return;
  }

  fprintf(stdout, "%% Message (topic %s [%"PRId32"], "
                      "offset %"PRId64", %zd bytes):\n",
                      rd_kafka_topic_name(rkmessage->rkt),
                      rkmessage->partition,
    rkmessage->offset, rkmessage->len);

  if (rkmessage->key_len) {
    printf("Key: %.*s\n",
             (int)rkmessage->key_len, (char *)rkmessage->key);
  }

  printf("%.*s\n",
           (int)rkmessage->len, (char *)rkmessage->payload);
  
}

/*
  init all configuration of kafka
 */
int initKafka(char *brokers, char *group,char *topic){
  rd_kafka_conf_t *conf;
  rd_kafka_topic_conf_t *topic_conf;
  rd_kafka_resp_err_t err;
  char tmp[16];
  char errstr[512];

  /* Kafka configuration */
  conf = rd_kafka_conf_new();

  //quick termination
  snprintf(tmp, sizeof(tmp), "%i", SIGIO);
  rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);

  //topic configuration
  topic_conf = rd_kafka_topic_conf_new();

  /* Consumer groups require a group id */
  if (!group)
          group = "rdkafka_consumer_example";
  if (rd_kafka_conf_set(conf, "group.id", group,
                        errstr, sizeof(errstr)) !=
      RD_KAFKA_CONF_OK) {
          fprintf(stderr, "%% %s\n", errstr);
          return -1;
  }

  /* Consumer groups always use broker based offset storage */
  if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",
                              "broker",
                              errstr, sizeof(errstr)) !=
      RD_KAFKA_CONF_OK) {
          fprintf(stderr, "%% %s\n", errstr);
          return -1;
  }

  /* Set default topic config for pattern-matched topics. */
  rd_kafka_conf_set_default_topic_conf(conf, topic_conf);

  //實例化一個頂級對象rd_kafka_t作為基礎容器,提供全局配置和共享狀態
  rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
  if(!rk){
    fprintf(stderr, "%% Failed to create new consumer:%s\n", errstr);
    return -1;
  }

  //Librdkafka需要至少一個brokers的初始化list
  if (rd_kafka_brokers_add(rk, brokers) == 0){
    fprintf(stderr, "%% No valid brokers specified\n");
    return -1;
  }

  //重定向 rd_kafka_poll()隊列到consumer_poll()隊列
  rd_kafka_poll_set_consumer(rk);

  //創建一個Topic+Partition的存儲空間(list/vector)
  topics = rd_kafka_topic_partition_list_new(1);
  //把Topic+Partition加入list
  rd_kafka_topic_partition_list_add(topics, topic, -1);
  //開啟consumer訂閱,匹配的topic將被添加到訂閱列表中
  if((err = rd_kafka_subscribe(rk, topics))){
      fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
      return -1;
  }

  return 1;
}

int main(int argc, char **argv){
  char *brokers = "localhost:9092";
  char *group = NULL;
  char *topic = NULL;
  
  int opt;
  rd_kafka_resp_err_t err;

  while ((opt = getopt(argc, argv, "g:b:t:qd:eX:As:DO")) != -1){
    switch (opt) {
      case 'b':
        brokers = optarg;
        break;
      case 'g':
        group = optarg;
        break;
      case 't':
        topic = optarg;
        break;
      default:
        break;
    }
  } 

  signal(SIGINT, stop);
  signal(SIGUSR1, sig_usr1);

  if(!initKafka(brokers, group, topic)){
    fprintf(stderr, "kafka server initialize error\n");
  }else{
    while(run){
      rd_kafka_message_t *rkmessage;
      /*-輪詢消費者的消息或事件,最多阻塞timeout_ms
        -應用程序應該定期調用consumer_poll(),即使沒有預期的消息,以服務
        所有排隊等待的回調函數,當注冊過rebalance_cb,該操作尤為重要,
        因為它需要被正確地調用和處理以同步內部消費者狀態 */
      rkmessage = rd_kafka_consumer_poll(rk, 1000);
      if(rkmessage){
        msg_consume(rkmessage, NULL);
        /*釋放rkmessage的資源,並把所有權還給rdkafka*/
        rd_kafka_message_destroy(rkmessage);
      }
    }
  }

done:
    /*此調用將會阻塞,直到consumer撤銷其分配,調用rebalance_cb(如果已設置),
    commit offset到broker,並離開consumer group
    最大阻塞時間被設置為session.timeout.ms
    */
    err = rd_kafka_consumer_close(rk);
    if(err){
      fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
    }else{
      fprintf(stderr, "%% Consumer closed\n");
    }

    //釋放topics list使用的所有資源和它自己
    rd_kafka_topic_partition_list_destroy(topics);

    //destroy kafka handle
    rd_kafka_destroy(rk);
  
    run = 5;
    //等待所有rd_kafka_t對象銷毀,所有kafka對象被銷毀,返回0,超時返回-1
    while(run-- > 0 && rd_kafka_wait_destroyed(1000) == -1){
      printf("Waiting for librdkafka to decommission\n");
    }
    if(run <= 0){
      //dump rdkafka內部狀態到stdout流
      rd_kafka_dump(stdout, rk);
    }

    return 0;
}

 

原文博客:http://blog.csdn.net/lijinqi1987/article/details/76582067


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM