版權聲明:本文為博主原創文章,遵循 CC 4.0 by-sa 版權協議,轉載請附上原文出處鏈接和本聲明。
本文鏈接:https://blog.csdn.net/lijinqi1987/article/details/76691170
librdkafka在c語言的基礎上封裝了一層c++的API,可以實現kafka的消費操作,基本操作步驟如下
1、創建kafka 配置
RdKafka::Conf *conf = nullptr;
conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
2、設置kafka各項參數
/*設置broker list*/
conf->set("bootstrap.servers", brokers_, errstr);
/*設置consumer group*/
conf->set("group.id", groupid_, errstr);
/*每次從單個分區中拉取消息的最大尺寸*/
conf->set("max.partition.fetch.bytes", strfetch_num, errstr);
3、創建kafka topic配置
RdKafka::Conf *tconf = nullptr;
tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
4、設置kafka topic參數
if(tconf->set("auto.offset.reset", "smallest", errstr)
5、創建kafka consumer實例
kafka_consumer_ = RdKafka::Consumer::create(conf, errstr);
6、創建kafka topic
RdKafka::Topic::create(kafka_consumer_, topics_, tconf, errstr);
7、啟動kafka consumer實例
RdKafka::ErrorCode resp = kafka_consumer_->start(topic_, partition_, offset_);
8、消費kafka
kafka_consumer_->consume(topic_, partition_, timeout_ms);
9、阻塞等待消息
kafka_consumer_->poll(0);
10、停止消費
kafka_consumer_->stop(topic_, partition_);
11、銷毀consumer實例
RdKafka::wait_destroyed(5000);