1. Source
NetCat Source:綁定的端口(tcp、udp),將流經端口的每一個文本行數據作為Event輸入;
type:source的類型,必須是netcat。
bind:要監聽的(本機的)主機名或者ip。此監聽不是過濾發送方。一台電腦不是說只有一個IP。有多網卡的電腦,對應多個IP。
port:綁定的本地的端口。
Avro Source:監聽一個avro服務端口,采集Avro數據序列化后的數據;
type:avrosource的類型,必須是avro。
bind:要監聽的(本機的)主機名或者ip。此監聽不是過濾發送方。一台電腦不是說只有一個IP。有多網卡的電腦,對應多個IP。
port:綁定的本地的端口。
Exec Source:於Unix的command在標准輸出上采集數據;
type:source的類型:必須是exec。
command:要執行命令。
Spooling Directory Source:監聽一個文件夾里的文件的新增,如果有則采集作為source。
type:source 的類型:必須是spooldir
spoolDir:監聽的文件夾 【提前創建目錄】
fileSuffix:上傳完畢后文件的重命名后綴,默認為.COMPLETED
deletePolicy:上傳后的文件的刪除策略never和immediate,默認為never。
fileHeader:是否要加上該文件的絕對路徑在header里,默認是false。
basenameHeader:是否要加上該文件的名稱在header里,默認是false。
2. Sink
HDFS Sink:將數據傳輸到hdfs集群中。
type:sink的類型 必須是hdfs。
hdfs.path:hdfs的上傳路徑。
hdfs.filePrefix:hdfs文件的前綴。默認是:FlumeData
hdfs.rollInterval:間隔多久產生新文件,默認是:30(秒) 0表示不以時間間隔為准。
hdfs.rollSize:文件到達多大再產生一個新文件,默認是:1024(bytes)0表示不以文件大小為准。
hdfs.rollCount:event達到多大再產生一個新文件,默認是:10(個)0表示不以event數目為准。
hdfs.batchSize:每次往hdfs里提交多少個event,默認為100
hdfs.fileType:hdfs文件的格式主要包括:SequenceFile, DataStream ,CompressedStream,如果使用了CompressedStream就要設置壓縮方式。
hdfs.codeC:壓縮方式:gzip, bzip2, lzo, lzop, snappy
注:%{host}可以使用header的key。以及%Y%m%d來表示時間,但關於時間的表示需要在header里有timestamp這個key。
Logger Sink將數據作為日志處理(根據flume中的設置的日志方式來顯示)
要在控制台顯示在運行agent的時候加入:-Dflume.root.logger=INFO,console 。
type:sink的類型:必須是 logger。
maxBytesToLog:打印body的最長的字節數 默認為16
Avro Sink:數據被轉換成Avro Event,然后發送到指定的服務端口上。
type:sink的類型:必須是 avro。
hostname:指定發送數據的主機名或者ip
port:指定發送數據的端口
File Roll Sink:數據發送到本地文件。
type:sink的類型:必須是 file_roll。
sink.directory:存儲文件的目錄【提前創建目錄】
batchSize:一次發送多少個event。默認為100
sink.rollInterval:多久產生一個新文件,默認為30s。單位是s。0為不產生新文件。【即使沒有數據也會產生文件】
3.Channel
Memory Channel使用內存作為數據的存儲。
Type channel的類型:必須為memory
capacity:channel中的最大event數目
transactionCapacity:channel中允許事務的最大event數目
File Channel 使用文件作為數據的存儲
Type channel的類型:必須為 file
checkpointDir :檢查點的數據存儲目錄【提前創建目錄】
dataDirs :數據的存儲目錄【提前創建目錄】
transactionCapacity:channel中允許事務的最大event數目
Spillable Memory Channel 使用內存作為channel超過了閥值就存在文件中
Type channel的類型:必須為SPILLABLEMEMORY
memoryCapacity:內存的容量event數
overflowCapacity:數據存到文件的event閥值數
checkpointDir:檢查點的數據存儲目錄
dataDirs:數據的存儲目錄
4. Interceptor
Timestamp Interceptor 時間戳攔截器 在header里加入key為timestamp,value為當前時間。
type:攔截器的類型,必須為timestamp
preserveExisting:如果此攔截器增加的key已經存在,如果這個值設置為true則保持原來的值,否則覆蓋原來的值。默認為false
Host Interceptor 主機名或者ip攔截器,在header里加入ip或者主機名
type:攔截器的類型,必須為host
preserveExisting:如果此攔截器增加的key已經存在,如果這個值設置為true則保持原來的值,否則覆蓋原來的值。默認為false
useIP:如果設置為true則使用ip地址,否則使用主機名,默認為true
hostHeader:使用的header的key名字,默認為host
Static Interceptor 靜態攔截器,是在header里加入固定的key和value。
type:avrosource的類型,必須是static。
preserveExisting:如果此攔截器增加的key已經存在,如果這個值設置為true則保持原來的值,否則覆蓋原來的值。默認為false
key:靜態攔截器添加的key的名字
value:靜態攔截器添加的key對應的value值
5. Channel Selector
Multiplexing Channel Selector 根據header的key的值分配channel
selector.type 默認為replicating
selector.header:選擇作為判斷的key
selector.default:默認的channel配置
selector.mapping.*:匹配到的channel的配置
6. Sink Processor
負載均衡
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=load_balance
a1.sinkgroups.g1.processor.backoff=true
a1.sinkgroups.g1.processor.selector=round_robin
a1.sinkgroups.g1.processor.selector.maxTimeOut=30000
backoff:開啟后,故障的節點會列入黑名單,過一定時間再次發送,如果還失敗,則等待是指數增長;直到達到最大的時間。
如果不開啟,故障的節點每次都會被重試。
selector.maxTimeOut:最大的黑名單時間(單位為毫秒)。
故障轉移
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=failover
a1.sinkgroups.g1.processor.priority.k1=10
a1.sinkgroups.g1.processor.priority.k2=5
a1.sinkgroups.g1.processor.maxpenalty=10000
#maxpenalty 對於故障的節點最大的黑名單時間 (in millis 毫秒)