分布式事務


1. 引言

  事務大家都知道,就是相當於一個原子操作,要么全部執行,要么發生異常全部回滾。但事務只限於本地事務,即各個數據庫操作必須在同一數據庫下執行。拿我最近的接手的項目來說,各個模塊全部部署於不同的服務器,都有自己獨立的數據庫。前端想要刪除一個用戶,先調用用戶平台的刪除用戶接口,再調用權限平台的刪除權限接口。起初覺得這樣操作沒什么問題,后來有幾次數據異常后,發現有的用戶信息沒有,但權限信息還存在,導致數據不一致。此時,就想到了用分布式事物來解決。所謂分布式事物,我個人理解是為了解決數據一致性的問題。

2. kafka+本地事物表解決分布式事務

  消息隊列的產生是為了解決各系統間通信問題,因為Kafka用的比較多,此處就想到用Kafka+本地事物表去解決分布式事務問題。關於Kafka+zookeeper的搭建此處不做詳解。

 

  上圖是自己基於Kafka+本地事物表實現的基本流程(圖自己畫的,可能不太清楚)代碼后文貼出,(上圖箭頭只代表流程,和下文的1.2.3無關)此處講一下自己的思路。先申明,kafka只能保證最終一致性,並不是強一致性。我們最終目的是保證上圖2個藍色方塊的任務執行。方便說明,假定2個系統A,B 分別對應的2個數據庫A庫和B庫。其中A庫中的事務表叫做A事務表,B庫中的事務表叫做B事務表。要執行的藍色方塊叫A業務和B業務。

  1. 在A系統中,啟用A庫的事物,執行如下2步操作。

    1)A系統執行A業務

    2)A系統在A庫的A事物表中寫一條狀態為NEW的數據(此處數據的ID唯一)

    此處啟用A庫的事務,即2步操作要木全部執行,要木不執行。

  2. A系統中啟用一個定時任務,5s中執行一次,輪訓A庫的A事物表,看是否有狀態為NEW的數據,如果有,將此記錄發送到Kafka消息隊列中,並修改此條數據的狀態為Published。此時A系統的操作全部執行完畢。

  3. B系統啟用進程拉取kafka數據,如果發現有從A系統來的數據,將此數據記錄到B系統的B事務表中,更新此數據在B系統的B事務表狀態為NEW(因為ID唯一,此條數據的ID和存放在A庫中的數據的ID相同,如果出現網絡異常導致B系統重復收到數據,但看到自己庫中已有此ID的數據,便會將重復消息棄用,此處是保證只執行一次),更新完成后,Kafka確認提交(此處要關閉Kafka的自動提交)

  4. B系統啟用定時任務,5s執行一次,輪訓B庫的B事物表,看是否有狀態為NEW的數據,如果有,執行如下2步操作。

    1)B系統執行B業務

    2)B系統更新B庫的B事物表,將此條狀態為New的數據改為狀態為Published

    此處啟用B庫的事務,即2步操作要木全部執行,要木不執行。

3. 實現代碼 

  相對於Kafka來說,A系統相當於消息生產者,B系統相當於消息消費者。下面為SQL建表語句。

-- A系統事務表 
CREATE TABLE `kafka_event_publish` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `payload` varchar(2000) NOT NULL,
  `eventType` varchar(30) NOT NULL,
  `status` varchar(30) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;

-- B系統事務表
CREATE TABLE `kafka_event_process` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `payload` varchar(2000) NOT NULL,
  `eventType` varchar(30) NOT NULL,
  `status` varchar(30) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;

 

  Kafka用來發送消息,接收消息,下面為Kafka的配置類。

package com.boot.util;

// 消費者消息狀態
public enum EventProcessStatus {
    NEW,
    PROCESSED;

    private EventProcessStatus() {
    }
}
--------------------------------------
package com.boot.util;

// 生產者消息狀態
public enum EventPublishStatus {
    NEW,
    PUBLISHED;

    private EventPublishStatus() {
    }
}
---------------------------------------
package com.boot.util;

// Kafka主題
public enum EventType {
    USER_CREATED;

    private EventType() {
    }
}
package com.boot.util;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

// kafka工具類
public class KafkaUtil {
    private static Producer<String, String> producer;
    private static KafkaConsumer<String, String> consumer;

    public KafkaUtil() {
    }
    // Kafka發送消息,topic為主題,value為具體消息
    public static void sendSync(String topic, String value) throws ExecutionException, InterruptedException {
        producer.send(new ProducerRecord(topic, value)).get();
    }
    // Kafka接收消息
    public static void consume(Consumer<String> c) {
    // 訂閱主題為USER_CREATED的消息
        consumer.subscribe(Arrays.asList(EventType.USER_CREATED.name()));

        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(100L);
            Iterator var2 = records.iterator();

            while(var2.hasNext()) {
                ConsumerRecord<String, String> record = (ConsumerRecord)var2.next();
                System.out.println(record);
                c.accept(record.value());
            }

            try {
                consumer.commitSync();
            } catch (CommitFailedException var4) {
                System.out.println("Kafka消費者提交offset失敗");
            }
        }
    }
    // kafka基礎配置
    static {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092");
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer(producerProps);
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("group.id", "VoucherGroup");
        consumerProps.put("enable.auto.commit", "false");
        consumer = new KafkaConsumer(consumerProps);
    }
}

  A系統主要執行的操作有 1)執行業務操作,2)插入New消息到數據庫,3)定時任務輪訓數據庫為New的數據,4)發送到Kafka中,5)修改數據庫消息狀態為Published。此處1),2)步操作不貼代碼。下面為A系統中(即生產者)代碼。

import com.boot.kafka.transaction.EventPublishService;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @Author  xiabing5
 * @Create  2019/8/2 10:13
 * @Desc    spring定時器,定時向kafka中發送事物消息
 **/
@Component
public class EventPublishSchedule {

    @Resource
    private EventPublishService eventPublishService;

    /*
    * 每N毫秒執行一次*/
    @Scheduled(fixedRate = 5000)
    private void publish() {
        eventPublishService.publish();
    }
}
import com.boot.mapper.KafkaEventPublishMapper;
import com.boot.pojo.KafkaEventPublish;
import com.boot.util.EventPublishStatus;
import com.boot.util.KafkaUtil;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.*;

/**
 * @Author  xiabing5
 * @Create  2019/8/2 9:34
 * @Desc    kafka解決分布式事物(消息發送端)
 **/
@Service
public class EventPublishService {
    
    @Resource
    private KafkaEventPublishMapper eventPublishMapper; // 事務表的Mapper

    @Transactional(rollbackFor = Exception.class)
    public void publish() {
        // 查詢所有狀態為NEW的事件
        Map<String,Object> params = new HashMap<String,Object>();
        params.put("status", EventPublishStatus.NEW.name());
        List<KafkaEventPublish> eventPublishList = eventPublishMapper.selectEventPublish(params);

        if(!CollectionUtils.isEmpty(eventPublishList)) {
            // 發送消息隊列
            List<Long> ids = sendEventPublish(eventPublishList);

            if (!CollectionUtils.isEmpty(ids)) {
                //更新數據庫狀態為PUBLISHED
                eventPublishMapper.updateEventStatus(ids, EventPublishStatus.PUBLISHED.name());
            }
        }
    }

    /**
     * @Author  xiabing5
     * @Create  2019/8/2 10:32
     * @Desc    發送EventPublish對象集合 返回發送成功的EventPublish的ID集合
     **/
    private static List<Long> sendEventPublish(List<KafkaEventPublish> kafkaEventPublishes) {

        if(CollectionUtils.isEmpty(kafkaEventPublishes)) {
            return Collections.emptyList();
        }
        List<Long> ids = new ArrayList<Long>();
        for(KafkaEventPublish kafkaEventPublish : kafkaEventPublishes) {
            try {
                KafkaUtil.sendSync(kafkaEventPublish.getEventType().name(),kafkaEventPublish.getPayload());
                ids.add(kafkaEventPublish.getId());
                System.out.println("發送kafka消息成功");
            }
            catch (Exception e) {
                System.out.println("發送kafka消息失敗 "+ kafkaEventPublish);
            }
        }
        return ids;
    }
}

  B系統主要執行的操作有,1)從kafka中拉取數據 ,2)將此數據放入數據庫事務表,更新狀態為New ,3) 定時任務輪詢狀態為New的數據,執行相應業務操作,4)更新New數據狀態為Complete 。下面為B系統中(即消費者)代碼。

import com.boot.kafka.transaction.EventProcessService;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

// 消費者定時任務
@Component
public class EventProcessSchedule {

    @Resource
    private EventProcessService eventProcessService;


    @Scheduled(fixedRate = 5000)
    private void process() {
        eventProcessService.process();
    }

}
import com.boot.mapper.KafkaEventProcessMapper;
import com.boot.pojo.KafkaEventProcess;
import com.boot.util.EventProcessStatus;
import com.boot.util.EventType;
import com.boot.util.KafkaUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

/**
 * @Author  xiabing5
 * @Create  2019/8/2 13:37
 * @Desc    接收kafka消息service類
 **/
@Service
public class EventProcessService {

    @Resource
    private KafkaEventProcessMapper kafkaEventProcessMapper;

    // 創建單一線程線程池
    @PostConstruct
    public void init() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("MqMessageConsumerThread-%d")
                .setDaemon(true)
                .build();
        ExecutorService executorService = Executors.newSingleThreadExecutor(threadFactory);
        executorService.execute(new MqMessageConsumerThread());
    }

    // 自定義接收線程
    private class MqMessageConsumerThread implements Runnable {

        @Override
        public void run() {
            KafkaUtil.consume(consumerRecord -> {
                KafkaEventProcess kafkaEventProcess = new KafkaEventProcess();
                kafkaEventProcess .setPayload(consumerRecord);
                kafkaEventProcess .setEventType(EventType.USER_CREATED);
                kafkaEventProcess .setStatus(EventProcessStatus.NEW);
                kafkaEventProcessMapper.insertEventProcess(kafkaEventProcess);
            });
        }
    }

    // 執行業務邏輯操作
   @Transactional(rollbackFor = Exception.class)
    public void process() {

        // 查詢表中狀態為new的事件
        Map<String,Object> params = new HashMap<String,Object>();
        params.put("status",EventProcessStatus.NEW.name());

        List<KafkaEventProcess> kafkaEventProcessList = kafkaEventProcessMapper.selectEventProcess(params);

       for(KafkaEventProcess kafkaEventProcess : kafkaEventProcessList) {
           // 執行業務操作
           System.out.println("刪除你");
       }

        List<Long> ids = kafkaEventProcessList.stream().map(item -> item.getId()).collect(Collectors.toList());
       kafkaEventProcessMapper.updateEventStatus(ids,EventProcessStatus.PROCESSED.name());

    }

}

  補充:此處沒有貼事務表的sql語句(即Mapper.xml)無非是添加數據庫記錄,更新記錄狀態語句。此代碼在我的實踐中能運行。

4. 總結

  分布式問題一直是我最近比較棘手問題,如分布式鎖,定時任務在集群下重復執行等。自己也是個小白,希望通過每次實踐后,能總結出點東西,便於以后去遍歷。

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM