Gobblin采集kafka數據


作者:Syn良子 出處:http://www.cnblogs.com/cssdongl 轉載請注明出處

找時間記錄一下利用Gobblin采集kafka數據的過程,話不多說,進入正題

一.Gobblin環境變量准備

需要配置好Gobblin0.7.0工作時對應的環境變量,可以去Gobblin的bin目錄的gobblin-env.sh配置,比如

export GOBBLIN_JOB_CONFIG_DIR=~/gobblin/gobblin-config-dir
export GOBBLIN_WORK_DIR=~/gobblin/gobblin-work-dir
export HADOOP_BIN_DIR=/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/hadoop/bin

也可以去自己當前用戶bashrc下配置,當然,確保JAVA_HOME也已經配置.

這里配置的Gobblin的配置文件目錄和工作目錄以及執行MR需要用到的hadoop bin目錄

二.Gobblin Standalone模式配置和使用

顧名思義,就是在部署Gobblin的單節點上來采集kafka數據,沒有用到Hadoop MR,配置過程如下

首先去GOBBLIN_JOB_CONFIG_DIR下,新建一個gobblinStandalone.pull配置文件,配置如下

job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
job.schedule=0 0/3 * * * ?
kafka.brokers=datanode01:9092
source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka

writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt

data.publisher.type=gobblin.publisher.BaseDataPublisher

mr.job.max.mappers=1

metrics.reporting.file.enabled=true
metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics
metrics.reporting.file.suffix=txt

bootstrap.with.offset=earliest

這里需要配置好抽取數據的kafka broker以及一些gobblin的工作組件,如source,extract,writer,publisher等,不明白的可以參考Gobblin wiki,很詳細.

我這里額外配置了一個job.schedule讓gobblin三分鍾檢查一次kafka的所有topic是否有新增,然后抽取任務就會三分鍾一次定時執行.這里用的Gobblin自帶的Quartz定時器.

ok,配置好以后進入Gobblin根目錄,啟動命令如:

 bin/gobblin-standalone.sh –conffile $GOBBLIN_JOB_CONFIG_DIR/gobblinStandalone.pull start

我這里GOBBLIN_JOB_CONFIG_DIR有多個pull文件,因此需要指明,如果GOBBLIN_JOB_CONFIG_DIR下只有一個配置文件,那么直接bin/gobblin-standalone.sh start即可執行

最終抽取過來的數據會輸出到GOBBLIN_WORK_DIR/job-output 中去.

三.Gobblin MapReduce模式配置和使用

這次配置Gobblin會使用MapReduce來抽取kafka數據到Hdfs,新建gobblin-mr.pull文件,配置如下

job.name=GobblinKafkaToHdfs
job.group=GobblinToHdfs1
job.description=Pull data from kafka to hdfs use Gobblin
job.lock.enabled=false
kafka.brokers=datanode01:9092

source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka
topic.whitelist=jsonTest

writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
simple.writer.delimiter=\n
simple.writer.prepend.size=false
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
writer.partitioner.class=gobblin.example.simplejson.TimeBasedJsonWriterPartitioner writer.partition.level=hourly writer.partition.pattern=yyyy/MM/dd/HH writer.partition.columns=time writer.partition.timezone=Asia/Shanghai
data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher

mr.job.max.mappers=1

metrics.reporting.file.enabled=true
metrics.log.dir=/gobblin-kafka/metrics
metrics.reporting.file.suffix=txt

bootstrap.with.offset=earliest

fs.uri=master:8020
writer.fs.uri=${fs.uri}
state.store.fs.uri=${fs.uri}

mr.job.root.dir=/gobblin-kafka/working
state.store.dir=/gobblin-kafka/state-store
task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data
data.publisher.final.dir=/gobblintest/job-output

 

注意標紅部分的配置第一行,我這里加了topic過濾,只對topic名稱為jsonTest的主題感興趣

因為需求是需要將gobblin的topic數據按照每天每小時來進行目錄分區,具體分區目錄需要根據kafka record中的時間字段來

我這里record是json格式的,時間字段格式如{…"time":"2016-10-12 00:30:20"…},因此需要繼承Gobblin的TimeBasedWriterPartitioner來重寫子類方法按照時間字段對hdfs的目錄分區

以下配置需要注意

fs.uri=master:8020

改成自己的集群的hdfs地址

writer.partition.columns=time

這里的time和json中的時間字段保持一致即可

writer.partition.level=hourly

表示hdfs分區到小時

writer.partition.pattern=yyyy/MM/dd/HH

表示最終需要在hdfs分區的目錄格式(按照自己的最終分區需求自定義即可)

writer.partitioner.class=gobblin.example.simplejson.TimeBasedJsonWriterPartitioner

重寫的hdfs按照json時間字段分區的子類,代碼我提交到github了,參考如下鏈接

https://github.com/cssdongl/gobblin/blob/master/gobblin-example/src/main/java/gobblin/example/simplejson/TimeBasedJsonWriterPartitioner.java

將擴展后的類加入Gobblin相應的模塊,我這里是放入gobblin-example模塊中去了,重新build,build有問題的話請參考這篇文章

上面配置文件最后的那些路徑都是hdfs路徑,請確保Gobblin有讀寫權限

隨后啟動命令

bin/gobblin-mapreduce.sh --conf $GOBBLIN_JOB_CONFIG_DIR/gobblin-mr.pull

運行成功后,hdfs會出現如下目錄,jsonTest是按照對應topic名稱生成的,如下圖

GobblinPartion1

 

GobblinPartion3

注意MR模式配置Quartz定時調度我試了好幾次不起作用,因此如果需要定時執行抽取的話請利用外部的工具,比如Linux的crontab或者Oozie或者Azkaban都是可以的.

四.Gobblin使用總結

1>先熟悉Gobblin官方wiki,寫的很詳細

2>github上fork一個源代碼仔細閱讀下source,extract,partioner這塊兒的代碼

3>使用中遇到問題多研究Gobblin的log和Hadoop的log.

參考資料:

http://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/

http://gobblin.readthedocs.io/en/latest/user-guide/Partitioned-Writers/

http://gobblin.readthedocs.io/en/latest/developer-guide/IDE-setup/

http://gobblin.readthedocs.io/en/latest/user-guide/FAQs/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM