這里主要介紹幾種常見的日志的source來源,包括監控文件型,監控文件內容增量,TCP和HTTP。
Spool類型
用於監控指定目錄內數據變更,若有新文件,則將新文件內數據讀取上傳
在教你一步搭建Flume分布式日志系統最后有介紹此案例
Exec
EXEC執行一個給定的命令獲得輸出的源,如果要使用tail命令,必選使得file足夠大才能看到輸出內容
創建agent配置文件
# vi /usr/local/flume170/conf/exec_tail.conf
a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.channels = c1 c2 a1.sources.r1.command = tail -F /var/log/haproxy.log # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = file a1.channels.c2.checkpointDir = /usr/local/flume170/checkpoint a1.channels.c2.dataDirs = /usr/local/flume170/data # Describe the sink a1.sinks.k1.type = logger a1.sinks.k1.channel =c1 a1.sinks.k2.type = FILE_ROLL a1.sinks.k2.channel = c2 a1.sinks.k2.sink.directory = /usr/local/flume170/files a1.sinks.k2.sink.rollInterval = 0
啟動flume agent a1
# /usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/exec_tail.conf -n a1 -Dflume.root.logger=INFO,console
生成足夠多的內容在文件里
# for i in {1..100};do echo "exec tail$i" >> /usr/local/flume170/log_exec_tail;echo $i;sleep 0.1;done
在H32的控制台,可以看到以下信息:


Http
JSONHandler型
基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式
創建agent配置文件
# vi /usr/local/flume170/conf/post_json.conf
a1.sources = r1 a1.channels = c1 a1.sinks = k1 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.http.HTTPSource a1.sources.r1.port = 5142 a1.sources.r1.channels = c1 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Describe the sink a1.sinks.k1.type = logger a1.sinks.k1.channel = c1
啟動flume agent a1
# /usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/post_json.conf -n a1 -Dflume.root.logger=INFO,console
生成JSON 格式的POST request
# curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "idoall.org_body"}]' http://localhost:8888
在H32的控制台,可以看到以下信息:
![]()


Tcp
Syslogtcp監聽TCP的端口做為數據源
創建agent配置文件
# vi /usr/local/flume170/conf/syslog_tcp.conf
a1.sources = r1 a1.channels = c1 a1.sinks = k1 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.host = H32 a1.sources.r1.channels = c1 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Describe the sink a1.sinks.k1.type = logger a1.sinks.k1.channel = c1
啟動flume agent a1
# /usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/syslog_tcp.conf -n a1 -Dflume.root.logger=INFO,console
測試產生syslog
# echo "hello idoall.org syslog" | nc localhost 5140
在H32的控制台,可以看到以下信息:


Flume Sink Processors和Avro類型
Avro可以發送一個給定的文件給Flume,Avro 源使用AVRO RPC機制。
failover的機器是一直發送給其中一個sink,當這個sink不可用的時候,自動發送到下一個sink。channel的transactionCapacity參數不能小於sink的batchsiz
在H32創建Flume_Sink_Processors配置文件
# vi /usr/local/flume170/conf/Flume_Sink_Processors.conf
a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 c2 a1.sources.r1.selector.type = replicating # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = H32 a1.sinks.k1.port = 5141 a1.sinks.k2.type = avro a1.sinks.k2.channel = c2 a1.sinks.k2.hostname = H33 a1.sinks.k2.port = 5141 # 這個是配置failover的關鍵,需要有一個sink group a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 # 處理的類型是failover a1.sinkgroups.g1.processor.type = failover # 優先級,數字越大優先級越高,每個sink的優先級必須不相同 a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 # 設置為10秒,當然可以根據你的實際狀況更改成更快或者很慢 a1.sinkgroups.g1.processor.maxpenalty = 10000
在H32創建Flume_Sink_Processors_avro配置文件
# vi /usr/local/flume170/conf/Flume_Sink_Processors_avro.conf
a1.sources = r1 a1.channels = c1 a1.sinks = k1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5141 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Describe the sink a1.sinks.k1.type = logger a1.sinks.k1.channel = c1
將2個配置文件復制到H33上一份
/usr/local/flume170# scp -r /usr/local/flume170/conf/Flume_Sink_Processors.conf H33:/usr/local/flume170/conf/Flume_Sink_Processors.conf
/usr/local/flume170# scp -r /usr/local/flume170/conf/Flume_Sink_Processors_avro.conf H33:/usr/local/flume170/conf/Flume_Sink_Processors_avro.conf
打開4個窗口,在H32和H33上同時啟動兩個flume agent
# /usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
# /usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/Flume_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
然后在H32或H33的任意一台機器上,測試產生log
# echo "idoall.org test1 failover" | nc H32 5140
因為H33的優先級高,所以在H33的sink窗口,可以看到以下信息,而H32沒有:

這時我們停止掉H33機器上的sink(ctrl+c),再次輸出測試數據
# echo "idoall.org test2 failover" | nc localhost 5140
可以在H32的sink窗口,看到讀取到了剛才發送的兩條測試數據:

我們再在H33的sink窗口中,啟動sink:
# /usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/Flume_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
輸入兩批測試數據:
# echo "idoall.org test3 failover" | nc localhost 5140 && echo "idoall.org test4 failover" | nc localhost 5140
在H33的sink窗口,我們可以看到以下信息,因為優先級的關系,log消息會再次落到H33上:

Load balancing Sink Processor
load balance type和failover不同的地方是,load balance有兩個配置,一個是輪詢,一個是隨機。兩種情況下如果被選擇的sink不可用,就會自動嘗試發送到下一個可用的sink上面。
在H32創建Load_balancing_Sink_Processors配置文件
# vi /usr/local/flume170/conf/Load_balancing_Sink_Processors.conf
a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 # Describe/configure the source a1.sources.r1.type = syslogtcp a1.sources.r1.port = 5140 a1.sources.r1.channels = c1 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = H32 a1.sinks.k1.port = 5141 a1.sinks.k2.type = avro a1.sinks.k2.channel = c1 a1.sinks.k2.hostname = H33 a1.sinks.k2.port = 5141 # 這個是配置failover的關鍵,需要有一個sink group a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 # 處理的類型是load_balance a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin
在H32創建Load_balancing_Sink_Processors_avro配置文件
# vi /usr/local/flume170/conf/Load_balancing_Sink_Processors_avro.conf
a1.sources = r1 a1.channels = c1 a1.sinks = k1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5141 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Describe the sink a1.sinks.k1.type = logger a1.sinks.k1.channel = c1
將2個配置文件復制到H33上一份
/usr/local/flume170# scp -r /usr/local/flume170/conf/Load_balancing_Sink_Processors.conf H33:/usr/local/flume170/conf/Load_balancing_Sink_Processors.conf
/usr/local/flume170# scp -r /usr/local/flume170/conf/Load_balancing_Sink_Processors_avro.conf H33:/usr/local/flume170/conf/Load_balancing_Sink_Processors_avro.conf
打開4個窗口,在H32和H33上同時啟動兩個flume agent
# /usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/Load_balancing_Sink_Processors_avro.conf -n a1 -Dflume.root.logger=INFO,console
# /usr/local/flume170/bin/flume-ng agent -c . -f /usr/local/flume170/conf/Load_balancing_Sink_Processors.conf -n a1 -Dflume.root.logger=INFO,console
然后在H32或H33的任意一台機器上,測試產生log,一行一行輸入,輸入太快,容易落到一台機器上
# echo "idoall.org test1" | nc H32 5140
# echo "idoall.org test2" | nc H32 5140
# echo "idoall.org test3" | nc H32 5140
# echo "idoall.org test4" | nc H32 5140
在H32的sink窗口,可以看到以下信息
1. 14/08/10 15:35:29 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 32 idoall.org test2 }
2. 14/08/10 15:35:33 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 34 idoall.org test4 }
在H33的sink窗口,可以看到以下信息:
1. 14/08/10 15:35:27 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 31 idoall.org test1 }
2. 14/08/10 15:35:29 INFO sink.LoggerSink: Event: { headers:{Severity=0, flume.syslog.status=Invalid, Facility=0} body: 69 64 6F 61 6C 6C 2E 6F 72 67 20 74 65 73 74 33 idoall.org test3 }
說明輪詢模式起到了作用。


以上均是建立在H32和H33能互通,且Flume配置都正確的情況下運行,且都是非常簡單的場景應用,值得注意的一點是Flume說是日志收集,其實還可以廣泛的認為“日志”可以當作是信息流,不局限於認知的日志。
