精通flink讀書筆記(2)——用DataStreamAPI處理數據


來源於 https://blog.csdn.net/yyqq188/article/details/79527017

 

Flink提供了Kafka連接器,用於從或向Kafka讀寫數據。

本文總結Flink與Kafka集成中的問題,並對一些疑點進行總結和梳理。

問題一: 讀Kafka的方式
## 讀取一個Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(String topic, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props)
舉例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>("userActionLog1", new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");

## 讀取多個Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props)
舉例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(Arrays.asList("userActionLog1","userActionLog2","userActionLog3"), new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");

# 讀取多個Topic
FlinkKafkaConsumer010#FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props)
FlinkKafkaConsumer010#FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props)
舉例:
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(Pattern.compile("userActionLog[1-9]{1}"), new SimpleStringSchema(), kafkaProperties);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");

# 從指定的時間戳開始消費
kafkaConsumer.setStartFromTimestamp(long startupOffsetsTimestamp)

# 從指定的偏移量開始消費,可為每個分區單獨設置偏移量
kafkaConsumer.setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)


問題二: 讀Kafka與反序列化器
可通過org.apache.flink.api.common.serialization.DeserializationSchema或org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema,將從Kafka讀取的二進制字節流反序列化成Flink內部支持的Java/Scala對象。

Flink內置支持以下2種常用反序列化器:

org.apache.flink.api.common.serialization.SimpleStringSchema:反序列化成String。

org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema:反序列化成jackson ObjectNode。

如果想實現Kafka復雜JSON直接轉換成想要的Object,可仿照org.apache.flink.api.common.serialization.SimpleStringSchema自定義即可。主要實現deserialize反序列化方法。

問題三: 讀Kafka動態發現Topic、Partition
之前使用Spark Streaming,Spark 2.2.2不支持動態發現Kafka 0.10.1中新增的Topic(基於正則指定)和Partition。當新增了Topic或Partition,需要重啟Spark Streaming任務。

在Flink中, 默認支持動態發現Kafka中新增的Topic或Partition,但需要手動開啟。

kafkaProperties.put("flink.partition-discovery.interval-millis","10000");

flink.partition-discovery.interval-millis: 檢查間隔,單位毫秒。

問題四: 讀Kafka與Exactly Once語義
沒有開啟Checkpoint,默認自動提交Offset至外部存儲(如Kafka Broker或Zookeeper),自動提交的間隔是5秒。Flink Kafka Consumer的容錯依賴於自動提交的Offset。

開啟Checkpoint,默認在Checkpoint完成后將存儲在Checkpoint中的Offset再提交至外部存儲(如Kafka Broker或0.8版本中的Zookeeper),Flink Kafka Consumer在Flink作業運行過程中的容錯依賴於Checkpoint中的Offset,Flink作業恢復,則可能是從Checkpoint中的Offset恢復,也可能是從外部存儲如Kafka Broker中的Offset恢復,具體取決於恢復方式。注意: 在這種方式下,Kafka Broker(或0.8中Zookeeper)存儲的Offset僅用於監控消費進度。

總結,基於Kafka可重復消費的能力並結合Flink Checkpoint機制,Flink Kafka Consumer能提供Exactly-Once語義。

問題五: 寫Kafka與Exactly Once語義
Kafka 0.8 Flink不提供Exactly-Once或At-Least-Once語義。

Kafka 0.9、0.10 Flink啟用Checkpoint,FlinkKafkaProducer09和FlinkKafkaProducer010提供At-Least-Once語義。除此之外,還需設置以下參數:

setLogFailuresOnly(false): 若為true,Producer遇到異常時,僅記錄失敗時的日志,流處理程序繼續。需要設置為false,當遇到異常,流處理程序失敗,拋出異常恢復任務並重試發送。

setFlushOnCheckpoint(true): Checkpoint中包含Kafka Producer Buffer中的數據,設置為true, 確保Checkpoint成功前,Buffer中的所有記錄都已寫入Kafka。

retries: 重試次數,默認0,建議設置更大。

Kafka 0.11、1.0.0+ Flink啟用Checkpoint,基於Two Phase Commit,FlinkKafkaProducer011和FlinkKafkaProducer(Kafka >=1.0.0) 默認提供Exactly-Once語義。
如需要其他語義Semantic.NONE(可能會丟或重)、Semantic.AT_LEAST_ONCE(可能會重)、Semantic.EXACTLY_ONCE(默認),可手動選擇。

從Kafka 0.10.1讀數據並寫入到Kafka 0.11.0.3並實現PV統計
部分依賴
<!--Kafka連接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.8.0</version>
</dependency>

<!--Kafka連接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.8.0</version>
</dependency>

代碼實現
package com.bigdata.flink;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;

import java.text.Format;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

/**
* Author: Wang Pei
* Summary: 讀寫Kafka
*/
public class ReadWriteKafka {
public static void main(String[] args) throws Exception{

/**解析命令行參數*/
ParameterTool fromArgs = ParameterTool.fromArgs(args);
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(fromArgs.getRequired("applicationProperties"));

//checkpoint參數
String checkpointDirectory = parameterTool.getRequired("checkpointDirectory");
long checkpointSecondInterval = parameterTool.getLong("checkpointSecondInterval");

//fromKafka參數
String fromKafkaBootstrapServers = parameterTool.getRequired("fromKafka.bootstrap.servers");
String fromKafkaGroupID = parameterTool.getRequired("fromKafka.group.id");
String fromKafkaAutoOffsetReset= parameterTool.getRequired("fromKafka.auto.offset.reset");
String fromKafkaTopic = parameterTool.getRequired("fromKafka.topic");

//toKafka參數
String toKafkaBootstrapServers = parameterTool.getRequired("toKafka.bootstrap.servers");
String toKafkaTopic = parameterTool.getRequired("toKafka.topic");

//窗口參數
long tumblingWindowLength = parameterTool.getLong("tumblingWindowLength");
long outOfOrdernessSeconds = parameterTool.getLong("outOfOrdernessSeconds");


/**配置運行環境*/

//設置Local Web Server
Configuration config = new Configuration();
config.setInteger(RestOptions.PORT,8081);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

//設置StateBackend
env.setStateBackend((StateBackend) new FsStateBackend(checkpointDirectory, true));

//設置Checkpoint
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointInterval(checkpointSecondInterval * 1000);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

/**配置數據源*/
Properties kafkaProperties = new Properties();
kafkaProperties.put("bootstrap.servers",fromKafkaBootstrapServers);
kafkaProperties.put("group.id",fromKafkaGroupID);
kafkaProperties.put("auto.offset.reset",fromKafkaAutoOffsetReset);
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(fromKafkaTopic, new SimpleStringSchema(), kafkaProperties);
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
DataStream<String> source = env.addSource(kafkaConsumer).name("KafkaSource").uid("source-id");

/**抽取轉換*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceMap = source
.map((MapFunction<String, Tuple4<String, String, String, Integer>>) value -> {
Tuple4<String, String, String, Integer> output = new Tuple4<>();
try {
JSONObject obj = JSON.parseObject(value);
output.f0 = obj.getString("userID");
output.f1 = obj.getString("eventTime");
output.f2 = obj.getString("eventType");
output.f3 = obj.getInteger("productID");
} catch (Exception e) {
e.printStackTrace();
}
return output;
}).returns(new TypeHint<Tuple4<String, String, String, Integer>>(){})
.name("Map: ExtractTransform").uid("map-id");

/**過濾掉異常數據*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> sourceFilter = sourceMap
.filter((FilterFunction<Tuple4<String, String, String, Integer>>) value -> value != null)
.name("Filter: FilterExceptionData").uid("filter-id");


/**抽取時間戳並發射水印*/
SingleOutputStreamOperator<Tuple4<String, String, String, Integer>> assignTimestampsAndWatermarks = sourceFilter.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, String, String, Integer>>(Time.seconds(outOfOrdernessSeconds)) {

SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public long extractTimestamp(Tuple4<String, String, String, Integer> element) {

long timestamp = 0L;

try {
Date date = format.parse(element.f1);
timestamp = date.getTime();
} catch (ParseException e) {
e.printStackTrace();
}

return timestamp;
}
}).uid("watermark-id");


/**窗口統計*/
SingleOutputStreamOperator<String> aggregate = assignTimestampsAndWatermarks
//默認用Hash方式
.keyBy((KeySelector<Tuple4<String, String, String, Integer>, String>) value -> value.f2)
.window(TumblingEventTimeWindows.of(Time.seconds(tumblingWindowLength)))
//在每個窗口(Window)上應用WindowFunction(CustomWindowFunction)
//CustomAggFunction用於增量聚合
//在每個窗口上,先進行增量聚合(CustomAggFunction),然后將增量聚合的結果作為WindowFunction(CustomWindowFunction)的輸入,計算后並輸出
//具體: 可參考底層AggregateApplyWindowFunction的實現
.aggregate(new CustomAggFunction(), new CustomWindowFunction());

//aggregate.print();

/**結果輸出*/
Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty("bootstrap.servers",toKafkaBootstrapServers);
kafkaProducerProperties.setProperty("transaction.timeout.ms",60000+"");
FlinkKafkaProducer011<String> kafkaProducer011 = new FlinkKafkaProducer011<>(
toKafkaTopic,
new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
kafkaProducerProperties,
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE
);

aggregate.addSink(kafkaProducer011).name("outputToKafka");

env.execute();

}

/**
* 自定義AggregateFunction
* 增量聚合,這里實現累加效果
*/
static class CustomAggFunction implements AggregateFunction<Tuple4<String, String, String, Integer>,Long,Long> {

@Override
public Long createAccumulator() {
return 0L;
}

@Override
public Long add(Tuple4<String, String, String, Integer> value, Long accumulator) {
return accumulator + 1;
}

@Override
public Long getResult(Long accumulator) {
return accumulator;
}

@Override
public Long merge(Long accumulator1, Long accumulator2) {
return accumulator1 + accumulator2;
}
}

/**
* 自定義WindowFunction
* 對增量聚合的結果再做處理,並輸出
*/
static class CustomWindowFunction implements WindowFunction<Long, String,String, TimeWindow> {

@Override
public void apply(String key, TimeWindow window, Iterable<Long> input, Collector<String> out) throws Exception {

Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

long windowStart = window.getStart();
long windowEnd = window.getEnd();
Long windowPV = input.iterator().next();

String output=format.format(new Date(windowStart))+","+format.format(new Date(windowEnd))+","+key+","+windowPV;

out.collect(output);

}
}

}

 

————————————————
版權聲明:本文為CSDN博主「wangpei1949」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/wangpei1949/article/details/98476354


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM