kafka+flink集成实例


来源于:  https://blog.csdn.net/weixin_44575542/article/details/88594773

kafka+flink集成
1.目的
1.1 Flink简介
Apache Flink是一个面向数据流处理和批量数据处理的可分布式的开源计算框架,它基于同一个Flink流式执行模型(streaming execution model),能够支持流处理和批处理两种应用类型。
flink特性

支持批处理和数据流程序处理
优雅流畅的支持java和scala api
同时支持高吞吐量和低延迟
支持事件处理和无序处理通过SataStream API,基于DataFlow数据流模型
在不同的时间语义(时间时间,处理时间)下支持灵活的窗口(时间,技术,会话,自定义触发器)
仅处理一次的容错担保
自动反压机制
图处理(批) 机器学习(批) 复杂事件处理(流)
在dataSet(批处理)API中内置支持迭代程序(BSP)
高效的自定义内存管理,和健壮的切换能力在in-memory和out-of-core中
兼容hadoop的mapreduce和storm


1.2 Flink应用场景
事件驱动的应用程序
数据分析应用
数据管道应用
具体如下:
多种数据源(有时不可靠):当数据是由数以百万计的不同用户或设备产生的,它是安全的假设数据会按照事件产生的顺序到达,和在上游数据失败的情况下,一些事件可能会比他们晚几个小时,迟到的数据也需要计算,这样的结果是准确的。
应用程序状态管理:当程序变得更加的复杂,比简单的过滤或者增强的数据结构,这个时候管理这些应用的状态将会变得比较难(例如:计数器,过去数据的窗口,状态机,内置数据库)。flink提供了工具,这些状态是有效的,容错的,和可控的,所以你不需要自己构建这些功能。
数据的快速处理:有一个焦点在实时或近实时用例场景中,从数据生成的那个时刻,数据就应该是可达的。在必要的时候,flink完全有能力满足这些延迟。
海量数据处理:这些程序需要分布在很多节点运行来支持所需的规模。flink可以在大型的集群中无缝运行,就像是在一个小集群一样。

2.环境
内容 版本号
系统版本 windows10
JDK 1.8.0_201
kafka 2.12-2.1.1
zookeeper 3.4.13
Flink 1.7.2
3.安装启动
环境安装请先参考:《windows下kafka的搭建及配置》
Flink安装:
官方下载地址:https://flink.apache.org/zh/downloads.html#section
解压后安装至F:\bigdata
命令窗口下启动

F:\bigdata\flink-1.7.2\bin\start-cluster.bat
1
打开浏览器输入
http://localhost:8081
安装成功

4.创建Flink工程
本例使用Intellij IDEA作为项目开发的IDE。首先创建Maven project,group为’com.zuoan’,artifact id为‘flink-kafka-sample’,version为‘1.0-SNAPSHOT’。整个项目结构如图所示:


5.添加依赖
POM文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.zuoan</groupId>
<artifactId>flink-kafka-sample</artifactId>
<version>1.0-SNAPSHOT</version>
<description>flink+kafka实例</description>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>

<configuration>
<transformers>

<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!--如果要打包的话,这里要换成对应的 main class-->
<mainClass>com.zuoan.KafkaFlinkStream</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*:*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
<encoding>utf8</encoding>
</configuration>
</plugin>
</plugins>
</build>

</project>

6.代码内容
代码主要由两部分组成:

1、MessageSplitter类、MessageWaterEmitter类和KafkaFlinkStream类:Flink streaming实时处理Kafka消息类
2、KafkaProducerTest类和MemoryUsageExtrator类:构建Kafka测试消息


本例中,Kafka消息格式固定为:时间戳,主机名,当前可用内存数。其中主机名固定设置为machine-1,而时间戳和当前可用内存数都是动态获取。由于本例只会启动一个Kafka producer来模拟单台机器发来的消息,因此在最终的统计结果中只会统计machine-1这一台机器的内存。下面我们先来看flink部分的代码实现。

KafkaFlinkStream类(Flink入口类,封装了对于Kafka消息的处理逻辑。本例每10秒统计一次结果并写入到本地文件)

package com.zuoan;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class KafkaFlinkStream {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // 非常关键,一定要设置启动检查点!!
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "demo");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("auto.offset.reset", "latest"); //value 反序列化

FlinkKafkaConsumer011<String> consumer =new FlinkKafkaConsumer011<String>(
"demo", //kafka topic 这里改成需要的kafka主题
new SimpleStringSchema(), // String 序列化
props);
//设置水位线
consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());

DataStream<Tuple2<String, Long>> keyedStream = env
.addSource(consumer)
.flatMap(new MessageSplitter())
.keyBy(0)
.timeWindow(Time.seconds(10))
//10秒统计数据并做均值计算
.apply(new WindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Tuple2<String, Long>> out) throws Exception {
long sum = 0L;
int count = 0;
for (Tuple2<String, Long> record: input) {
sum += record.f1;
count++;
}
Tuple2<String, Long> result = input.iterator().next();
result.f1 = sum / count;
out.collect(result);
}
});
//key流写入文件 参数一 args[0]
keyedStream.writeAsText(args[0]);
env.execute("Flink-Kafka sample");
}
}

MessageSplitter类(将获取到的每条Kafka消息根据“,”分割取出其中的主机名和内存数信息)

package com.zuoan;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class MessageSplitter implements FlatMapFunction<String, Tuple2<String, Long>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
if (value != null && value.contains(",")) {
String[] parts = value.split(",");
out.collect(new Tuple2<String, Long>(parts[1], Long.parseLong(parts[2])));
}
}
}


MessageWaterEmitter类(根据Kafka消息确定Flink的水位)

package com.zuoan;

import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

import javax.annotation.Nullable;

public class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks<String> {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
if (lastElement != null && lastElement.contains(",")) {
String[] parts = lastElement.split(",");
return new Watermark(Long.parseLong(parts[0]));
}
return null;
}

@Override
public long extractTimestamp(String element, long previousElementTimestamp) {
if (element != null && element.contains(",")) {
String[] parts = element.split(",");
return Long.parseLong(parts[0]);
}
return 0L;
}
}


实现了这些代码之后我们已然可以打包进行部署了,不过在其之前我们先看下Kafka消息生产测试类的实现——该类每1秒发送一条符合上面格式的Kafka消息供下游Flink集群消费。
KafkaProducerTest类(发送Kafka消息)

package com.zuoan;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerTest {

public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<Object, String> producer = new KafkaProducer<Object, String>(props);
int totalMessageCount = 10000;
for (int i = 0; i < totalMessageCount; i++) {
String value = String.format("%d,%s,%d", System.currentTimeMillis(), "machine-1", currentMemSize());
System.out.println("发送数据-->"+value);
producer.send(new ProducerRecord<Object, String>("demo", value), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("Failed to send message with exception " + exception);
}
}
});
Thread.sleep(1000);
}
producer.close();
}

private static long currentMemSize() {
return MemoryUsageExtrator.currentFreeMemorySizeInBytes();
}
}


MemoryUsageExtrator类(很简单的工具类,提取当前可用内存字节数)

package com.zuoan;

import com.sun.management.OperatingSystemMXBean;
import java.lang.management.ManagementFactory;

public class MemoryUsageExtrator {
private static OperatingSystemMXBean mxBean =
(OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();

/**
* Get current free memory size in bytes
* @return free RAM size
*/
public static long currentFreeMemorySizeInBytes() {
OperatingSystemMXBean osmxb = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
return osmxb.getFreePhysicalMemorySize();
}
}


7.部署jar包
执行打包成jar包

E:\gitsource\flink-kafka-sample>mvn clean package

生成的jar包在项目目录下
E:\gitsource\flink-kafka-sample\target\flink-kafka-sample-1.0-SNAPSHOT.jar
部署jar包:

./bin\flink.bat run -c com.zuoan.KafkaFlinkStream E:\gitsource\flink-kafka-sample\target\flink-kafka-sample-1.0-SNAPSHOT.jar test F:\result.txt

每次运行后,如果重新启动后需要删除日志文件,要不然报错无法正常启动

java.io.IOException: File or directory F:/result.txt already exists. Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.
at org.apache.flink.core.fs.FileSystem.initOutPathLocalFS(FileSystem.java:773)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.initOutPathLocalFS(SafetyNetWrapperFileSystem.java:137)
at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:227)
at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:64)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Unknown Source)


8.运行测试
在IDE界面运行Kafka producer,给Flink job创建输入数据,然后启动一个终端,监控输出文件的变化,
打开F:\result.txt可以看到,Flink每隔10s就会保存一条新的统计记录到result.txt文件中,该记录会统计主机名为machine-1的机器在过去10s的平均可用内存字节数。

9.总结
本文给出了一个可运行的Flink + Kafka的项目配置及代码实现。在具体的使用场景中可以根据实际情况调整。


 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM