RabbitMq解決分布式事物


一、RabbitMQ解決分布式事務思路:

案例: 經典案例,以目前流行點外賣的案例,用戶下單后,調用訂單服務,讓后訂單服務調用派單系統通知送外賣人員送單,這時候訂單系統與派單系統采用MQ異步通訊。

二、RabbitMQ解決分布式事務原理采用最終一致性原理

需要保證以下三要素

1、確認生產者一定要將數據投遞到MQ服務器中(采用MQ消息確認機制)

2、MQ消費者消息能夠正確消費消息,采用手動ACK模式(注意重試冪等性問題)

3、如何保證第一個事務先執行,采用補償機制,在創建一個補單消費者進行監聽,如果訂單沒有創建成功,進行補單。

三、如果生產者投遞消息到MQ服務器成功

  場景1:如果消費者消費消息失敗了,生產者是不需要回滾事務的。

  解決方案:消費者采用手動ack應答模式,采用MQ進行補償重試機制,注意MQ補償冪等性問題。

 問題:如何確保生產者投遞消息到MQ服務器一定能成功?

 解決方案:confirm機制(確認應答機制)。

場景2 如果生產者投遞消息到MQ服務器失敗,如何解決?

 解決方案:使用生產者重試機制進行發消息,注意冪等性問題。

場景3  如何保證一個事務先執行,生產者投遞消息到MQ服務器成功,消費者消費成功了,但是訂單卻回滾了。 

 解決方案:補單機制。

傳統解決方式:

RabbitMq解決方案:

MQ解決分布式事務一致性 

案例中 訂單表 和 派單表必須一致!

用MQ 可以做流量削峰值

MQ解決分布式事務最終一致性思想

 1.   確保生產者消息 一定要投遞到MQ服務器端成功

如果生產者投遞消息到MQ服務器成功

  場景1  如果消費者消費消息失敗了 

        生產者是不需要回滾事務。 消費者采用手動ack應答方式  進行補償機制,補償的過程中注意 冪等性 問題。

       分布式事務中遵循base理論 遵循cpa理論

 如何確保生產者發送消息一定發送到MQ消息服務器端成功? confirm機制 確認應答機制

 場景2    如果生產者發送消息到MQ服務器端失敗 

      使用生產者重試機制進行發消息

四、代碼實現

1、派單表

create TABLE platoon( id INT PRIMARY KEY AUTO_INCREMENT, orderId VARCHAR(255), takeout_userId int )

2、訂單表

create TABLE order_info( id INT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(30), order_money INT, orderId VARCHAR(255) );

3、生產者

1.實現接口 implements RabbitTemplate.ConfirmCallback

 2. 重寫回調方法   成功 失敗的  調用  

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {

send方法里面調用回調函數:

this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);

yml需要配置回調機制:

###開啟消息確認機制 confirms
publisher-confirms: true
publisher-returns: true

 重試也是有一定次數限制的 如果超過一定次數 就需要進行人工補償了

上面已經實現了確保消息發送給 消費者   此時的數據不一致問題 就是:

場景3.  如何保證第一個事務先執行,生產者投遞消息到MQ服務器成功,消費者消費消息成功了,但是訂單事務回滾了。   

(生產者投遞消息給消費者消費成功 然后 生產者回滾了) 

MQ解決分布式原理通過最終一致性解決總體框架圖:  交換機采用路由鍵模式  補單隊列和派但隊列都綁定同一個路由鍵 

支付服務和積分服務

下單

pom文件

  <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.6.RELEASE</version>
    </parent>
    <dependencies>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.1.1</version>
        </dependency>
        <!-- mysql 依賴 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- 阿里巴巴數據源 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.14</version>
        </dependency>
        <!-- SpringBoot整合Web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- springboot- 整個 lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- 添加springboot對amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

基礎包:

@Component public class BaseApiService { public BasicResult setResultError(Integer code, String msg) { return setResult(code, msg, null); } // 返回錯誤,可以傳msg
    public BasicResult setResultError(String msg) { return setResult(ApiConstants.HTTP_RES_CODE_500, msg, null); } // 返回成功,可以傳data值
    public BasicResult setResultSuccess(Object data) { return setResult(ApiConstants.HTTP_RES_CODE_200, ApiConstants.HTTP_RES_CODE_200_VALUE, data); } // 返回成功,沒有data值
    public BasicResult setResultSuccess() { return setResult(ApiConstants.HTTP_RES_CODE_200, ApiConstants.HTTP_RES_CODE_200_VALUE, null); } // 返回成功,沒有data值
    public BasicResult setResultSuccess(String msg) { return setResult(ApiConstants.HTTP_RES_CODE_200, msg, null); } // 通用封裝
    public BasicResult setResult(Integer code, String msg, Object data) { return new BasicResult(code, msg, data); } }
@Data public class BasicResult { private Integer rtnCode; private String msg; private Object data; public BasicResult() { } public BasicResult(Integer rtnCode, String msg, Object data) { super(); this.rtnCode = rtnCode; this.msg = msg; this.data = data; } @Override public String toString() { return "ResponseBase [rtnCode=" + rtnCode + ", msg=" + msg + ", data=" + data + "]"; } }

常量

public interface ApiConstants { // 響應請求成功
    String HTTP_RES_CODE_200_VALUE = "success"; // 系統錯誤
    String HTTP_RES_CODE_500_VALUE = "fial"; // 響應請求成功code
    Integer HTTP_RES_CODE_200 = 200; // 系統錯誤
    Integer HTTP_RES_CODE_500 = 500; // 未關聯QQ賬號
    Integer HTTP_RES_CODE_201 = 201; }

config類

@Configuration public class RabbitmqConfig { // 下單並且派單存隊列
    public static final String ORDER_DIC_QUEUE = "order_dic_queue"; // 補單隊列,判斷訂單是否已經被創建
    public static final String ORDER_CREATE_QUEUE = "order_create_queue"; // 下單並且派單交換機
    private static final String ORDER_EXCHANGE_NAME = "order_exchange_name"; // 1.定義訂單隊列
 @Bean public Queue directOrderDicQueue() { return new Queue(ORDER_DIC_QUEUE); } // 2.定義補訂單隊列
 @Bean public Queue directCreateOrderQueue() { return new Queue(ORDER_CREATE_QUEUE); } // 2.定義交換機
 @Bean DirectExchange directOrderExchange() { return new DirectExchange(ORDER_EXCHANGE_NAME); } // 3.訂單隊列與交換機綁定
 @Bean Binding bindingExchangeOrderDicQueue() { return BindingBuilder.bind(directOrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey"); } // 3.補單隊列與交換機綁定
 @Bean Binding bindingExchangeCreateOrder() { return BindingBuilder.bind(directCreateOrderQueue()).to(directOrderExchange()).with("orderRoutingKey"); } }

實體類

@Data public class OrderEntity { private Long id; // 訂單名稱
    private String name; // 下單金額
    private Double orderMoney; // 訂單id
    private String orderId; }

mapper接口

public interface OrderMapper { @Insert(value = "INSERT INTO `order_info` VALUES (#{id}, #{name}, #{orderMoney},#{orderId})") @Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id") public int addOrder(OrderEntity orderEntity); @Select("SELECT id as id ,name as name , order_money as orderMoney,orderId as orderId from order_info where orderId=#{orderId};") public OrderEntity findOrderId(@Param("orderId") String orderId); }

service類

@Service public class OrderService extends BaseApiService implements RabbitTemplate.ConfirmCallback { @Autowired private OrderMapper orderMapper; @Autowired private RabbitTemplate rabbitTemplate; @Transactional public BasicResult addOrderAndDispatch(){ //先下單 訂單表插入數據
        OrderEntity orderEntity = new OrderEntity(); orderEntity.setName("黃燜雞米飯"); // 價格是300元
 orderEntity.setOrderMoney(300d); // 商品id
        String orderId = UUID.randomUUID().toString(); orderEntity.setOrderId(orderId); // 1.先下單,創建訂單 (往訂單數據庫中插入一條數據)
        int orderResult = orderMapper.addOrder(orderEntity); System.out.println("orderResult:" + orderResult); if (orderResult <= 0) { return setResultError("下單失敗!"); } // 2.訂單表插插入完數據后 訂單表發送 外賣小哥
 send(orderId); //出現異常的時候 //int i = 1/0;
        return setResultSuccess(); } /** * 發送消息 * @param orderId */
    private void send(String orderId) { JSONObject jsonObect = new JSONObject(); jsonObect.put("orderId", orderId); String msg = jsonObect.toJSONString(); System.out.println("msg:" + msg); //封裝消息
 MessageBuilder.withBody(msg.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("utf-8") .setMessageId(orderId); //構建回調參數
        CorrelationData correlationData = new CorrelationData(orderId); //發送消息
        this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); rabbitTemplate.convertAndSend("order_exchange_name","orderRoutingKey" ,msg,correlationData); } // 生產消息確認機制 生產者往服務器端發送消息的時候 采用應答機制
 @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //全局ID 都是相同的
        String orderId = correlationData.getId(); System.out.println("消息id:" + correlationData.getId()); if (ack) { //消息發送成功
            System.out.println("消息發送確認成功"); } else { //重試機制
 send(orderId); System.out.println("消息發送確認失敗:" + cause); } } }

cotroller類

@RestController public class OrderController extends BaseApiService { @Autowired private OrderService orderService; @RequestMapping("/addOrder") public BasicResult addOrder() { return orderService.addOrderAndDispatch(); } }

啟動類

@MapperScan("com.yehui.mapper") @SpringBootApplication public class AppOrder { public static void main(String[] args) { SpringApplication.run(AppOrder.class, args); } }

yml配置文件

spring: rabbitmq: ####連接地址 host: localhost ####端口號 port: 5672 ####賬號 username: guest ####密碼 password: guest ### 地址 virtual-host: / ###開啟消息確認機制 confirms publisher-confirms: true publisher-returns: true #數據庫連接信息 datasource: name: test url: jdbc:mysql://127.0.0.1:3306/project?useUnicode=true&characterEncoding=utf8&autoReconnect=true&rewriteBatchedStatements=TRUE
 username: root password: root # 使用druid數據源 type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver

派單

pom文件

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.6.RELEASE</version>
    </parent>
    <dependencies>

        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.1.1</version>
        </dependency>
        <!-- mysql 依賴 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!-- 阿里巴巴數據源 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.0.14</version>
        </dependency>
        <!-- SpringBoot整合Web組件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- springboot- 整個 lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!-- 添加springboot對amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

常量類

public interface ApiConstants { // 響應請求成功
    String HTTP_RES_CODE_200_VALUE = "success"; // 系統錯誤
    String HTTP_RES_CODE_500_VALUE = "fial"; // 響應請求成功code
    Integer HTTP_RES_CODE_200 = 200; // 系統錯誤
    Integer HTTP_RES_CODE_500 = 500; // 未關聯QQ賬號
    Integer HTTP_RES_CODE_201 = 201; }

實體類

@Data public class DispatchEntity { private Long id; // 訂單號
    private String orderId; // 外賣員id
    private Long takeoutUserId; }

mapper接口

public interface DispatchMapper { /** * 新增派單任務 */ @Insert("INSERT into platoon values (null,#{orderId},#{takeoutUserId});") public int insertDistribute(DispatchEntity distributeEntity); /** * 查詢是否已經派單了 */ @Select("SELECT * FROM platoon WHERE orderid =#{OrderId}") public DispatchEntity findByOrderId(@Param("orderId") String OrderId); }

config類

@Configuration public class RabbitmqConfig { // 下單並且派單存隊列
    public static final String ORDER_DIC_QUEUE = "order_dic_queue"; // 補單隊列,判斷訂單是否已經被創建
    public static final String ORDER_CREATE_QUEUE = "order_create_queue"; // 下單並且派單交換機
    private static final String ORDER_EXCHANGE_NAME = "order_exchange_name"; // 1.定義派單隊列
 @Bean public Queue OrderDicQueue() { return new Queue(ORDER_DIC_QUEUE); } /* // 2.定義補訂單隊列 @Bean public Queue directCreateOrderQueue() { return new Queue(ORDER_CREATE_QUEUE); }*/

    // 2.定義交換機
 @Bean DirectExchange directOrderExchange() { return new DirectExchange(ORDER_EXCHANGE_NAME); } // 3.訂單隊列與交換機綁定
 @Bean Binding bindingExchangeOrderDicQueue() { return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey"); } }

派單服務

/** * 派單服務 */ @Component public class DispatchConsumer { @Autowired private DispatchMapper dispatchMapper; @RabbitListener(queues = "order_dic_queue") public void process(Message message, Channel channel) throws UnsupportedEncodingException { String messageId = message.getMessageProperties().getMessageId(); String msg = new String(message.getBody(), "UTF-8"); System.out.println("派單服務平台" + msg + ",消息id:" + messageId); JSONObject jsonObject = JSONObject.parseObject(msg); String orderId = jsonObject.getString("orderId"); if (StringUtils.isEmpty(orderId)) { // 日志記錄
            return; } DispatchEntity dispatchEntity = new DispatchEntity(); // 訂單id
 dispatchEntity.setOrderId(orderId); // 外賣員id
        dispatchEntity.setTakeoutUserId(12l); // 使用orderId查詢是否已經派單了 網絡重試間隔
        DispatchEntity byOrderId = dispatchMapper.findByOrderId(orderId); if (byOrderId == null) { // 手動簽收消息,通知mq服務器端刪除該消息 已經派過單了,通知MQ不要在繼續重試。
 basicNack(message, channel); return; } //插入數據庫
        int insertDistribute = dispatchMapper.insertDistribute(dispatchEntity); if (insertDistribute > 0) { // 手動簽收消息,通知mq服務器端刪除該消息
 basicNack(message, channel); } } // 消費者獲取到消息之后 手動簽收 通知MQ刪除該消息
    private void basicNack(Message message, Channel channel) { try { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } catch (IOException e) { e.printStackTrace(); } } }

啟動類

@MapperScan("com.yehui.mapper") @SpringBootApplication public class AppDispatch { public static void main(String[] args) { SpringApplication.run(AppDispatch.class, args); } }

yml文件

spring: rabbitmq: ####連接地址 host: localhost ####端口號 port: 5672 ####賬號 username: guest ####密碼 password: guest ### 地址 virtual-host: / listener: simple: retry: ####開啟消費者(程序出現異常的情況下會)進行重試 enabled: true ####最大重試次數 max-attempts: 5 ####重試間隔次數 initial-interval: 3000 ####開啟手動ack acknowledge-mode: manual #數據庫連接信息 datasource: name: test url: jdbc:mysql://127.0.0.1:3306/project?useUnicode=true&characterEncoding=utf8&autoReconnect=true&rewriteBatchedStatements=TRUE
 username: root password: root # 使用druid數據源 type: com.alibaba.druid.pool.DruidDataSource driver-class-name: com.mysql.jdbc.Driver server: port: 8081

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM