使用flume將kafka數據sink到HBase【轉】


1. hbase sink介紹

如果還不了解flume請查看我寫的其他flume下的博客。

接下來的內容主要來自flume官方文檔的學習。

順便也強烈推薦flume 1.6 官方API

hbase的sink主要有以下兩種。兩種方式都提供和HBASE一樣的一致性保證,即行級原子性

1.1 HbaseSink

agent的配置時提供兩種序列化模式:

  1. SimpleHbaseEventSerializer: 將整個事件body部分當做完整的一列寫入hbase
  2. RegexHbaseEventSerializer: 根據正則表達式將event body拆分到不同的列當中

優點:
安全性較高:支持往secure hbase寫數據(hbase可以開啟kerberos校驗)

缺點:
性能沒有后面的那種AsyncHBaseSink高

1.2 AsyncHbaseSink

異步的Sink,可見速度是比前者快的,但是不支持往Secure Hbase寫數據。

采用的序列化器是:SimpleAsyncHbaseEventSerializer,也支持將event body分割成多個列,插入到對應KEY的ROW里

2. 配置flume

我們這里hbase沒有開啟安全相關選項,一般這集群也主要在內網環境。所以我們這里采用AsyncHbaseSink來進行本次操作。source則為kafka。

channel我們也選用kafka channel。之所以選擇kafka channel的依據可以參考flume中各類channel分析對比

配置文件如下:

# ------------------- 定義數據流----------------------
# source的名字 agent.sources = kafkaSource # channels的名字,建議按照type來命名 agent.channels = kafkaChannel # sink的名字,建議按照目標來命名 agent.sinks = hbaseSink # ---------------------定義source和sink的綁定關系---------------- # 指定source使用的channel名字 agent.sources.kafkaSource.channels = kafkaChannel # 指定sink需要使用的channel的名字,注意這里是channel agent.sinks.hbaseSink.channel = kafkaChannel #-------- kafkaSource相關配置----------------- # 定義消息源類型 agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource # 定義kafka所在zk的地址 agent.sources.kafkaSource.zookeeperConnect = 10.45.9.139:2181 # 配置消費的kafka topic agent.sources.kafkaSource.topic = my-topic-test # 配置消費者組的id agent.sources.kafkaSource.groupId = flume # 消費超時時間,參照如下寫法可以配置其他所有kafka的consumer選項。注意格式從kafka.xxx開始是consumer的配置屬性 agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100 #------- kafkaChannel相關配置------------------------- # channel類型 agent.channels.kafkaChannel.type = org.aprache.flume.channel.kafka.KafkaChannel # channel存儲的事件容量,即隊列長度 agent.channels.kafkaChannel.capacity=10000 # 事務容量 agent.channels.kafkaChannel.transactionCapacity=1000 # kafka broker list agent.channels.kafkaChannel.brokerList=mysql1:9092,mysql4:9092 # 指定topic agent.channels.topic=flume # 指定zk地址 agent.channels.kafkaChannel.zookeeperConnect=10.45.9.139:2181 # 指定producer的選項,關鍵是指定acks的值,保證消息發送的可靠性,retries采用默認的3 agent.channels.kafkaChannel.kafka.producer.acks=all #---------hbaseSink 相關配置------------------ # 指定sink類型。PS:如果使用RegexHbaseEventSerializer只能使用hbase類型 # agent.sinks.hbaseSink.type = hbase agent.sinks.hbaseSink.type = asynchbase # 指定hbase中的表名 agent.sinks.hbaseSink.table = student # 指明column family agent.sinks.hbaseSink.columnFamily= info # 使用的serializer agent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer # 如果需要使用正則處理value可以使用以下的serializer #agent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.RegexHbaseEventSerializer # 指定正則表達式,這里用的正則是匹配逗號分隔的字符串 #agent.sinks.hbaseSink.serializer.regex= ^([^,]+),([^,]+),([^,]+),([^,]+)$ # 指定在列族中對應的的colName # agent.sinks.hbaseSink.serializer.colNames=c1,c2,c3 # 指定hbase所用的zk集合 agent.sinks.hbaseSink.zookeeperQuorum = mysql3:2181,mysql4:2181,mysql5:2181 

3. 運行測試flume

在$FLUME_HOME/bin下執行以下命令運行。后台會開啟一個Application 的Java進程

nohup sh flume-ng agent --conf-file ../conf/flume-conf.properties --name agent -Dflume.root.logger=INFO,console &

然后寫個kafka的producer往我們前面定義的my-topic-test中寫消息。
producer的代碼如下:

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /**  * @author Wan Kaiming on 2016/8/1  * @version 1.0  */ public class MyProducer { public static void main(String[] args) { Properties props = new Properties(); //broker地址 這里用域名,記得修改本地的hosts文件 props.put("bootstrap.servers", "mysql1:9092,mysql4:9092"); //消息可靠性語義 props.put("acks", "all"); //請求broker失敗進行重試的次數,避免由於請求broker失敗造成的消息重復 props.put("retries", 0); //按批發送,每批的消息數量 props.put("batch.size", 16384); //防止來不及發送,延遲一點點時間,使得能夠批量發送消息 props.put("linger.ms", 1); //緩沖大小,bytes props.put("buffer.memory", 33554432); //key的序列化類 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value的序列化類 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //創建一個Producer對象,加載配置上下文信息 Producer<String, String> producer = new KafkaProducer<String,String>(props); for(int i=0;i<6;i++){ producer.send(new ProducerRecord<String, String>("my-topic-test", "hello", "world")); } // while(true){ // //調用send方法進行發送。send方法將消息加到緩存,異步發送 // producer.send(new ProducerRecord<String, String>("my-topic", "hello", "world")); // } producer.close(); } } 

運行完畢后可以去hbase shell中檢查下:

總結:可見key全部是由flume自動生成的。發送給kafka的 value值"world"全部成功保存到HBASE

PS:

  1. 最后一行多出來的incRow是Flume的SimpleAsyncHbaseEventSerializer中使用的。用來統計行數的,每次都在最后一行,效果就是一個計數的count。
  2. 這里產生的行的名字是pCol和iCol都是SimpleAsyncHbaseEventSerializer的默認值,其實可以自定義指定

總結:可見,如果需要更加自由的對寫入HBASE的數據做自定義,建議需要了解下這個Event序列化類的源碼,然后可以自定義序列化類

4. 使用RegexHbaseEventSerializer來處理些HBASE的值

重要提示:使用RegexHbaseEventSerializer不支持最新的hbase1.2.2。請換成hbase-0.98.20-hadoop2-bin.tar.gz,否則可能會報錯找不到類。該BUG說明見:Flume 1.6 HBase 1.12 java.lang.NoSuchMethodError

  1. 修改flume的配置文件,改用RegexHbaseEventSerializer,我使用的配置文件如下
# ------------------- 定義數據流----------------------
# source的名字 agent.sources = kafkaSource # channels的名字,建議按照type來命名 agent.channels = kafkaChannel # sink的名字,建議按照目標來命名 agent.sinks = hbaseSink # ---------------------定義source和sink的綁定關系---------------- # 指定source使用的channel名字 agent.sources.kafkaSource.channels = kafkaChannel # 指定sink需要使用的channel的名字,注意這里是channel agent.sinks.hbaseSink.channel = kafkaChannel #-------- kafkaSource相關配置----------------- # 定義消息源類型 agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource # 定義kafka所在zk的地址 agent.sources.kafkaSource.zookeeperConnect = 10.45.9.139:2181 # 配置消費的kafka topic agent.sources.kafkaSource.topic = my-topic-regex # 配置消費者組的id agent.sources.kafkaSource.groupId = flume # 消費超時時間,參照如下寫法可以配置其他所有kafka的consumer選項。注意格式從kafka.xxx開始是consumer的配置屬性 agent.sources.kafkaSource.kafka.consumer.timeout.ms = 100 #------- kafkaChannel相關配置------------------------- # channel類型 agent.channels.kafkaChannel.type = org.apache.flume.channel.kafka.KafkaChannel # channel存儲的事件容量,即隊列長度 agent.channels.kafkaChannel.capacity=10000 # 事務容量 agent.channels.kafkaChannel.transactionCapacity=1000 # kafka broker list agent.channels.kafkaChannel.brokerList=mysql1:9092,mysql4:9092 # 指定topic agent.channels.kafkaChannel.topic=flume-regex-channel # 指定zk地址 agent.channels.kafkaChannel.zookeeperConnect=10.45.9.139:2181 # 指定producer的選項,關鍵是指定acks的值,保證消息發送的可靠性,retries采用默認的3 # agent.channels.kafkaChannel.kafka.producer.acks=all #---------hbaseSink 相關配置------------------ # 指定sink類型 # agent.sinks.hbaseSink.type = asynchbase agent.sinks.hbaseSink.type = hbase # 指定hbase中的表名 agent.sinks.hbaseSink.table = student # 指明column family agent.sinks.hbaseSink.columnFamily = info # 使用的serializer # agent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer # 如果需要使用正則處理value可以使用以下的serializer agent.sinks.hbaseSink.serializer= org.apache.flume.sink.hbase.RegexHbaseEventSerializer # 指定某一列來當主鍵,而不是用隨機生成的key # agent.sinks.hbaseSink.serializer.rowKeyIndex = 0 # 指定正則表達式,這里用的正則是匹配逗號分隔的字符串 agent.sinks.hbaseSink.serializer.regex=^([^,]+),([^,]+),([^,]+),([^,]+)$ # 指定在列族中對應的的colName agent.sinks.hbaseSink.serializer.colNames=c1,c2,c3,c4 # 指定hbase所用的zk集合 agent.sinks.hbaseSink.zookeeperQuorum = mysql3:2181,mysql4:2181,mysql5:2181 
  1. 修改producer文件,將value值發送"one,two,three,four"可以匹配正則。執行結果如下圖:

PS:建議把lfume默認的JVM大小改大點,並且開啟JMX方便監控JVM

vi $FLUME_HOME/bin/flume-ng
# set default params # 若干內容... JAVA_OPTS="-Xmx1500m -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false " LD_LIBRARY_PATH="" # 若干內容... 

5. 效率測試

測試按照第四節的配置來進行。生產者代碼如下:

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Date; import java.util.Properties; /**  * @author Wan Kaiming on 2016/8/1  * @version 1.0  */ public class MyProducer { public static void main(String[] args) { //統計時間 System.out.println("程序開始時間戳信息:"+new Date()); final long startTime=System.currentTimeMillis(); Properties props = new Properties(); //broker地址 這里用域名,記得修改本地的hosts文件 props.put("bootstrap.servers", "mysql1:9092,mysql4:9092"); //消息可靠性語義 props.put("acks", "all"); //請求broker失敗進行重試的次數,避免由於請求broker失敗造成的消息重復 props.put("retries", 3); //按批發送,每批的消息數量 //props.put("batch.size", 16384); props.put("batch.size", 16384); //防止來不及發送,延遲一點點時間,使得能夠批量發送消息 props.put("linger.ms", 1); //緩沖大小,bytes props.put("buffer.memory", 33554432); //key的序列化類 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value的序列化類 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //創建一個Producer對象,加載配置上下文信息 Producer<String, String> producer = new KafkaProducer<String,String>(props); for(int i=0;i<1000000;i++){ producer.send(new ProducerRecord<String, String>("my-topic-regex", Integer.toString(i), "one,two,three,four")); } producer.close(); final long endTime=System.currentTimeMillis(); float excTime=(float)(endTime-startTime)/1000; System.out.println("執行時間:"+excTime+"s"); System.out.println("當前時間為:"+ new Date()); } } 

測試基本信息:

名稱 信息
PC硬件信息 Intel(R) Xeon(R) CPU E7- 4830 @ 2.13GHz,內存4G
JAVA JDK1.8
KAFKA 2台broker(heap size =1200M),1個topic,5個分區,2個復制分區,acks=all
flume 1.60版本,heap size=1.5G,kafkachannel,kafka source,hbase sink。
負載信息 100萬條消息,每條消息20byte的樣子
hbase 1台master,3台slave,其中1台slave和master在一台機器,版本0.98-hadoop2

測試結果1(kafkachannel=0):

  1. kafka發送消息時間:6.912s
  2. hbase接受完全部消息:4分33s
  3. 延遲時間:4分26s

測試結果2(kafkachannel=all):

  1. kafka發送消息時間:8.25s
  2. hbase接受完全部消息:4分59s
  3. 延遲時間:4分51s

PS: 測試會存在一定誤差。因為讀取hbase的時候是按照1000條的批大小批量讀取的,count完整個HBASE的記錄本身也會花很多時間。也就是意味着,實際的延遲時間肯定比我測試的要小。測試1的時候,32.8萬條消息,花費時間約為120s,得到吞吐量TPS=2733。該值基本比較准確。

綜上,在保證消息可靠性前提下,kafka消息通過flume寫hbase的吞吐量TPS基本在3K左右這個數量級。相信經過更多的配置優化、硬件性能提升、增大JVM堆等方式,提升TPS不是問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM