來源於: https://blog.csdn.net/weixin_44575542/article/details/88594773
kafka+flink集成
1.目的
1.1 Flink簡介
Apache Flink是一個面向數據流處理和批量數據處理的可分布式的開源計算框架,它基於同一個Flink流式執行模型(streaming execution model),能夠支持流處理和批處理兩種應用類型。
flink特性
支持批處理和數據流程序處理
優雅流暢的支持java和scala api
同時支持高吞吐量和低延遲
支持事件處理和無序處理通過SataStream API,基於DataFlow數據流模型
在不同的時間語義(時間時間,處理時間)下支持靈活的窗口(時間,技術,會話,自定義觸發器)
僅處理一次的容錯擔保
自動反壓機制
圖處理(批) 機器學習(批) 復雜事件處理(流)
在dataSet(批處理)API中內置支持迭代程序(BSP)
高效的自定義內存管理,和健壯的切換能力在in-memory和out-of-core中
兼容hadoop的mapreduce和storm
1.2 Flink應用場景
事件驅動的應用程序
數據分析應用
數據管道應用
具體如下:
多種數據源(有時不可靠):當數據是由數以百萬計的不同用戶或設備產生的,它是安全的假設數據會按照事件產生的順序到達,和在上游數據失敗的情況下,一些事件可能會比他們晚幾個小時,遲到的數據也需要計算,這樣的結果是准確的。
應用程序狀態管理:當程序變得更加的復雜,比簡單的過濾或者增強的數據結構,這個時候管理這些應用的狀態將會變得比較難(例如:計數器,過去數據的窗口,狀態機,內置數據庫)。flink提供了工具,這些狀態是有效的,容錯的,和可控的,所以你不需要自己構建這些功能。
數據的快速處理:有一個焦點在實時或近實時用例場景中,從數據生成的那個時刻,數據就應該是可達的。在必要的時候,flink完全有能力滿足這些延遲。
海量數據處理:這些程序需要分布在很多節點運行來支持所需的規模。flink可以在大型的集群中無縫運行,就像是在一個小集群一樣。
2.環境
內容 版本號
系統版本 windows10
JDK 1.8.0_201
kafka 2.12-2.1.1
zookeeper 3.4.13
Flink 1.7.2
3.安裝啟動
環境安裝請先參考:《windows下kafka的搭建及配置》
Flink安裝:
官方下載地址:https://flink.apache.org/zh/downloads.html#section
解壓后安裝至F:\bigdata
命令窗口下啟動
F:\bigdata\flink-1.7.2\bin\start-cluster.bat
1
打開瀏覽器輸入
http://localhost:8081
安裝成功
4.創建Flink工程
本例使用Intellij IDEA作為項目開發的IDE。首先創建Maven project,group為’com.zuoan’,artifact id為‘flink-kafka-sample’,version為‘1.0-SNAPSHOT’。整個項目結構如圖所示:
5.添加依賴
POM文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zuoan</groupId>
<artifactId>flink-kafka-sample</artifactId>
<version>1.0-SNAPSHOT</version>
<description>flink+kafka實例</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!--如果要打包的話,這里要換成對應的 main class-->
<mainClass>com.zuoan.KafkaFlinkStream</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*:*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
<encoding>utf8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
6.代碼內容
代碼主要由兩部分組成:
1、MessageSplitter類、MessageWaterEmitter類和KafkaFlinkStream類:Flink streaming實時處理Kafka消息類
2、KafkaProducerTest類和MemoryUsageExtrator類:構建Kafka測試消息
本例中,Kafka消息格式固定為:時間戳,主機名,當前可用內存數。其中主機名固定設置為machine-1,而時間戳和當前可用內存數都是動態獲取。由於本例只會啟動一個Kafka producer來模擬單台機器發來的消息,因此在最終的統計結果中只會統計machine-1這一台機器的內存。下面我們先來看flink部分的代碼實現。
KafkaFlinkStream類(Flink入口類,封裝了對於Kafka消息的處理邏輯。本例每10秒統計一次結果並寫入到本地文件)
package com.zuoan;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import java.util.Properties;
public class KafkaFlinkStream {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // 非常關鍵,一定要設置啟動檢查點!!
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "demo");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("auto.offset.reset", "latest"); //value 反序列化
FlinkKafkaConsumer011<String> consumer =new FlinkKafkaConsumer011<String>(
"demo", //kafka topic 這里改成需要的kafka主題
new SimpleStringSchema(), // String 序列化
props);
//設置水位線
consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());
DataStream<Tuple2<String, Long>> keyedStream = env
.addSource(consumer)
.flatMap(new MessageSplitter())
.keyBy(0)
.timeWindow(Time.seconds(10))
//10秒統計數據並做均值計算
.apply(new WindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Tuple2<String, Long>> out) throws Exception {
long sum = 0L;
int count = 0;
for (Tuple2<String, Long> record: input) {
sum += record.f1;
count++;
}
Tuple2<String, Long> result = input.iterator().next();
result.f1 = sum / count;
out.collect(result);
}
});
//key流寫入文件 參數一 args[0]
keyedStream.writeAsText(args[0]);
env.execute("Flink-Kafka sample");
}
}
MessageSplitter類(將獲取到的每條Kafka消息根據“,”分割取出其中的主機名和內存數信息)
package com.zuoan;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class MessageSplitter implements FlatMapFunction<String, Tuple2<String, Long>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
if (value != null && value.contains(",")) {
String[] parts = value.split(",");
out.collect(new Tuple2<String, Long>(parts[1], Long.parseLong(parts[2])));
}
}
}
MessageWaterEmitter類(根據Kafka消息確定Flink的水位)
package com.zuoan;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;
public class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks<String> {
@Nullable
@Override
public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
if (lastElement != null && lastElement.contains(",")) {
String[] parts = lastElement.split(",");
return new Watermark(Long.parseLong(parts[0]));
}
return null;
}
@Override
public long extractTimestamp(String element, long previousElementTimestamp) {
if (element != null && element.contains(",")) {
String[] parts = element.split(",");
return Long.parseLong(parts[0]);
}
return 0L;
}
}
實現了這些代碼之后我們已然可以打包進行部署了,不過在其之前我們先看下Kafka消息生產測試類的實現——該類每1秒發送一條符合上面格式的Kafka消息供下游Flink集群消費。
KafkaProducerTest類(發送Kafka消息)
package com.zuoan;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerTest {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<Object, String> producer = new KafkaProducer<Object, String>(props);
int totalMessageCount = 10000;
for (int i = 0; i < totalMessageCount; i++) {
String value = String.format("%d,%s,%d", System.currentTimeMillis(), "machine-1", currentMemSize());
System.out.println("發送數據-->"+value);
producer.send(new ProducerRecord<Object, String>("demo", value), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.out.println("Failed to send message with exception " + exception);
}
}
});
Thread.sleep(1000);
}
producer.close();
}
private static long currentMemSize() {
return MemoryUsageExtrator.currentFreeMemorySizeInBytes();
}
}
MemoryUsageExtrator類(很簡單的工具類,提取當前可用內存字節數)
package com.zuoan;
import com.sun.management.OperatingSystemMXBean;
import java.lang.management.ManagementFactory;
public class MemoryUsageExtrator {
private static OperatingSystemMXBean mxBean =
(OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
/**
* Get current free memory size in bytes
* @return free RAM size
*/
public static long currentFreeMemorySizeInBytes() {
OperatingSystemMXBean osmxb = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
return osmxb.getFreePhysicalMemorySize();
}
}
7.部署jar包
執行打包成jar包
E:\gitsource\flink-kafka-sample>mvn clean package
生成的jar包在項目目錄下
E:\gitsource\flink-kafka-sample\target\flink-kafka-sample-1.0-SNAPSHOT.jar
部署jar包:
./bin\flink.bat run -c com.zuoan.KafkaFlinkStream E:\gitsource\flink-kafka-sample\target\flink-kafka-sample-1.0-SNAPSHOT.jar test F:\result.txt
每次運行后,如果重新啟動后需要刪除日志文件,要不然報錯無法正常啟動
java.io.IOException: File or directory F:/result.txt already exists. Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.
at org.apache.flink.core.fs.FileSystem.initOutPathLocalFS(FileSystem.java:773)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.initOutPathLocalFS(SafetyNetWrapperFileSystem.java:137)
at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:227)
at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88)
at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:64)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Unknown Source)
8.運行測試
在IDE界面運行Kafka producer,給Flink job創建輸入數據,然后啟動一個終端,監控輸出文件的變化,
打開F:\result.txt可以看到,Flink每隔10s就會保存一條新的統計記錄到result.txt文件中,該記錄會統計主機名為machine-1的機器在過去10s的平均可用內存字節數。
9.總結
本文給出了一個可運行的Flink + Kafka的項目配置及代碼實現。在具體的使用場景中可以根據實際情況調整。