flume配置kafka channle的實戰案例
作者:尹正傑
版權聲明:原創作品,謝絕轉載!否則將追究法律責任。
最近在新公司負責大數據平台的建設,平台搭建完畢后,需要將雲平台(我們公司使用的Ucloud的雲服務器,大概320多台,還在擴容中),公司每個月光大數據服務費用就接近50萬人民幣。老板考慮成本問題,花了接近200萬的前采購了50台服務器用於大數據平台的建設。我已經將集群部署好了,正准備將雲上的環境原樣搬到我的新平台上時,遇到了一系列的坑,我已經填了不少的坑。這不,關於flume的一個channel的選擇也是一個坑。
我們知道常用的channel如下:
file channel的特點是:速度慢,支持容災;
memory channel的特點是:速度快,斷電丟數據,我們在Ucloud上使用的就是它;
kafka channel的特點是:高速緩存;
一.flume報錯OOM
計划將kafka的數據通過flume抽取到hdfs上,真是flume略我千百變,我帶flume如初戀啊。經過各種測試,我已經將flume的內存提升到12G,下面是我啟動flume看到的信息,如下圖:
啟動后,我開啟了一個終端,查看JVM的內存使用情況,發現分分鍾就給我吃滿了!如下圖:
接下來,不到3分鍾時間,flume就崩潰了,頻繁拋出OOM的異常信息:
接下來,我們看一下hdfs集群中是否有數據,答案是肯定的,hdfs的確是有數據:
咋解決呢?大家可能說,你繼續加大內存唄,12G不夠,就給24G試試看!OK,我就JVM的堆內存調試到24G,啟動程序:
在后端查看flume的進程ID:
查看JVM的運行情況,如下:
經過上面的改造后,我一致在等OOM,可惜一個小時過去了,始終沒有拋出OOM異常,我有點小失落,也有點小開心。開心的是終於不崩潰了,失落的是還剩下4個G,那我原來打算在這台服務器上開啟8個flume進程的計划是要泡湯了,因為是總大小總共才32G,有上面解決方案呢?查看官網,據說是有種基於Kafka channel的模式。也是本篇博客的想要說的主角。
二.分析OOM的解決方案
1>.分析為什么會拋OOM溢出
其實想象大家也知道,source是kafka,而sink是hdfs,他們兩個的吞吐量閉着眼睛大家都知道誰是快誰慢。
hdfs集群的工作原理可知,它在寫數據和讀取數據時都會和NameNode這個服務器進行交互,需要一系列驗證操作,最后讀操作或者寫操作依然不是和NameNode進行交互,而是client直接跟DataNode進行交互。
kafka則是基於partition來進行消費的,網上有些文章說partition的數越多,意味着kafka的吞吐量就越大,其實這個說法並不嚴謹,parition的數量應該小於集群的core總數,因為每個消費者基於paritition進行消費時,服務器都會開啟一個線程去應酬,如果你一台服務器paritrion響應的特別多,設計到上下文切換反而不理想了。一個消費者可以去集群同時對多個partition進行消費。
以上的觀點僅是我個人對Kafka和hdfs的理解,如果哪里有說的不對的話,歡迎各位大神之路!綜上所述,我們都說kafka的速度要遠遠大於hdfs,kafka是順序寫入磁盤的,他的速度可達到300M/s,我們可以毫不客氣的說,順序寫入磁盤相比隨機寫入內存的速度有過之而無不及。
好了,上面說了一堆的廢話,咱們回歸正題,為什么會OOM呢?原因很簡單,我們形象的說:想必大家都喝過可樂吧,可樂的汽水瓶形狀大家也應該清楚吧,我們將可樂瓶的瓶蓋去掉,然后把可樂瓶的平底削去,我們假設我們在粗的一端(原來的瓶底)注水進去,然后水會送細的一端(原來的瓶蓋)出去沒毛病吧?理論上來說,如果粗的一端源源不斷的網可樂瓶中注水,水也會遠遠不斷的從小瓶蓋中出去,但是當粗的一端流入端的速度遠遠高於流出端的速度,那么可樂瓶容器很快就會積累很多的水,知道把可能瓶注滿水,當注滿水以后,這個時候還要往里面注水的話,可樂瓶容器可能會變形,設置可能會將可能瓶撐爆。
如果你看懂了我上面的描述,那么你我們在結合kafka和hdfs,說一說,誰是瓶底,誰是瓶蓋,誰是可樂瓶呢?我的理解是此時我們的瓶底的一端就是kafka,可樂瓶本身就是memory channel,瓶口那一端就是hdfs。那么瓶子被水撐滿最終爆裂就好比咱們的OOM。
2>.編寫flume的配置文件(該配置文件我是根據生產環境稍作改動,可供參考)
[root@flume120 ~]# cat /soft/flume/conf/job/flume-conf-01.properties #定義別名 agent.sources = kafkaSource agent.channels = fileChannel agent.sinks = hdfsSink #綁定關系 agent.sources.kafkaSource.channels = fileChannel agent.sinks.hdfsSink.channel = fileChannel #指定source源為kafka source agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource agent.sources.kafkaSource.kafka.bootstrap.servers = 10.1.3.116:9092,10.1.3.117:9092,10.1.3.118:9092,10.1.3.119:9092,10.1.3.120:9092 agent.sources.kafkaSource.topic = against_cheating_01 agent.sources.kafkaSource.groupId = 1 agent.sources.kafkaSource.kafka.consumer.timeout.ms = 70000 agent.sources.kafkaSource.kafka.consumer.request.timeout.ms = 80000 agent.sources.kafkaSource.kafka.consumer.fetch.max.wait.ms=7000 agent.sources.kafkaSource.kafka.consumer.offset.flush.interval.ms = 50000 agent.sources.kafkaSource.kafka.consumer.session.timeout.ms = 70000 agent.sources.kafkaSource.kafka.consumer.heartbeat.interval.ms = 60000 agent.sources.kafkaSource.kafka.consumer.enable.auto.commit = false agent.sources.kafkaSource.interceptors = i1 agent.sources.kafkaSource.interceptors.i1.userIp = true agent.sources.kafkaSource.interceptors.i1.type = host #指定channel類型為kafka agent.channels.fileChannel.type = org.apache.flume.channel.kafka.KafkaChannel agent.channels.fileChannel.kafka.bootstrap.servers = 10.1.3.116:9092,10.1.3.117:9092,10.1.3.118:9092,10.1.3.119:9092,10.1.3.120:9092 agent.channels.fileChannel.kafka.topic = channel_against_cheating_01 agent.channels.fileChannel.kafka.consumer.group.id = flume-consumer-against_cheating_01 agent.channels.fileChannel.kafka.consumer.timeout.ms = 70000 agent.channels.fileChannel.kafka.consumer.request.timeout.ms = 80000 agent.channels.fileChannel.kafka.consumer.fetch.max.wait.ms=7000 agent.channels.fileChannel.kafka.consumer.offset.flush.interval.ms = 50000 agent.channels.fileChannel.kafka.consumer.session.timeout.ms = 70000 agent.channels.fileChannel.kafka.consumer.heartbeat.interval.ms = 60000 agent.channels.fileChannel.kafka.consumer.enable.auto.commit = false #指定sink的類型為hdfs agent.sinks.hdfsSink.type = hdfs agent.sinks.hdfsSink.hdfs.path = hdfs://hdfs-ha/user/against_cheating/%Y%m%d agent.sinks.hdfsSink.hdfs.filePrefix = 10-1-2-120_01_%Y%m%d_%H agent.sinks.hdfsSink.hdfs.fileSuffix = .txt agent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true agent.sinks.hdfsSink.hdfs.writeFormat = Text agent.sinks.hdfsSink.hdfs.fileType=DataStream agent.sinks.hdfsSink.hdfs.rollCount = 0 agent.sinks.hdfsSink.hdfs.rollSize = 0 agent.sinks.hdfsSink.hdfs.rollInterval = 180 agent.sinks.hdfsSink.hdfs.batchSize = 100 agent.sinks.hdfsSink.hdfs.threadsPoolSize = 50 agent.sinks.hdfsSink.hdfs.idleTimeout = 0 agent.sinks.hdfsSink.hdfs.minBlockReplicas = 1 agent.sinks.hdfsSink.hdfs.callTimeout=100000 agent.sinks.hdfsSink.hdfs.request-timeout=100000 agent.sinks.hdfsSink.hdfs.connect-timeout=80000 [root@flume120 ~]#
3>.啟動flume
[root@flume120 ~]# cd /soft/flume/shell/ [root@flume120 shell]# [root@flume120 shell]# ll total 20 -rwxr-xr-x 1 root root 850 Oct 18 14:44 start_flume_against_cheating_01.sh -rwxr-xr-x 1 root root 849 Oct 18 14:43 start_flume_against_cheating_02.sh -rwxr-xr-x 1 root root 850 Oct 18 14:42 start_flume_against_cheating_03.sh -rwxr-xr-x 1 root root 850 Oct 18 14:42 start_flume_against_cheating_04.sh -rwxr-xr-x 1 root root 850 Oct 18 14:58 start_flume_against_cheating_05.sh [root@flume120 shell]# [root@flume120 shell]# cat start_flume_against_cheating_01.sh #我們直接執行這個腳本就行,默認就可以執行啦! #!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com #Data:Thu Oct 18 11:26:06 CST 2018 #將監控數據發送給ganglia,需要指定ganglia服務器地址,使用請確認是否部署好ganglia服務! #nohup flume-ng agent -c /soft/flume/conf/job/ --conf-file=/soft/flume/conf/job/flume-conf-01.properties --name agent -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=10.1.2.120:8649 -Dflume.root.logger=INFO,console >> /soft/flume/logs/flume-ganglia-01.log 2>&1 & #啟動flume自身的監控參數,默認執行以下腳本 nohup flume-ng agent -c /soft/flume/conf/job/ --conf-file=/soft/flume/conf/job/flume-conf-01.properties --name agent -Dflume.monitoring.type=http -Dflume.monitoring.port=5201 -Dflume.root.logger=INFO,console >> /soft/flume/logs/flume-http-01.log 2>&1 & [root@flume120 shell]#
4>.查看日志([root@flume120 shell]# tail -10f /soft/flume/logs/flume-http-01.log)