1.Flink中exactly once實現原理分析
生產者從kafka拉取數據以及消費者往kafka寫數據都需要保證exactly once。目前flink中支持exactly once的source不多,有kafka source;能實現exactly once的sink也不多,如kafka sink、streamingFileSink,其都要開啟checkpoint才能實現exactly once。接下來以FlinkKafkaProducer為例,深入研究其源代碼,從而理解flink中的exactly once(精准一次性語義)是怎么實現的。
1.1 大致流程圖(也叫分兩階段提交原理)
1. JobManager定期(通過CheckpointCodinator)向各個包含state的subTask發起checkpoint的請求
2. subTask將各自的state寫入到相應的statebackend,一個資源槽對應一個文件,其中各個subTask的state寫入這個文件中
3. 各個subTask向JobManager發送checkpoint成功的消息
4. 當所有subTask都發送了checkpoint成功的消息后,jobManager會向所有實現了checkpoint的subTask發送成功的消息
5. subTask往kafka寫數據,並且向Kafka提交事務()
注意:為了保證一個流水線(pipeline)上的operrator state和keyedstate數據一致,flink引入了barrier機制,即在jobmanager和taskManager間設置一個barrier,相當於節流,保證在checkpoint時,source不能在讀取數據
問題:kafka涉及到生產者往里面寫數據一個事務,以及消費者讀取數據一個事務,這兩個事物間有什么聯系?
1.2 源碼解析
(1)首先看FlinkKafkaProducer類,可以發現其繼承了TwoPhaseCommitSinkFunction
(2)TwoPhaseCommitSinkFunction是所有要實現一次性語義的SinkFunction的一個比較推薦的基類,其實現了兩個重要的接口,分別為:CheckpointedFunction, CheckpointListener
- CheckpointedFunction接口
此接口中包含兩個方法,分別為snapshotState方法、initializeState方法,源代碼如下

public interface CheckpointedFunction { /** * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself. * * @param context the context for drawing a snapshot of the operator * @throws Exception */ void snapshotState(FunctionSnapshotContext context) throws Exception; /** * This method is called when the parallel function instance is created during distributed * execution. Functions typically set up their state storing data structures in this method. * * @param context the context for initializing the operator * @throws Exception */ void initializeState(FunctionInitializationContext context) throws Exception; }
其中snapshotState方法是用checkpoint時,拍快照,其能將state持久化到statebackend。這里面存了一些transactionID、subTask編號、以及kafka的相關信息(用來寫數據)。若是checkpoint成功了,但是subTask並沒有成功將數據寫入kafka,則會通過這個方法恢復原先最近的state進行恢復,然后繼續
initializeState方法可以用來恢復state,解釋可能以前將state持久化到了statebackend,但並沒有將數據成功寫入kafka,則可以ton過這個方法恢復最近的state,然后將數據繼續往kafka寫數據。
- CheckpointListener接口
此接口中包含一個notifyCheckpointComplete方法
源碼如下

/** * This interface must be implemented by functions/operations that want to receive * a commit notification once a checkpoint has been completely acknowledged by all * participants. */ @PublicEvolving public interface CheckpointListener { /** * This method is called as a notification once a distributed checkpoint has been completed. * * Note that any exception during this method will not cause the checkpoint to * fail any more. * * @param checkpointId The ID of the checkpoint that has been completed. * @throws Exception */ void notifyCheckpointComplete(long checkpointId) throws Exception; }
notifyCheckpointComplete方法什么時候被調用呢?所有分區的subTask向JobManager相應checkpoint后才會被調用,即告知各個subTask,這次checkpoint成功了,可以進行下一步的操作了,該方法源碼如下:

@Override public final void notifyCheckpointComplete(long checkpointId) throws Exception { // the following scenarios are possible here // // (1) there is exactly one transaction from the latest checkpoint that // was triggered and completed. That should be the common case. // Simply commit that transaction in that case. // // (2) there are multiple pending transactions because one previous // checkpoint was skipped. That is a rare case, but can happen // for example when: // // - the master cannot persist the metadata of the last // checkpoint (temporary outage in the storage system) but // could persist a successive checkpoint (the one notified here) // // - other tasks could not persist their status during // the previous checkpoint, but did not trigger a failure because they // could hold onto their state and could successfully persist it in // a successive checkpoint (the one notified here) // // In both cases, the prior checkpoint never reach a committed state, but // this checkpoint is always expected to subsume the prior one and cover all // changes since the last successful one. As a consequence, we need to commit // all pending transactions. // // (3) Multiple transactions are pending, but the checkpoint complete notification // relates not to the latest. That is possible, because notification messages // can be delayed (in an extreme case till arrive after a succeeding checkpoint // was triggered) and because there can be concurrent overlapping checkpoints // (a new one is started before the previous fully finished). // // ==> There should never be a case where we have no pending transaction here // Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator(); Throwable firstError = null; while (pendingTransactionIterator.hasNext()) { Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next(); Long pendingTransactionCheckpointId = entry.getKey(); TransactionHolder<TXN> pendingTransaction = entry.getValue(); if (pendingTransactionCheckpointId > checkpointId) { continue; } LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}", name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId); logWarningIfTimeoutAlmostReached(pendingTransaction); try { commit(pendingTransaction.handle); } catch (Throwable t) { if (firstError == null) { firstError = t; } } LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction); pendingTransactionIterator.remove(); } if (firstError != null) { throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure", firstError); } }
注意,該方法除了提醒個subTask此次checkpoint成功了外,還會提交事務,具體見源碼如下(為該方法源碼的一部分):
FlinkKafkaProducer中的commit方法

@Override protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) { if (transaction.isTransactional()) { try { transaction.producer.commitTransaction(); } finally { recycleTransactionalProducer(transaction.producer); } } }
若是事務提交失敗后,該怎么辦呢?沒關系,事務提交失敗后,會根據重啟策略重啟,並調用initializeState方法恢復先前最近的一個state,繼續往kafka寫數據,提交事務,再次提交事務時,就不是調用commit方法了,而是調用FlinkKafkaProducer中的recoverAndCommit方法(這塊也可能是preCommit方法,自己還沒完全看懂源碼),先恢復數據再commit事務,源碼如下

@Override protected void recoverAndCommit(FlinkKafkaProducer.KafkaTransactionState transaction) { if (transaction.isTransactional()) { try ( FlinkKafkaInternalProducer<byte[], byte[]> producer = initTransactionalProducer(transaction.transactionalId, false)) { producer.resumeTransaction(transaction.producerId, transaction.epoch); producer.commitTransaction(); } catch (InvalidTxnStateException | ProducerFencedException ex) { // That means we have committed this transaction before. LOG.warn("Encountered error {} while recovering transaction {}. " + "Presumably this transaction has been already committed before", ex, transaction); } } }
注意:這里可以保證checkpoint成功,以及事務提交成功,但是沒法保證它倆在一起同時成功。但這也沒關系,就算checkpoint成功了,事務沒成功也沒關系。事務沒成功會回滾,它會從statebackend中恢復數據,然后再向kafka中寫數據,提交事務。
2 自定義兩階段提交sink實例
自定義兩階段提交sink,其面向的存儲系統一定要支持事務,比如mysq,0.11版以后的kafka。簡單來說,自定義兩階段提交sink就是繼承TwoPhaseCommitSinkFunction類,然后重寫里面的方法,具體見下面的例子
MySQL分兩階段提交的Sink
druid連接池

package cn._51doit.flink.day11; import com.alibaba.druid.pool.DruidDataSourceFactory; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.util.Properties; public class DruidConnectionPool { private transient static DataSource dataSource = null; private transient static Properties props = new Properties(); static { props.put("driverClassName", "com.mysql.jdbc.Driver"); props.put("url", "jdbc:mysql://172.16.200.101:3306/bigdata?characterEncoding=UTF-8"); props.put("username", "root"); props.put("password", "123456"); try { dataSource = DruidDataSourceFactory.createDataSource(props); } catch (Exception e) { e.printStackTrace(); } } private DruidConnectionPool() { } public static Connection getConnection() throws SQLException { return dataSource.getConnection(); } }
MySqlTwoPhaseCommitSinkFunction

package cn._51doit.flink.day11; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<Tuple2<String, Integer>, MySqlTwoPhaseCommitSink.ConnectionState, Void> { public MySqlTwoPhaseCommitSink() { super(new KryoSerializer<>(MySqlTwoPhaseCommitSink.ConnectionState.class, new ExecutionConfig()), VoidSerializer.INSTANCE); } @Override protected MySqlTwoPhaseCommitSink.ConnectionState beginTransaction() throws Exception { System.out.println("=====> beginTransaction... "); //Class.forName("com.mysql.jdbc.Driver"); //Connection conn = DriverManager.getConnection("jdbc:mysql://172.16.200.101:3306/bigdata?characterEncoding=UTF-8", "root", "123456"); Connection connection = DruidConnectionPool.getConnection(); connection.setAutoCommit(false); return new ConnectionState(connection); } @Override protected void invoke(MySqlTwoPhaseCommitSink.ConnectionState connectionState, Tuple2<String, Integer> value, Context context) throws Exception { Connection connection = connectionState.connection; PreparedStatement pstm = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = ?"); pstm.setString(1, value.f0); pstm.setInt(2, value.f1); pstm.setInt(3, value.f1); pstm.executeUpdate(); pstm.close(); } @Override protected void preCommit(MySqlTwoPhaseCommitSink.ConnectionState connectionState) throws Exception { System.out.println("=====> preCommit... " + connectionState); } @Override protected void commit(MySqlTwoPhaseCommitSink.ConnectionState connectionState) { System.out.println("=====> commit... "); Connection connection = connectionState.connection; try { connection.commit(); connection.close(); } catch (SQLException e) { throw new RuntimeException("提交事物異常"); } } @Override protected void abort(MySqlTwoPhaseCommitSink.ConnectionState connectionState) { System.out.println("=====> abort... "); Connection connection = connectionState.connection; try { connection.rollback(); connection.close(); } catch (SQLException e) { throw new RuntimeException("回滾事物異常"); } } static class ConnectionState { private final transient Connection connection; ConnectionState(Connection connection) { this.connection = connection; } } }
3 將數據寫入Hbase
使用hbase的冪等性結合at least Once(flink中state能恢復,在兩次checkpoint間可能會有重復讀取數據的情況)實現精確一次性語義
HBaseUtil

package cn._51doit.flink.day11; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; /** * Hbase的工具類,用來創建Hbase的Connection */ public class HBaseUtil { /** * @param zkQuorum zookeeper地址,多個要用逗號分隔 * @param port zookeeper端口號 * @return */ public static Connection getConnection(String zkQuorum, int port) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", zkQuorum); conf.set("hbase.zookeeper.property.clientPort", port + ""); Connection connection = ConnectionFactory.createConnection(conf); return connection; } }
MyHbaseSink

package cn._51doit.flink.day11; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import java.util.ArrayList; import java.util.List; public class MyHbaseSink extends RichSinkFunction<Tuple2<String, Double>> { private transient Connection connection; private transient Integer maxSize = 1000; private transient Long delayTime = 5000L; private transient Long lastInvokeTime; private transient List<Put> puts = new ArrayList<>(maxSize); public MyHbaseSink() {} public MyHbaseSink(Integer maxSize, Long delayTime) { this.maxSize = maxSize; this.delayTime = delayTime; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ParameterTool params = (ParameterTool) getRuntimeContext() .getExecutionConfig().getGlobalJobParameters(); //創建一個Hbase的連接 connection = HBaseUtil.getConnection( params.getRequired("hbase.zookeeper.quorum"), params.getInt("hbase.zookeeper.property.clientPort", 2181) ); lastInvokeTime = System.currentTimeMillis(); } @Override public void invoke(Tuple2<String, Double> value, Context context) throws Exception { String rk = value.f0; Put put = new Put(rk.getBytes()); put.addColumn("data".getBytes(), "order".getBytes(), value.f1.toString().getBytes()); puts.add(put); //使用ProcessingTime long currentTime = System.currentTimeMillis(); //加到一個集合中 if(puts.size() == maxSize || currentTime - lastInvokeTime >= delayTime) { //獲取一個HbaseTable Table table = connection.getTable(TableName.valueOf("myorder")); table.put(puts); puts.clear(); lastInvokeTime = currentTime; table.close(); } } @Override public void close() throws Exception { connection.close(); } }
4 ProtoBuf
protoBuf是一種序列化機制,數據存儲還是二進制,其特點是序列化、反序列化快,占用空間小(相比json而言,是它的1/3)、跨平台、跨語言。
4.1 protobuf的使用測試
(1)創建一個maven工程
(2)導入pom依賴,具體內容見下

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>protobuf-bean</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> </properties> <dependencies> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.7.1</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java-util</artifactId> <version>3.7.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> <build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.6.2</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> <protocArtifact> com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier} </protocArtifact> <pluginId>grpc-java</pluginId> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
(3)在main目錄下創建一個proto文件夾,在這個文件夾下編輯相應的xxx.proto文件,具體如下

syntax = "proto3";
option java_package = "cn._51doit.proto";
option java_outer_classname = "OrderProto";
message Order {
int32 id = 1;
string time = 2;
double money = 3;
}
(4)在maven的plugins中會有個protobuf插件,點擊里面的protobuf.compile,即可在項目中的target目錄下生成相應的protobuf bean文件(支持多種語言的schema信息)
(5)將得到的proto bean移到自己想要的目錄中即可
此測試就是將json數據轉成protoBuf bean格式數據,然后在將其序列化輸出,以及反序列化至bean輸出
OrderProtoTest

package cn._51doit.test; import cn._51doit.proto.OrderProto; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; public class OrderProtoTest { public static void main(String[] args) throws InvalidProtocolBufferException { String json = "{\"id\": 100, \"time\": \"2020-07-01\", \"money\": 66.66}"; //使用工具類生成一個類 OrderProto.Order.Builder bean = OrderProto.Order.newBuilder(); //將數據拷貝的bean中 JsonFormat.parser().merge(json, bean); bean.setId(666); bean.setTime("2019-10-18"); bean.setMoney(888.88); //序列化轉成二進制 //bean -> byte數組 byte[] bytes = bean.build().toByteArray(); System.out.println("二進制:" + bytes); //反序列化 //二進制數組轉成bean OrderProto.Order order = OrderProto.Order.parseFrom(bytes); System.out.println("對象格式:" + order); } }
4.2 將數據以ProtoBuf的二進制形式發送到Kafka
DataToKafka

package cn._51doit.test; import cn._51doit.proto.DataBeanProto; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class DataToKafka { public static void main(String[] args) { // 1 配置參數 Properties props = new Properties(); //連接kafka節點 props.setProperty("bootstrap.servers", "feng05:9092,feng06:9092,feng07:9092"); props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); String topic = "dataproto"; // 2 kafka的生產者 KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props); DataBeanProto.DataBean.Builder bean = DataBeanProto.DataBean.newBuilder(); DataBeanProto.DataBeans.Builder list = DataBeanProto.DataBeans.newBuilder(); for (int i = 1; i <= 100; i++) { //往bean中設置屬性 bean.setId(i); bean.setTitle("doit-" + i); bean.setUrl("www.51doit.cn"); //將bean追加到list中 list.addDataBean(bean); //清空原來分組的數據 bean.clear(); if(list.getDataBeanCount() == 10) { //將beans的集合轉成protobuf的二進制 byte[] bytes = list.build().toByteArray(); ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, bytes); producer.send(record); //一次發送10條 producer.flush(); list.clear(); } } System.out.println("message send success"); // 釋放資源 producer.close(); } }
4.3 Flume的KafkaChannel整合kafka序列化器
需求:(1)在kafka中定義序列化器,在數據寫入kafka前,將之轉成對應的二進制存入kafka
(2)Flink從Kafka中拉取剛存入相應格式的二進制數據,轉成ProtoBuf的Bean
(1)kafka序列化器的實現
大致思路就是首先獲取一個protoBuf bean,然后定義一個序列化器,實現一個Serializer接口,在里面重寫serialize方法,具體邏輯見下面代碼。將該代碼打包,放到flume的lib文件夾中,注意需要將flume的lib中protobuf-java-2.5.0.jar注釋或者刪除掉。
KafkaProtoBufSerializer

package cn._51doit.test; import cn._51doit.proto.UserProto; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serializer; import java.util.Map; public class KafkaProtoBufSerializer implements Serializer<byte[]> { @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public byte[] serialize(String topic, byte[] data) { // 將source傳給channel的數據轉成ProtoBuf的二進制 //line是一個json String line = new String(data); UserProto.User.Builder bean = UserProto.User.newBuilder(); //使用工具類將JSON的數據的數據set到bean中 try { JsonFormat.parser().merge(line, bean); } catch (InvalidProtocolBufferException e) { return null; } return bean.build().toByteArray(); //返回的是ProtoBuf的二進制 } @Override public byte[] serialize(String topic, Headers headers, byte[] data) { return new byte[0]; } @Override public void close() { } }
(2)Flink的Kafka反序列化器的實現
注意,此處除了要設置反序列化,即將kafka中確定topic中的protoBuf格式的二進制數據序列化成protoBuf的bean,還要指定bean的序列化規則(注冊自定義的序列化類),這樣flink處理該數據時才能進行網絡傳輸
DataBeanProto(bean,跨語言)
使用4.1方法生成
DataBeansDeserializer反序列化器

package cn._51doit.flink.day11; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import java.io.IOException; /** * 自定義的Flink反序列化器 */ public class DataBeansDeserializer implements DeserializationSchema<DataBeanProto.DataBeans> { //反序列化 @Override public DataBeanProto.DataBeans deserialize(byte[] message) throws IOException { return DataBeanProto.DataBeans.parseFrom(message); } @Override public boolean isEndOfStream(DataBeanProto.DataBeans nextElement) { return false; } @Override public TypeInformation<DataBeanProto.DataBeans> getProducedType() { return TypeInformation.of(DataBeanProto.DataBeans.class); } }
PBSerializer序列化器

package cn._51doit.flink.day11; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.google.protobuf.Message; import java.lang.reflect.Method; import java.util.HashMap; public class PBSerializer extends Serializer<Message> { /* This cache never clears, but only scales like the number of * classes in play, which should not be very large. * We can replace with a LRU if we start to see any issues. */ final protected HashMap<Class, Method> methodCache = new HashMap<Class, Method>(); /** * This is slow, so we should cache to avoid killing perf: * See: http://www.jguru.com/faq/view.jsp?EID=246569 */ protected Method getParse(Class cls) throws Exception { Method meth = methodCache.get(cls); if (null == meth) { meth = cls.getMethod("parseFrom", new Class[]{ byte[].class }); methodCache.put(cls, meth); } return meth; } //序列化 @Override public void write(Kryo kryo, Output output, Message mes) { byte[] ser = mes.toByteArray(); output.writeInt(ser.length, true); output.writeBytes(ser); } //反序列化 @Override public Message read(Kryo kryo, Input input, Class<Message> pbClass) { try { int size = input.readInt(true); byte[] barr = new byte[size]; input.readBytes(barr); return (Message)getParse(pbClass).invoke(null, barr); } catch (Exception e) { throw new RuntimeException("Could not create " + pbClass, e); } } }
測試類
ProtoBufDemo

package cn._51doit.flink.day11; import cn._51doit.flink.day10.FlinkUtilsV2; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.util.Collector; public class ProtoBufDemo { public static void main(String[] args) throws Exception{ ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]); DataStream<DataBeanProto.DataBeans> dataBeansStream = FlinkUtilsV2.createKafkaDataStream(parameters, "dataproto", "gid", DataBeansDeserializer.class); //注冊自定義的序列化類 FlinkUtilsV2.getEnv().getConfig().registerTypeWithKryoSerializer(DataBeanProto.DataBeans.class, PBSerializer.class); FlinkUtilsV2.getEnv().getConfig().registerTypeWithKryoSerializer(DataBeanProto.DataBean.class, PBSerializer.class); SingleOutputStreamOperator<DataBeanProto.DataBean> dataBeanStream = dataBeansStream.flatMap( new FlatMapFunction<DataBeanProto.DataBeans, DataBeanProto.DataBean>() { @Override public void flatMap(DataBeanProto.DataBeans list, Collector<DataBeanProto.DataBean> out) throws Exception { for (DataBeanProto.DataBean dataBean : list.getDataBeanList()) { out.collect(dataBean); } } }); dataBeanStream.print(); FlinkUtilsV2.getEnv().execute(); } }