使用 Flume 采集服務器本地日志,需要按照日志類型的不同,將不同種類的日志發往不同的分析系統。
在實際的開發中,一台服務器產生的日志類型可能有很多種,不同類型的日志可能需要發送到不同的分析系統。
此時會用到 Flume 拓撲結構中的 Multiplexing 結構,Multiplexing的原理是,根據 event 中 Header 的某個 key 的值,將不同的 event 發送到不同的 Channel中,所以我們需要自定義一個 Interceptor,為不同類型的 event 的 Header 中的 key 賦予 不同的值。
這里以端口數據模擬日志,以數字(單個)和字母(單個)模擬不同類型的日志,需要自定義 interceptor 區分數字和字母,將其分別發往不同的分析系統(Channel)。
一、創建自定義攔截器
https://flume.apache.org/FlumeUserGuide.html#flume-interceptors
1.引入 pom 依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com</groupId> <artifactId>flume</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
2.編寫攔截器類
package interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.List; public class CustomInterceptor implements Interceptor { @Override public void initialize() { } // 單個事件攔截 @Override public Event intercept(Event event) { byte[] body = event.getBody(); if (body[0] < 'z' && body[0] > 'a') { // 自定義頭信息 event.getHeaders().put("type", "letter"); } else if (body[0] > '0' && body[0] < '9') { // 自定義頭信息 event.getHeaders().put("type", "number"); } return event; } // 批量事件攔截 @Override public List<Event> intercept(List<Event> list) { for (Event event : list) { intercept(event); } return list; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new CustomInterceptor(); } @Override public void configure(Context context) { } } }
二、打包測試
1.打包上傳
將項目打包。
上傳到 flume 的 lib 目錄下。
2.編寫 flume 配置文件
1.flume1
配置 1 個 netcat source,1 個 sink group(2 個 avro sink),並配置相應的 ChannelSelector 和 interceptor。

# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = 127.0.0.1 a1.sources.r1.port = 4444 # 攔截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = interceptor.CustomInterceptor$Builder # 選擇器 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = type # 與自定義攔截器中設置的頭信息對應 a1.sources.r1.selector.mapping.letter = c1 a1.sources.r1.selector.mapping.number = c2 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = 127.0.0.1 a1.sinks.k1.port = 4141 a1.sinks.k2.type=avro a1.sinks.k2.hostname = 127.0.0.1 a1.sinks.k2.port = 4242 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
2.flume2
配置一個 avro source 和一個 logger sink。

a2.sources = r1 a2.sinks = k1 a2.channels = c1 a2.sources.r1.type = avro a2.sources.r1.bind = 127.0.0.1 a2.sources.r1.port = 4141 a2.sinks.k1.type = logger a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 a2.sinks.k1.channel = c1 a2.sources.r1.channels = c1
3.flume3
配置一個 avro source 和一個 logger sink。

a3.sources = r1 a3.sinks = k1 a3.channels = c1 a3.sources.r1.type = avro a3.sources.r1.bind = 127.0.0.1 a3.sources.r1.port = 4242 a3.sinks.k1.type = logger a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 a3.sinks.k1.channel = c1 a3.sources.r1.channels = c1
3.測試
flume2 和 flume3 需要先啟動,flume1 需要連接 flume2 和 flume3,若先啟動 flume1 會報連接不上(也可以無視錯誤日志,先啟動)
cd /opt/apache-flume-1.9.0-bin bin/flume-ng agent --conf conf/ --name a3 --conf-file /tmp/flume-job/interceptor/flume3 -Dflume.root.logger=INFO,console bin/flume-ng agent --conf conf/ --name a2 --conf-file /tmp/flume-job/interceptor/flume2 -Dflume.root.logger=INFO,console bin/flume-ng agent --conf conf/ --name a1 --conf-file /tmp/flume-job/interceptor/flume1 -Dflume.root.logger=INFO,console
向監控端口發送數據。
nc 127.0.0.1 4444 qwer 1234
可以看到不同的內容被發送到不同的 flume 了,攔截器代碼中只定義數字和小寫字母,發送其它的內容不會被 flume1 轉發。