原文:https://www.cnblogs.com/yanliang12138/p/12554756.html
目錄
為什么要使用Kafka事務
在日常開發中,數據庫的事務幾乎是必須用到的,事務回滾不一定在於數據增刪改異常,可能系統出現特定邏輯判斷的時候也需要進行數據回滾,Kafka亦是如此,
我們並不希望消息監聽器接收到一些錯誤的或者不需要的消息。
SpringBoot使用數據庫事務非常簡單,只需要在方法上加上@Transactional注解即可,那Kafka如果需要使用事務也可以如此,不過還需修改其他配置才能生效。
Kafka使用事務的兩種方式
配置Kafka事務管理器並使用@Transactional注解
使用KafkaTemplate的executeInTransaction方法
使用@Transactional注解方式
使用注解方式開啟事務還是比較方便的,不過首先需要我們配置KafkaTransactionManager,這個類就是Kafka提供給我們的事務管理類,
我們需要使用生產者工廠來創建這個事務管理類。
需要注意的是,我們需要在producerFactory中開啟事務功能,並設置TransactionIdPrefix,TransactionIdPrefix是用來生成Transactional.id的前綴。
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String servers; @Value("${spring.kafka.producer.retries}") private int retries; @Value("${spring.kafka.producer.acks}") private String acks; @Value("${spring.kafka.producer.batch-size}") private int batchSize; @Value("${spring.kafka.producer.properties.linger.ms}") private int linger; @Value("${spring.kafka.producer.buffer-memory}") private int bufferMemory; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.ACKS_CONFIG, acks); //需要 all props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, Object> producerFactory() { DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs()); factory.transactionCapable(); factory.setTransactionIdPrefix("tran-"); return factory; } @Bean public KafkaTransactionManager transactionManager() { KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory()); return manager; } }
配置Kafka事務還是非常簡單的,接下來我們測試一下事務是否能正常使用,在DemoTest類中創建該方法
@Test @Transactional public void testTransactionalAnnotation() throws InterruptedException { kafkaTemplate.send("topic.quick.tran", "test transactional annotation"); throw new RuntimeException("fail"); }
運行測試方法后我們可以看到控制台中輸出了如下日志,這就代表我們使用事務成功,
或者你可以打開Kafka Tool 2工具查看一下測試前后的數據是否有變化
org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted at org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:317) [kafka-clients-1.0.2.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:214) [kafka-clients-1.0.2.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-1.0.2.jar:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
使用KafkaTemplate.executeInTransaction開啟事務
這種方式開啟事務是不需要配置事務管理器的,也可以稱為本地事務。直接編寫測試方法
public void test() { // 聲明事務:后面報錯消息不會發出去 kafkaTemplate.executeInTransaction(operations -> { operations.send("topic1","test executeInTransaction"); throw new RuntimeException("fail"); }); }
@Test public void testExecuteInTransaction() throws InterruptedException { kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() { @Override public Object doInOperations(KafkaOperations kafkaOperations) { kafkaOperations.send("topic.quick.tran", "test executeInTransaction"); throw new RuntimeException("fail"); //return true; } }); }
運行測試方法后控制台同樣打印出了事務終止的異常,代表可以正常使用事務
org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted at org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:317) [kafka-clients-1.0.2.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:214) [kafka-clients-1.0.2.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-1.0.2.jar:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]