flume配置kafka channle的實戰案例


                  flume配置kafka channle的實戰案例

                                               作者:尹正傑

版權聲明:原創作品,謝絕轉載!否則將追究法律責任。

 

  最近在新公司負責大數據平台的建設,平台搭建完畢后,需要將雲平台(我們公司使用的Ucloud的雲服務器,大概320多台,還在擴容中),公司每個月光大數據服務費用就接近50萬人民幣。老板考慮成本問題,花了接近200萬的前采購了50台服務器用於大數據平台的建設。我已經將集群部署好了,正准備將雲上的環境原樣搬到我的新平台上時,遇到了一系列的坑,我已經填了不少的坑。這不,關於flume的一個channel的選擇也是一個坑。

  我們知道常用的channel如下:

    file          channel的特點是:速度慢,支持容災;

    memory channel的特點是:速度快,斷電丟數據,我們在Ucloud上使用的就是它;

    kafka     channel的特點是:高速緩存;

 

一.flume報錯OOM

  計划將kafka的數據通過flume抽取到hdfs上,真是flume略我千百變,我帶flume如初戀啊。經過各種測試,我已經將flume的內存提升到12G,下面是我啟動flume看到的信息,如下圖:

  啟動后,我開啟了一個終端,查看JVM的內存使用情況,發現分分鍾就給我吃滿了!如下圖:

  接下來,不到3分鍾時間,flume就崩潰了,頻繁拋出OOM的異常信息:

  接下來,我們看一下hdfs集群中是否有數據,答案是肯定的,hdfs的確是有數據:

   咋解決呢?大家可能說,你繼續加大內存唄,12G不夠,就給24G試試看!OK,我就JVM的堆內存調試到24G,啟動程序:

  在后端查看flume的進程ID:

  查看JVM的運行情況,如下:

  經過上面的改造后,我一致在等OOM,可惜一個小時過去了,始終沒有拋出OOM異常,我有點小失落,也有點小開心。開心的是終於不崩潰了,失落的是還剩下4個G,那我原來打算在這台服務器上開啟8個flume進程的計划是要泡湯了,因為是總大小總共才32G,有上面解決方案呢?查看官網,據說是有種基於Kafka channel的模式。也是本篇博客的想要說的主角。

 

二.分析OOM的解決方案

1>.分析為什么會拋OOM溢出

  其實想象大家也知道,source是kafka,而sink是hdfs,他們兩個的吞吐量閉着眼睛大家都知道誰是快誰慢。

  hdfs集群的工作原理可知,它在寫數據和讀取數據時都會和NameNode這個服務器進行交互,需要一系列驗證操作,最后讀操作或者寫操作依然不是和NameNode進行交互,而是client直接跟DataNode進行交互。  

  kafka則是基於partition來進行消費的,網上有些文章說partition的數越多,意味着kafka的吞吐量就越大,其實這個說法並不嚴謹,parition的數量應該小於集群的core總數,因為每個消費者基於paritition進行消費時,服務器都會開啟一個線程去應酬,如果你一台服務器paritrion響應的特別多,設計到上下文切換反而不理想了。一個消費者可以去集群同時對多個partition進行消費。

  以上的觀點僅是我個人對Kafka和hdfs的理解,如果哪里有說的不對的話,歡迎各位大神之路!綜上所述,我們都說kafka的速度要遠遠大於hdfs,kafka是順序寫入磁盤的,他的速度可達到300M/s,我們可以毫不客氣的說,順序寫入磁盤相比隨機寫入內存的速度有過之而無不及。

  好了,上面說了一堆的廢話,咱們回歸正題,為什么會OOM呢?原因很簡單,我們形象的說:想必大家都喝過可樂吧,可樂的汽水瓶形狀大家也應該清楚吧,我們將可樂瓶的瓶蓋去掉,然后把可樂瓶的平底削去,我們假設我們在粗的一端(原來的瓶底)注水進去,然后水會送細的一端(原來的瓶蓋)出去沒毛病吧?理論上來說,如果粗的一端源源不斷的網可樂瓶中注水,水也會遠遠不斷的從小瓶蓋中出去,但是當粗的一端流入端的速度遠遠高於流出端的速度,那么可樂瓶容器很快就會積累很多的水,知道把可能瓶注滿水,當注滿水以后,這個時候還要往里面注水的話,可樂瓶容器可能會變形,設置可能會將可能瓶撐爆。

  如果你看懂了我上面的描述,那么你我們在結合kafka和hdfs,說一說,誰是瓶底,誰是瓶蓋,誰是可樂瓶呢?我的理解是此時我們的瓶底的一端就是kafka,可樂瓶本身就是memory channel,瓶口那一端就是hdfs。那么瓶子被水撐滿最終爆裂就好比咱們的OOM。

2>.編寫flume的配置文件(該配置文件我是根據生產環境稍作改動,可供參考)

[root@flume120 ~]# cat /soft/flume/conf/job/flume-conf-01.properties
#定義別名
agent.sources = kafkaSource
agent.channels = fileChannel
agent.sinks = hdfsSink

#綁定關系
agent.sources.kafkaSource.channels = fileChannel
agent.sinks.hdfsSink.channel = fileChannel


#指定source源為kafka source
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.kafka.bootstrap.servers = 10.1.3.116:9092,10.1.3.117:9092,10.1.3.118:9092,10.1.3.119:9092,10.1.3.120:9092
agent.sources.kafkaSource.topic = against_cheating_01
agent.sources.kafkaSource.groupId = 1
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 70000
agent.sources.kafkaSource.kafka.consumer.request.timeout.ms = 80000
agent.sources.kafkaSource.kafka.consumer.fetch.max.wait.ms=7000
agent.sources.kafkaSource.kafka.consumer.offset.flush.interval.ms = 50000
agent.sources.kafkaSource.kafka.consumer.session.timeout.ms = 70000
agent.sources.kafkaSource.kafka.consumer.heartbeat.interval.ms = 60000
agent.sources.kafkaSource.kafka.consumer.enable.auto.commit = false
agent.sources.kafkaSource.interceptors = i1
agent.sources.kafkaSource.interceptors.i1.userIp = true
agent.sources.kafkaSource.interceptors.i1.type = host

#指定channel類型為kafka
agent.channels.fileChannel.type = org.apache.flume.channel.kafka.KafkaChannel
agent.channels.fileChannel.kafka.bootstrap.servers = 10.1.3.116:9092,10.1.3.117:9092,10.1.3.118:9092,10.1.3.119:9092,10.1.3.120:9092
agent.channels.fileChannel.kafka.topic = channel_against_cheating_01
agent.channels.fileChannel.kafka.consumer.group.id = flume-consumer-against_cheating_01
agent.channels.fileChannel.kafka.consumer.timeout.ms = 70000
agent.channels.fileChannel.kafka.consumer.request.timeout.ms = 80000
agent.channels.fileChannel.kafka.consumer.fetch.max.wait.ms=7000
agent.channels.fileChannel.kafka.consumer.offset.flush.interval.ms = 50000
agent.channels.fileChannel.kafka.consumer.session.timeout.ms = 70000
agent.channels.fileChannel.kafka.consumer.heartbeat.interval.ms = 60000
agent.channels.fileChannel.kafka.consumer.enable.auto.commit = false


#指定sink的類型為hdfs
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://hdfs-ha/user/against_cheating/%Y%m%d
agent.sinks.hdfsSink.hdfs.filePrefix = 10-1-2-120_01_%Y%m%d_%H
agent.sinks.hdfsSink.hdfs.fileSuffix = .txt
agent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType=DataStream
agent.sinks.hdfsSink.hdfs.rollCount = 0
agent.sinks.hdfsSink.hdfs.rollSize = 0
agent.sinks.hdfsSink.hdfs.rollInterval = 180
agent.sinks.hdfsSink.hdfs.batchSize = 100
agent.sinks.hdfsSink.hdfs.threadsPoolSize = 50
agent.sinks.hdfsSink.hdfs.idleTimeout = 0
agent.sinks.hdfsSink.hdfs.minBlockReplicas = 1
agent.sinks.hdfsSink.hdfs.callTimeout=100000
agent.sinks.hdfsSink.hdfs.request-timeout=100000
agent.sinks.hdfsSink.hdfs.connect-timeout=80000

[root@flume120 ~]# 

 

3>.啟動flume 

[root@flume120 ~]# cd /soft/flume/shell/
[root@flume120 shell]# 
[root@flume120 shell]# ll
total 20
-rwxr-xr-x 1 root root 850 Oct 18 14:44 start_flume_against_cheating_01.sh
-rwxr-xr-x 1 root root 849 Oct 18 14:43 start_flume_against_cheating_02.sh
-rwxr-xr-x 1 root root 850 Oct 18 14:42 start_flume_against_cheating_03.sh
-rwxr-xr-x 1 root root 850 Oct 18 14:42 start_flume_against_cheating_04.sh
-rwxr-xr-x 1 root root 850 Oct 18 14:58 start_flume_against_cheating_05.sh
[root@flume120 shell]# 
[root@flume120 shell]# cat start_flume_against_cheating_01.sh                         #我們直接執行這個腳本就行,默認就可以執行啦!
#!/bin/bash
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie
#EMAIL:y1053419035@qq.com
#Data:Thu Oct 18 11:26:06 CST 2018

#將監控數據發送給ganglia,需要指定ganglia服務器地址,使用請確認是否部署好ganglia服務!
#nohup flume-ng agent -c /soft/flume/conf/job/ --conf-file=/soft/flume/conf/job/flume-conf-01.properties --name agent -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=10.1.2.120:8649 -Dflume.root.logger=INFO,console >> /soft/flume/logs/flume-ganglia-01.log  2>&1 &


#啟動flume自身的監控參數,默認執行以下腳本
nohup flume-ng agent -c /soft/flume/conf/job/ --conf-file=/soft/flume/conf/job/flume-conf-01.properties --name agent -Dflume.monitoring.type=http  -Dflume.monitoring.port=5201 -Dflume.root.logger=INFO,console  >> /soft/flume/logs/flume-http-01.log  2>&1 &


[root@flume120 shell]# 

4>.查看日([root@flume120 shell]# tail -10f /soft/flume/logs/flume-http-01.log)

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM