flume 的配置總結


 

flume的配置無非就是四步:1、創建一個配置文件 2、在其中配置source,sink,Channel 的各項參數 3、連接各個組件   4、調用啟動命令

配置參考官網http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html

1、針對NetCat的配置

1.1選用NetCat TCP Source

  這個source可以打開一個端口號,監聽端口號收到的消息!將消息的每行,封裝為一個event!

配置必要的參數

 

 

 1.2選用Logger Sink

采用 logger以info級別將event輸出到(文件或控制台)默認輸出到日志文件中,可在啟動命令中加入

-Dflume.root.logger=DEBUG,console

這樣就可以打印到控制台,方便測試。

配置必要參數

 1.3 選用Memory Channel

  將event存儲在內存中的隊列中!一般適用於高吞吐量的場景,但是如果agent故障,會損失階段性的數據!

配置必要參數

 

 

 1.4編寫配置文件

# 命名每個組件 a1代表agent的名稱 
#a1.sources代表a1中配置的source,多個使用空格間隔
#a1.sinks代表a1中配置的sink,多個使用空格間隔
#a1.channels代表a1中配置的channel,多個使用空格間隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 配置source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop103
a1.sources.r1.port = 44444

# 配置sink
a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog=100

# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000

# 綁定和連接組件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

1.5啟動命令

bin/flume-ng agent -c conf/ -n a1 -f flumeagents/netcatSource-loggersink.conf -Dflume.root.logger=DEBUG,console

#參數說明:
    --conf/-c:表示配置文件存儲在conf/目錄
    --name/-n:表示給agent起名為a1
    --conf-file/-f:flume本次啟動讀取的配置文件是在job文件夾下的flume-telnet.conf文件。
    -Dflume.root.logger=INFO,console :-D表示flume運行時動態修改flume.root.logger參數屬性值,並將控制台日志打印級別設置為INFO級別。
    日志級別包括:log、info、warn、error。console是將結果打印到控制台,方便測試

2、針對讀取日志文件的配置

2.1.1選用Exec Source(因為在異常情況下,Exec Source無法把從客戶端讀取的event進行緩存,有丟失數據的風險的,建議使用 Spooling Directory Source, Taildir Source來替換ExecSource!)

Exec Source在啟動后執行一個linux命令

配置必要參數

 

示例:

# 配置source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /opt/module/hive/logs/hive.log

 

 2.1.2 Spooling Directory Source(自動收集文件)

SpoolingDirSource在監控一個目錄中新放入的文件的數據,一旦發現,就數據封裝為event! 在目錄中,已經傳輸完成的數據,會使用重命名或刪除來標識這些文件已經傳輸完成!

適用於:已經在一個目錄中生成了大量的離線日志,且日志不會再進行寫入和修改

必要配置

 

 

可選配置

示例

# 配置 spoolsource
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/sun9/work

2.1.3 TailDirSource(實時更新文件)

  TailDirSource以接近實時的速度監控文件中寫入的新行!

  TailDirSource檢測文件中寫入的新行,並且將每個文件tail的位置記錄在一個JSON的文件中!即便agent掛掉,重啟后,source從上次記錄的位置繼續執行tail操作!

  用戶可以通過修改Position文件的參數,來改變source繼續讀取的位置!如果postion文件丟失了,那么source會重新從每個文件的第一行開始讀取(重復讀)!

必須配置

 

 

 可選配置

 

 

 示例

# 配置source
a1.sources.r1.type = TAILDIR
#多個文件用空格分開
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /home/atguigu/a.txt
a1.sources.r1.filegroups.f2 = /home/atguigu/b.txt
a1.sources.r1.positionFile=/home/atguigu/taildir_position.json

 

 2.2選用HDFS Sink

 HDFS Sink負責將數據寫到HDFS。

  • 目前支持創建 text和SequnceFile文件!

  • 以上兩種文件格式,都可以使用壓縮

  • 文件可以基於時間周期性滾動或基於文件大小滾動或基於events的數量滾動

  • 可以根據數據產生的時間戳或主機名對數據進行分桶或分區

  • 上傳的路徑名可以包含 格式化的轉義序列,轉義序列會在文件/目錄真正上傳時被替換,如:hdfs://hadoop102:9000/flume/%Y%m%d/%H%M

  • 如果要使用這個sink,必須已經安裝了hadoop,這樣flume才能使用Jar包和hdfs通信

必要配置

 

 

 推薦配置

文件滾動策略

 

 

 文件的類型和壓縮類型:

 

 

 目錄的滾動策略:

 

 

 時間戳設置:

注意: 所有和時間相關的轉義序列,都要求event的header中有timestamp的屬性名,值為時間戳

 

 

 示例:

# 配置sink
a1.sinks.k1.type = hdfs
    #轉義序列
a1.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H%M
#上傳文件的前綴
a1.sinks.k1.hdfs.filePrefix = logs-
#滾動目錄 一分鍾滾動一次目錄
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 1
a1.sinks.k1.hdfs.roundUnit = minute
#是否使用本地時間戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#配置文件滾動
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 134217700
a1.sinks.k1.hdfs.rollCount = 0
#使用文件格式存儲數據
a1.sinks.k1.hdfs.fileType=DataStream 

3、針對多路復用的配置

需求如下,同一個數據,既要上傳到HDFS上,也要保存到本地,這就涉及到了多個flume串聯的問題

3.1 flume之間傳輸需選用Avro Sink  (Avro Sink和Avro Source是搭配使用的!)

sink將event以avro序列化的格式發送到另外一台機器的指定進程!

必要配置

 

# 配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=hadoop102
a1.sinks.k1.port=12345

 

 

 3.2Avro Source

source讀取avro格式的數據,反序列化為event對象!

必要配置

 

示例

# 配置source
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 12345

 

 

 

注意:在接收數據的Agent里有倆個Channel,這時還需要配置使用復制的channel選擇器,此選擇器會選中所有的channel,每個channel復制一個event(可以省略,默認)

a1.sources.r1.selector.type = replicating

3.3寫入本地   File Roll Sink

將event寫入到本地磁盤!

必要配置

 

 可選配置

示例

# 配置sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory=/home/atguigu/flume
a1.sinks.k1.sink.rollInterval=600

 

四、針對故障轉移及負載均衡的配置

實質是一個Channel對應倆個Sink的配置,這里當然也會用到Avro Source ,Avro Sink ,但最關鍵的是Sink選擇器

注意:啟動時要先啟動從機,再啟動主機

4.1Failover Sink Processor(故障轉移)

 故障轉移的sink處理器! 這個sink處理器會維護一組有優先級的sink!默認挑選優先級最高(數值越大)的sink來處理數據!故障的sink會放入池中冷卻一段時間,恢復后,重新加入到存活的池中,此時在live pool(存活的池)中選優先級最高的,接替工作!

必要配置

 

可選配置

 示例:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

4.2Load balancing Sink Processor(負載均衡)

使用round_robinorrandom兩種算法,讓多個激活的sink間的負載均衡(多個sink輪流干活)!

 

必要配置

 

 可選配置

 

 示例:

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random

 五、讀取event header中的值來決定Channel

 

 5.1  Multiplexing Channel Selector

根據配置讀取event header中指定key的value,根據value的映射,分配到不同的channel!

必要配置

 

 示例

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

5.3  Static Interceptor

允許用戶向event添加一個靜態的key-value!

必要配置

 

 示例

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = mykey
a1.sources.r1.interceptors.i1.value = agent2

 六、與Kafka對接

6.1  Kafka Sink

必要配置

 

 

 可選配置

 

 示例:

#如果要實現自動分區,往往會在攔截器處設置topic的值

  a1.sources.r1.interceptors = i1
  a1.sources.r1.interceptors.i1.type = Interceptor.MyInterceptor$Builder

# 配置sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic=test3
a1.sinks.k1.useFlumeEventFormat=false

自定義攔截器代碼

public class MyInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        byte[] body = event.getBody();
        String s = new String(body);
        Map<String, String> headers = event.getHeaders();
        if(s.contains("hello")){

            headers.put("topic","hello");

        }else{
            headers.put("topic","other");
        }
        event.setHeaders(headers);
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }

        return list;
    }

    @Override
    public void close() {

    }
    public static class Builder implements org.apache.flume.interceptor.Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

 6.2 Kafka Channel

為什么選擇Kafkachannel: 因為kafka集群有高可用和副本機制!這樣即便agent掛掉,或某個broker宕機,sink也可以立刻從新的leader上繼續拉取event!

必要配置

 

 可選配置

 

 示例:

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false

 6.3Kafka Source

KafkaSource本質是一個可以從kafka集群的主題上消費數據的消費者!如果希望提高消費者速率,可以配置多個KafkaSource,指定多個KafkaSource有相同的組id!

必要配置

 

 可選配置

 

 示例:

# 配置source
a1.sources.r1.type =org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = topic_start
a1.sources.r1.kafka.consumer.group.id=CG_start
#kafka消費者默認從分區的最后一個位置消費,當前分區中已經有170條數據,如果不配置,只會從170之后消費
#控制kafka消費者從主題的最早的位置消費,此參數只會在一個從未提交過offset的組中生效
a1.sources.r1.kafka.consumer.auto.offset.reset=earliest

6.4File Channel

FileChannel將event存儲在文件中!與memory channel 相比,不易丟數據,但效率低!

可選配置

 

 示例:

a1.channels.c1.type = file
a1.channels.c1.dataDirs = /opt/module/flumedata/c1
a1.channels.c1.checkpointDir=/opt/module/flumedata/c1checkpoint
a1.channels.c1.useDualCheckpoints=true
a1.channels.c1.backupCheckpointDir=/opt/module/flumedata/c1backupcheckpoint

優化

正如上文提到的

通過配置dataDirs指向多個路徑,每個路徑對應不同的硬盤,增大Flume吞吐量。,

checkpointDir和backupCheckpointDir也盡量配置在不同硬盤對應的目錄中,保證checkpoint壞掉后,可以快速使用backupCheckpointDir恢復數據


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM