Spring Kafka --- 事務的兩種方式


原文:https://www.cnblogs.com/yanliang12138/p/12554756.html

目錄

為什么要使用Kafka事務

在日常開發中,數據庫的事務幾乎是必須用到的,事務回滾不一定在於數據增刪改異常,可能系統出現特定邏輯判斷的時候也需要進行數據回滾,Kafka亦是如此,

我們並不希望消息監聽器接收到一些錯誤的或者不需要的消息。

SpringBoot使用數據庫事務非常簡單,只需要在方法上加上@Transactional注解即可,那Kafka如果需要使用事務也可以如此,不過還需修改其他配置才能生效。

Kafka使用事務的兩種方式

配置Kafka事務管理器並使用@Transactional注解

使用KafkaTemplate的executeInTransaction方法

使用@Transactional注解方式

使用注解方式開啟事務還是比較方便的,不過首先需要我們配置KafkaTransactionManager,這個類就是Kafka提供給我們的事務管理類,

我們需要使用生產者工廠來創建這個事務管理類。

需要注意的是,我們需要在producerFactory中開啟事務功能,並設置TransactionIdPrefix,TransactionIdPrefix是用來生成Transactional.id的前綴。

 

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;
    @Value("${spring.kafka.producer.retries}")
    private int retries;
    @Value("${spring.kafka.producer.acks}")
    private String acks;
    @Value("${spring.kafka.producer.batch-size}")
    private int batchSize;
    @Value("${spring.kafka.producer.properties.linger.ms}")
    private int linger;
    @Value("${spring.kafka.producer.buffer-memory}")
    private int bufferMemory;
 
 
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.ACKS_CONFIG, acks); //需要 all
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
 
    public ProducerFactory<String, Object> producerFactory() {
        DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs());
        factory.transactionCapable(); factory.setTransactionIdPrefix("tran-"); return factory;
    }
 
    @Bean
    public KafkaTransactionManager transactionManager() { KafkaTransactionManager manager = new KafkaTransactionManager(producerFactory()); return manager;
    }
}

配置Kafka事務還是非常簡單的,接下來我們測試一下事務是否能正常使用,在DemoTest類中創建該方法

@Test
    @Transactional
    public void testTransactionalAnnotation() throws InterruptedException {
        kafkaTemplate.send("topic.quick.tran", "test transactional annotation");
        throw new RuntimeException("fail");
    }

運行測試方法后我們可以看到控制台中輸出了如下日志,這就代表我們使用事務成功,

或者你可以打開Kafka Tool 2工具查看一下測試前后的數據是否有變化

org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted
    at org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:317) [kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:214) [kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-1.0.2.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

 

 

使用KafkaTemplate.executeInTransaction開啟事務

這種方式開啟事務是不需要配置事務管理器的,也可以稱為本地事務。直接編寫測試方法

 

 

 public void test() {
        // 聲明事務:后面報錯消息不會發出去
        kafkaTemplate.executeInTransaction(operations -> {
            operations.send("topic1","test executeInTransaction");
            throw new RuntimeException("fail");
        });
    }

 

@Test
    public void testExecuteInTransaction() throws InterruptedException {
        kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
            @Override
            public Object doInOperations(KafkaOperations kafkaOperations) {
                kafkaOperations.send("topic.quick.tran", "test executeInTransaction");
                throw new RuntimeException("fail");
                //return true;
            }
        });
    }

運行測試方法后控制台同樣打印出了事務終止的異常,代表可以正常使用事務

org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted
    at org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:317) [kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:214) [kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) [kafka-clients-1.0.2.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM