Flume對接Kafka


一.簡單實現

需求:根據 flume 監控 exec 文件的追加數據,寫入 kafkatest-demo 分區,然后啟用 kafka-consumer 消費 test-demo 分區數據。

需求分析

1)flume的配置文件

在hadoop102上創建flume的配置文件

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F  /opt/module/testdata/3.txt

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#kafka的broker主機和端口
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
#kafka sink發送數據的topic
a1.sinks.k1.kafka.topic = test-demo
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2)啟動 zkkafka集群

3)創建 test-demo 主題

 bin/kafka-topics.sh --create --bootstrap-server hadoop102:9092 --topic test-demo --partitions 2 --replication-factor 2

4)啟動 kafka consumer 去消費 test-demo 主題

 bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic test-demo
aaa

5)啟動 flume,並且往 3.txt 中追加數據

 bin/flume-ng agent -c conf/ -f job/flume-kafka/flume-exec-kafka.conf -n a1
 
 echo hello >> /opt/module/testdata/3.txt

6)觀察 kafka consumer 的消費情況

二.自定義interceptor(使用kafka sink)

需求flume監控 exec 文件的追加數據,將flume采集的數據按照不同的類型輸入到不同的topic中

​ 將日志數據中帶有的 hello 的,輸入到kafka的 first 主題中,

​ 將日志數據中帶有 good 的,輸入到kafka的 second 主題中,

​ 其他的數據輸入到kafka的 third 主題中

需求分析

通過自定義 flume 的攔截器,往 header 增加 topic 信息 ,配置文件中 kafka sink 增加 topic 配置,實現將數據按照指定 topic 發送。

1)自定義 flume 攔截器

創建工程,pom依賴
    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

自定義攔截器類,並打包上傳至/opt/module/flume/lib包下

package com.bigdata.intercepter;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @description: TODO 自定義flume攔截器
 * @author: HaoWu
 * @create: 2020/7/7 20:32
 */
public class FlumeKafkaInterceptorDemo implements Interceptor {
    private List<Event> events;

    //初始化
    @Override
    public void initialize() {
        events = new ArrayList<>();
    }

    @Override
    public Event intercept(Event event) {
        // 獲取event的header
        Map<String, String> header = event.getHeaders();
        // 獲取event的boby
        String body = new String(event.getBody());
        // 根據body中的數據設置header
        if (body.contains("hello")) {
            header.put("topic", "first");
        } else if (body.contains("good")) {
            header.put("topic", "second");
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        // 對每次批數據進來清空events
        events.clear();
        // 循環處理單個event
        for (Event event : events) {
            events.add(intercept(event));
        }

        return events;
    }

    @Override
    public void close() {

    }
    // 靜態內部類創建自定義攔截器對象
    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new FlumeKafkaInterceptorDemo();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

2)編寫 flume 的配置文件

​ flume-netstat-kafka.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#Interceptor
a1.sources.r1.interceptors = i1
#自定義攔截器全類名+$Builder
a1.sources.r1.interceptors.i1.type = com.bigdata.intercepter.FlumeKafkaInterceptorDemo$Builder

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#默認發往的topic
a1.sinks.k1.kafka.topic = third
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3)創建topic

在kafka中創建 first , second , third 這3個topic

[hadoop@hadoop102 kafka]$  bin/kafka-topics.sh --list --bootstrap-server hadoop102:9092
__consumer_offsets
first
second
test-demo
third

4)啟動各組件

啟動3個 kafka consumer 分別消費 first , second , third 中的數據

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic second
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic third

5)啟動 flume,通過netstat發送數據到flume

bin/flume-ng agent -c conf/ -f job/flume-kafka/flume-netstat-kafka.conf -n a1
nc localhost 44444

6)觀察消費者的消費情況

三.自定義interceptor(使用kafka channel)

需求:使用taildir source監控/opt/module/applog/log文件夾下的文件,使用攔截器過濾非json的數據,使用kafka channel對接 kafka,將數據發往指定topic。

需求分析

使用kafka channel不需要sink

1)自定義攔截器

創建maven工程

pom文件

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

編寫攔截器類ETLInterceptor.java

package com.bigdata;
import com.alibaba.fastjson.JSON;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.Iterator;
import java.util.List;

/**
 * @description: TODO 自定義攔截器,簡單的ETL清洗
 * @author: HaoWu
 * @create: 2020/7/10 18:14
 */
public class ETLInterceptor  implements Interceptor {
    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        String s = new String(event.getBody());
        try {
            JSON.toJSON(s);
            return event;
        } catch (Exception e) {
            return null;
        }
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        Iterator<Event> iterator = events.iterator();
        while (iterator.hasNext()){
            Event e = iterator.next();
            if(e==null){
                iterator.remove();
            }
        }
        return events;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

打包,將有依賴的包上傳到%Flume_HOME%/lib目錄下

2)flume配置

bigdata-applog-kafka.conf

#描述agent
a1.sources = r1
a1.channels = c1

#描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*

#攔截器
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.bigdata.ETLInterceptor$Builder

#描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = applog
a1.channels.c1.parseAsFlumeEvent = false

#關聯source->channel->sink
a1.sources.r1.channels = c1

3)啟動各組件

啟動zookeeper、kafka-->啟動消費者消費applog主題-->啟動flume-->觀察消費者

#消費者消費applog
kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic applog --from-beginning
#啟動flume
bin/flume-ng agent -n a1 -c conf/ -f job/bigdata-applog-kafka.conf

consumer消費到數據


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM