作者:Syn良子 出處:http://www.cnblogs.com/cssdongl 轉載請注明出處
找時間記錄一下利用Gobblin采集kafka數據的過程,話不多說,進入正題
一.Gobblin環境變量准備
需要配置好Gobblin0.7.0工作時對應的環境變量,可以去Gobblin的bin目錄的gobblin-env.sh配置,比如
export GOBBLIN_JOB_CONFIG_DIR=~/gobblin/gobblin-config-dir export GOBBLIN_WORK_DIR=~/gobblin/gobblin-work-dir export HADOOP_BIN_DIR=/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/lib/hadoop/bin
也可以去自己當前用戶bashrc下配置,當然,確保JAVA_HOME也已經配置.
這里配置的Gobblin的配置文件目錄和工作目錄以及執行MR需要用到的hadoop bin目錄
二.Gobblin Standalone模式配置和使用
顧名思義,就是在部署Gobblin的單節點上來采集kafka數據,沒有用到Hadoop MR,配置過程如下
首先去GOBBLIN_JOB_CONFIG_DIR下,新建一個gobblinStandalone.pull配置文件,配置如下
job.name=GobblinKafkaQuickStart job.group=GobblinKafka job.description=Gobblin quick start job for Kafka job.lock.enabled=false job.schedule=0 0/3 * * * ? kafka.brokers=datanode01:9092 source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource extract.namespace=gobblin.extract.kafka writer.builder.class=gobblin.writer.SimpleDataWriterBuilder writer.file.path.type=tablename writer.destination.type=HDFS writer.output.format=txt data.publisher.type=gobblin.publisher.BaseDataPublisher mr.job.max.mappers=1 metrics.reporting.file.enabled=true metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics metrics.reporting.file.suffix=txt bootstrap.with.offset=earliest
這里需要配置好抽取數據的kafka broker以及一些gobblin的工作組件,如source,extract,writer,publisher等,不明白的可以參考Gobblin wiki,很詳細.
我這里額外配置了一個job.schedule讓gobblin三分鍾檢查一次kafka的所有topic是否有新增,然后抽取任務就會三分鍾一次定時執行.這里用的Gobblin自帶的Quartz定時器.
ok,配置好以后進入Gobblin根目錄,啟動命令如:
bin/gobblin-standalone.sh –conffile $GOBBLIN_JOB_CONFIG_DIR/gobblinStandalone.pull start
我這里GOBBLIN_JOB_CONFIG_DIR有多個pull文件,因此需要指明,如果GOBBLIN_JOB_CONFIG_DIR下只有一個配置文件,那么直接bin/gobblin-standalone.sh start即可執行
最終抽取過來的數據會輸出到GOBBLIN_WORK_DIR/job-output 中去.
三.Gobblin MapReduce模式配置和使用
這次配置Gobblin會使用MapReduce來抽取kafka數據到Hdfs,新建gobblin-mr.pull文件,配置如下
job.name=GobblinKafkaToHdfs job.group=GobblinToHdfs1 job.description=Pull data from kafka to hdfs use Gobblin job.lock.enabled=false kafka.brokers=datanode01:9092 source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource extract.namespace=gobblin.extract.kafka topic.whitelist=jsonTest writer.builder.class=gobblin.writer.SimpleDataWriterBuilder simple.writer.delimiter=\n simple.writer.prepend.size=false writer.file.path.type=tablename writer.destination.type=HDFS writer.output.format=txt writer.partitioner.class=gobblin.example.simplejson.TimeBasedJsonWriterPartitioner writer.partition.level=hourly writer.partition.pattern=yyyy/MM/dd/HH writer.partition.columns=time writer.partition.timezone=Asia/Shanghai data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher mr.job.max.mappers=1 metrics.reporting.file.enabled=true metrics.log.dir=/gobblin-kafka/metrics metrics.reporting.file.suffix=txt bootstrap.with.offset=earliest fs.uri=master:8020 writer.fs.uri=${fs.uri} state.store.fs.uri=${fs.uri} mr.job.root.dir=/gobblin-kafka/working state.store.dir=/gobblin-kafka/state-store task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data data.publisher.final.dir=/gobblintest/job-output
注意標紅部分的配置第一行,我這里加了topic過濾,只對topic名稱為jsonTest的主題感興趣
因為需求是需要將gobblin的topic數據按照每天每小時來進行目錄分區,具體分區目錄需要根據kafka record中的時間字段來
我這里record是json格式的,時間字段格式如{…"time":"2016-10-12 00:30:20"…},因此需要繼承Gobblin的TimeBasedWriterPartitioner來重寫子類方法按照時間字段對hdfs的目錄分區
以下配置需要注意
fs.uri=master:8020
改成自己的集群的hdfs地址
writer.partition.columns=time
這里的time和json中的時間字段保持一致即可
writer.partition.level=hourly
表示hdfs分區到小時
writer.partition.pattern=yyyy/MM/dd/HH
表示最終需要在hdfs分區的目錄格式(按照自己的最終分區需求自定義即可)
writer.partitioner.class=gobblin.example.simplejson.TimeBasedJsonWriterPartitioner
重寫的hdfs按照json時間字段分區的子類,代碼我提交到github了,參考如下鏈接
將擴展后的類加入Gobblin相應的模塊,我這里是放入gobblin-example模塊中去了,重新build,build有問題的話請參考這篇文章
上面配置文件最后的那些路徑都是hdfs路徑,請確保Gobblin有讀寫權限
隨后啟動命令
bin/gobblin-mapreduce.sh --conf $GOBBLIN_JOB_CONFIG_DIR/gobblin-mr.pull
運行成功后,hdfs會出現如下目錄,jsonTest是按照對應topic名稱生成的,如下圖
注意MR模式配置Quartz定時調度我試了好幾次不起作用,因此如果需要定時執行抽取的話請利用外部的工具,比如Linux的crontab或者Oozie或者Azkaban都是可以的.
四.Gobblin使用總結
1>先熟悉Gobblin官方wiki,寫的很詳細
2>github上fork一個源代碼仔細閱讀下source,extract,partioner這塊兒的代碼
3>使用中遇到問題多研究Gobblin的log和Hadoop的log.
參考資料:
http://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/
http://gobblin.readthedocs.io/en/latest/user-guide/Partitioned-Writers/
http://gobblin.readthedocs.io/en/latest/developer-guide/IDE-setup/