flume 單機問題解決與架構更改


引言

今天針對線上生產環境下單機 flume 拉取kafka數據並存儲數據入Hdfs 出現大批量數據延遲. 在網上官網各種搜索數據,並結合官網數據,現進行以下總結

1. 線上單機存在問題簡述

當前flume拉取kafa數據量並不大 ,根據flume客戶端日志 ,每半分鍾hdfs文件寫入一次數據生成文件
發現問題:
拉取kafka數據過慢

2. 解決思路

  1. 加大kafka拉取數據量
  2. 加大flume中channel,source,sink 各通道的單條數據量
  3. 將flume拉取數據單機版本改成多數據拉取,通過flume-avore-sink-> flume-avore-source 進行數據多數據采取並合並

3 加大kafka拉取數據量

3.1 kafka-source簡述

  • flume 輸入單線程拉取數據並將數據發送內置channel並通過sink組件進行數據轉發和處理,故對於kafka集群多副本方式拉取數據的時候,應適當考慮多個flume節點拉取kafka多副本數據,以避免flume節點在多個kafka集群副本中輪詢。加大flume拉取kafka數據的速率。
  • flume-kafka-source 是flume內置的kafka source數據組件,是為了拉取kafka數據,配置如下:
agent.sources = r1
 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest
  • flume-kafka-source 的offset是交由zk集群去維護offset

3.2 kafka-source配置詳解

Kafka Source是一個Apache Kafka消費者,它從Kafka主題中讀取消息。 如果您正在運行多個Kafka源,則可以使用相同的使用者組配置它們,以便每個源都讀取一組唯一的主題分區。

Property Name Default Description
channels 配置的channels 可配置多個channels 后續文章會說到
type org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers 配置kafka集群地址
kafka.consumer.group.id flume 唯一確定的消費者群體。 在多個源或代理中設置相同的ID表示它們是同一個使用者組的一部分
kafka.topics 你需要消費的topic
kafka.topics.regex 正則表達式,用於定義源訂閱的主題集。 此屬性的優先級高於kafka.topics,如果存在則覆蓋kafka.topics
batchSize 1000 一批中寫入Channel的最大消息數 (優化項)
batchDurationMillis 1000 將批次寫入通道之前的最長時間(以毫秒為單位)只要達到第一個大小和時間,就會寫入批次。(優化項)
backoffSleepIncrement 1000 Kafka主題顯示為空時觸發的初始和增量等待時間。 等待時間將減少對空kafka主題的激進ping操作。 一秒鍾是攝取用例的理想選擇,但使用攔截器的低延遲操作可能需要較低的值。
maxBackoffSleep 5000 Kafka主題顯示為空時觸發的最長等待時間。 5秒是攝取用例的理想選擇,但使用攔截器的低延遲操作可能需要較低的值。
useFlumeEventFormat false 默認情況下,事件從Kafka主題直接作為字節直接進入事件主體。 設置為true以將事件讀取為Flume Avro二進制格式。 與KafkaSink上的相同屬性或Kafka Channel上的parseAsFlumeEvent屬性一起使用時,這將保留在生成端發送的任何Flume標頭。
setTopicHeader true 設置為true時,將檢索到的消息的主題存儲到標題中,該標題由topicHeader屬性定義。
topicHeader topic 如果setTopicHeader屬性設置為true,則定義用於存儲接收消息主題名稱的標題的名稱。 如果與Kafka SinktopicHeader屬性結合使用,應該小心,以避免在循環中將消息發送回同一主題。
migrateZookeeperOffsets true 如果找不到Kafka存儲的偏移量,請在Zookeeper中查找偏移量並將它們提交給Kafka。 這應該是支持從舊版本的Flume無縫Kafka客戶端遷移。 遷移后,可以將其設置為false,但通常不需要這樣做。 如果未找到Zookeeper偏移量,則Kafka配置kafka.consumer.auto.offset.reset定義如何處理偏移量。 查看[Kafka文檔](http://kafka.apache.org/documentation.html#newconsumerconfigs)了解詳細信息
kafka.consumer.security.protocol PLAINTEXT 如果使用某種級別的安全性寫入Kafka,則設置為SASL_PLAINTEXT,SASL_SSL或SSL。
Other Kafka Consumer Properties 這些屬性用於配置Kafka Consumer。 可以使用Kafka支持的任何消費者財產。 唯一的要求是在前綴為“kafka.consumer”的前綴中添加屬性名稱。 例如:kafka.consumer.auto.offset.reset

注意:
Kafka Source會覆蓋兩個Kafka使用者參數:source.com將auto.commit.enable設置為“false”,並提交每個批處理。 Kafka源至少保證一次消息檢索策略。 源啟動時可以存在重復項。 Kafka Source還提供了key.deserializer(org.apache.kafka.common.serialization.StringSerializer)和value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer)的默認值。 不建議修改這些參數。
官方配置示例:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
Example for topic subscription by regex

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used

本案例kafka-source配置

agent.sources = r1
agent.sources.r1.channels=c1
 agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = test-hadoop01:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest

官網配置文件地址kafka-source

3.3 配置優化

主要是在放入flume-channels 的批量數據加大
更改參數:
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
更改解釋:
即每2秒鍾拉取 kafka 一批數據 批數據大小為50000 放入到flume-channels 中 。即flume該節點 flume-channels 輸入端數據已放大

更改依據:

  • 需要配置kafka單條數據 broker.conf 中配置 message.max.bytes
  • 當前flume channel sink 組件最大消費能力如何?

4. 加大flume中channel,source,sink 各通道的單條數據量

4.1 source 發送至channels 數據量大小已配置 見 3.3

4.2 channel 配置

Property Name Default Description
type The component type name, needs to be memory
capacity 100 通道中存儲的最大事件數 (優化項)
transactionCapacity 100 每個事務通道從源或提供給接收器的最大事件數 (優化項)
keep-alive 3 添加或刪除事件的超時(以秒為單位)
byteCapacityBufferPercentage 20 定義byteCapacity與通道中所有事件的估計總大小之間的緩沖區百分比,以計算標頭中的數據。 見下文。
byteCapacity see description 允許的最大總字節作為此通道中所有事件的總和。 實現只計算Eventbody,這也是提供byteCapacityBufferPercentage配置參數的原因。 默認為計算值,等於JVM可用的最大內存的80%(即命令行傳遞的-Xmx值的80%)。 請注意,如果在單個JVM上有多個內存通道,並且它們碰巧保持相同的物理事件(即,如果您使用來自單個源的復制通道選擇器),那么這些事件大小可能會因為通道byteCapacity目的而被重復計算。 將此值設置為“0”將導致此值回退到大約200 GB的內部硬限制。

配置 capacity 和 transactionCapacity 值 。默認配置規則為:
$$
channels.capacity >= channels.transactionCapacity >= source.batchSize
$$
官方channels配置示例

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

本案例修改之后的channels 配置

agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000

5. 將flume拉取數據單機版本改成多數據拉取,通過flume-avore-sink-> flume-avore-source 進行數據多數據采取並合並

5.1 存在問題

通過上續修改會發現單機版本的flume會在多副本kafka輪詢造成效率浪費
單機版本flume處理數據時會存在單機瓶頸,單機channels可能最多只能處理最大數據無法擴充
單機flume配置多個數據源不方便,不能適合后續多需求開發

5.2 修改架構

多數據聚合架構

5.3采集節點配置文件

收集節點配置(3台):

agent.sources = r1
agent.channels = c1
agent.sinks = k1

agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.batchSize = 50000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = qcloud-test-hadoop03:9092
agent.sources.r1.kafka.topics = topicTest
agent.sources.r1.kafka.consumer.group.id = groupTest

agent.channels.c1.type = memory
agent.channels.c1.capacity=550000
agent.channels.c1.transactionCapacity=520000

agent.sinks.k1.type = avro
agent.sinks.k1.hostname = test-hadoop03
agent.sinks.k1.port=4545

agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

匯總節點配置(1台):

agent.sources = r1
agent.channels = memoryChannel
agent.sinks = hdfsSink


agent.sources.r1.type = avro
agent.sources.r1.bind = ip
agent.sources.r1.port = 4545
agent.sources.r1.batchSize = 100000
agent.sources.r1.batchDurationMillis = 1000


agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.keep-alive=30
agent.channels.memoryChannel.capacity=120000
agent.channels.memoryChannel.transactionCapacity=100000


agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.hdfs.path=hdfs://nameser/data/hm2/%Y-%m-%d-%H
agent.sinks.hdfsSink.hdfs.writeFormat=Text

agent.sinks.hdfsSink.hdfs.rollCount = 0

agent.sinks.hdfsSink.hdfs.rollSize = 134217728

agent.sinks.hdfsSink.hdfs.rollInterval = 60
agent.sinks.hdfsSink.hdfs.fileType=DataStream
agent.sinks.hdfsSink.hdfs.idleTimeout=65
agent.sinks.hdfsSink.hdfs.callTimeout=65000
agent.sinks.hdfsSink.hdfs.threadsPoolSize=300

agent.sinks.hdfsSink.channel = memoryChannel
agent.sources.r1.channels = memoryChannel

5.4 架構注意點

  • 當前架構需要保證聚合節點機器的性能
  • 當前架構新的瓶頸可能會存在存儲Hdfs數據時過慢 ,導致聚合節點Channels 占用率居高不下,導致堵塞 。
  • 需要關注avro 自定義source sink 的發送效率

6.flume 監控工具(http)

flume 監控工具總共有三種方式 ,我們這里為方便簡單,使用內置http接口監控方式進行操作

6.1 配置

在啟動腳本處設置 參數 -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 即可

6.2 訪問 地址 :

http://flumeIp:34545

6.3 返回結果示例 和字段解釋 :

{
	"CHANNEL.memoryChannel": {
		"ChannelCapacity": "550000",
		"ChannelFillPercentage": "0.18181818181818182",
		"Type": "CHANNEL",
		"ChannelSize": "1000",
		"EventTakeSuccessCount": "33541400",
		"EventTakeAttemptCount": "33541527",
		"StartTime": "1536572886273",
		"EventPutAttemptCount": "33542500",
		"EventPutSuccessCount": "33542500",
		"StopTime": "0"
	},
	"SINK.hdfsSink": {
		"ConnectionCreatedCount": "649",
		"ConnectionClosedCount": "648",
		"Type": "SINK",
		"BatchCompleteCount": "335414",
		"BatchEmptyCount": "27",
		"EventDrainAttemptCount": "33541500",
		"StartTime": "1536572886275",
		"EventDrainSuccessCount": "33541400",
		"BatchUnderflowCount": "0",
		"StopTime": "0",
		"ConnectionFailedCount": "0"
	},
	"SOURCE.avroSource": {
		"EventReceivedCount": "33542500",
		"AppendBatchAcceptedCount": "335425",
		"Type": "SOURCE",
		"EventAcceptedCount": "33542500",
		"AppendReceivedCount": "0",
		"StartTime": "1536572886465",
		"AppendAcceptedCount": "0",
		"OpenConnectionCount": "3",
		"AppendBatchReceivedCount": "335425",
		"StopTime": "0"
	}
}

參數定義:

字段名稱 含義 備注
SOURCE.OpenConnectionCount 打開的連接數
SOURCE.TYPE 組件類型
SOURCE.AppendBatchAcceptedCount 追加到channel中的批數量
SOURCE.AppendBatchReceivedCount source端剛剛追加的批數量
SOURCE.EventAcceptedCount 成功放入channel的event數量
SOURCE.AppendReceivedCount source追加目前收到的數量
SOURCE.StartTime(StopTIme) 組件開始時間、結束時間
SOURCE.EventReceivedCount source端成功收到的event數量
SOURCE.AppendAcceptedCount source追加目前放入channel的數量
CHANNEL.EventPutSuccessCount 成功放入channel的event數量
CHANNEL.ChannelFillPercentage 通道使用比例
CHANNEL.EventPutAttemptCount 嘗試放入將event放入channel的次數
CHANNEL.ChannelSize 目前在channel中的event數量
CHANNEL.EventTakeSuccessCount 從channel中成功取走的event數量
CHANNEL.ChannelCapacity 通道容量
CHANNEL.EventTakeAttemptCount 嘗試從channel中取走event的次數
SINK.BatchCompleteCount 完成的批數量
SINK.ConnectionFailedCount 連接失敗數
SINK.EventDrainAttemptCount 嘗試提交的event數量
SINK.ConnectionCreatedCount 創建連接數
SINK.Type 組件類型
SINK.BatchEmptyCount 批量取空的數量
SINK.ConnectionClosedCount 關閉連接數量
SINK.EventDrainSuccessCount 成功發送event的數量
SINK.BatchUnderflowCount 正處於批量處理的batch數

參考地址
flume-document : http://flume.apache.org/FlumeUserGuide.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM