來源於 https://blog.csdn.net/lukabruce/article/details/100737292
背景
最近項目中使用Flink消費kafka消息,並將消費的消息存儲到mysql中,看似一個很簡單的需求,在網上也有很多flink消費kafka的例子,但看了一圈也沒看到能解決重復消費的問題的文章,於是在flink官網中搜索此類場景的處理方式,發現官網也沒有實現flink到mysql的Exactly-Once例子,但是官網卻有類似的例子來解決端到端的僅一次消費問題。這個現成的例子就是FlinkKafkaProducer011這個類,它保證了通過FlinkKafkaProducer011發送到kafka的消息是Exactly-Once的,主要的實現方式就是繼承了TwoPhaseCommitSinkFunction這個類,關於TwoPhaseCommitSinkFunction這個類的作用可以先看上一篇文章:https://blog.51cto.com/simplelife/2401411。
實現思想
這里簡單說下這個類的作用就是實現這個類的方法:beginTransaction、preCommit、commit、abort,達到事件(preCommit)預提交的邏輯(當事件進行自己的邏輯處理后進行預提交,如果預提交成功之后才進行真正的(commit)提交,如果預提交失敗則調用abort方法進行事件的回滾操作),結合flink的checkpoint機制,來保存topic中partition的offset。
達到的效果我舉個例子來說明下:比如checkpoint每10s進行一次,此時用FlinkKafkaConsumer011實時消費kafka中的消息,消費並處理完消息后,進行一次預提交數據庫的操作,如果預提交沒有問題,10s后進行真正的插入數據庫操作,如果插入成功,進行一次checkpoint,flink會自動記錄消費的offset,可以將checkpoint保存的數據放到hdfs中,如果預提交出錯,比如在5s的時候出錯了,此時Flink程序就會進入不斷的重啟中,重啟的策略可以在配置中設置,當然下一次的checkpoint也不會做了,checkpoint記錄的還是上一次成功消費的offset,本次消費的數據因為在checkpoint期間,消費成功,但是預提交過程中失敗了,注意此時數據並沒有真正的執行插入操作,因為預提交(preCommit)失敗,提交(commit)過程也不會發生了。等你將異常數據處理完成之后,再重新啟動這個Flink程序,它會自動從上一次成功的checkpoint中繼續消費數據,以此來達到Kafka到Mysql的Exactly-Once。
具體實現代碼三個類
- StreamDemoKafka2Mysql.java
-
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-
import org.apache.flink.streaming.api.CheckpointingMode;
-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
-
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
-
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
-
import org.apache.kafka.clients.consumer.ConsumerConfig;
-
-
import java.util.Properties;
-
-
/**
-
* Created with IntelliJ IDEA.
-
* User: zzy
-
* Date: 2019/5/28
-
* Time: 8:40 PM
-
* To change this template use File | Settings | File Templates.
-
*
-
* 消費kafka消息,sink(自定義)到mysql中,保證kafka to mysql 的Exactly-Once
-
*/
-
-
-
public class StreamDemoKafka2Mysql {
-
private static final String topic_ExactlyOnce = "mysql-exactly-Once-4";
-
-
public static void main(String[] args) throws Exception {
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-
//設置並行度,為了方便測試,查看消息的順序,這里設置為1,可以更改為多並行度
-
env.setParallelism( 1);
-
//checkpoint的設置
-
//每隔10s進行啟動一個檢查點【設置checkpoint的周期】
-
env.enableCheckpointing( 10000);
-
//設置模式為:exactly_one,僅一次語義
-
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-
//確保檢查點之間有1s的時間間隔【checkpoint最小間隔】
-
env.getCheckpointConfig().setMinPauseBetweenCheckpoints( 1000);
-
//檢查點必須在10s之內完成,或者被丟棄【checkpoint超時時間】
-
env.getCheckpointConfig().setCheckpointTimeout( 10000);
-
//同一時間只允許進行一次檢查點
-
env.getCheckpointConfig().setMaxConcurrentCheckpoints( 1);
-
//表示一旦Flink程序被cancel后,會保留checkpoint數據,以便根據實際需要恢復到指定的checkpoint
-
//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-
//設置statebackend,將檢查點保存在hdfs上面,默認保存在內存中。這里先保存到本地
-
// env.setStateBackend(new FsStateBackend("file:///Users/temp/cp/"));
-
-
//設置kafka消費參數
-
Properties props = new Properties();
-
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "zzy:9092");
-
props.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group2");
-
//kafka分區自動發現周期
-
props.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3000");
-
-
/*SimpleStringSchema可以獲取到kafka消息,JSONKeyValueDeserializationSchema可以獲取都消息的key,value,metadata:topic,partition,offset等信息*/
-
FlinkKafkaConsumer011<ObjectNode> kafkaConsumer011 = new FlinkKafkaConsumer011<>(topic_ExactlyOnce, new JSONKeyValueDeserializationSchema(true), props);
-
-
//加入kafka數據源
-
DataStreamSource<ObjectNode> streamSource = env.addSource(kafkaConsumer011);
-
// System.out.println("streamSource:" + streamSource.print());
-
streamSource.print();
-
//數據傳輸到下游
-
streamSource.addSink( new MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink");
-
//觸發執行
-
env.execute(StreamDemoKafka2Mysql.class.getName());
-
}
-
}
2.MySqlTwoPhaseCommitSink.java
-
import org.apache.flink.api.common.ExecutionConfig;
-
import org.apache.flink.api.common.typeutils.TypeSerializer;
-
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
-
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
-
import org.slf4j.Logger;
-
import org.slf4j.LoggerFactory;
-
-
import java.sql.Connection;
-
import java.sql.PreparedStatement;
-
import java.sql.Timestamp;
-
import java.text.SimpleDateFormat;
-
import java.util.Date;
-
-
/**
-
* Created with IntelliJ IDEA.
-
* User: zzy
-
* Date: 2019/5/28
-
* Time: 8:47 PM
-
* To change this template use File | Settings | File Templates.
-
*
-
* 自定義kafka to mysql,繼承TwoPhaseCommitSinkFunction,實現兩階段提交
-
*/
-
public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<ObjectNode,Connection,Void> {
-
-
private static final Logger log = LoggerFactory.getLogger(MySqlTwoPhaseCommitSink.class);
-
-
public MySqlTwoPhaseCommitSink(){
-
super(new KryoSerializer<>(Connection.class,new ExecutionConfig()), VoidSerializer.INSTANCE);
-
}
-
-
/**
-
* 執行數據庫入庫操作 task初始化的時候調用
-
* @param connection
-
* @param objectNode
-
* @param context
-
* @throws Exception
-
*/
-
-
protected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception {
-
log.info( "start invoke...");
-
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
-
log.info( "===>date:" + date + " " + objectNode);
-
log.info( "===>date:{} --{}",date,objectNode);
-
String value = objectNode.get( "value").toString();
-
log.info( "objectNode-value:" + value);
-
JSONObject valueJson = JSONObject.parseObject(value);
-
String value_str = (String) valueJson.get( "value");
-
String sql = "insert into `mysqlExactlyOnce_test` (`value`,`insert_time`) values (?,?)";
-
PreparedStatement ps = connection.prepareStatement(sql);
-
ps.setString( 1,value_str);
-
Timestamp value_time = new Timestamp(System.currentTimeMillis());
-
ps.setTimestamp( 2,value_time);
-
log.info( "要插入的數據:{}--{}",value_str,value_time);
-
//執行insert語句
-
ps.execute();
-
//手動制造異常
-
if(Integer.parseInt(value_str) == 15) {
-
System.out.println( 1 / 0);
-
}
-
}
-
-
/**
-
* 獲取連接,開啟手動提交事物(getConnection方法中)
-
* @return
-
* @throws Exception
-
*/
-
-
protected Connection beginTransaction() throws Exception {
-
log.info( "start beginTransaction.......");
-
String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";
-
Connection connection = DBConnectUtil.getConnection(url, "root", "123456");
-
return connection;
-
}
-
-
/**
-
*預提交,這里預提交的邏輯在invoke方法中
-
* @param connection
-
* @throws Exception
-
*/
-
-
protected void preCommit(Connection connection) throws Exception {
-
log.info( "start preCommit...");
-
}
-
-
/**
-
* 如果invoke方法執行正常,則提交事務
-
* @param connection
-
*/
-
-
protected void commit(Connection connection) {
-
log.info( "start commit...");
-
DBConnectUtil.commit(connection);
-
}
-
-
/**
-
* 如果invoke執行異常則回滾事物,下一次的checkpoint操作也不會執行
-
* @param connection
-
*/
-
-
protected void abort(Connection connection) {
-
log.info( "start abort rollback...");
-
DBConnectUtil.rollback(connection);
-
}
-
}
3.DBConnectUtil.java
-
import org.slf4j.Logger;
-
import org.slf4j.LoggerFactory;
-
-
import java.sql.DriverManager;
-
import java.sql.SQLException;
-
import java.sql.Connection;
-
-
/**
-
* Created with IntelliJ IDEA.
-
* User: zzy
-
* Date: 2019/5/28
-
* Time: 8:58 PM
-
* To change this template use File | Settings | File Templates.
-
*/
-
public class DBConnectUtil {
-
-
private static final Logger log = LoggerFactory.getLogger(DBConnectUtil.class);
-
-
/**
-
* 獲取連接
-
*
-
* @param url
-
* @param user
-
* @param password
-
* @return
-
* @throws SQLException
-
*/
-
public static Connection getConnection(String url, String user, String password) throws SQLException {
-
Connection conn = null;
-
try {
-
Class.forName( "com.mysql.jdbc.Driver");
-
} catch (ClassNotFoundException e) {
-
log.error( "獲取mysql.jdbc.Driver失敗");
-
e.printStackTrace();
-
}
-
try {
-
conn = DriverManager.getConnection(url, user, password);
-
log.info( "獲取連接:{} 成功...",conn);
-
} catch (Exception e){
-
log.error( "獲取連接失敗,url:" + url + ",user:" + user);
-
}
-
-
//設置手動提交
-
conn.setAutoCommit( false);
-
return conn;
-
}
-
-
/**
-
* 提交事物
-
*/
-
public static void commit(Connection conn) {
-
if (conn != null) {
-
try {
-
conn.commit();
-
} catch (SQLException e) {
-
log.error( "提交事物失敗,Connection:" + conn);
-
e.printStackTrace();
-
} finally {
-
close(conn);
-
}
-
}
-
}
-
-
/**
-
* 事物回滾
-
*
-
* @param conn
-
*/
-
public static void rollback(Connection conn) {
-
if (conn != null) {
-
try {
-
conn.rollback();
-
} catch (SQLException e) {
-
log.error( "事物回滾失敗,Connection:" + conn);
-
e.printStackTrace();
-
} finally {
-
close(conn);
-
}
-
}
-
}
-
-
/**
-
* 關閉連接
-
*
-
* @param conn
-
*/
-
public static void close(Connection conn) {
-
if (conn != null) {
-
try {
-
conn.close();
-
} catch (SQLException e) {
-
log.error( "關閉連接失敗,Connection:" + conn);
-
e.printStackTrace();
-
}
-
}
-
}
-
}
4.代碼測試
為了方便發送消息,我用一個定時任務每秒發送一個數字,1~20,往kafka寫日志的程序
-
public class KafkaUtils {
-
// private static final String broker_list = "localhost:9092";
-
private static final String broker_list = "zzy:9092";
-
//flink 讀取kafka寫入mysql exactly-once 的topic
-
private static final String topic_ExactlyOnce = "mysql-exactly-Once-4";
-
-
public static void writeToKafka2() throws InterruptedException {
-
Properties props = new Properties();
-
props.put( "bootstrap.servers", broker_list);
-
props.put( "key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
props.put( "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
// KafkaProducer producer = new KafkaProducer<String, String>(props);//老版本producer已廢棄
-
Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
-
-
try {
-
for (int i = 1; i <= 20; i++) {
-
MysqlExactlyOncePOJO mysqlExactlyOnce = new MysqlExactlyOncePOJO(String.valueOf(i));
-
ProducerRecord record = new ProducerRecord<String, String>(topic_ExactlyOnce, null, null, JSON.toJSONString(mysqlExactlyOnce));
-
producer.send(record);
-
System.out.println( "發送數據: " + JSON.toJSONString(mysqlExactlyOnce));
-
Thread.sleep( 1000);
-
}
-
} catch (Exception e){
-
-
}
-
-
producer.flush();
-
}
-
-
public static void main(String[] args) throws InterruptedException {
-
writeToKafka2();
-
}
-
}
-
-
-
-
-
-
public class MysqlExactlyOncePOJO {
-
private String value;
-
}
在發送到數字15之前,應該是做過一次checkpoint了,並且快要到第二次checkpoint的時間,第一次checkpoint的消費數據成功將插入數據庫中,在消費到數字15的時候,手動造一個異常,此時數據庫中應該只有第一次checkpoint后commit的數據,第二次checkpoint的數據並不會插入到數據庫中(因為預提交已經失敗,不會進行真正的提交),我實驗的日志信息:
-
19/06/01 14:52:07 INFO TypeExtractor: Class class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
-
19/06/01 14:52:07 INFO FlinkKafkaConsumerBase: Setting restore state in the FlinkKafkaConsumer: {KafkaTopicPartition{topic='mysql-exactly-Once-4', partition=0}=10}
-
19/06/01 14:52:07 INFO ConsumerConfig: ConsumerConfig values:
-
auto.commit.interval.ms = 5000
-
auto.offset.reset = latest
-
bootstrap.servers = [zzy: 9092]
-
check.crcs = true
-
client.id =
-
connections.max.idle.ms = 540000
-
enable.auto.commit = true
-
exclude.internal.topics = true
-
fetch.max.bytes = 52428800
-
fetch.max.wait.ms = 500
-
fetch.min.bytes = 1
-
group.id = flink-consumer-group2
-
heartbeat.interval.ms = 3000
-
interceptor.classes = null
-
internal.leave.group.on.close = true
-
isolation.level = read_uncommitted
-
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
-
max.partition.fetch.bytes = 1048576
-
max.poll.interval.ms = 300000
-
max.poll.records = 500
-
metadata.max.age.ms = 300000
-
metric.reporters = []
-
metrics.num.samples = 2
-
metrics.recording.level = INFO
-
metrics.sample.window.ms = 30000
-
partition.assignment.strategy = [ class org.apache.kafka.clients.consumer.RangeAssignor]
-
receive.buffer.bytes = 65536
-
reconnect.backoff.max.ms = 1000
-
reconnect.backoff.ms = 50
-
request.timeout.ms = 305000
-
retry.backoff.ms = 100
-
sasl.jaas.config = null
-
sasl.kerberos.kinit.cmd = /usr/bin/kinit
-
sasl.kerberos.min.time.before.relogin = 60000
-
sasl.kerberos.service.name = null
-
sasl.kerberos.ticket.renew.jitter = 0.05
-
sasl.kerberos.ticket.renew.window.factor = 0.8
-
sasl.mechanism = GSSAPI
-
security.protocol = PLAINTEXT
-
send.buffer.bytes = 131072
-
session.timeout.ms = 10000
-
ssl.cipher.suites = null
-
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
-
ssl.endpoint.identification.algorithm = null
-
ssl.key.password = null
-
ssl.keymanager.algorithm = SunX509
-
ssl.keystore.location = null
-
ssl.keystore.password = null
-
ssl.keystore.type = JKS
-
ssl.protocol = TLS
-
ssl.provider = null
-
ssl.secure.random.implementation = null
-
ssl.trustmanager.algorithm = PKIX
-
ssl.truststore.location = null
-
ssl.truststore.password = null
-
ssl.truststore.type = JKS
-
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
-
-
19/06/01 14:52:07 WARN ConsumerConfig: The configuration 'flink.partition-discovery.interval-millis' was supplied but isn't a known config.
-
19/06/01 14:52:07 INFO AppInfoParser: Kafka version : 0.11.0.0
-
19/06/01 14:52:07 INFO AppInfoParser: Kafka commitId : cb8625948210849f
-
19/06/01 14:52:07 INFO FlinkKafkaConsumerBase: Consumer subtask 0 will start reading 1 partitions with offsets in restored state: {KafkaTopicPartition{topic='mysql-exactly-Once-4', partition=0}=10}
-
19/06/01 14:52:07 INFO ConsumerConfig: ConsumerConfig values:
-
auto.commit.interval.ms = 5000
-
auto.offset.reset = latest
-
bootstrap.servers = [zzy: 9092]
-
check.crcs = true
-
client.id =
-
connections.max.idle.ms = 540000
-
enable.auto.commit = false
-
exclude.internal.topics = true
-
fetch.max.bytes = 52428800
-
fetch.max.wait.ms = 500
-
fetch.min.bytes = 1
-
group.id = flink-consumer-group2
-
heartbeat.interval.ms = 3000
-
interceptor.classes = null
-
internal.leave.group.on.close = true
-
isolation.level = read_uncommitted
-
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
-
max.partition.fetch.bytes = 1048576
-
max.poll.interval.ms = 300000
-
max.poll.records = 500
-
metadata.max.age.ms = 300000
-
metric.reporters = []
-
metrics.num.samples = 2
-
metrics.recording.level = INFO
-
metrics.sample.window.ms = 30000
-
partition.assignment.strategy = [ class org.apache.kafka.clients.consumer.RangeAssignor]
-
receive.buffer.bytes = 65536
-
reconnect.backoff.max.ms = 1000
-
reconnect.backoff.ms = 50
-
request.timeout.ms = 305000
-
retry.backoff.ms = 100
-
sasl.jaas.config = null
-
sasl.kerberos.kinit.cmd = /usr/bin/kinit
-
sasl.kerberos.min.time.before.relogin = 60000
-
sasl.kerberos.service.name = null
-
sasl.kerberos.ticket.renew.jitter = 0.05
-
sasl.kerberos.ticket.renew.window.factor = 0.8
-
sasl.mechanism = GSSAPI
-
security.protocol = PLAINTEXT
-
send.buffer.bytes = 131072
-
session.timeout.ms = 10000
-
ssl.cipher.suites = null
-
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
-
ssl.endpoint.identification.algorithm = null
-
ssl.key.password = null
-
ssl.keymanager.algorithm = SunX509
-
ssl.keystore.location = null
-
ssl.keystore.password = null
-
ssl.keystore.type = JKS
-
ssl.protocol = TLS
-
ssl.provider = null
-
ssl.secure.random.implementation = null
-
ssl.trustmanager.algorithm = PKIX
-
ssl.truststore.location = null
-
ssl.truststore.password = null
-
ssl.truststore.type = JKS
-
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
-
-
19/06/01 14:52:07 WARN ConsumerConfig: The configuration 'flink.partition-discovery.interval-millis' was supplied but isn't a known config.
-
19/06/01 14:52:07 INFO AppInfoParser: Kafka version : 0.11.0.0
-
19/06/01 14:52:07 INFO AppInfoParser: Kafka commitId : cb8625948210849f
-
{"value":{"value":"12"},"metadata":{"offset":11,"topic":"mysql-exactly-Once-4","partition":0}}
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"12"},"metadata":{"offset":11,"topic":"mysql-exactly-Once-4","partition":0}}
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"12"},"metadata":{"offset":11,"topic":"mysql-exactly-Once-4","partition":0}}
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"12"}
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的數據:12--2019-06-01 14:52:07.616
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"13"},"metadata":{"offset":12,"topic":"mysql-exactly-Once-4","partition":0}}
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"13"},"metadata":{"offset":12,"topic":"mysql-exactly-Once-4","partition":0}}
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"13"}
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的數據:13--2019-06-01 14:52:07.617
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"14"},"metadata":{"offset":13,"topic":"mysql-exactly-Once-4","partition":0}}
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"14"},"metadata":{"offset":13,"topic":"mysql-exactly-Once-4","partition":0}}
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"14"}
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的數據:14--2019-06-01 14:52:07.618
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"15"},"metadata":{"offset":14,"topic":"mysql-exactly-Once-4","partition":0}}
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"15"},"metadata":{"offset":14,"topic":"mysql-exactly-Once-4","partition":0}}
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"15"}
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的數據:15--2019-06-01 14:52:07.619
-
{ "value":{"value":"13"},"metadata":{"offset":12,"topic":"mysql-exactly-Once-4","partition":0}}
-
{ "value":{"value":"14"},"metadata":{"offset":13,"topic":"mysql-exactly-Once-4","partition":0}}
-
{ "value":{"value":"15"},"metadata":{"offset":14,"topic":"mysql-exactly-Once-4","partition":0}}
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start abort rollback...
-
19/06/01 14:52:07 INFO Task: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903) switched from RUNNING to FAILED.
-
java.lang.ArithmeticException: / by zero
-
at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java: 68)
-
at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java: 30)
-
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java: 230)
-
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java: 56)
-
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java: 579)
-
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java: 554)
-
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java: 534)
-
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java: 649)
-
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java: 602)
-
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 718)
-
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 696)
-
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java: 104)
-
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java: 111)
-
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java: 398)
-
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java: 89)
-
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java: 154)
-
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java: 675)
-
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java: 667)
-
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java: 94)
-
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java: 58)
-
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java: 99)
-
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java: 300)
-
at org.apache.flink.runtime.taskmanager.Task.run(Task.java: 704)
-
at java.lang.Thread.run(Thread.java: 748)
-
19/06/01 14:52:07 INFO Task: Freeing task resources for Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903).
-
19/06/01 14:52:07 INFO Task: Ensuring all FileSystem streams are closed for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903) [FAILED]
-
19/06/01 14:52:07 INFO TaskExecutor: Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) c284f48cd0b113da4f68fd835e643903.
-
19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903) switched from RUNNING to FAILED.
-
java.lang.ArithmeticException: / by zero
-
at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java: 68)
-
at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java: 30)
-
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java: 230)
-
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java: 56)
-
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java: 579)
-
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java: 554)
-
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java: 534)
-
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java: 649)
-
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java: 602)
-
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 718)
-
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 696)
-
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java: 104)
-
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java: 111)
-
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java: 398)
-
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java: 89)
-
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java: 154)
-
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java: 675)
-
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java: 667)
-
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java: 94)
-
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java: 58)
-
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java: 99)
-
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java: 300)
-
at org.apache.flink.runtime.taskmanager.Task.run(Task.java: 704)
-
at java.lang.Thread.run(Thread.java: 748)
-
19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state RUNNING to FAILING.
-
...
-
19/06/01 14:52:07 INFO TaskExecutor: Discarding the results produced by task execution c284f48cd0b113da4f68fd835e643903.
-
19/06/01 14:52:07 INFO ExecutionGraph: Try to restart or fail the job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) if no longer possible.
-
19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state FAILING to RESTARTING.
-
19/06/01 14:52:07 INFO ExecutionGraph: Restarting the job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89).
-
19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state RESTARTING to CREATED.
-
19/06/01 14:52:07 INFO CheckpointCoordinator: Restoring job a7188181ec45ab397d21bb1f928c7b89 from latest valid checkpoint: Checkpoint 3 @ 1559371921807 for a7188181ec45ab397d21bb1f928c7b89.
-
19/06/01 14:52:07 INFO CheckpointCoordinator: No master state to restore
-
19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state CREATED to RUNNING.
-
19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from CREATED to SCHEDULED.
-
19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from SCHEDULED to DEPLOYING.
-
19/06/01 14:52:07 INFO ExecutionGraph: Deploying Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (attempt #33) to localhost
-
19/06/01 14:52:07 INFO TaskExecutor: Received task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1).
-
19/06/01 14:52:07 INFO Task: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from CREATED to DEPLOYING.
-
19/06/01 14:52:07 INFO Task: Creating FileSystem stream leak safety net for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) [DEPLOYING]
-
19/06/01 14:52:07 INFO Task: Loading JAR files for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) [DEPLOYING].
-
19/06/01 14:52:07 INFO Task: Registering task at network: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) [DEPLOYING].
-
19/06/01 14:52:07 INFO Task: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from DEPLOYING to RUNNING.
-
19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from DEPLOYING to RUNNING.
-
19/06/01 14:52:07 INFO StreamTask: No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
-
19/06/01 14:52:07 INFO TwoPhaseCommitSinkFunction: MySqlTwoPhaseCommitSink 0/1 - restoring state
-
19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start commit...
-
19/06/01 14:52:07 ERROR DBConnectUtil: 提交事物失敗,Connection:com.mysql.jdbc.JDBC4Connection@69ae3a8c
-
java.sql.SQLException: Unexpected exception encountered during query.
-
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
-
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
-
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
-
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
-
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2523)
-
at com.mysql.jdbc.ConnectionImpl.commit(ConnectionImpl.java:1547)
-
at com.zzy.bigdata.flink.streaming.DBConnectUtil.commit(DBConnectUtil.java:56)
-
at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.commit(MySqlTwoPhaseCommitSink.java:103)
-
at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.commit(MySqlTwoPhaseCommitSink.java:30)
-
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommit(TwoPhaseCommitSinkFunction.java:200)
-
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:395)
-
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:353)
-
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
-
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
-
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
-
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
-
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
-
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
-
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
-
at java.lang.Thread.run(Thread.java:748)
通過日志發現成功入庫的日志是1-11,消費到數字15的時候,提交失敗,日志最后一行發生了回滾,關閉了連接,然后進行conmit的時候也失敗了,消費的數據12-15不會插入到數據庫中,此時checkpoint也不會做了,checkpoint保存的還是上一次成功消費后的offset數據。
數據庫表:mysqlExactlyOnce_test
-
CREATE TABLE `mysqlExactlyOnce_test` (
-
`id` bigint(20) NOT NULL AUTO_INCREMENT,
-
`value` varchar(255) DEFAULT NULL,
-
`insert_time` datetime DEFAULT NULL,
-
PRIMARY KEY (`id`)
-
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
表中的數據