Kafka 如何讀取指定topic中的offset -------------用來驗證分區是不是均衡!!!(__consumer_offsets)(已驗證!)


我現在使用的是librdkafka 的C/C++ 的客戶端來生產消息,用flume來輔助處理異常的數據,,,

但是在前段時間,單獨使用flume測試的時候發現,flume不能對分區進行負載均衡!同一個集群中,一個broker的一個分區已經有10億條數據,另外一台的另一個分區只有8億條數據;

因此,我對flume參照別人的做法,增加了攔截器;

即在flume配置文件中 增加以下字段;

-----

stage_nginx.sources.tailSource.interceptors = i2
stage_nginx.sources.tailSource.interceptors.i2.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
stage_nginx.sources.tailSource.interceptors.i2.headerName=key
stage_nginx.sources.tailSource.interceptors.i2.preserveExisting=false

----特別注意,,此處的sources是你自己的sources name.

增加完后,要先進行自己測試,驗證flume攔截器的負載均衡功能;

好,下來話不多少,,看測試步驟;

 

1,創建topic 相關聯的分區 (因現場暫時只有2個分區,所以我這邊暫時取2個分區做測試)

  (我暫時使用的kafka版本是kafka_2.11-0.9.0.1,以下都是在kafka相關版本的bin路徑下操作命令

  ./kafka-topics.sh --create --zookeeper 192.165.1.91:12181,192.165.1.92:12181,192.165.1.64:12181 --replication-factor 1 --partitions 2 --topic test3      

   創建topic test3  不要分區  zookeeper 3台   分區2個  zookeeper端口號12181(我本地的broker端口號是19091,這個在kafka  conf/ server.properties里邊配置)

2,查看topic的創建情況 

  在broker的每台機器的目錄下,分別查看topic的創建情況!  

  下邊是我91機器的情況:

  ./kafka-topics.sh --describe --zookeeper 192.165.1.91:12181 --topic test3 

  

    Topic:test3 PartitionCount:2 ReplicationFactor:1 Configs:

    Topic: test3 Partition: 0 Leader: 1 Replicas: 1 Isr: 1

    Topic: test3 Partition: 1 Leader: 2 Replicas: 2 Isr: 2

-------------意思是  他有倆個分區,,每個分區他的備份分區都是他們自己,即沒有分區,,你們可以根據你們自身的現狀做不同的操作;

3,啟動flume生產消息,並且查看消息是否生產成功;

   a, 啟動flume:/home/hadoop/wgjflume/apache-flume-1.5.0-cdh5.4.9-bin/bin/flume-ng agent -n stage_nginx -c /home/hadoop/wgjflume/apache-flume-1.5.0-cdh5.4.9-bin/conf -f /home/hadoop/wgjflume/apache-flume-1.5.0-cdh5.4.9-bin/conf/flume-conf.properties -Dflume.root.logger=INFO,console      

    次處生產了30條消息!!!

   b, 查看消息是否消費成功!

   ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.165.1.91:19092,192.165.1.92:19092,192.165.1.64:19092 --topic test6 --time -1

4,要查消費情況,必須的建立消費組,,下來創建消費group

   ./kafka-console-consumer.sh --bootstrap-server 192.165.1.91:19092,192.165.1.92:19092,192.165.1.64:19092 --topic test3 --from-beginning --new-consumer

    此處會消費信息!

5,(此處,關閉消費程序,或者另外開一個窗口來)查看自己創建的  group id號;

     ./kafka-consumer-groups.sh --bootstrap-server 192.165.1.91:19092,192.165.1.92:19092,192.165.1.64:19092 --list --new-consumer       

     本地我顯示的是:console-consumer-54762

6,查詢__consumer_offsets topic所有內容

    注意:運行下面命令前先要在consumer.properties中設置exclude.internal.topics=false(同時要配置好你的consumer.properties中有關zookeeper和broker相關的IP和端口信息

     ./kafka-console-consumer.sh --topic __consumer_offsets --zookeeper 192.165.1.91:12181,192.165.1.92:12181,192.165.1.64:12181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config ../config/consumer.properties --from-beginning

  此處需要注意的是  consumer.properties 的路徑!!!  

7. 計算指定consumer group在__consumer_offsets topic中分區信息

這時候就用到了第4步獲取的group.id(本例中是console-consumer-54762)。Kafka會使用下面公式計算該group位移保存在__consumer_offsets的哪個分區上:

Math.abs(groupID.hashCode()) % numPartitions

所以在本例中,對應的分區=Math.abs("console-consumer-54762".hashCode()) % 50 = 22,即__consumer_offsets的分區22保存了這個consumer group的位移信息,下面讓我們驗證一下。

  注意:Math.abs("console-consumer-54762".hashCode()) % 50  這個使用java輸出的一個值,Math.abs是java的一個函數,可以直接將前邊這個做參數,打印出他的值.

8. 獲取指定consumer group的位移信息 

bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 22 --broker-list 192.165.1.91:19092,192.165.1.92:19092,192.165.1.64:19092 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

9,生產消息,查看負載均衡情況,,,,我項目中用的flume是exec方式的,,所有使用  cat 一個文件中的內容追加到flume  exec 的文件末尾,然后運行命令8,,,查看消費位移!!!

  結果顯示,,,攔截器,,分發消息成功,,倆個分區數目基本是持衡的。

  

   本文鏈接地址:https://i.cnblogs.com/EditPosts.aspx?postid=6339111

   特別鳴謝,胡夕,參考了他的博文,他的博客鏈接地址:http://www.cnblogs.com/huxi2b/p/6061110.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM