【原創】大數據基礎之Gobblin(2)持久化kafka到hdfs


gobblin 0.10

想要持久化kafka到hdfs有很多種方式,比如flume、logstash、gobblin,其中flume和logstash是流式的,gobblin是批處理式的,gobblin通過定時任務觸發來完成數據持久化,在任務和任務之間是沒有任何讀寫的,這點是和flume、logstash的最大不同;

gobblin有幾種部署方式:

1)standalone+cron;

2)mr+oozie/azkaban等

3)docker;

其中第3中方式最為方便,因為gobblin可以把任務的狀態都寫到hdfs上,所以在哪個節點啟動gobblin並沒有什么區別,而且只有數據同步之后才會修改元數據,保證不會因為kafka或者hdfs或者自身故障導致丟數據;

1 配置

#job
job.name=test_job
job.group=test_group
job.schedule=0 0 */1 * * ?
job.lock.enabled=false

#source
source.class=gobblin.source.extractor.extract.kafka.KafkaSimpleSource
extract.namespace=gobblin.extract.kafka
kafka.brokers=$kafka_brokers
bootstrap.with.offset=latest
topic.whitelist=$kafka_topics

mr.job.max.mappers=1

#writer
writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
writer.partitioner.class=gobblin.writer.partitioner.TimeBasedWriterPartitioner
writer.partition.columns=time writer.partition.level
=hourly writer.partition.pattern=yyyyMMdd/HH writer.partition.timezone=Asia/Shanghai data.publisher.type=gobblin.publisher.TimePartitionedDataPublisher #metrics metrics.reporting.file.enabled=true metrics.reporting.file.suffix=txt #fs fs.uri=hdfs://$name_node:8020 writer.fs.uri=${fs.uri} state.store.fs.uri=${fs.uri} data.publisher.final.dir=${env:GOBBLIN_WORK_DIR}/job-output metrics.log.dir=${env:GOBBLIN_WORK_DIR}/metrics state.store.dir=${env:GOBBLIN_WORK_DIR}/state-store mr.job.root.dir=${env:GOBBLIN_WORK_DIR}/working task.data.root.dir=${env:GOBBLIN_WORK_DIR}/task-data

修改其中的$kafka_brokers,$kafka_topics,$name_node即可;

這里的配置為standalone每小時執行一次,每次執行時根據數據中的time字段來格式化為時間分區進行存放到hdfs上的指定目錄;

 

2 啟動

export GOBBLIN_JOB_CONFIG_DIR=/opt/gobblin/gobblin-dist/job_conf
export GOBBLIN_WORK_DIR=/opt/gobblin/gobblin-dist/work_dir

bin/gobblin-standalone.sh start

 

3 定制化

1)希望按照當前時間(而不是數據中的時間)進行時間分區

package gobblin.writer.partitioner;

import gobblin.configuration.State; public class DefaultTimeBasedWriterPartitioner extends TimeBasedWriterPartitioner { public DefaultTimeBasedWriterPartitioner(State state, int numBranches, int branchId) { super(state, numBranches, branchId); } public long getRecordTimestamp(Object record) { return System.currentTimeMillis(); } }

配置:

writer.partitioner.class=gobblin.writer.partitioner.DefaultTimeBasedWriterPartitioner

2)只保存json數據,並且添加換行

package gobblin.source.extractor.extract.kafka;

import gobblin.configuration.WorkUnitState; import gobblin.source.extractor.Extractor; import java.io.IOException; public class JsonKafkaSimpleSource extends KafkaSimpleSource { public JsonKafkaSimpleSource() {} @Override public Extractor<String, byte[]> getExtractor(WorkUnitState state) throws IOException { return new JsonKafkaSimpleExtractor(state); } }

 

package gobblin.source.extractor.extract.kafka;

import gobblin.configuration.WorkUnitState; import gobblin.kafka.client.ByteArrayBasedKafkaRecord; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; public class JsonKafkaSimpleExtractor extends KafkaSimpleExtractor { public JsonKafkaSimpleExtractor(WorkUnitState state) { super(state); } @Override protected byte[] decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException { byte[] resultBytes = kafkaConsumerRecord.getMessageBytes(); String result = new String(resultBytes, "UTF-8"); if (result != null && result.length() > 2 && result.charAt(0) == '{' && result.charAt(result.length() - 1) == '}') return (result + "\n").getBytes("UTF-8"); else { System.out.println("[" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "]found invalid json : " + result); return "".getBytes(); } } }

 

配置:

source.class=gobblin.source.extractor.extract.kafka.JsonKafkaSimpleSource

 

4 docker image

https://hub.docker.com/r/gobblin/gobblin-standalone

docker run -d gobblin/gobblin-standalone:ubuntu-gobblin-0.10.0

 

參考:

https://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/

https://gobblin.readthedocs.io/en/latest/user-guide/Configuration-Properties-Glossary/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM