為什么要使用Kafka事務
在日常開發中,數據庫的事務幾乎是必須用到的,事務回滾不一定在於數據增刪改異常,可能系統出現特定邏輯判斷的時候也需要進行數據回滾,Kafka亦是如此,
我們並不希望消息監聽器接收到一些錯誤的或者不需要的消息。
SpringBoot使用數據庫事務非常簡單,只需要在方法上加上@Transactional注解即可,那Kafka如果需要使用事務也可以如此,不過還需修改其他配置才能生效。
Kafka使用事務的兩種方式
配置Kafka事務管理器並使用@Transactional注解
使用KafkaTemplate的executeInTransaction方法
使用@Transactional注解方式
使用注解方式開啟事務還是比較方便的,不過首先需要我們配置KafkaTransactionManager,這個類就是Kafka提供給我們的事務管理類,
我們需要使用生產者工廠來創建這個事務管理類。
需要注意的是,我們需要在producerFactory中開啟事務功能,並設置TransactionIdPrefix,TransactionIdPrefix是用來生成Transactional.id的前綴。

@Bean public ProducerFactory<Integer, String> producerFactory() { DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(senderProps()); factory.transactionCapable(); factory.setTransactionIdPrefix("tran-"); return factory; } @Bean public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) { KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory); return manager; }
配置Kafka事務還是非常簡單的,接下來我們測試一下事務是否能正常使用,在DemoTest類中創建該方法

@Test @Transactional public void testTransactionalAnnotation() throws InterruptedException { kafkaTemplate.send("topic.quick.tran", "test transactional annotation"); throw new RuntimeException("fail"); }
運行測試方法后我們可以看到控制台中輸出了如下日志,這就代表我們使用事務成功,
或者你可以打開Kafka Tool 2工具查看一下測試前后的數據是否有變化

org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted at org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:317) [kafka-clients-1.0.2.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:214) [kafka-clients-1.0.2.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-1.0.2.jar:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
使用KafkaTemplate.executeInTransaction開啟事務
這種方式開啟事務是不需要配置事務管理器的,也可以稱為本地事務。直接編寫測試方法

@Test public void testExecuteInTransaction() throws InterruptedException { kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() { @Override public Object doInOperations(KafkaOperations kafkaOperations) { kafkaOperations.send("topic.quick.tran", "test executeInTransaction"); throw new RuntimeException("fail"); //return true;
} }); }
運行測試方法后控制台同樣打印出了事務終止的異常,代表可以正常使用事務

org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted at org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:317) [kafka-clients-1.0.2.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:214) [kafka-clients-1.0.2.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-1.0.2.jar:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]