實時事件統計項目:優化flume:用file channel代替mem channel


背景:利用kafka+flume+morphline+solr做實時統計

solr從12月23號開始一直沒有數據。查看日志發現,因為有一個同事加了一條格式錯誤的埋點數據,導致大量error。

據推斷,是因為使用mem channel占滿,消息來不及處理,導致新來的數據都丟失了。

image

修改flume使用file channel:

kafka2solr.sources = source_from_kafka
kafka2solr.channels = file_channel
kafka2solr.sinks = solrSink

# For each one of the sources, the type is defined  
kafka2solr.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
kafka2solr.sources.source_from_kafka.channels = file_channel
kafka2solr.sources.source_from_kafka.batchSize = 100
kafka2solr.sources.source_from_kafka.useFlumeEventFormat=false
kafka2solr.sources.source_from_kafka.kafka.bootstrap.servers= kafkanode0:9092,kafkanode1:9092,kafkanode2:9092
kafka2solr.sources.source_from_kafka.kafka.topics = eventCount
kafka2solr.sources.source_from_kafka.kafka.consumer.group.id = flume_solr_caller
kafka2solr.sources.source_from_kafka.kafka.consumer.auto.offset.reset=latest

# file channel  
kafka2solr.channels.file_channel.type = file
kafka2solr.channels.file_channel.checkpointDir = /var/log/flume-ng/checkpoint
kafka2solr.channels.file_channel.dataDirs = /var/log/flume-ng/data


kafka2solr.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
kafka2solr.sinks.solrSink.channel = file_channel
#kafka2solr.sinks.solrSink.batchSize = 1000
#kafka2solr.sinks.solrSink.batchDurationMillis = 1000
kafka2solr.sinks.solrSink.morphlineFile = morphlines.conf
kafka2solr.sinks.solrSink.morphlineId=morphline1
kafka2solr.sinks.solrSink.isIgnoringRecoverableExceptions=true

使得數據持久化到磁盤不會丟失。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM