解決Flink消費Kafka信息,結果存儲在Mysql的重復消費問題


來源於  https://blog.csdn.net/lukabruce/article/details/100737292

 

背景

最近項目中使用Flink消費kafka消息,並將消費的消息存儲到mysql中,看似一個很簡單的需求,在網上也有很多flink消費kafka的例子,但看了一圈也沒看到能解決重復消費的問題的文章,於是在flink官網中搜索此類場景的處理方式,發現官網也沒有實現flink到mysql的Exactly-Once例子,但是官網卻有類似的例子來解決端到端的僅一次消費問題。這個現成的例子就是FlinkKafkaProducer011這個類,它保證了通過FlinkKafkaProducer011發送到kafka的消息是Exactly-Once的,主要的實現方式就是繼承了TwoPhaseCommitSinkFunction這個類,關於TwoPhaseCommitSinkFunction這個類的作用可以先看上一篇文章:https://blog.51cto.com/simplelife/2401411

實現思想

這里簡單說下這個類的作用就是實現這個類的方法:beginTransaction、preCommit、commit、abort,達到事件(preCommit)預提交的邏輯(當事件進行自己的邏輯處理后進行預提交,如果預提交成功之后才進行真正的(commit)提交,如果預提交失敗則調用abort方法進行事件的回滾操作),結合flink的checkpoint機制,來保存topic中partition的offset。

達到的效果我舉個例子來說明下:比如checkpoint每10s進行一次,此時用FlinkKafkaConsumer011實時消費kafka中的消息,消費並處理完消息后,進行一次預提交數據庫的操作,如果預提交沒有問題,10s后進行真正的插入數據庫操作,如果插入成功,進行一次checkpoint,flink會自動記錄消費的offset,可以將checkpoint保存的數據放到hdfs中,如果預提交出錯,比如在5s的時候出錯了,此時Flink程序就會進入不斷的重啟中,重啟的策略可以在配置中設置,當然下一次的checkpoint也不會做了,checkpoint記錄的還是上一次成功消費的offset,本次消費的數據因為在checkpoint期間,消費成功,但是預提交過程中失敗了,注意此時數據並沒有真正的執行插入操作,因為預提交(preCommit)失敗,提交(commit)過程也不會發生了。等你將異常數據處理完成之后,再重新啟動這個Flink程序,它會自動從上一次成功的checkpoint中繼續消費數據,以此來達到Kafka到Mysql的Exactly-Once。

具體實現代碼三個類

  1. StreamDemoKafka2Mysql.java
  1.  
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
  2.  
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
  3.  
    import org.apache.flink.streaming.api.CheckpointingMode;
  4.  
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5.  
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6.  
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
  7.  
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
  8.  
    import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
  9.  
    import org.apache.kafka.clients.consumer.ConsumerConfig;
  10.  
     
  11.  
    import java.util.Properties;
  12.  
     
  13.  
    /**
  14.  
    * Created with IntelliJ IDEA.
  15.  
    * User: zzy
  16.  
    * Date: 2019/5/28
  17.  
    * Time: 8:40 PM
  18.  
    * To change this template use File | Settings | File Templates.
  19.  
    *
  20.  
    * 消費kafka消息,sink(自定義)到mysql中,保證kafka to mysql 的Exactly-Once
  21.  
    */
  22.  
     
  23.  
    @SuppressWarnings("all")
  24.  
    public class StreamDemoKafka2Mysql {
  25.  
    private static final String topic_ExactlyOnce = "mysql-exactly-Once-4";
  26.  
     
  27.  
    public static void main(String[] args) throws Exception {
  28.  
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  29.  
     
  30.  
    //設置並行度,為了方便測試,查看消息的順序,這里設置為1,可以更改為多並行度
  31.  
    env.setParallelism( 1);
  32.  
    //checkpoint的設置
  33.  
    //每隔10s進行啟動一個檢查點【設置checkpoint的周期】
  34.  
    env.enableCheckpointing( 10000);
  35.  
    //設置模式為:exactly_one,僅一次語義
  36.  
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  37.  
    //確保檢查點之間有1s的時間間隔【checkpoint最小間隔】
  38.  
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints( 1000);
  39.  
    //檢查點必須在10s之內完成,或者被丟棄【checkpoint超時時間】
  40.  
    env.getCheckpointConfig().setCheckpointTimeout( 10000);
  41.  
    //同一時間只允許進行一次檢查點
  42.  
    env.getCheckpointConfig().setMaxConcurrentCheckpoints( 1);
  43.  
    //表示一旦Flink程序被cancel后,會保留checkpoint數據,以便根據實際需要恢復到指定的checkpoint
  44.  
    //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  45.  
    //設置statebackend,將檢查點保存在hdfs上面,默認保存在內存中。這里先保存到本地
  46.  
    // env.setStateBackend(new FsStateBackend("file:///Users/temp/cp/"));
  47.  
     
  48.  
    //設置kafka消費參數
  49.  
    Properties props = new Properties();
  50.  
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "zzy:9092");
  51.  
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group2");
  52.  
    //kafka分區自動發現周期
  53.  
    props.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3000");
  54.  
     
  55.  
    /*SimpleStringSchema可以獲取到kafka消息,JSONKeyValueDeserializationSchema可以獲取都消息的key,value,metadata:topic,partition,offset等信息*/
  56.  
    FlinkKafkaConsumer011<ObjectNode> kafkaConsumer011 = new FlinkKafkaConsumer011<>(topic_ExactlyOnce, new JSONKeyValueDeserializationSchema(true), props);
  57.  
     
  58.  
    //加入kafka數據源
  59.  
    DataStreamSource<ObjectNode> streamSource = env.addSource(kafkaConsumer011);
  60.  
    // System.out.println("streamSource:" + streamSource.print());
  61.  
    streamSource.print();
  62.  
    //數據傳輸到下游
  63.  
    streamSource.addSink( new MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink");
  64.  
    //觸發執行
  65.  
    env.execute(StreamDemoKafka2Mysql.class.getName());
  66.  
    }
  67.  
    }

2.MySqlTwoPhaseCommitSink.java

  1.  
    import org.apache.flink.api.common.ExecutionConfig;
  2.  
    import org.apache.flink.api.common.typeutils.TypeSerializer;
  3.  
    import org.apache.flink.api.common.typeutils.base.VoidSerializer;
  4.  
    import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
  5.  
    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
  6.  
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  7.  
    import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
  8.  
    import org.slf4j.Logger;
  9.  
    import org.slf4j.LoggerFactory;
  10.  
     
  11.  
    import java.sql.Connection;
  12.  
    import java.sql.PreparedStatement;
  13.  
    import java.sql.Timestamp;
  14.  
    import java.text.SimpleDateFormat;
  15.  
    import java.util.Date;
  16.  
     
  17.  
    /**
  18.  
    * Created with IntelliJ IDEA.
  19.  
    * User: zzy
  20.  
    * Date: 2019/5/28
  21.  
    * Time: 8:47 PM
  22.  
    * To change this template use File | Settings | File Templates.
  23.  
    *
  24.  
    * 自定義kafka to mysql,繼承TwoPhaseCommitSinkFunction,實現兩階段提交
  25.  
    */
  26.  
    public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<ObjectNode,Connection,Void> {
  27.  
     
  28.  
    private static final Logger log = LoggerFactory.getLogger(MySqlTwoPhaseCommitSink.class);
  29.  
     
  30.  
    public MySqlTwoPhaseCommitSink(){
  31.  
    super(new KryoSerializer<>(Connection.class,new ExecutionConfig()), VoidSerializer.INSTANCE);
  32.  
    }
  33.  
     
  34.  
    /**
  35.  
    * 執行數據庫入庫操作 task初始化的時候調用
  36.  
    * @param connection
  37.  
    * @param objectNode
  38.  
    * @param context
  39.  
    * @throws Exception
  40.  
    */
  41.  
    @Override
  42.  
    protected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception {
  43.  
    log.info( "start invoke...");
  44.  
    String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
  45.  
    log.info( "===>date:" + date + " " + objectNode);
  46.  
    log.info( "===>date:{} --{}",date,objectNode);
  47.  
    String value = objectNode.get( "value").toString();
  48.  
    log.info( "objectNode-value:" + value);
  49.  
    JSONObject valueJson = JSONObject.parseObject(value);
  50.  
    String value_str = (String) valueJson.get( "value");
  51.  
    String sql = "insert into `mysqlExactlyOnce_test` (`value`,`insert_time`) values (?,?)";
  52.  
    PreparedStatement ps = connection.prepareStatement(sql);
  53.  
    ps.setString( 1,value_str);
  54.  
    Timestamp value_time = new Timestamp(System.currentTimeMillis());
  55.  
    ps.setTimestamp( 2,value_time);
  56.  
    log.info( "要插入的數據:{}--{}",value_str,value_time);
  57.  
    //執行insert語句
  58.  
    ps.execute();
  59.  
    //手動制造異常
  60.  
    if(Integer.parseInt(value_str) == 15) {
  61.  
    System.out.println( 1 / 0);
  62.  
    }
  63.  
    }
  64.  
     
  65.  
    /**
  66.  
    * 獲取連接,開啟手動提交事物(getConnection方法中)
  67.  
    * @return
  68.  
    * @throws Exception
  69.  
    */
  70.  
    @Override
  71.  
    protected Connection beginTransaction() throws Exception {
  72.  
    log.info( "start beginTransaction.......");
  73.  
    String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";
  74.  
    Connection connection = DBConnectUtil.getConnection(url, "root", "123456");
  75.  
    return connection;
  76.  
    }
  77.  
     
  78.  
    /**
  79.  
    *預提交,這里預提交的邏輯在invoke方法中
  80.  
    * @param connection
  81.  
    * @throws Exception
  82.  
    */
  83.  
    @Override
  84.  
    protected void preCommit(Connection connection) throws Exception {
  85.  
    log.info( "start preCommit...");
  86.  
    }
  87.  
     
  88.  
    /**
  89.  
    * 如果invoke方法執行正常,則提交事務
  90.  
    * @param connection
  91.  
    */
  92.  
    @Override
  93.  
    protected void commit(Connection connection) {
  94.  
    log.info( "start commit...");
  95.  
    DBConnectUtil.commit(connection);
  96.  
    }
  97.  
     
  98.  
    /**
  99.  
    * 如果invoke執行異常則回滾事物,下一次的checkpoint操作也不會執行
  100.  
    * @param connection
  101.  
    */
  102.  
    @Override
  103.  
    protected void abort(Connection connection) {
  104.  
    log.info( "start abort rollback...");
  105.  
    DBConnectUtil.rollback(connection);
  106.  
    }
  107.  
    }

3.DBConnectUtil.java

  1.  
    import org.slf4j.Logger;
  2.  
    import org.slf4j.LoggerFactory;
  3.  
     
  4.  
    import java.sql.DriverManager;
  5.  
    import java.sql.SQLException;
  6.  
    import java.sql.Connection;
  7.  
     
  8.  
    /**
  9.  
    * Created with IntelliJ IDEA.
  10.  
    * User: zzy
  11.  
    * Date: 2019/5/28
  12.  
    * Time: 8:58 PM
  13.  
    * To change this template use File | Settings | File Templates.
  14.  
    */
  15.  
    public class DBConnectUtil {
  16.  
     
  17.  
    private static final Logger log = LoggerFactory.getLogger(DBConnectUtil.class);
  18.  
     
  19.  
    /**
  20.  
    * 獲取連接
  21.  
    *
  22.  
    * @param url
  23.  
    * @param user
  24.  
    * @param password
  25.  
    * @return
  26.  
    * @throws SQLException
  27.  
    */
  28.  
    public static Connection getConnection(String url, String user, String password) throws SQLException {
  29.  
    Connection conn = null;
  30.  
    try {
  31.  
    Class.forName( "com.mysql.jdbc.Driver");
  32.  
    } catch (ClassNotFoundException e) {
  33.  
    log.error( "獲取mysql.jdbc.Driver失敗");
  34.  
    e.printStackTrace();
  35.  
    }
  36.  
    try {
  37.  
    conn = DriverManager.getConnection(url, user, password);
  38.  
    log.info( "獲取連接:{} 成功...",conn);
  39.  
    } catch (Exception e){
  40.  
    log.error( "獲取連接失敗,url:" + url + ",user:" + user);
  41.  
    }
  42.  
     
  43.  
    //設置手動提交
  44.  
    conn.setAutoCommit( false);
  45.  
    return conn;
  46.  
    }
  47.  
     
  48.  
    /**
  49.  
    * 提交事物
  50.  
    */
  51.  
    public static void commit(Connection conn) {
  52.  
    if (conn != null) {
  53.  
    try {
  54.  
    conn.commit();
  55.  
    } catch (SQLException e) {
  56.  
    log.error( "提交事物失敗,Connection:" + conn);
  57.  
    e.printStackTrace();
  58.  
    } finally {
  59.  
    close(conn);
  60.  
    }
  61.  
    }
  62.  
    }
  63.  
     
  64.  
    /**
  65.  
    * 事物回滾
  66.  
    *
  67.  
    * @param conn
  68.  
    */
  69.  
    public static void rollback(Connection conn) {
  70.  
    if (conn != null) {
  71.  
    try {
  72.  
    conn.rollback();
  73.  
    } catch (SQLException e) {
  74.  
    log.error( "事物回滾失敗,Connection:" + conn);
  75.  
    e.printStackTrace();
  76.  
    } finally {
  77.  
    close(conn);
  78.  
    }
  79.  
    }
  80.  
    }
  81.  
     
  82.  
    /**
  83.  
    * 關閉連接
  84.  
    *
  85.  
    * @param conn
  86.  
    */
  87.  
    public static void close(Connection conn) {
  88.  
    if (conn != null) {
  89.  
    try {
  90.  
    conn.close();
  91.  
    } catch (SQLException e) {
  92.  
    log.error( "關閉連接失敗,Connection:" + conn);
  93.  
    e.printStackTrace();
  94.  
    }
  95.  
    }
  96.  
    }
  97.  
    }

4.代碼測試

為了方便發送消息,我用一個定時任務每秒發送一個數字,1~20,往kafka寫日志的程序

  1.  
    public class KafkaUtils {
  2.  
    // private static final String broker_list = "localhost:9092";
  3.  
    private static final String broker_list = "zzy:9092";
  4.  
    //flink 讀取kafka寫入mysql exactly-once 的topic
  5.  
    private static final String topic_ExactlyOnce = "mysql-exactly-Once-4";
  6.  
     
  7.  
    public static void writeToKafka2() throws InterruptedException {
  8.  
    Properties props = new Properties();
  9.  
    props.put( "bootstrap.servers", broker_list);
  10.  
    props.put( "key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  11.  
    props.put( "value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  12.  
    // KafkaProducer producer = new KafkaProducer<String, String>(props);//老版本producer已廢棄
  13.  
    Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
  14.  
     
  15.  
    try {
  16.  
    for (int i = 1; i <= 20; i++) {
  17.  
    MysqlExactlyOncePOJO mysqlExactlyOnce = new MysqlExactlyOncePOJO(String.valueOf(i));
  18.  
    ProducerRecord record = new ProducerRecord<String, String>(topic_ExactlyOnce, null, null, JSON.toJSONString(mysqlExactlyOnce));
  19.  
    producer.send(record);
  20.  
    System.out.println( "發送數據: " + JSON.toJSONString(mysqlExactlyOnce));
  21.  
    Thread.sleep( 1000);
  22.  
    }
  23.  
    } catch (Exception e){
  24.  
     
  25.  
    }
  26.  
     
  27.  
    producer.flush();
  28.  
    }
  29.  
     
  30.  
    public static void main(String[] args) throws InterruptedException {
  31.  
    writeToKafka2();
  32.  
    }
  33.  
    }
  34.  
     
  35.  
     
  36.  
    @Data
  37.  
    @NoArgsConstructor
  38.  
    @AllArgsConstructor
  39.  
    public class MysqlExactlyOncePOJO {
  40.  
    private String value;
  41.  
    }

在發送到數字15之前,應該是做過一次checkpoint了,並且快要到第二次checkpoint的時間,第一次checkpoint的消費數據成功將插入數據庫中,在消費到數字15的時候,手動造一個異常,此時數據庫中應該只有第一次checkpoint后commit的數據,第二次checkpoint的數據並不會插入到數據庫中(因為預提交已經失敗,不會進行真正的提交),我實驗的日志信息:

  1.  
    19/06/01 14:52:07 INFO TypeExtractor: Class class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
  2.  
    19/06/01 14:52:07 INFO FlinkKafkaConsumerBase: Setting restore state in the FlinkKafkaConsumer: {KafkaTopicPartition{topic='mysql-exactly-Once-4', partition=0}=10}
  3.  
    19/06/01 14:52:07 INFO ConsumerConfig: ConsumerConfig values:
  4.  
    auto.commit.interval.ms = 5000
  5.  
    auto.offset.reset = latest
  6.  
    bootstrap.servers = [zzy: 9092]
  7.  
    check.crcs = true
  8.  
    client.id =
  9.  
    connections.max.idle.ms = 540000
  10.  
    enable.auto.commit = true
  11.  
    exclude.internal.topics = true
  12.  
    fetch.max.bytes = 52428800
  13.  
    fetch.max.wait.ms = 500
  14.  
    fetch.min.bytes = 1
  15.  
    group.id = flink-consumer-group2
  16.  
    heartbeat.interval.ms = 3000
  17.  
    interceptor.classes = null
  18.  
    internal.leave.group.on.close = true
  19.  
    isolation.level = read_uncommitted
  20.  
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
  21.  
    max.partition.fetch.bytes = 1048576
  22.  
    max.poll.interval.ms = 300000
  23.  
    max.poll.records = 500
  24.  
    metadata.max.age.ms = 300000
  25.  
    metric.reporters = []
  26.  
    metrics.num.samples = 2
  27.  
    metrics.recording.level = INFO
  28.  
    metrics.sample.window.ms = 30000
  29.  
    partition.assignment.strategy = [ class org.apache.kafka.clients.consumer.RangeAssignor]
  30.  
    receive.buffer.bytes = 65536
  31.  
    reconnect.backoff.max.ms = 1000
  32.  
    reconnect.backoff.ms = 50
  33.  
    request.timeout.ms = 305000
  34.  
    retry.backoff.ms = 100
  35.  
    sasl.jaas.config = null
  36.  
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
  37.  
    sasl.kerberos.min.time.before.relogin = 60000
  38.  
    sasl.kerberos.service.name = null
  39.  
    sasl.kerberos.ticket.renew.jitter = 0.05
  40.  
    sasl.kerberos.ticket.renew.window.factor = 0.8
  41.  
    sasl.mechanism = GSSAPI
  42.  
    security.protocol = PLAINTEXT
  43.  
    send.buffer.bytes = 131072
  44.  
    session.timeout.ms = 10000
  45.  
    ssl.cipher.suites = null
  46.  
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  47.  
    ssl.endpoint.identification.algorithm = null
  48.  
    ssl.key.password = null
  49.  
    ssl.keymanager.algorithm = SunX509
  50.  
    ssl.keystore.location = null
  51.  
    ssl.keystore.password = null
  52.  
    ssl.keystore.type = JKS
  53.  
    ssl.protocol = TLS
  54.  
    ssl.provider = null
  55.  
    ssl.secure.random.implementation = null
  56.  
    ssl.trustmanager.algorithm = PKIX
  57.  
    ssl.truststore.location = null
  58.  
    ssl.truststore.password = null
  59.  
    ssl.truststore.type = JKS
  60.  
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
  61.  
     
  62.  
    19/06/01 14:52:07 WARN ConsumerConfig: The configuration 'flink.partition-discovery.interval-millis' was supplied but isn't a known config.
  63.  
    19/06/01 14:52:07 INFO AppInfoParser: Kafka version : 0.11.0.0
  64.  
    19/06/01 14:52:07 INFO AppInfoParser: Kafka commitId : cb8625948210849f
  65.  
    19/06/01 14:52:07 INFO FlinkKafkaConsumerBase: Consumer subtask 0 will start reading 1 partitions with offsets in restored state: {KafkaTopicPartition{topic='mysql-exactly-Once-4', partition=0}=10}
  66.  
    19/06/01 14:52:07 INFO ConsumerConfig: ConsumerConfig values:
  67.  
    auto.commit.interval.ms = 5000
  68.  
    auto.offset.reset = latest
  69.  
    bootstrap.servers = [zzy: 9092]
  70.  
    check.crcs = true
  71.  
    client.id =
  72.  
    connections.max.idle.ms = 540000
  73.  
    enable.auto.commit = false
  74.  
    exclude.internal.topics = true
  75.  
    fetch.max.bytes = 52428800
  76.  
    fetch.max.wait.ms = 500
  77.  
    fetch.min.bytes = 1
  78.  
    group.id = flink-consumer-group2
  79.  
    heartbeat.interval.ms = 3000
  80.  
    interceptor.classes = null
  81.  
    internal.leave.group.on.close = true
  82.  
    isolation.level = read_uncommitted
  83.  
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
  84.  
    max.partition.fetch.bytes = 1048576
  85.  
    max.poll.interval.ms = 300000
  86.  
    max.poll.records = 500
  87.  
    metadata.max.age.ms = 300000
  88.  
    metric.reporters = []
  89.  
    metrics.num.samples = 2
  90.  
    metrics.recording.level = INFO
  91.  
    metrics.sample.window.ms = 30000
  92.  
    partition.assignment.strategy = [ class org.apache.kafka.clients.consumer.RangeAssignor]
  93.  
    receive.buffer.bytes = 65536
  94.  
    reconnect.backoff.max.ms = 1000
  95.  
    reconnect.backoff.ms = 50
  96.  
    request.timeout.ms = 305000
  97.  
    retry.backoff.ms = 100
  98.  
    sasl.jaas.config = null
  99.  
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
  100.  
    sasl.kerberos.min.time.before.relogin = 60000
  101.  
    sasl.kerberos.service.name = null
  102.  
    sasl.kerberos.ticket.renew.jitter = 0.05
  103.  
    sasl.kerberos.ticket.renew.window.factor = 0.8
  104.  
    sasl.mechanism = GSSAPI
  105.  
    security.protocol = PLAINTEXT
  106.  
    send.buffer.bytes = 131072
  107.  
    session.timeout.ms = 10000
  108.  
    ssl.cipher.suites = null
  109.  
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  110.  
    ssl.endpoint.identification.algorithm = null
  111.  
    ssl.key.password = null
  112.  
    ssl.keymanager.algorithm = SunX509
  113.  
    ssl.keystore.location = null
  114.  
    ssl.keystore.password = null
  115.  
    ssl.keystore.type = JKS
  116.  
    ssl.protocol = TLS
  117.  
    ssl.provider = null
  118.  
    ssl.secure.random.implementation = null
  119.  
    ssl.trustmanager.algorithm = PKIX
  120.  
    ssl.truststore.location = null
  121.  
    ssl.truststore.password = null
  122.  
    ssl.truststore.type = JKS
  123.  
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
  124.  
     
  125.  
    19/06/01 14:52:07 WARN ConsumerConfig: The configuration 'flink.partition-discovery.interval-millis' was supplied but isn't a known config.
  126.  
    19/06/01 14:52:07 INFO AppInfoParser: Kafka version : 0.11.0.0
  127.  
    19/06/01 14:52:07 INFO AppInfoParser: Kafka commitId : cb8625948210849f
  128.  
    {"value":{"value":"12"},"metadata":{"offset":11,"topic":"mysql-exactly-Once-4","partition":0}}
  129.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...
  130.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"12"},"metadata":{"offset":11,"topic":"mysql-exactly-Once-4","partition":0}}
  131.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"12"},"metadata":{"offset":11,"topic":"mysql-exactly-Once-4","partition":0}}
  132.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"12"}
  133.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的數據:12--2019-06-01 14:52:07.616
  134.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...
  135.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"13"},"metadata":{"offset":12,"topic":"mysql-exactly-Once-4","partition":0}}
  136.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"13"},"metadata":{"offset":12,"topic":"mysql-exactly-Once-4","partition":0}}
  137.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"13"}
  138.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的數據:13--2019-06-01 14:52:07.617
  139.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...
  140.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"14"},"metadata":{"offset":13,"topic":"mysql-exactly-Once-4","partition":0}}
  141.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"14"},"metadata":{"offset":13,"topic":"mysql-exactly-Once-4","partition":0}}
  142.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"14"}
  143.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的數據:14--2019-06-01 14:52:07.618
  144.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start invoke...
  145.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 {"value":{"value":"15"},"metadata":{"offset":14,"topic":"mysql-exactly-Once-4","partition":0}}
  146.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: ===>date:2019-06-01 14:52:07 --{"value":{"value":"15"},"metadata":{"offset":14,"topic":"mysql-exactly-Once-4","partition":0}}
  147.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: objectNode-value:{"value":"15"}
  148.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: 要插入的數據:15--2019-06-01 14:52:07.619
  149.  
    { "value":{"value":"13"},"metadata":{"offset":12,"topic":"mysql-exactly-Once-4","partition":0}}
  150.  
    { "value":{"value":"14"},"metadata":{"offset":13,"topic":"mysql-exactly-Once-4","partition":0}}
  151.  
    { "value":{"value":"15"},"metadata":{"offset":14,"topic":"mysql-exactly-Once-4","partition":0}}
  152.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start abort rollback...
  153.  
    19/06/01 14:52:07 INFO Task: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903) switched from RUNNING to FAILED.
  154.  
    java.lang.ArithmeticException: / by zero
  155.  
    at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java: 68)
  156.  
    at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java: 30)
  157.  
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java: 230)
  158.  
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java: 56)
  159.  
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java: 579)
  160.  
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java: 554)
  161.  
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java: 534)
  162.  
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java: 649)
  163.  
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java: 602)
  164.  
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 718)
  165.  
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 696)
  166.  
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java: 104)
  167.  
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java: 111)
  168.  
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java: 398)
  169.  
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java: 89)
  170.  
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java: 154)
  171.  
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java: 675)
  172.  
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java: 667)
  173.  
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java: 94)
  174.  
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java: 58)
  175.  
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java: 99)
  176.  
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java: 300)
  177.  
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java: 704)
  178.  
    at java.lang.Thread.run(Thread.java: 748)
  179.  
    19/06/01 14:52:07 INFO Task: Freeing task resources for Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903).
  180.  
    19/06/01 14:52:07 INFO Task: Ensuring all FileSystem streams are closed for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903) [FAILED]
  181.  
    19/06/01 14:52:07 INFO TaskExecutor: Un-registering task and sending final execution state FAILED to JobManager for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) c284f48cd0b113da4f68fd835e643903.
  182.  
    19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (c284f48cd0b113da4f68fd835e643903) switched from RUNNING to FAILED.
  183.  
    java.lang.ArithmeticException: / by zero
  184.  
    at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java: 68)
  185.  
    at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java: 30)
  186.  
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java: 230)
  187.  
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java: 56)
  188.  
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java: 579)
  189.  
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java: 554)
  190.  
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java: 534)
  191.  
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java: 649)
  192.  
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java: 602)
  193.  
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 718)
  194.  
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 696)
  195.  
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java: 104)
  196.  
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java: 111)
  197.  
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java: 398)
  198.  
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java: 89)
  199.  
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java: 154)
  200.  
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java: 675)
  201.  
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java: 667)
  202.  
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java: 94)
  203.  
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java: 58)
  204.  
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java: 99)
  205.  
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java: 300)
  206.  
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java: 704)
  207.  
    at java.lang.Thread.run(Thread.java: 748)
  208.  
    19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state RUNNING to FAILING.
  209.  
    ...
  210.  
    19/06/01 14:52:07 INFO TaskExecutor: Discarding the results produced by task execution c284f48cd0b113da4f68fd835e643903.
  211.  
    19/06/01 14:52:07 INFO ExecutionGraph: Try to restart or fail the job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) if no longer possible.
  212.  
    19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state FAILING to RESTARTING.
  213.  
    19/06/01 14:52:07 INFO ExecutionGraph: Restarting the job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89).
  214.  
    19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state RESTARTING to CREATED.
  215.  
    19/06/01 14:52:07 INFO CheckpointCoordinator: Restoring job a7188181ec45ab397d21bb1f928c7b89 from latest valid checkpoint: Checkpoint 3 @ 1559371921807 for a7188181ec45ab397d21bb1f928c7b89.
  216.  
    19/06/01 14:52:07 INFO CheckpointCoordinator: No master state to restore
  217.  
    19/06/01 14:52:07 INFO ExecutionGraph: Job com.zzy.bigdata.flink.streaming.StreamDemoKafka2Mysql (a7188181ec45ab397d21bb1f928c7b89) switched from state CREATED to RUNNING.
  218.  
    19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from CREATED to SCHEDULED.
  219.  
    19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from SCHEDULED to DEPLOYING.
  220.  
    19/06/01 14:52:07 INFO ExecutionGraph: Deploying Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (attempt #33) to localhost
  221.  
    19/06/01 14:52:07 INFO TaskExecutor: Received task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1).
  222.  
    19/06/01 14:52:07 INFO Task: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from CREATED to DEPLOYING.
  223.  
    19/06/01 14:52:07 INFO Task: Creating FileSystem stream leak safety net for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) [DEPLOYING]
  224.  
    19/06/01 14:52:07 INFO Task: Loading JAR files for task Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) [DEPLOYING].
  225.  
    19/06/01 14:52:07 INFO Task: Registering task at network: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) [DEPLOYING].
  226.  
    19/06/01 14:52:07 INFO Task: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from DEPLOYING to RUNNING.
  227.  
    19/06/01 14:52:07 INFO ExecutionGraph: Source: Custom Source -> (Sink: Print to Std. Out, Sink: MySqlTwoPhaseCommitSink) (1/1) (b406c6534c19b26ab0ae3b6056f926cc) switched from DEPLOYING to RUNNING.
  228.  
    19/06/01 14:52:07 INFO StreamTask: No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
  229.  
    19/06/01 14:52:07 INFO TwoPhaseCommitSinkFunction: MySqlTwoPhaseCommitSink 0/1 - restoring state
  230.  
    19/06/01 14:52:07 INFO MySqlTwoPhaseCommitSink: start commit...
  231.  
    19/06/01 14:52:07 ERROR DBConnectUtil: 提交事物失敗,Connection:com.mysql.jdbc.JDBC4Connection@69ae3a8c
  232.  
    java.sql.SQLException: Unexpected exception encountered during query.
  233.  
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
  234.  
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
  235.  
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
  236.  
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
  237.  
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2523)
  238.  
    at com.mysql.jdbc.ConnectionImpl.commit(ConnectionImpl.java:1547)
  239.  
    at com.zzy.bigdata.flink.streaming.DBConnectUtil.commit(DBConnectUtil.java:56)
  240.  
    at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.commit(MySqlTwoPhaseCommitSink.java:103)
  241.  
    at com.zzy.bigdata.flink.streaming.MySqlTwoPhaseCommitSink.commit(MySqlTwoPhaseCommitSink.java:30)
  242.  
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommit(TwoPhaseCommitSinkFunction.java:200)
  243.  
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:395)
  244.  
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:353)
  245.  
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
  246.  
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
  247.  
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
  248.  
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
  249.  
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
  250.  
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
  251.  
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
  252.  
    at java.lang.Thread.run(Thread.java:748)

通過日志發現成功入庫的日志是1-11,消費到數字15的時候,提交失敗,日志最后一行發生了回滾,關閉了連接,然后進行conmit的時候也失敗了,消費的數據12-15不會插入到數據庫中,此時checkpoint也不會做了,checkpoint保存的還是上一次成功消費后的offset數據。

數據庫表:mysqlExactlyOnce_test

  1.  
    CREATE TABLE `mysqlExactlyOnce_test` (
  2.  
    `id` bigint(20) NOT NULL AUTO_INCREMENT,
  3.  
    `value` varchar(255) DEFAULT NULL,
  4.  
    `insert_time` datetime DEFAULT NULL,
  5.  
    PRIMARY KEY (`id`)
  6.  
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

表中的數據


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM