kafka 事務代碼實現(生產者到server端的事務)


kafka的事務指的是2個點   ① 生產者到kafka服務端的事務保障    ②消費者從kafka拉取數據的事務

kafka提供的事務機制是 第①點,  對於第②點來說 只能自己在消費端實現冪等性。

 

我們來介紹第①點, 因為生產者producer寫到kafka可能會出現消息重復,比如 設置ack=all,寫入到kafka的leader時,leader掛掉了,

沒有及時反饋ack,導致生產者再次發送消息就會出現重復消息落盤。這種情況可以設置kafka的屬性用來開啟冪等。但是這種冪等

只能保證 producer沒有掛掉的情況下,因為冪等的原理是 kafka緩存了一份 pid,partition,seqnumber 的數據,如果命中則說明之前緩存了,

但是如果producer掛掉了重啟后,它的pid就會變化,partition也有可能變化,就會導致消息會出現重復狀況。所以kafka 0.11版本加入了事務機制

開啟時事務后,會存在 transaction_id , 封裝成( transaction_id, pid,partition,seqnumber, 消費到哪條記錄等等) 保存在kafka上,如果producer 掛了重新

啟動的時候,會自動尋找kafka中的這個 transaction_id,找到的話就會恢復到掛掉之前的狀態 ,然后進行消費。kafka事務保證了  要么全部成功,要么全部失敗。

 

還有一個很重要的點是 要在consumer端 設置   isolation.level 為 read_committed狀態,它默認是read_uncommitted狀態,這是什么意思呢? 接下來詳細說明一下:

目前producer是雙線程設計,后台的Sender線程負責實際的消息發送。當Sender線程構造消息batch發送時,它會嘗試去讀取事務狀態,如果發現已經abort,則立即將未發送的batch全部fail掉——這就是為什么你注釋Thread.sleep后則不能發送的原因。當你加入了Thread.sleep之后batch發送時主線程在休眠,尚未執行到abortTransaction,故Sender線程成功地發送了消息到Kafka broker。

另外,你需要為consumer端配置isolation.level = read_committed。這樣不管哪種情況你都不會讀取到任何未提交的消息。默認是read_uncommitted,即使abort的消息,只要是成功發送到Kafka了,consumer就能讀取到。


1、也就是開啟事務之后,生產者調用send發送數據時他就會直接向kafka插入數據,只不過是這個數據后面追加了一個狀態,這個狀態是read_uncommited代表未提交,只有producer調用了commitTransaction時候 這些數據在kafka中才會都標記為read_commited。
所以 如果在 consumer消費方沒有設置 isolation.level 為 read_committed狀態(默認是read_uncommited),那么當producer 出現 異常或者宕機或者在事務提交之前發送的數據也依然能讀取到,因為前面說了 這些數據是send的時候依然插入到kafka中只不過狀態標記為uncommited而已,所以要想實現事務,還得在consumer方設置 隔離級別 isolation.level 為 read_committed,表示只能讀取 提交狀態的記錄 ,這樣不管在任何條件下都不會讀取到任何未提交的消息
 
2、還有一個情況是,當生產者中代碼捕獲到了異常,並進行abortTransaction,而消費者並沒有設置隔離級別為read_committed,但卻讀不到消息呢,那我們可以 想象成當生產者調用abortTransaction時接下來的消息肯定不會發送到服務器,並且已發送到服務器上的
消息還會直接刪掉,這樣理解就可以了
 

 

producer實現代碼如下:

public class producer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks","all");
        props.put("retries","2");
        props.put("batch.size","16384");
        props.put("transactional.id","tran-wb2");  //事務ID,開啟事務下面冪等也要開啟
        props.put("enable.idempotence", "true");  //開啟冪等
        // 一定要在消費者方設置 isolation.level為 read_committed,表示只讀取已提交事務狀態的記錄
        Producer<Object, Object> producer = new KafkaProducer<>(props);

        producer.initTransactions();
        producer.beginTransaction();
        try {
            for (int i = 0; i <100 ; i++) {
                Future<RecordMetadata> first = producer.send(new ProducerRecord<>("first", i + "sad ", i + 1 + "s d"));
                //first.get();  加上get可以實現同步發送操作
                if (i==20){
                    throw new RuntimeException("測試異常回滾");
                }
            }
        } catch (RuntimeException e){
            System.out.println(e.toString());
            producer.abortTransaction();  //出現異常,就進行回滾,這樣所有消息都會失敗
            producer.close();
            return;
        }

        producer.commitTransaction(); //沒有異常就 事務提交
        producer.close();
    }
}

消費者代碼

public class consumer {

    public static void main(String[] args) throws InterruptedException {

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("group.id", "wangbingsaa");
        properties.put("isolation.level", "read_committed"); //一定要設置 只拉取 已提交事務狀態的記錄,這樣無論什么條件都可以
        // properties.put("auto.offset.reset","earliest"); //設置拉取的位置
        properties.put("enable.auto.commit", "false"); //關閉自動提交
        properties.put("auto.commit.interval.ms", "1000"); //自動提交間隔


        Consumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList("first"));

        ConsumerRecords<String, String> records = consumer.poll(4000); //如果拉取時長超過4000毫秒 就不拉取
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, partitoon = %d key = %s, value = %s%n", record.offset(), record.partition(),record.key(), record.value());
        }
        consumer.commitSync(); //手動提交
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM