【轉】c++(11)使用librdkafka庫實現kafka的消費實例


版權聲明:本文為博主原創文章,遵循 CC 4.0 by-sa 版權協議,轉載請附上原文出處鏈接和本聲明。
本文鏈接:https://blog.csdn.net/lijinqi1987/article/details/76691170

 

librdkafka在c語言的基礎上封裝了一層c++的API,可以實現kafka的消費操作,基本操作步驟如下


1、創建kafka 配置

RdKafka::Conf *conf = nullptr;
conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

 

2、設置kafka各項參數


/*設置broker list*/
conf->set("bootstrap.servers", brokers_, errstr);

/*設置consumer group*/
conf->set("group.id", groupid_, errstr);

/*每次從單個分區中拉取消息的最大尺寸*/
conf->set("max.partition.fetch.bytes", strfetch_num, errstr);


3、創建kafka topic配置
RdKafka::Conf *tconf = nullptr;
tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);


4、設置kafka topic參數

if(tconf->set("auto.offset.reset", "smallest", errstr)


5、創建kafka consumer實例

kafka_consumer_ = RdKafka::Consumer::create(conf, errstr);


6、創建kafka topic

RdKafka::Topic::create(kafka_consumer_, topics_, tconf, errstr);


7、啟動kafka consumer實例

RdKafka::ErrorCode resp = kafka_consumer_->start(topic_, partition_, offset_);


8、消費kafka

kafka_consumer_->consume(topic_, partition_, timeout_ms);


9、阻塞等待消息

kafka_consumer_->poll(0);


10、停止消費

kafka_consumer_->stop(topic_, partition_);


11、銷毀consumer實例

RdKafka::wait_destroyed(5000);

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM