1. 引言
事務大家都知道,就是相當於一個原子操作,要么全部執行,要么發生異常全部回滾。但事務只限於本地事務,即各個數據庫操作必須在同一數據庫下執行。拿我最近的接手的項目來說,各個模塊全部部署於不同的服務器,都有自己獨立的數據庫。前端想要刪除一個用戶,先調用用戶平台的刪除用戶接口,再調用權限平台的刪除權限接口。起初覺得這樣操作沒什么問題,后來有幾次數據異常后,發現有的用戶信息沒有,但權限信息還存在,導致數據不一致。此時,就想到了用分布式事物來解決。所謂分布式事物,我個人理解是為了解決數據一致性的問題。
2. kafka+本地事物表解決分布式事務
消息隊列的產生是為了解決各系統間通信問題,因為Kafka用的比較多,此處就想到用Kafka+本地事物表去解決分布式事務問題。關於Kafka+zookeeper的搭建此處不做詳解。
上圖是自己基於Kafka+本地事物表實現的基本流程(圖自己畫的,可能不太清楚)代碼后文貼出,(上圖箭頭只代表流程,和下文的1.2.3無關)此處講一下自己的思路。先申明,kafka只能保證最終一致性,並不是強一致性。我們最終目的是保證上圖2個藍色方塊的任務執行。方便說明,假定2個系統A,B 分別對應的2個數據庫A庫和B庫。其中A庫中的事務表叫做A事務表,B庫中的事務表叫做B事務表。要執行的藍色方塊叫A業務和B業務。
1. 在A系統中,啟用A庫的事物,執行如下2步操作。
1)A系統執行A業務
2)A系統在A庫的A事物表中寫一條狀態為NEW的數據(此處數據的ID唯一)
此處啟用A庫的事務,即2步操作要木全部執行,要木不執行。
2. A系統中啟用一個定時任務,5s中執行一次,輪訓A庫的A事物表,看是否有狀態為NEW的數據,如果有,將此記錄發送到Kafka消息隊列中,並修改此條數據的狀態為Published。此時A系統的操作全部執行完畢。
3. B系統啟用進程拉取kafka數據,如果發現有從A系統來的數據,將此數據記錄到B系統的B事務表中,更新此數據在B系統的B事務表狀態為NEW(因為ID唯一,此條數據的ID和存放在A庫中的數據的ID相同,如果出現網絡異常導致B系統重復收到數據,但看到自己庫中已有此ID的數據,便會將重復消息棄用,此處是保證只執行一次),更新完成后,Kafka確認提交(此處要關閉Kafka的自動提交)
4. B系統啟用定時任務,5s執行一次,輪訓B庫的B事物表,看是否有狀態為NEW的數據,如果有,執行如下2步操作。
1)B系統執行B業務
2)B系統更新B庫的B事物表,將此條狀態為New的數據改為狀態為Published
此處啟用B庫的事務,即2步操作要木全部執行,要木不執行。
3. 實現代碼
相對於Kafka來說,A系統相當於消息生產者,B系統相當於消息消費者。下面為SQL建表語句。
-- A系統事務表 CREATE TABLE `kafka_event_publish` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `payload` varchar(2000) NOT NULL, `eventType` varchar(30) NOT NULL, `status` varchar(30) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8; -- B系統事務表 CREATE TABLE `kafka_event_process` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `payload` varchar(2000) NOT NULL, `eventType` varchar(30) NOT NULL, `status` varchar(30) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
Kafka用來發送消息,接收消息,下面為Kafka的配置類。
package com.boot.util; // 消費者消息狀態 public enum EventProcessStatus { NEW, PROCESSED; private EventProcessStatus() { } } -------------------------------------- package com.boot.util; // 生產者消息狀態 public enum EventPublishStatus { NEW, PUBLISHED; private EventPublishStatus() { } } --------------------------------------- package com.boot.util; // Kafka主題 public enum EventType { USER_CREATED; private EventType() { } }
package com.boot.util; import java.util.Arrays; import java.util.Iterator; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; // kafka工具類 public class KafkaUtil { private static Producer<String, String> producer; private static KafkaConsumer<String, String> consumer; public KafkaUtil() { } // Kafka發送消息,topic為主題,value為具體消息 public static void sendSync(String topic, String value) throws ExecutionException, InterruptedException { producer.send(new ProducerRecord(topic, value)).get(); } // Kafka接收消息 public static void consume(Consumer<String> c) { // 訂閱主題為USER_CREATED的消息 consumer.subscribe(Arrays.asList(EventType.USER_CREATED.name())); while(true) { ConsumerRecords<String, String> records = consumer.poll(100L); Iterator var2 = records.iterator(); while(var2.hasNext()) { ConsumerRecord<String, String> record = (ConsumerRecord)var2.next(); System.out.println(record); c.accept(record.value()); } try { consumer.commitSync(); } catch (CommitFailedException var4) { System.out.println("Kafka消費者提交offset失敗"); } } } // kafka基礎配置 static { Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092"); producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer(producerProps); Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092"); consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put("group.id", "VoucherGroup"); consumerProps.put("enable.auto.commit", "false"); consumer = new KafkaConsumer(consumerProps); } }
A系統主要執行的操作有 1)執行業務操作,2)插入New消息到數據庫,3)定時任務輪訓數據庫為New的數據,4)發送到Kafka中,5)修改數據庫消息狀態為Published。此處1),2)步操作不貼代碼。下面為A系統中(即生產者)代碼。
import com.boot.kafka.transaction.EventPublishService; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @Author xiabing5 * @Create 2019/8/2 10:13 * @Desc spring定時器,定時向kafka中發送事物消息 **/ @Component public class EventPublishSchedule { @Resource private EventPublishService eventPublishService; /* * 每N毫秒執行一次*/ @Scheduled(fixedRate = 5000) private void publish() { eventPublishService.publish(); } }
import com.boot.mapper.KafkaEventPublishMapper; import com.boot.pojo.KafkaEventPublish; import com.boot.util.EventPublishStatus; import com.boot.util.KafkaUtil; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import javax.annotation.Resource; import java.util.*; /** * @Author xiabing5 * @Create 2019/8/2 9:34 * @Desc kafka解決分布式事物(消息發送端) **/ @Service public class EventPublishService { @Resource private KafkaEventPublishMapper eventPublishMapper; // 事務表的Mapper @Transactional(rollbackFor = Exception.class) public void publish() { // 查詢所有狀態為NEW的事件 Map<String,Object> params = new HashMap<String,Object>(); params.put("status", EventPublishStatus.NEW.name()); List<KafkaEventPublish> eventPublishList = eventPublishMapper.selectEventPublish(params); if(!CollectionUtils.isEmpty(eventPublishList)) { // 發送消息隊列 List<Long> ids = sendEventPublish(eventPublishList); if (!CollectionUtils.isEmpty(ids)) { //更新數據庫狀態為PUBLISHED eventPublishMapper.updateEventStatus(ids, EventPublishStatus.PUBLISHED.name()); } } } /** * @Author xiabing5 * @Create 2019/8/2 10:32 * @Desc 發送EventPublish對象集合 返回發送成功的EventPublish的ID集合 **/ private static List<Long> sendEventPublish(List<KafkaEventPublish> kafkaEventPublishes) { if(CollectionUtils.isEmpty(kafkaEventPublishes)) { return Collections.emptyList(); } List<Long> ids = new ArrayList<Long>(); for(KafkaEventPublish kafkaEventPublish : kafkaEventPublishes) { try { KafkaUtil.sendSync(kafkaEventPublish.getEventType().name(),kafkaEventPublish.getPayload()); ids.add(kafkaEventPublish.getId()); System.out.println("發送kafka消息成功"); } catch (Exception e) { System.out.println("發送kafka消息失敗 "+ kafkaEventPublish); } } return ids; } }
B系統主要執行的操作有,1)從kafka中拉取數據 ,2)將此數據放入數據庫事務表,更新狀態為New ,3) 定時任務輪詢狀態為New的數據,執行相應業務操作,4)更新New數據狀態為Complete 。下面為B系統中(即消費者)代碼。
import com.boot.kafka.transaction.EventProcessService; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; // 消費者定時任務 @Component public class EventProcessSchedule { @Resource private EventProcessService eventProcessService; @Scheduled(fixedRate = 5000) private void process() { eventProcessService.process(); } }
import com.boot.mapper.KafkaEventProcessMapper; import com.boot.pojo.KafkaEventProcess; import com.boot.util.EventProcessStatus; import com.boot.util.EventType; import com.boot.util.KafkaUtil; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.stream.Collectors; /** * @Author xiabing5 * @Create 2019/8/2 13:37 * @Desc 接收kafka消息service類 **/ @Service public class EventProcessService { @Resource private KafkaEventProcessMapper kafkaEventProcessMapper; // 創建單一線程線程池 @PostConstruct public void init() { ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("MqMessageConsumerThread-%d") .setDaemon(true) .build(); ExecutorService executorService = Executors.newSingleThreadExecutor(threadFactory); executorService.execute(new MqMessageConsumerThread()); } // 自定義接收線程 private class MqMessageConsumerThread implements Runnable { @Override public void run() { KafkaUtil.consume(consumerRecord -> { KafkaEventProcess kafkaEventProcess = new KafkaEventProcess(); kafkaEventProcess .setPayload(consumerRecord); kafkaEventProcess .setEventType(EventType.USER_CREATED); kafkaEventProcess .setStatus(EventProcessStatus.NEW); kafkaEventProcessMapper.insertEventProcess(kafkaEventProcess); }); } } // 執行業務邏輯操作 @Transactional(rollbackFor = Exception.class) public void process() { // 查詢表中狀態為new的事件 Map<String,Object> params = new HashMap<String,Object>(); params.put("status",EventProcessStatus.NEW.name()); List<KafkaEventProcess> kafkaEventProcessList = kafkaEventProcessMapper.selectEventProcess(params); for(KafkaEventProcess kafkaEventProcess : kafkaEventProcessList) { // 執行業務操作 System.out.println("刪除你"); } List<Long> ids = kafkaEventProcessList.stream().map(item -> item.getId()).collect(Collectors.toList()); kafkaEventProcessMapper.updateEventStatus(ids,EventProcessStatus.PROCESSED.name()); } }
補充:此處沒有貼事務表的sql語句(即Mapper.xml)無非是添加數據庫記錄,更新記錄狀態語句。此代碼在我的實踐中能運行。
4. 總結
分布式問題一直是我最近比較棘手問題,如分布式鎖,定時任務在集群下重復執行等。自己也是個小白,希望通過每次實踐后,能總結出點東西,便於以后去遍歷。