Flume-自定義 Interceptor(攔截器)


使用 Flume 采集服務器本地日志,需要按照日志類型的不同,將不同種類的日志發往不同的分析系統。

 

在實際的開發中,一台服務器產生的日志類型可能有很多種,不同類型的日志可能需要發送到不同的分析系統。

此時會用到 Flume 拓撲結構中的 Multiplexing 結構,Multiplexing的原理是,根據 event 中 Header 的某個 key 的值,將不同的 event 發送到不同的 Channel中,所以我們需要自定義一個 Interceptor,為不同類型的 event 的 Header 中的 key 賦予 不同的值。

這里以端口數據模擬日志,以數字(單個)和字母(單個)模擬不同類型的日志,需要自定義 interceptor 區分數字和字母,將其分別發往不同的分析系統(Channel)。

一、創建自定義攔截器

https://flume.apache.org/FlumeUserGuide.html#flume-interceptors

1.引入 pom 依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com</groupId>
    <artifactId>flume</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2.編寫攔截器類

package interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;

public class CustomInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    // 單個事件攔截
    @Override
    public Event intercept(Event event) {
        byte[] body = event.getBody();
        if (body[0] < 'z' && body[0] > 'a') {
            // 自定義頭信息
            event.getHeaders().put("type", "letter");
        } else if (body[0] > '0' && body[0] < '9') {
            // 自定義頭信息
            event.getHeaders().put("type", "number");
        }
        return event;
    }

    // 批量事件攔截
    @Override
    public List<Event> intercept(List<Event> list) {
        for (Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new CustomInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

 

二、打包測試

1.打包上傳

將項目打包。

上傳到 flume 的 lib 目錄下。

 

2.編寫 flume 配置文件

1.flume1

配置 1 個 netcat source,1 個 sink group(2 個 avro sink),並配置相應的 ChannelSelector 和 interceptor。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 127.0.0.1
a1.sources.r1.port = 4444

# 攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = interceptor.CustomInterceptor$Builder

# 選擇器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
# 與自定義攔截器中設置的頭信息對應
a1.sources.r1.selector.mapping.letter = c1
a1.sources.r1.selector.mapping.number = c2

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 127.0.0.1
a1.sinks.k1.port = 4141

a1.sinks.k2.type=avro
a1.sinks.k2.hostname = 127.0.0.1
a1.sinks.k2.port = 4242

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
View Code

2.flume2

配置一個 avro source 和一個 logger sink。

a2.sources = r1
a2.sinks = k1
a2.channels = c1

a2.sources.r1.type = avro
a2.sources.r1.bind = 127.0.0.1
a2.sources.r1.port = 4141

a2.sinks.k1.type = logger

a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

a2.sinks.k1.channel = c1
a2.sources.r1.channels = c1
View Code

3.flume3

配置一個 avro source 和一個 logger sink。

a3.sources = r1
a3.sinks = k1
a3.channels = c1

a3.sources.r1.type = avro
a3.sources.r1.bind = 127.0.0.1
a3.sources.r1.port = 4242

a3.sinks.k1.type = logger

a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1
View Code

 

3.測試

flume2 和 flume3 需要先啟動,flume1 需要連接 flume2 和 flume3,若先啟動 flume1 會報連接不上(也可以無視錯誤日志,先啟動)

cd /opt/apache-flume-1.9.0-bin 

bin/flume-ng agent --conf conf/ --name a3 --conf-file /tmp/flume-job/interceptor/flume3 -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a2 --conf-file /tmp/flume-job/interceptor/flume2 -Dflume.root.logger=INFO,console
bin/flume-ng agent --conf conf/ --name a1 --conf-file /tmp/flume-job/interceptor/flume1 -Dflume.root.logger=INFO,console

向監控端口發送數據。

nc 127.0.0.1 4444

qwer
1234

可以看到不同的內容被發送到不同的 flume 了,攔截器代碼中只定義數字和小寫字母,發送其它的內容不會被 flume1 轉發。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM