一.簡單實現
需求:根據 flume 監控 exec 文件的追加數據,寫入 kafka 的 test-demo 分區,然后啟用 kafka-consumer 消費 test-demo 分區數據。
需求分析

1)flume的配置文件
在hadoop102上創建flume的配置文件
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/testdata/3.txt
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#kafka的broker主機和端口
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
#kafka sink發送數據的topic
a1.sinks.k1.kafka.topic = test-demo
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2)啟動 zk、kafka集群
3)創建 test-demo 主題
bin/kafka-topics.sh --create --bootstrap-server hadoop102:9092 --topic test-demo --partitions 2 --replication-factor 2
4)啟動 kafka consumer 去消費 test-demo 主題
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic test-demo
aaa
5)啟動 flume,並且往 3.txt 中追加數據
bin/flume-ng agent -c conf/ -f job/flume-kafka/flume-exec-kafka.conf -n a1
echo hello >> /opt/module/testdata/3.txt

6)觀察 kafka consumer 的消費情況

二.自定義interceptor(使用kafka sink)
需求:flume監控 exec 文件的追加數據,將flume采集的數據按照不同的類型輸入到不同的topic中
將日志數據中帶有的 hello 的,輸入到kafka的 first 主題中,
將日志數據中帶有 good 的,輸入到kafka的 second 主題中,
其他的數據輸入到kafka的 third 主題中
需求分析
通過自定義 flume 的攔截器,往 header 增加 topic 信息 ,配置文件中 kafka sink 增加 topic 配置,實現將數據按照指定 topic 發送。

1)自定義 flume 攔截器
創建工程,pom依賴
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
自定義攔截器類,並打包上傳至/opt/module/flume/lib包下
package com.bigdata.intercepter;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @description: TODO 自定義flume攔截器
* @author: HaoWu
* @create: 2020/7/7 20:32
*/
public class FlumeKafkaInterceptorDemo implements Interceptor {
private List<Event> events;
//初始化
@Override
public void initialize() {
events = new ArrayList<>();
}
@Override
public Event intercept(Event event) {
// 獲取event的header
Map<String, String> header = event.getHeaders();
// 獲取event的boby
String body = new String(event.getBody());
// 根據body中的數據設置header
if (body.contains("hello")) {
header.put("topic", "first");
} else if (body.contains("good")) {
header.put("topic", "second");
}
return event;
}
@Override
public List<Event> intercept(List<Event> events) {
// 對每次批數據進來清空events
events.clear();
// 循環處理單個event
for (Event event : events) {
events.add(intercept(event));
}
return events;
}
@Override
public void close() {
}
// 靜態內部類創建自定義攔截器對象
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new FlumeKafkaInterceptorDemo();
}
@Override
public void configure(Context context) {
}
}
}
2)編寫 flume 的配置文件
flume-netstat-kafka.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#Interceptor
a1.sources.r1.interceptors = i1
#自定義攔截器全類名+$Builder
a1.sources.r1.interceptors.i1.type = com.bigdata.intercepter.FlumeKafkaInterceptorDemo$Builder
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#默認發往的topic
a1.sinks.k1.kafka.topic = third
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3)創建topic
在kafka中創建 first , second , third 這3個topic
[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --list --bootstrap-server hadoop102:9092
__consumer_offsets
first
second
test-demo
third
4)啟動各組件
啟動3個 kafka consumer 分別消費 first , second , third 中的數據
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic second
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic third

5)啟動 flume,通過netstat發送數據到flume
bin/flume-ng agent -c conf/ -f job/flume-kafka/flume-netstat-kafka.conf -n a1
nc localhost 44444
6)觀察消費者的消費情況

三.自定義interceptor(使用kafka channel)
需求:使用taildir source監控/opt/module/applog/log文件夾下的文件,使用攔截器過濾非json的數據,使用kafka channel對接 kafka,將數據發往指定topic。
需求分析
使用kafka channel不需要sink

1)自定義攔截器
創建maven工程
pom文件
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
編寫攔截器類:ETLInterceptor.java
package com.bigdata;
import com.alibaba.fastjson.JSON;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.Iterator;
import java.util.List;
/**
* @description: TODO 自定義攔截器,簡單的ETL清洗
* @author: HaoWu
* @create: 2020/7/10 18:14
*/
public class ETLInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
String s = new String(event.getBody());
try {
JSON.toJSON(s);
return event;
} catch (Exception e) {
return null;
}
}
@Override
public List<Event> intercept(List<Event> events) {
Iterator<Event> iterator = events.iterator();
while (iterator.hasNext()){
Event e = iterator.next();
if(e==null){
iterator.remove();
}
}
return events;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new ETLInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
打包,將有依賴的包上傳到%Flume_HOME%/lib目錄下

2)flume配置
bigdata-applog-kafka.conf
#描述agent
a1.sources = r1
a1.channels = c1
#描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
#攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.bigdata.ETLInterceptor$Builder
#描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = applog
a1.channels.c1.parseAsFlumeEvent = false
#關聯source->channel->sink
a1.sources.r1.channels = c1
3)啟動各組件
啟動zookeeper、kafka-->啟動消費者消費applog主題-->啟動flume-->觀察消費者
#消費者消費applog
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic applog --from-beginning
#啟動flume
bin/flume-ng agent -n a1 -c conf/ -f job/bigdata-applog-kafka.conf
consumer消費到數據

