kafka的事務指的是2個點 ① 生產者到kafka服務端的事務保障 ②消費者從kafka拉取數據的事務
kafka提供的事務機制是 第①點, 對於第②點來說 只能自己在消費端實現冪等性。
我們來介紹第①點, 因為生產者producer寫到kafka可能會出現消息重復,比如 設置ack=all,寫入到kafka的leader時,leader掛掉了,
沒有及時反饋ack,導致生產者再次發送消息就會出現重復消息落盤。這種情況可以設置kafka的屬性用來開啟冪等。但是這種冪等
只能保證 producer沒有掛掉的情況下,因為冪等的原理是 kafka緩存了一份 pid,partition,seqnumber 的數據,如果命中則說明之前緩存了,
但是如果producer掛掉了重啟后,它的pid就會變化,partition也有可能變化,就會導致消息會出現重復狀況。所以kafka 0.11版本加入了事務機制
開啟時事務后,會存在 transaction_id , 封裝成( transaction_id, pid,partition,seqnumber, 消費到哪條記錄等等) 保存在kafka上,如果producer 掛了重新
啟動的時候,會自動尋找kafka中的這個 transaction_id,找到的話就會恢復到掛掉之前的狀態 ,然后進行消費。kafka事務保證了 要么全部成功,要么全部失敗。
還有一個很重要的點是 要在consumer端 設置 isolation.level 為 read_committed狀態,它默認是read_uncommitted狀態,這是什么意思呢? 接下來詳細說明一下:
目前producer是雙線程設計,后台的Sender線程負責實際的消息發送。當Sender線程構造消息batch發送時,它會嘗試去讀取事務狀態,如果發現已經abort,則立即將未發送的batch全部fail掉——這就是為什么你注釋Thread.sleep后則不能發送的原因。當你加入了Thread.sleep之后batch發送時主線程在休眠,尚未執行到abortTransaction,故Sender線程成功地發送了消息到Kafka broker。
另外,你需要為consumer端配置isolation.level = read_committed。這樣不管哪種情況你都不會讀取到任何未提交的消息。默認是read_uncommitted,即使abort的消息,只要是成功發送到Kafka了,consumer就能讀取到。
1、也就是開啟事務之后,生產者調用send發送數據時他就會直接向kafka插入數據,只不過是這個數據后面追加了一個狀態,這個狀態是read_uncommited代表未提交,只有producer調用了commitTransaction時候 這些數據在kafka中才會都標記為read_commited。
producer實現代碼如下:
public class producer { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers","hadoop102:9092,hadoop103:9092,hadoop104:9092"); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("acks","all"); props.put("retries","2"); props.put("batch.size","16384"); props.put("transactional.id","tran-wb2"); //事務ID,開啟事務下面冪等也要開啟 props.put("enable.idempotence", "true"); //開啟冪等 // 一定要在消費者方設置 isolation.level為 read_committed,表示只讀取已提交事務狀態的記錄 Producer<Object, Object> producer = new KafkaProducer<>(props); producer.initTransactions(); producer.beginTransaction(); try { for (int i = 0; i <100 ; i++) { Future<RecordMetadata> first = producer.send(new ProducerRecord<>("first", i + "sad ", i + 1 + "s d")); //first.get(); 加上get可以實現同步發送操作 if (i==20){ throw new RuntimeException("測試異常回滾"); } } } catch (RuntimeException e){ System.out.println(e.toString()); producer.abortTransaction(); //出現異常,就進行回滾,這樣所有消息都會失敗 producer.close(); return; } producer.commitTransaction(); //沒有異常就 事務提交 producer.close(); } }
消費者代碼
public class consumer { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); properties.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("group.id", "wangbingsaa"); properties.put("isolation.level", "read_committed"); //一定要設置 只拉取 已提交事務狀態的記錄,這樣無論什么條件都可以 // properties.put("auto.offset.reset","earliest"); //設置拉取的位置 properties.put("enable.auto.commit", "false"); //關閉自動提交 properties.put("auto.commit.interval.ms", "1000"); //自動提交間隔 Consumer<String, String> consumer = new KafkaConsumer<>(properties); consumer.subscribe(Collections.singletonList("first")); ConsumerRecords<String, String> records = consumer.poll(4000); //如果拉取時長超過4000毫秒 就不拉取 for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, partitoon = %d key = %s, value = %s%n", record.offset(), record.partition(),record.key(), record.value()); } consumer.commitSync(); //手動提交 } }