flume的配置無非就是四步:1、創建一個配置文件 2、在其中配置source,sink,Channel 的各項參數 3、連接各個組件 4、調用啟動命令
配置參考官網http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
1、針對NetCat的配置
1.1選用NetCat TCP Source
這個source可以打開一個端口號,監聽端口號收到的消息!將消息的每行,封裝為一個event!
配置必要的參數
1.2選用Logger Sink
采用 logger以info級別將event輸出到(文件或控制台)默認輸出到日志文件中,可在啟動命令中加入
-Dflume.root.logger=DEBUG,console
這樣就可以打印到控制台,方便測試。
配置必要參數
1.3 選用Memory Channel
將event存儲在內存中的隊列中!一般適用於高吞吐量的場景,但是如果agent故障,會損失階段性的數據!
配置必要參數
1.4編寫配置文件
# 命名每個組件 a1代表agent的名稱 #a1.sources代表a1中配置的source,多個使用空格間隔 #a1.sinks代表a1中配置的sink,多個使用空格間隔 #a1.channels代表a1中配置的channel,多個使用空格間隔 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = hadoop103 a1.sources.r1.port = 44444 # 配置sink a1.sinks.k1.type = logger a1.sinks.k1.maxBytesToLog=100 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 # 綁定和連接組件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
1.5啟動命令
bin/flume-ng agent -c conf/ -n a1 -f flumeagents/netcatSource-loggersink.conf -Dflume.root.logger=DEBUG,console #參數說明: --conf/-c:表示配置文件存儲在conf/目錄 --name/-n:表示給agent起名為a1 --conf-file/-f:flume本次啟動讀取的配置文件是在job文件夾下的flume-telnet.conf文件。 -Dflume.root.logger=INFO,console :-D表示flume運行時動態修改flume.root.logger參數屬性值,並將控制台日志打印級別設置為INFO級別。
日志級別包括:log、info、warn、error。console是將結果打印到控制台,方便測試
2、針對讀取日志文件的配置
2.1.1選用
示例:
# 配置source a1.sources.r1.type = exec a1.sources.r1.command = tail -f /opt/module/hive/logs/hive.log
2.1.2
可選配置
示例
# 配置 spoolsource a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/sun9/work
2.1.3
TailDirSource以接近實時的速度監控文件中寫入的新行!
TailDirSource檢測文件中寫入的新行,並且將每個文件tail的位置記錄在一個JSON的文件中!即便agent掛掉,重啟后,source從上次記錄的位置繼續執行tail操作!
用戶可以通過修改Position文件的參數,來改變source繼續讀取的位置!如果postion文件丟失了,那么source會重新從每個文件的第一行開始讀取(重復讀)!
必須配置
可選配置
示例
# 配置source a1.sources.r1.type = TAILDIR #多個文件用空格分開 a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /home/atguigu/a.txt a1.sources.r1.filegroups.f2 = /home/atguigu/b.txt a1.sources.r1.positionFile=/home/atguigu/taildir_position.json
2.2選用HDFS Sink
HDFS Sink負責將數據寫到HDFS。
-
目前支持創建 text和SequnceFile文件!
-
-
文件可以基於時間周期性滾動或基於文件大小滾動或基於events的數量滾動
-
可以根據數據產生的時間戳或主機名對數據進行分桶或分區
-
上傳的路徑名可以包含 格式化的轉義序列,轉義序列會在文件/目錄真正上傳時被替換,如:hdfs://hadoop102:9000/flume/%Y%m%d/%H%M
-
如果要使用這個sink,必須已經安裝了hadoop,這樣flume才能使用Jar包和hdfs通信
必要配置
推薦配置
文件滾動策略
文件的類型和壓縮類型:
目錄的滾動策略:
時間戳設置:
注意: 所有和時間相關的轉義序列,都要求event的header中有timestamp的屬性名,值為時間戳
示例:
# 配置sink a1.sinks.k1.type = hdfs #轉義序列 a1.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H%M #上傳文件的前綴 a1.sinks.k1.hdfs.filePrefix = logs- #滾動目錄 一分鍾滾動一次目錄 a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 1 a1.sinks.k1.hdfs.roundUnit = minute #是否使用本地時間戳 a1.sinks.k1.hdfs.useLocalTimeStamp = true #配置文件滾動 a1.sinks.k1.hdfs.rollInterval = 30 a1.sinks.k1.hdfs.rollSize = 134217700 a1.sinks.k1.hdfs.rollCount = 0 #使用文件格式存儲數據 a1.sinks.k1.hdfs.fileType=DataStream
3、針對多路復用的配置
需求如下,同一個數據,既要上傳到HDFS上,也要保存到本地,這就涉及到了多個flume串聯的問題
3.1 flume之間傳輸需選用Avro Sink (Avro Sink和Avro Source是搭配使用的!)
sink將event以avro序列化的格式發送到另外一台機器的指定進程!
必要配置
# 配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname=hadoop102 a1.sinks.k1.port=12345
3.2
示例
# 配置source a1.sources.r1.type = avro a1.sources.r1.bind = hadoop102 a1.sources.r1.port = 12345
注意:在接收數據的Agent里有倆個Channel,這時還需要配置使用復制的channel選擇器,此選擇器會選中所有的channel,每個channel復制一個event(可以省略,默認)
a1.sources.r1.selector.type = replicating
3.3寫入本地
可選配置
示例
# 配置sink a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory=/home/atguigu/flume a1.sinks.k1.sink.rollInterval=600
四、針對故障轉移及負載均衡的配置
實質是一個Channel對應倆個Sink的配置,這里當然也會用到Avro Source ,Avro Sink ,但最關鍵的是Sink選擇器
注意:啟動時要先啟動從機,再啟動主機
4.1
故障轉移的sink處理器! 這個sink處理器會維護一組有優先級的sink!默認挑選優先級最高(數值越大)的sink來處理數據!故障的sink會放入池中冷卻一段時間,恢復后,重新加入到存活的池中,此時在live pool(存活的池)中選優先級最高的,接替工作!
必要配置
可選配置
示例:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
4.2
必要配置
可選配置
示例:
a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.selector = random
五、讀取event header中的值來決定Channel
5.1
根據配置讀取event header中指定key的value,根據value的映射,分配到不同的channel!
示例
a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4
5.3
示例
a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = mykey a1.sources.r1.interceptors.i1.value = agent2
六、與Kafka對接
6.1 Kafka Sink
必要配置
可選配置
示例:
#如果要實現自動分區,往往會在攔截器處設置topic的值
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = Interceptor.MyInterceptor$Builder
# 配置sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.topic=test3 a1.sinks.k1.useFlumeEventFormat=false
自定義攔截器代碼
public class MyInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { byte[] body = event.getBody(); String s = new String(body); Map<String, String> headers = event.getHeaders(); if(s.contains("hello")){ headers.put("topic","hello"); }else{ headers.put("topic","other"); } event.setHeaders(headers); return event; } @Override public List<Event> intercept(List<Event> list) { for (Event event : list) { intercept(event); } return list; } @Override public void close() { } public static class Builder implements org.apache.flume.interceptor.Interceptor.Builder{ @Override public Interceptor build() { return new MyInterceptor(); } @Override public void configure(Context context) { } } }
6.2 Kafka Channel
為什么選擇Kafkachannel: 因為kafka集群有高可用和副本機制!這樣即便agent掛掉,或某個broker宕機,sink也可以立刻從新的leader上繼續拉取event!
必要配置
可選配置
示例:
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.channels.c1.kafka.topic = topic_start a1.channels.c1.parseAsFlumeEvent = false
6.3Kafka Source
KafkaSource本質是一個可以從kafka集群的主題上消費數據的消費者!如果希望提高消費者速率,可以配置多個KafkaSource,指定多個KafkaSource有相同的組id!
必要配置
可選配置
示例:
# 配置source a1.sources.r1.type =org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics = topic_start a1.sources.r1.kafka.consumer.group.id=CG_start #kafka消費者默認從分區的最后一個位置消費,當前分區中已經有170條數據,如果不配置,只會從170之后消費 #控制kafka消費者從主題的最早的位置消費,此參數只會在一個從未提交過offset的組中生效 a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
6.4File Channel
FileChannel將event存儲在文件中!與memory channel 相比,不易丟數據,但效率低!
可選配置
示例:
a1.channels.c1.type = file a1.channels.c1.dataDirs = /opt/module/flumedata/c1 a1.channels.c1.checkpointDir=/opt/module/flumedata/c1checkpoint a1.channels.c1.useDualCheckpoints=true a1.channels.c1.backupCheckpointDir=/opt/module/flumedata/c1backupcheckpoint
優化
正如上文提到的,
通過配置dataDirs指向多個路徑,每個路徑對應不同的硬盤,增大Flume吞吐量。,
checkpointDir和backupCheckpointDir也盡量配置在不同硬盤對應的目錄中,保證checkpoint壞掉后,可以快速使用backupCheckpointDir恢復數據