flume參數:
#example.conf:單節點Flume配置 #命名此代理上的組件 a1.sources = r1 a1.sinks = k1 a1.channels = c1 #描述/配置源 a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 #描述接收器 a1.sinks.k1.type = logger #使用緩沖內存中事件的通道 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #將源和接收器綁定到通道 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
此配置定義名為a1的單個代理。a1有一個偵聽端口44444上的數據的源,一個緩沖內存中事件數據的通道,以及一個將事件數據記錄到控制台的接收器。
根據scource、channel、sink划分
1、Sources
Flume中常用的Source有NetCat,Avro,Exec,Spooling Directory,Taildir,也可以根據業務場景的需要自定義Source,具體介紹如下。
1)NetCat Source
NetCat Source可以使用TCP和UDP兩種協議方式,使用方法基本相同,通過監聽指定的IP和端口來傳輸數據,它會將監聽到的每一行數據轉化成一個Event寫入到Channel中。(必須參數以@標示,下類同)
Property Name Default Description channels@ – type@ – 類型指定為:netcat bind@ – 綁定機器名或IP地址 port@ – 端口號 max-line-length 512 一行的最大字節數 ack-every-event true 對成功接受的Event返回OK selector.type replicating 選擇器類型replicating or multiplexing selector.* 選擇器相關參數 interceptors – 攔截器列表,多個以空格分隔 interceptors.* 攔截器相關參數
2)Avro Source
不同主機上的Agent通過網絡傳輸數據可使用的Source,一般是接受Avro client的數據,或和是上一級Agent的Avro Sink成對存在。
Property Name Default Description channels@ – type@ – 類型指定為:avro bind@ – 監聽的主機名或IP地址 port@ – 端口號 threads – 傳輸可使用的最大線程數 selector.type selector.* interceptors – 攔截器列表 interceptors.* compression-type none 可設置為“none” 或 “deflate”. 壓縮類型需要和AvroSource匹配
3)Exec Source
-
Exec source通過執行給定的Unix命令的傳輸結果數據,如,cat,tail -F等,實時性比較高,但是一旦Agent進程出現問題,可能會導致數據的丟失。
Property Name Default Description channels@ – type@ – 類型指定為:exec command@ – 需要去執行的命令 shell – 運行命令的shell腳本文件 restartThrottle 10000 嘗試重啟的超時時間 restart false 如果命令執行失敗,是否重啟 logStdErr false 是否記錄錯誤日志 batchSize 20 批次寫入channel的最大日志數量 batchTimeout 3000 批次寫入數據的最大等待時間(毫秒) selector.type replicating 選擇器類型replicating or multiplexing selector.* 選擇器其他參數 interceptors – 攔截器列表,多個空格分隔 interceptors.*
4)Spooling Directory Source
通過監控一個文件夾將新增文件內容轉換成Event傳輸數據,特點是不會丟失數據,使用Spooling Directory Source需要注意的兩點是,
1)不能對被監控的文件夾下的新增的文件做出任何更改,
2)新增到監控文件夾的文件名稱必須是唯一的。由於是對整個新增文件的監控,Spooling Directory Source的實時性相對較低,不過可以采用對文件高粒度分割達到近似實時。
Property Name Default Description channels@ – type@ – 類型指定:spooldir. spoolDir@ – 被監控的文件夾目錄 fileSuffix .COMPLETED 完成數據傳輸的文件后綴標志 deletePolicy never 刪除已經完成數據傳輸的文件時間:never or immediate fileHeader false 是否在header中添加文件的完整路徑信息 fileHeaderKey file 如果header中添加文件的完整路徑信息時key的名稱 basenameHeader false 是否在header中添加文件的基本名稱信息 basenameHeaderKey basename 如果header中添加文件的基本名稱信息時key的名稱 includePattern ^.*$ 使用正則來匹配新增文件需要被傳輸數據的文件 ignorePattern ^$ 使用正則來忽略新增的文件 trackerDir .flumespool 存儲元數據信息目錄 consumeOrder oldest 文件消費順序:oldest, youngest and random. maxBackoff 4000 如果channel容量不足,嘗試寫入的超時時間,如果仍然不能寫入,則會拋出ChannelException batchSize 100 批次處理粒度 inputCharset UTF-8 輸入碼表格式 decodeErrorPolicy FAIL 遇到不可解碼字符后的處理方式:FAIL,REPLACE,IGNORE selector.type replicating 選擇器類型:replicating or multiplexing selector.* 選擇器其他參數 interceptors – 攔截器列表,空格分隔 interceptors.*
5)Taildir Source
可以實時的監控指定一個或多個文件中的新增內容,由於該方式將數據的偏移量保存在一個指定的json文件中,即使在Agent掛掉或被kill也不會有數據的丟失,需要注意的是,該Source不能在Windows上使用。
Property Name Default Description channels@ – type@ – 指定類型:TAILDIR. filegroups@ – 文件組的名稱,多個空格分隔 filegroups.<filegroupName>@ – 被監控文件的絕對路徑 positionFile ~/.flume/taildir_position.json 存儲數據偏移量路徑 headers.<filegroupName>.<headerKey> – Header key的名稱 byteOffsetHeader false 是否添加字節偏移量到key為‘byteoffset’值中 skipToEnd false 當偏移量不能寫入到文件時是否跳到文件結尾 idleTimeout 120000 關閉沒有新增內容的文件超時時間(毫秒) writePosInterval 3000 在positionfile 寫入每一個文件lastposition的時間間隔 batchSize 100 批次處理行數 fileHeader false 是否添加header存儲文件絕對路徑 fileHeaderKey file fileHeader啟用時,使用的key
2、Channels
官網提供的Channel有多種類型可供選擇,這里介紹Memory Channel和File Channel。
1)Memory Channel
Memory Channel是使用內存來存儲Event,使用內存的意味着數據傳輸速率會很快,但是當Agent掛掉后,存儲在Channel中的數據將會丟失。
Property Name Default Description type@ – 類型指定為:memory capacity 100 存儲在channel中的最大容量 transactionCapacity 100 從一個source中去或者給一個sink,每個事務中最大的事件數 keep-alive 3 對於添加或者刪除一個事件的超時的秒鍾 byteCapacityBufferPercentage 20 定義緩存百分比 byteCapacity see description Channel中允許存儲的最大字節總數
2)File Channel
File Channel使用磁盤來存儲Event,速率相對於Memory Channel較慢,但數據不會丟失。
Property Name Default Description type@ – 類型指定:file. checkpointDir ~/.flume/file-channel/checkpoint checkpoint目錄 useDualCheckpoints false 備份checkpoint,為True,backupCheckpointDir必須設置 backupCheckpointDir – 備份checkpoint目錄 dataDirs ~/.flume/file-channel/data 數據存儲所在的目錄設置 transactionCapacity 10000 Event存儲最大值 checkpointInterval 30000 checkpoint間隔時間 maxFileSize 2146435071 單一日志最大設置字節數 minimumRequiredSpace 524288000 最小的請求閑置空間(以字節為單位) capacity 1000000 Channel最大容量 keep-alive 3 一個存放操作的等待時間值(秒) use-log-replay-v1 false Expert: 使用老的回復邏輯 use-fast-replay false Expert: 回復不需要隊列 checkpointOnClose true
3、Sinks
Flume常用Sinks有Log Sink,HDFS Sink,Avro Sink,Kafka Sink,當然也可以自定義Sink。
1)Logger Sink
Logger Sink以INFO 級別的日志記錄到log日志中,這種方式通常用於測試。
Property Name Default Description
channel@ –
type@ – 類型指定:logger
maxBytesToLog 16 能夠記錄的最大Event Body字節數
2)HDFS Sink
-
Sink數據到HDFS,目前支持text 和 sequence files兩種文件格式,支持壓縮,並可以對數據進行分區,分桶存儲。
Name Default Description channel@ – type@ – 指定類型:hdfs hdfs.path@ – HDFS的路徑,eg hdfs://namenode/flume/webdata/ hdfs.filePrefix FlumeData 保存數據文件的前綴名 hdfs.fileSuffix – 保存數據文件的后綴名 hdfs.inUsePrefix – 臨時寫入的文件前綴名 hdfs.inUseSuffix .tmp 臨時寫入的文件后綴名 hdfs.rollInterval 30 間隔多長將臨時文件滾動成最終目標文件,單位:秒, 如果設置成0,則表示不根據時間來滾動文件 hdfs.rollSize 1024 當臨時文件達到多少(單位:bytes)時,滾動成目標文件, 如果設置成0,則表示不根據臨時文件大小來滾動文件 hdfs.rollCount 10 當 events 數據達到該數量時候,將臨時文件滾動成目標文件, 如果設置成0,則表示不根據events數據來滾動文件 hdfs.idleTimeout 0 當目前被打開的臨時文件在該參數指定的時間(秒)內, 沒有任何數據寫入,則將該臨時文件關閉並重命名成目標文件 hdfs.batchSize 100 每個批次刷新到 HDFS 上的 events 數量 hdfs.codeC – 文件壓縮格式,包括:gzip, bzip2, lzo, lzop, snappy hdfs.fileType SequenceFile 文件格式,包括:SequenceFile, DataStream,CompressedStre, 當使用DataStream時候,文件不會被壓縮,不需要設置hdfs.codeC; 當使用CompressedStream時候,必須設置一個正確的hdfs.codeC值; hdfs.maxOpenFiles 5000 最大允許打開的HDFS文件數,當打開的文件數達到該值, 最早打開的文件將會被關閉 hdfs.minBlockReplicas – HDFS副本數,寫入 HDFS 文件塊的最小副本數。 該參數會影響文件的滾動配置,一般將該參數配置成1,才可以按照配置正確滾動文件 hdfs.writeFormat Writable 寫 sequence 文件的格式。包含:Text, Writable(默認) hdfs.callTimeout 10000 執行HDFS操作的超時時間(單位:毫秒) hdfs.threadsPoolSize 10 hdfs sink 啟動的操作HDFS的線程數 hdfs.rollTimerPoolSize 1 hdfs sink 啟動的根據時間滾動文件的線程數 hdfs.kerberosPrincipal – HDFS安全認證kerberos配置 hdfs.kerberosKeytab – HDFS安全認證kerberos配置 hdfs.proxyUser 代理用戶 hdfs.round false 是否啟用時間上的”舍棄” hdfs.roundValue 1 時間上進行“舍棄”的值 hdfs.roundUnit second 時間上進行”舍棄”的單位,包含:second,minute,hour hdfs.timeZone Local Time 時區。 hdfs.useLocalTimeStamp false 是否使用當地時間 hdfs.closeTries 0 Number hdfs sink 關閉文件的嘗試次數; 如果設置為1,當一次關閉文件失敗后,hdfs sink將不會再次嘗試關閉文件, 這個未關閉的文件將會一直留在那,並且是打開狀態; 設置為0,當一次關閉失敗后,hdfs sink會繼續嘗試下一次關閉,直到成功 hdfs.retryInterval 180 hdfs sink 嘗試關閉文件的時間間隔, 如果設置為0,表示不嘗試,相當於於將hdfs.closeTries設置成1 serializer TEXT 序列化類型 serializer.*
3)Avro Sink
Property Name Default Description channel@ – type@ – 指定類型:avro. hostname@ – 主機名或IP port@ – 端口號 batch-size 100 批次處理Event數 connect-timeout 20000 連接超時時間 request-timeout 20000 請求超時時間 compression-type none 壓縮類型,“none” or “deflate”. compression-level 6 壓縮級別,0表示不壓縮,1-9數字越大,壓縮比越高 ssl false 使用ssl加密
4)Kafka Sink
-
傳輸數據到Kafka中,需要注意的是Flume版本和Kafka版本的兼容性
Property Name Default Description type – 指定類型:org.apache.flume.sink.kafka.KafkaSink kafka.bootstrap.servers – kafka服務地址 kafka.topic default-flume-topic kafka Topic flumeBatchSize 100 批次寫入kafka Event數 kafka.producer.acks 1 多少個副本確認后才能確定消息傳遞成功,0表示不需要確認 1表示只需要首要的副本得到確認,-1表示等待所有確認。
啟動參數:
命令
bin / flume-ng agent -conf conf -z zkhost:2181,zkhost1:2181 -p / flume -name a1 -Dflume.root.logger = INFO,console
1、flume-ng agent 運行一個Flume Agent
2、-conf 指定配置文件,這個配置文件必須在全局選項的–conf參數定義的目錄下。
3、-z Zookeeper連接字符串。以逗號分隔的主機名列表:port
4、-p Zookeeper中的基本路徑,用於存儲代理配置
5、-name a1 Agent的名稱a1
6、-Dflume.root.logger=INFO,console 該參數將會把flume的日志輸出到console,為了將其輸出到日志文件(默認在$FLUME_HOME/logs),可以將console改為LOGFILE形式
具體的配置可以修改$FLUME_HOME/conf/log4j.properties
-Dflume.log.file=./wchatAgent.logs 該參數直接輸出日志到目標文件
具體:
https://blog.csdn.net/realoyou/article/details/81514128
參考:
https://www.jianshu.com/p/4252fbcdce79