引言
今天針對線上生產環境下單機 flume 拉取kafka數據並存儲數據入Hdfs 出現大批量數據延遲. 在網上官網各種搜索數據,並結合官網數據,現進行以下總結
1. 線上單機存在問題簡述
當前flume拉取kafa數據量並不大 ,根據flume客戶端日志 ,每半分鍾hdfs文件寫入一次數據生成文件
發現問題:
拉取kafka數據過慢
2. 解決思路
- 加大kafka拉取數據量
- 加大flume中channel,source,sink 各通道的單條數據量
- 將flume拉取數據單機版本改成多數據拉取,通過flume-avore-sink-> flume-avore-source 進行數據多數據采取並合並
3 加大kafka拉取數據量
3.1 kafka-source簡述
- flume 輸入單線程拉取數據並將數據發送內置channel並通過sink組件進行數據轉發和處理,故對於kafka集群多副本方式拉取數據的時候,應適當考慮多個flume節點拉取kafka多副本數據,以避免flume節點在多個kafka集群副本中輪詢。加大flume拉取kafka數據的速率。
- flume-kafka-source 是flume內置的kafka source數據組件,是為了拉取kafka數據,配置如下:
agent.sources = r1
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest
- flume-kafka-source 的offset是交由zk集群去維護offset
3.2 kafka-source配置詳解
Kafka Source是一個Apache Kafka消費者,它從Kafka主題中讀取消息。 如果您正在運行多個Kafka源,則可以使用相同的使用者組配置它們,以便每個源都讀取一組唯一的主題分區。
Property Name | Default | Description |
---|---|---|
channels | – | 配置的channels 可配置多個channels 后續文章會說到 |
type | – | org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | – | 配置kafka集群地址 |
kafka.consumer.group.id | flume | 唯一確定的消費者群體。 在多個源或代理中設置相同的ID表示它們是同一個使用者組的一部分 |
kafka.topics | – | 你需要消費的topic |
kafka.topics.regex | – | 正則表達式,用於定義源訂閱的主題集。 此屬性的優先級高於kafka.topics ,如果存在則覆蓋kafka.topics 。 |
batchSize | 1000 | 一批中寫入Channel的最大消息數 (優化項) |
batchDurationMillis | 1000 | 將批次寫入通道之前的最長時間(以毫秒為單位)只要達到第一個大小和時間,就會寫入批次。(優化項) |
backoffSleepIncrement | 1000 | Kafka主題顯示為空時觸發的初始和增量等待時間。 等待時間將減少對空kafka 主題的激進ping操作。 一秒鍾是攝取用例的理想選擇,但使用攔截器的低延遲操作可能需要較低的值。 |
maxBackoffSleep | 5000 | Kafka主題顯示為空時觸發的最長等待時間。 5秒是攝取用例的理想選擇,但使用攔截器的低延遲操作可能需要較低的值。 |
useFlumeEventFormat | false | 默認情況下,事件從Kafka主題直接作為字節直接進入事件主體。 設置為true以將事件讀取為Flume Avro二進制格式。 與KafkaSink上的相同屬性或Kafka Channel上的parseAsFlumeEvent屬性一起使用時,這將保留在生成端發送的任何Flume標頭。 |
setTopicHeader | true | 設置為true時,將檢索到的消息的主題存儲到標題中,該標題由topicHeader 屬性定義。 |
topicHeader | topic | 如果setTopicHeader 屬性設置為true ,則定義用於存儲接收消息主題名稱的標題的名稱。 如果與Kafka SinktopicHeader 屬性結合使用,應該小心,以避免在循環中將消息發送回同一主題。 |
migrateZookeeperOffsets | true | 如果找不到Kafka存儲的偏移量,請在Zookeeper中查找偏移量並將它們提交給Kafka。 這應該是支持從舊版本的Flume無縫Kafka客戶端遷移。 遷移后,可以將其設置為false,但通常不需要這樣做。 如果未找到Zookeeper偏移量,則Kafka配置kafka.consumer.auto.offset.reset定義如何處理偏移量。 查看[Kafka文檔](http://kafka.apache.org/documentation.html#newconsumerconfigs)了解詳細信息 |
kafka.consumer.security.protocol | PLAINTEXT | 如果使用某種級別的安全性寫入Kafka,則設置為SASL_PLAINTEXT,SASL_SSL或SSL。 |
Other Kafka Consumer Properties | – | 這些屬性用於配置Kafka Consumer。 可以使用Kafka支持的任何消費者財產。 唯一的要求是在前綴為“kafka.consumer”的前綴中添加屬性名稱。 例如:kafka.consumer.auto.offset.reset |
注意:
Kafka Source會覆蓋兩個Kafka使用者參數:source.com將auto.commit.enable設置為“false”,並提交每個批處理。 Kafka源至少保證一次消息檢索策略。 源啟動時可以存在重復項。 Kafka Source還提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer)和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默認值。 不建議修改這些參數。
官方配置示例:
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
Example for topic subscription by regex
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used
本案例kafka-source配置
agent.sources = r1
agent.sources.r1.channels=c1
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest
官網配置文件地址 : kafka-source
3.3 配置優化
主要是在放入flume-channels 的批量數據加大
更改參數:
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
更改解釋:
即每2秒鍾拉取 kafka 一批數據 批數據大小為50000 放入到flume-channels 中 。即flume該節點 flume-channels 輸入端數據已放大
更改依據:
- 需要配置kafka單條數據 broker.conf 中配置
message.max.bytes
- 當前flume channel sink 組件最大消費能力如何?
4. 加大flume中channel,source,sink 各通道的單條數據量
4.1 source 發送至channels 數據量大小已配置 見 3.3
4.2 channel 配置
Property Name | Default | Description |
---|---|---|
type | – | The component type name, needs to be memory |
capacity | 100 | 通道中存儲的最大事件數 (優化項) |
transactionCapacity | 100 | 每個事務通道從源或提供給接收器的最大事件數 (優化項) |
keep-alive | 3 | 添加或刪除事件的超時(以秒為單位) |
byteCapacityBufferPercentage | 20 | 定義byteCapacity與通道中所有事件的估計總大小之間的緩沖區百分比,以計算標頭中的數據。 見下文。 |
byteCapacity | see description | 允許的最大總字節作為此通道中所有事件的總和。 實現只計算Eventbody ,這也是提供byteCapacityBufferPercentage 配置參數的原因。 默認為計算值,等於JVM可用的最大內存的80%(即命令行傳遞的-Xmx值的80%)。 請注意,如果在單個JVM上有多個內存通道,並且它們碰巧保持相同的物理事件(即,如果您使用來自單個源的復制通道選擇器),那么這些事件大小可能會因為通道byteCapacity目的而被重復計算。 將此值設置為“0”將導致此值回退到大約200 GB的內部硬限制。 |
配置 capacity 和 transactionCapacity 值 。默認配置規則為:
$$
channels.capacity >= channels.transactionCapacity >= source.batchSize
$$
官方channels配置示例
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
本案例修改之后的channels 配置
agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000
5. 將flume拉取數據單機版本改成多數據拉取,通過flume-avore-sink-> flume-avore-source 進行數據多數據采取並合並
5.1 存在問題
通過上續修改會發現單機版本的flume會在多副本kafka輪詢造成效率浪費
單機版本flume處理數據時會存在單機瓶頸,單機channels可能最多只能處理最大數據無法擴充
單機flume配置多個數據源不方便,不能適合后續多需求開發
5.2 修改架構
5.3采集節點配置文件
收集節點配置(3台):
agent.sources = r1
agent.channels = c1
agent.sinks = k1
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = qcloud-test-hadoop03:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest
agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000
agent.sinks.k1.type = avro
agent.sinks.k1.hostname = test-hadoop03
agent.sinks.k1.port=4545
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1
匯總節點配置(1台):
agent.sources = r1
agent.channels = memoryChannel
agent.sinks = hdfsSink
agent.sources.r1.type = avro
agent.sources.r1.bind = ip
agent.sources.r1.port = 4545
agent.sources.r1.batchSize = 100000
agent.sources.r1.batchDurationMillis = 1000
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.keep-alive=30
agent.channels.memoryChannel.capacity=120000
agent.channels.memoryChannel.transactionCapacity=100000
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.hdfs.path=hdfs://nameser/data/hm2/%Y-%m-%d-%H
agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.rollCount = 0
agent.sinks.hdfsSink.hdfs.rollSize = 134217728
agent.sinks.hdfsSink.hdfs.rollInterval = 60
agent.sinks.hdfsSink.hdfs.fileType=DataStream
agent.sinks.hdfsSink.hdfs.idleTimeout=65
agent.sinks.hdfsSink.hdfs.callTimeout=65000
agent.sinks.hdfsSink.hdfs.threadsPoolSize=300
agent.sinks.hdfsSink.channel = memoryChannel
agent.sources.r1.channels = memoryChannel
5.4 架構注意點
- 當前架構需要保證聚合節點機器的性能
- 當前架構新的瓶頸可能會存在存儲Hdfs數據時過慢 ,導致聚合節點Channels 占用率居高不下,導致堵塞 。
- 需要關注avro 自定義source sink 的發送效率
6.flume 監控工具(http)
flume 監控工具總共有三種方式 ,我們這里為方便簡單,使用內置http接口監控方式進行操作
6.1 配置
在啟動腳本處設置 參數 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545
即可
6.2 訪問 地址 :
6.3 返回結果示例 和字段解釋 :
{
"CHANNEL.memoryChannel": {
"ChannelCapacity": "550000",
"ChannelFillPercentage": "0.18181818181818182",
"Type": "CHANNEL",
"ChannelSize": "1000",
"EventTakeSuccessCount": "33541400",
"EventTakeAttemptCount": "33541527",
"StartTime": "1536572886273",
"EventPutAttemptCount": "33542500",
"EventPutSuccessCount": "33542500",
"StopTime": "0"
},
"SINK.hdfsSink": {
"ConnectionCreatedCount": "649",
"ConnectionClosedCount": "648",
"Type": "SINK",
"BatchCompleteCount": "335414",
"BatchEmptyCount": "27",
"EventDrainAttemptCount": "33541500",
"StartTime": "1536572886275",
"EventDrainSuccessCount": "33541400",
"BatchUnderflowCount": "0",
"StopTime": "0",
"ConnectionFailedCount": "0"
},
"SOURCE.avroSource": {
"EventReceivedCount": "33542500",
"AppendBatchAcceptedCount": "335425",
"Type": "SOURCE",
"EventAcceptedCount": "33542500",
"AppendReceivedCount": "0",
"StartTime": "1536572886465",
"AppendAcceptedCount": "0",
"OpenConnectionCount": "3",
"AppendBatchReceivedCount": "335425",
"StopTime": "0"
}
}
參數定義:
字段名稱 | 含義 | 備注 |
---|---|---|
SOURCE.OpenConnectionCount | 打開的連接數 | |
SOURCE.TYPE | 組件類型 | |
SOURCE.AppendBatchAcceptedCount | 追加到channel中的批數量 | |
SOURCE.AppendBatchReceivedCount | source端剛剛追加的批數量 | |
SOURCE.EventAcceptedCount | 成功放入channel的event數量 | |
SOURCE.AppendReceivedCount | source追加目前收到的數量 | |
SOURCE.StartTime(StopTIme) | 組件開始時間、結束時間 | |
SOURCE.EventReceivedCount | source端成功收到的event數量 | |
SOURCE.AppendAcceptedCount | source追加目前放入channel的數量 | |
CHANNEL.EventPutSuccessCount | 成功放入channel的event數量 | |
CHANNEL.ChannelFillPercentage | 通道使用比例 | |
CHANNEL.EventPutAttemptCount | 嘗試放入將event放入channel的次數 | |
CHANNEL.ChannelSize | 目前在channel中的event數量 | |
CHANNEL.EventTakeSuccessCount | 從channel中成功取走的event數量 | |
CHANNEL.ChannelCapacity | 通道容量 | |
CHANNEL.EventTakeAttemptCount | 嘗試從channel中取走event的次數 | |
SINK.BatchCompleteCount | 完成的批數量 | |
SINK.ConnectionFailedCount | 連接失敗數 | |
SINK.EventDrainAttemptCount | 嘗試提交的event數量 | |
SINK.ConnectionCreatedCount | 創建連接數 | |
SINK.Type | 組件類型 | |
SINK.BatchEmptyCount | 批量取空的數量 | |
SINK.ConnectionClosedCount | 關閉連接數量 | |
SINK.EventDrainSuccessCount | 成功發送event的數量 | |
SINK.BatchUnderflowCount | 正處於批量處理的batch數 |
參考地址
flume-document : http://flume.apache.org/FlumeUserGuide.html