各Flume的source、channel、sink解析和實例用法


常用:去官網一頓擼,有很多哦

最右側有目錄導航喲=================>>>>>>>>

一、Source:

avro

多用於復制(a1.sources.r1.selector.type = replicating)、多路復用(a1.sources.r1.selector.type = multiplexing)、負載均衡、故障轉移(a1.sinkgroups.g1.processor.type = failover)

exec

一般監測啟動一類的,hive.log啟動日志

netcat

監控端口數據

spooling directory

監測目錄下的多個文件,上傳完成后文件結尾為COMPLETED,但是在上傳完成后不能修改文件,否則會報錯

taildir

描述:

實時監測某些文件,支持持續修改文件,支持正則表達式

Taildir Source維護了一個json格式的position File,其會定期的往position File中更新每個文件讀取到的最新的位置,因此能夠實現斷點續傳)

二、Channel:

Memory Channel

Kafka Channel(直接干到Kafka)

File Channel

三、Sink:

avro

發送到下游的avro source?都可以啊

File Roll

Stores events on the local filesystem(將事件存儲在本地文件系統上)

HBase

hdfs

輸出到hdfs

logger

輸出到控制台

四、案例

1. 監控端口數據

案例需求:使用Flume監聽一個端口,收集該端口數據,並打印到控制台。

( source:netcat  channel:memory channel  sink:logger)

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
																	
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
									
a1.sinks.k1.type = hdfs
																				
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#設置文件上傳到hdfs的路徑
a1.sinks.k1.hdfs.path = hdfs://Linux201:8020/flume/hive-events/%y-%m-%d/%H
#設置文件前綴				
a1.sinks.k1.hdfs.filePrefix = logs-
#設置每個文件每60s滾動
a1.sinks.k1.hdfs.rollInterval = 60
#設置每個文件到達128M時滾動
a1.sinks.k1.hdfs.rollSize = 134217728
#設置每多少個event就滾動一個文件(此設置就是不依據event)
a1.sinks.k1.hdfs.rollCount = 0
#設置每多少個event就寫入hdfs(不是文件滾動的意思)
a1.sinks.k1.hdfs.batchSize = 100															
#設置文件格式,此格式不會壓縮(但是支持壓縮?)
a1.sinks.k1.hdfs.fileType = DataStream																	
#設置時間戳四舍五入														
a1.sinks.k1.hdfs.round = true
#設置多長時間創建一個文件夾
a1.sinks.k1.hdfs.roundValue = 1
#設置四舍五入的值的單位
a1.sinks.k1.hdfs.roundUnit = hour
#設置使用本地時間,而不是事件標頭中的時間戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true

2. 實時監控單個追加文件

案例需求:實時監控Hive日志,並上傳到HDFS中

(source: exec  channel:memory  sink:  hdfs)

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive-3.1.2/logs/hive.log
																	
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
										
										
a1.sinks.k1.type = hdfs
																				
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
#設置文件上傳到hdfs的路徑
a1.sinks.k1.hdfs.path = hdfs://Linux201:8020/flume/hive-events/%y-%m-%d/%H
#設置文件前綴				
a1.sinks.k1.hdfs.filePrefix = logs-
#設置每個文件每60s滾動
a1.sinks.k1.hdfs.rollInterval = 60
#設置每個文件到達128M時滾動
a1.sinks.k1.hdfs.rollSize = 134217728
#設置每多少個event就滾動一個文件(此設置就是不依據event)
a1.sinks.k1.hdfs.rollCount = 0
#設置每多少個event就寫入hdfs(不是文件滾動的意思)
a1.sinks.k1.hdfs.batchSize = 100															
#設置文件格式,此格式不會壓縮(但是支持壓縮?)
a1.sinks.k1.hdfs.fileType = DataStream																	
#設置時間戳四舍五入														
a1.sinks.k1.hdfs.round = true
#設置多長時間創建一個文件夾
a1.sinks.k1.hdfs.roundValue = 1
#設置四舍五入的值的單位
a1.sinks.k1.hdfs.roundUnit = hour
#設置使用本地時間,而不是事件標頭中的時間戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true

3. 實時監控目錄下多個新文件

案例需求:使用Flume監聽整個目錄的新文件,並上傳至HDFS

(source: spooldir  channel: memory  sink:  hdfs)

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://Linux201:8020/flume/upload/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動文件夾
a3.sinks.k3.hdfs.round = true
#多少時間單位創建一個新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#設置每個文件的滾動大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700  // ? 134217728
#文件的滾動與Event數量無關
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

4. 實時監控目錄下的多個追加文件

案例需求:使用Flume監聽整個目錄的實時追加文件,並上傳至HDFS

(source: taildir  channel: memory  sink: hdfs)

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = TAILDIR
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups = f1 f2
a3.sources.r3.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files/.*log.*

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://Linux201:8020/flume/upload2/%Y%m%d/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時間滾動文件夾
a3.sinks.k3.hdfs.round = true
#多少時間單位創建一個新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#設置每個文件的滾動大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700  //? 134217728
#文件的滾動與Event數量無關
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

5. 復制和多路復用

案例需求

使用Flume-1監控文件變動,Flume-1將變動內容傳遞給Flume-2,Flume-2負責存儲到HDFS。同時Flume-1將變動內容傳遞給Flume-3,Flume-3負責輸出到Local FileSystem。

flume1:(source: exec  channel: memory  sink: avro)

flume2:(source: avro  channel: memory  sink: hdfs)

flume3:(source: avro  channel: memory  sink: File Roll Sink)

flume1:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數據流復制給所有channel
a1.sources.r1.selector.type = replicating

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
# sink端的avro是一個數據發送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = Linux201 
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = Linux201
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

flume2:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
# source端的avro是一個數據接收服務
a2.sources.r1.type = avro
a2.sources.r1.bind = Linux201
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://Linux201:8020/flume2/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時間滾動文件夾
a2.sinks.k1.hdfs.round = true
#多少時間單位創建一個新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設置每個文件的滾動大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動與Event數量無關
a2.sinks.k1.hdfs.rollCount = 0

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3:

注意:輸出的本地目錄必須是已經存在的目錄,如果該目錄不存在,並不會創建新的目錄。

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = Linux201
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/datas/flume3

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

6. 負載均衡和故障轉移

案例需求:使用Flume1監控一個端口,其sink組中的sink分別對接Flume2和Flume3,采用FailoverSinkProcessor,實現故障轉移的功能。

flume1:(source: netcat  channel: memory  sink: avro)

flume2:(source: avro  channel: memory  sink: logger)

flume3:(source: avro  channel: memory  sink: logger)

flume1:

# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = Linux201
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = Linux201
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

flume2:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = Linux201
a2.sources.r1.port = 4141

# Describe the sink
a2.sinks.k1.type = logger

# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3:

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = Linux201
a3.sources.r1.port = 4142

# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

使用jps -ml查看Flume進程。

7. 聚合

案例需求:

Linux201上的Flume-1監控文件/opt/module/group.log,

Linux202上的Flume-2監控某一個端口的數據流,

Flume-1與Flume-2將數據發送給Linux203上的Flume-3,Flume-3將最終數據打印到控制台

flume1:(source: exec  channel: memory  sink: avro)

flume2:(source: netcat  channel: memory  sink: avro)

flume3:(source: avro  channel: memory  sink: logger)

flume1:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/group.log
a1.sources.r1.shell = /bin/bash -c

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = Linux203
a1.sinks.k1.port = 4141

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume2:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = netcat
a2.sources.r1.bind = Linux202
a2.sources.r1.port = 44444

# Describe the sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = Linux203
a2.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

flume3: 

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1

# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = Linux203
a3.sources.r1.port = 4141

# Describe the sink
# Describe the sink
a3.sinks.k1.type = logger

# Describe the channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

 五、自定義Interceptor

案例需求

使用Flume采集服務器本地日志,需要按照日志類型的不同,將不同種類的日志發往不同的分析系統。此案例中,將字母開頭和數字開頭的數據發送到不同的控制台中

需求分析

在實際的開發中,一台服務器產生的日志類型可能有很多種,不同類型的日志可能需要發送到不同的分析系統。此時會用到Flume拓撲結構中的Multiplexing結構,Multiplexing的原理是,根據event中Header的某個key的值,將不同的event發送到不同的Channel中,所以我們需要自定義一個Interceptor,為不同類型的event的Header中的value賦予不同的值。

在該案例中,我們以端口數據模擬日志,以數字(單個)和字母(單個)模擬不同類型的日志,我們需要自定義interceptor區分數字和字母,將其分別發往不同的分析系統(Channel)。

 

 

實現步驟

(1)創建一個maven項目,並引入以下依賴

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
</dependency>

(2)定義CustomInterceptor類並實現Interceptor接口

 

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;

/**
 * 根據event的首字符是字母還是數字,添加不同header
 */
public class MyInterceptor implements Interceptor {

    /**
     * 初始化方法
     */
    public void initialize() {

    }

    /**
     * 對一個event進行更改:我們要插入header
     *
     * @param event 要更改的event
     * @return 更改完的event
     */
    public Event intercept(Event event) {
        //取出event的header和body
        Map<String, String> headers = event.getHeaders();
        byte[] body = event.getBody();//body是二級制數組

        //根據Body首字母不同,進行不同處理
        String line = null;
        try {
            line = new String(body, "utf-8");//將二進制數組轉化為字符串,如果不加utf-8,默認使用當前環境的編解碼方式
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        char first = line.charAt(0);
        if ((first >= 'a' && first <= 'z') || (first >= 'A' && first <= 'Z')) {
            //是字母
            headers.put("AAA", "XXX");
        } else if (first >= '0' && first <= '9') {
            //是數字
            headers.put("AAA", "YYY");
        } else {
            //不是字母不是數字
            headers.put("AAA", "ZZZ");
        }


        return event;
    }

    /**
     * 批量對一批事件進行更改
     *
     * @param events 要更改的一批事件
     * @return 更改完的事件
     */
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    /**
     * 關閉資源的方法
     */
    public void close() {

    }

    /**
     * 用來構建Interceptor實體的類
     */
    public static class MyBuilder implements Interceptor.Builder {

        //構建方法
        public Interceptor build() {
            return new MyInterceptor();
        }

        /**
         * 配置方法
         *
         * @param context 配置文件
         */
        public void configure(Context context) {

        }
    }
}

將其打包放入flume中的lib包

(3)編輯flume配置文件(a1.sources.r1.interceptors.i1.type = MyInterceptor$MyBuilder使用的全類名

為Linux201上的Flume1配置1個netcat source,1個sink group(3個avro sink),並配置相應的ChannelSelector和interceptor

a1.sources = r1
a1.sinks = k1 k2 k3
a1.channels = c1 c2 c3

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = MyInterceptor$MyBuilder
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = AAA
a1.sources.r1.selector.mapping.XXX = c1
a1.sources.r1.selector.mapping.YYY = c2
a1.sources.r1.selector.mapping.ZZZ = c3

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = Linux201
a1.sinks.k1.port = 4141

a1.sinks.k2.type=avro
a1.sinks.k2.hostname = Linux202
a1.sinks.k2.port = 4242

a1.sinks.k3.type=avro
a1.sinks.k3.hostname = Linux203
a1.sinks.k3.port = 4343


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

a1.channels.c3.type = memory
a1.channels.c3.capacity = 1000
a1.channels.c3.transactionCapacity = 100


a1.sources.r1.channels = c1 c2 c3
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
a1.sinks.k3.channel = c3

為Linux201上的Flume2配置一個avro source和一個logger sink

a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.bind = Linux201
a2.sources.r1.port = 4141

a2.sinks.k1.type = logger

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

a2.sinks.k1.channel = c1
a2.sources.r1.channels = c1

為Linux202上的Flume3配置一個avro source和一個logger sink

a3.sources = r1
a3.sinks = k1
a3.channels = c1

a3.sources.r1.type = avro
a3.sources.r1.bind = Linux202
a3.sources.r1.port = 4242

a3.sinks.k1.type = logger

a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1

為Linux203上的Flume4配置一個avro source和一個logger sink

a4.sources = r1
a4.sinks = k1
a4.channels = c1

a4.sources.r1.type = avro
a4.sources.r1.bind = Linux203
a4.sources.r1.port = 4343

a4.sinks.k1.type = logger

a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100

a4.sinks.k1.channel = c1
a4.sources.r1.channels = c1

(4)在Linux201(啟兩flume),Linux202,Linux203上啟動flume進程,注意先后順序

Linux201: 
bin/flume-ng agent -c conf/ -n a1 -f job/flume-file-flume
bin/flume-ng agent -c conf/ -n a2 -f job/flume-flume-console1 -Dflume.root.logger=INFO,console

Linux202: 
bin/flume-ng agent -c conf/ -n a3 -f job/flume-flume-console2 -Dflume.root.logger=INFO,console

Linux203: 
bin/flume-ng agent -c conf/ -n a4 -f job/flume-flume-console3 -Dflume.root.logger=INFO,console

(5)在Linux201使用netcat向localhost:44444發送字母和數字

(6)觀察Linux201、Linux202和Linux203打印的日志

六、自定義Source

官方也提供了自定義source的接口:

https://flume.apache.org/FlumeDeveloperGuide.html#source根據官方說明自定義MySource需要繼承AbstractSource類並實現Configurable和PollableSource接口。

使用場景:讀取MySQL數據或者其他文件系統。

1、需求:

使用flume接收數據,並給每條數據添加前綴,輸出到控制台。前綴可從flume配置文件中配置。

2. 實現相應方法:

getBackOffSleepIncrement()//失敗后每次遞增的時間

getMaxBackOffSleepInterval()//最多多長時間后就不source了

configure(Context context)//初始化context(讀取配置文件內容)

process()//獲取數據封裝成event並寫入channel,這個方法將被循環調用。

3. 編碼

(1)導入pom依賴

<dependencies>

    <dependency>

        <groupId>org.apache.flume</groupId>

        <artifactId>flume-ng-core</artifactId>

        <version>1.9.0</version>

    </dependency>

</dependencies>

(2)編寫代碼

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

/**
 * @version 1.0
 * @Author: zls
 * @Date: 2020/10/10 12:40
 * @Desc:
 */
public class MySource extends AbstractSource implements Configurable, PollableSource {

    //前綴
    private String prefix;
    //間隔
    private Long interval;

    /**
     * 框架調用該方法來拉取數據並處理
     *
     * @return 事件處理的狀態
     * @throws EventDeliveryException
     */
    public Status process() throws EventDeliveryException {
        Status status = null;
        //獲取ChannelProcessor
        ChannelProcessor channelProcessor = getChannelProcessor();

        try {
            //處理事件
            Event e = getSomeData();

            channelProcessor.processEvent(e);

            status = Status.READY;
        } catch (Exception e) {
            //處理異常
            e.printStackTrace();
            status = Status.BACKOFF;
        }

        return status;
    }

    /**
     * 對於一個自定義源,獲取數據的方式
     *
     * @return 獲取到的事件
     */
    private Event getSomeData() throws InterruptedException {
        Event event = new SimpleEvent();
        event.setBody((prefix + "Test content").getBytes());
        Thread.sleep(interval);
        return event;
    }

    /**
     * 如果出現異常,停止調用Source的遞增時間
     *
     * @return 遞增時間
     */
    public long getBackOffSleepIncrement() {
        return 1000;
    }

    /**
     * 停止調用Source的最大時間
     *
     * @return
     */
    public long getMaxBackOffSleepInterval() {
        return 10000;
    }

    /**
     * 定義方法,可以用來配置我們的自定義Source
     *
     * @param context 配置文件
     */
    public void configure(Context context) {
        prefix = context.getString("XXX", "DD");
        interval = context.getLong("YYY", 500L);
    }
}

4. 測試

(1)打包

將寫好的代碼打包,並放到flume的lib目錄下。

(2)編寫配置文件(a1.sources.r1.type = MySource使用的全類名

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = MySource
a1.sources.r1.XXX = Myprefix
a1.sources.r1.YYY = 1000

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)開啟flume,測試一下

[zls@Linux201 flume]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console

結果:每隔1S輸出以下數據

七、 自定義Sink

官方提供了自定義sink的接口:

https://flume.apache.org/FlumeDeveloperGuide.html#sink根據官方說明自定義MySink需要繼承AbstractSink類並實現Configurable接口。

實現相應方法:

configure(Context context)//初始化context(讀取配置文件內容)

process()//從Channel讀取獲取數據(event),這個方法將被循環調用。

使用場景:讀取Channel數據寫入MySQL或者其他文件系統。

需求:

使用flume接收數據,並在Sink端給每條數據添加前綴和后綴,輸出到控制台。前后綴可在flume任務配置文件中配置。

 編碼

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

import java.io.IOException;

/**
 * @version 1.0
 * @Author: zls
 * @Date: 2020/10/10 15:07
 * @Desc: 將所有拿到的數據打印到控制台
 */
public class MySink extends AbstractSink implements Configurable {
    private String prefix;
    private String suffix;

    /**
     * Sink會從channel中拉取數據並處理
     *
     * @return
     * @throws EventDeliveryException
     */
    public Status process() throws EventDeliveryException {
        Status status = null;

        //獲取Sink對應的Channel
        Channel channel = getChannel();

        //事務
        Transaction transaction = channel.getTransaction();
        transaction.begin();

        try {
            //做事情
            //1. 從channel中拿數據
            Event take = channel.take();
            //2. 將數據寫入到對應的sink
            storeSomeData(take);

            status = Status.READY;
            transaction.commit();
        } catch (Exception e) {
            //處理異常
            status = Status.BACKOFF;
            transaction.rollback();
        } finally {
            transaction.close();
        }
        return status;
    }

    /**
     * 將event數據進行處理(儲存或者消費)
     *
     * @param take 拿到的數據
     */
    private void storeSomeData(Event take) throws IOException, InterruptedException {
        if (take != null) {
            System.out.print(prefix);
            System.out.write(take.getBody());
            System.out.println(suffix);
        } else {
            Thread.sleep(5000L);
        }
    }

    /**
     * 配置方法:用來配置我們的sink
     *
     * @param context
     */
    public void configure(Context context) {
        prefix = context.getString("XXX", "DPre");
        suffix = context.getString("YYY", "DSuf");
    }
}

 

測試

(1)打包

將寫好的代碼打包,並放到flume的lib目錄下。

(2)配置文件

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444

a1.sinks.k1.type = MySink
a1.sinks.k1.XXX = zls:
a1.sinks.k1.YYY = :zls

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(3)開啟任務

[zls@Linux201 flume]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
[zls@Linux201 ~]$ nc localhost 44444
1
OK
hello
OK

結果

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM