-
預置條件
本文采用clouderaManage安裝了kafka、flume、和kudu。注意:在安裝kudu的時候一定需要時間同步。具體的時間同步設置方法請參照:https://blog.csdn.net/u014516601/article/details/81433594。
本文kafka、flume和kudu的版本分別如下:
<flume.version>1.6.0</flume.version>
<kudu.version>1.7.0</kudu.version>
-
數據加載的流程
-
flume沒有集成kudu,因此需要第三方jar包,因此依賴kudu-flume-sink-1.7.0-cdh5.16.1.jar,將該jar包放在flume的lib下面。如果基於clouderaManage安裝,則可以直接放在/opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib/flume-ng/lib
-
編寫kudusink類,實現KuduOperationsProductor接口,必須重寫:configure、initialize、getOperations和close方法,下面是本文的實例代碼:
-
-
三 .編輯flume的agent文件
kafka.sources = kafkasource
kafka.sinks = kudusink1 kudusink2
kafka.channels = flumechannel1 flumechannel2
kafka.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource
kafka.sources.kafkasource.zookeeperConnect = zookeeper地址:2182
kafka.sources.kafkasource.topic = us_general
kafka.sources.kafkasource.kafka.consumer.timeout.ms = 100
kafka.sources.kafkasource.kafka.consumer.group.id = flume-kudu
kafka.sources.kafkasource.selector.type = replicating //本次采用多路復用
kafka.sources.kafkasource.channels = flumechannel1 flumechannel2
kafka.channels.flumechannel1.type = memory
kafka.channels.flumechannel1.capacity = 10000
kafka.channels.flumechannel1.transactionCapacity = 100
kafka.channels.flumechannel2.type = memory
kafka.channels.flumechannel2.capacity = 10000
kafka.channels.flumechannel2.transactionCapacity = 100
kafka.sinks.kudusink1.type = org.apache.kudu.flume.sink.KuduSink
kafka.sinks.kudusink1.masterAddresses = kuduMaster的地址:7051
kafka.sinks.kudusink1.tableName = impala::kududb.hisrealinfo1
kafka.sinks.kudusink1.operation = insert
kafka.sinks.kudusink1.batchSize = 50
kafka.sinks.kudusink1.producer = KuduSinkjar包
kafka.sinks.kudusink1.channel = flumechannel1
kafka.sinks.kudusink2.type = org.apache.kudu.flume.sink.KuduSink
kafka.sinks.kudusink2.masterAddresses = kuduMaster的地址:7051
kafka.sinks.kudusink2.tableName = impala::kududb.realinfo1
kafka.sinks.kudusink2.operation = insert
kafka.sinks.kudusink2.batchSize = 50
kafka.sinks.kudusink2.producer = KuduSinkjar包
kafka.sinks.kudusink2.channel = flumechannel2
四.執行flume_ng命令模式
flume-ng agent --conf ./flumekudu/ --conf-file $FLUME_USGENERAL_CONFIG --name kafka -Dflume.root.logger=INFO,console
注意:
基於命令模式的執行flume_ng,可能出現內存溢出的錯誤。這是,需要調節jdk的堆內存大小。
