我現在使用的是librdkafka 的C/C++ 的客戶端來生產消息,用flume來輔助處理異常的數據,,,
但是在前段時間,單獨使用flume測試的時候發現,flume不能對分區進行負載均衡!同一個集群中,一個broker的一個分區已經有10億條數據,另外一台的另一個分區只有8億條數據;
因此,我對flume參照別人的做法,增加了攔截器;
即在flume配置文件中 增加以下字段;
-----
stage_nginx.sources.tailSource.interceptors = i2
stage_nginx.sources.tailSource.interceptors.i2.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
stage_nginx.sources.tailSource.interceptors.i2.headerName=key
stage_nginx.sources.tailSource.interceptors.i2.preserveExisting=false
----特別注意,,此處的sources是你自己的sources name.
增加完后,要先進行自己測試,驗證flume攔截器的負載均衡功能;
好,下來話不多少,,看測試步驟;
1,創建topic 相關聯的分區 (因現場暫時只有2個分區,所以我這邊暫時取2個分區做測試)
(我暫時使用的kafka版本是kafka_2.11-0.9.0.1,以下都是在kafka相關版本的bin路徑下操作命令)
./kafka-topics.sh --create --zookeeper 192.165.1.91:12181,192.165.1.92:12181,192.165.1.64:12181 --replication-factor 1 --partitions 2 --topic test3
創建topic test3 不要分區 zookeeper 3台 分區2個 zookeeper端口號12181(我本地的broker端口號是19091,這個在kafka conf/ server.properties里邊配置)
2,查看topic的創建情況
在broker的每台機器的目錄下,分別查看topic的創建情況!
下邊是我91機器的情況:
./kafka-topics.sh --describe --zookeeper 192.165.1.91:12181 --topic test3
Topic:test3 PartitionCount:2 ReplicationFactor:1 Configs:
Topic: test3 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test3 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
-------------意思是 他有倆個分區,,每個分區他的備份分區都是他們自己,即沒有分區,,你們可以根據你們自身的現狀做不同的操作;
3,啟動flume生產消息,並且查看消息是否生產成功;
a, 啟動flume:/home/hadoop/wgjflume/apache-flume-1.5.0-cdh5.4.9-bin/bin/flume-ng agent -n stage_nginx -c /home/hadoop/wgjflume/apache-flume-1.5.0-cdh5.4.9-bin/conf -f /home/hadoop/wgjflume/apache-flume-1.5.0-cdh5.4.9-bin/conf/flume-conf.properties -Dflume.root.logger=INFO,console
次處生產了30條消息!!!
b, 查看消息是否消費成功!
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.165.1.91:19092,192.165.1.92:19092,192.165.1.64:19092 --topic test6 --time -1
4,要查消費情況,必須的建立消費組,,下來創建消費group
./kafka-console-consumer.sh --bootstrap-server 192.165.1.91:19092,192.165.1.92:19092,192.165.1.64:19092 --topic test3 --from-beginning --new-consumer
此處會消費信息!
5,(此處,關閉消費程序,或者另外開一個窗口來)查看自己創建的 group id號;
./kafka-consumer-groups.sh --bootstrap-server 192.165.1.91:19092,192.165.1.92:19092,192.165.1.64:19092 --list --new-consumer
本地我顯示的是:console-consumer-54762
6,查詢__consumer_offsets topic所有內容
注意:運行下面命令前先要在consumer.properties中設置exclude.internal.topics=false(同時要配置好你的consumer.properties中有關zookeeper和broker相關的IP和端口信息)
./kafka-console-consumer.sh --topic __consumer_offsets --zookeeper 192.165.1.91:12181,192.165.1.92:12181,192.165.1.64:12181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config ../config/consumer.properties --from-beginning
此處需要注意的是 consumer.properties 的路徑!!!
7. 計算指定consumer group在__consumer_offsets topic中分區信息
這時候就用到了第4步獲取的group.id(本例中是console-consumer-54762)。Kafka會使用下面公式計算該group位移保存在__consumer_offsets的哪個分區上:
Math.abs(groupID.hashCode()) % numPartitions
所以在本例中,對應的分區=Math.abs("console-consumer-54762".hashCode()) % 50 = 22,即__consumer_offsets的分區22保存了這個consumer group的位移信息,下面讓我們驗證一下。
注意:Math.abs("console-consumer-54762".hashCode()) % 50 這個使用java輸出的一個值,Math.abs是java的一個函數,可以直接將前邊這個做參數,打印出他的值.
8. 獲取指定consumer group的位移信息
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 22 --broker-list 192.165.1.91:19092,192.165.1.92:19092,192.165.1.64:19092 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
9,生產消息,查看負載均衡情況,,,,我項目中用的flume是exec方式的,,所有使用 cat 一個文件中的內容追加到flume exec 的文件末尾,然后運行命令8,,,查看消費位移!!!
結果顯示,,,攔截器,,分發消息成功,,倆個分區數目基本是持衡的。
本文鏈接地址:https://i.cnblogs.com/EditPosts.aspx?postid=6339111
特別鳴謝,胡夕,參考了他的博文,他的博客鏈接地址:http://www.cnblogs.com/huxi2b/p/6061110.html