5.【Spring Cloud Alibaba】消息驅動的微服務-SpringCloudAlibabaRocketMQ


Spring實現異步的方式

image

引入MQ后的架構演進

image

MQ的使用場景

  • 異步處理
  • 流量削峰填谷
  • 解耦微服務

MQ的選擇

image

mq對比詳情

mq對比詳情

image

搭建MQ

搭建教程

搭建RocketMq控制台

RocketMQ控制台安裝教程

RocketMq的術語與概念

image

image

RocketMQ進階

看官方RocketMQ指導

消息編程模型01-編寫生產者

image

pom.xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
application.yml

image

代碼實現
private final RocketMQTemplate rocketMQTemplate;

image

消息編程模型02-編寫消費者

image

pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
application.yml

image

代碼實現

image

分布式事務01-流程剖析,概念術語,事務消息狀態

image

image

image

分布式事務02-編碼實現

表創建
CREATE TABLE `rocketmq_transaction_log` (
`id`  int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵' ,
`transaction_Id`  varchar(45) NOT NULL COMMENT '事務' ,
`log`  varchar(45) NOT NULL COMMENT '日志' ,
PRIMARY KEY (`id`)
);
ShareService
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
    // 1. 查詢share是否存在,不存在或者當前的audit_status != NOT_YET,那么拋異常
    Share share = this.shareMapper.selectByPrimaryKey(id);
    if (share == null) {
        throw new IllegalArgumentException("參數非法!該分享不存在!");
    }
    if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
        throw new IllegalArgumentException("參數非法!該分享已審核通過或審核不通過!");
    }

    // 3. 如果是PASS,那么發送消息給rocketmq,讓用戶中心去消費,並為發布人添加積分
    if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
        // 發送半消息。。
        String transactionId = UUID.randomUUID().toString();

        this.rocketMQTemplate.sendMessageInTransaction(
                "tx-add-bonus-group",
                "add-bonus",
                MessageBuilder
                    .withPayload(
                        UserAddBonusMsgDTO.builder()
                            .userId(share.getUserId())
                            .bonus(50)
                            .build()
                    )
                    // header也有妙用...
                    .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                    .setHeader("share_id", id)
                    .setHeader("dto", JSON.toJSONString(auditDTO))
                    .build(),
                auditDTO
            );
    }
    else {
        this.auditByIdInDB(id, auditDTO);
    }
    return share;
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
    Share share = Share.builder()
        .id(id)
        .auditStatus(auditDTO.getAuditStatusEnum().toString())
        .reason(auditDTO.getReason())
        .build();
    this.shareMapper.updateByPrimaryKeySelective(share);

    // 4. 把share寫到緩存
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
    this.auditByIdInDB(id, auditDTO);

    this.rocketmqTransactionLogMapper.insertSelective(
        RocketmqTransactionLog.builder()
            .transactionId(transactionId)
            .log("審核分享...")
            .build()
    );
}
AddBonusTransactionListener
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
    private final ShareService shareService;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        MessageHeaders headers = msg.getHeaders();

        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer shareId = Integer.valueOf((String) headers.get("share_id"));

        ShareAuditDTO auditDTO = (ShareAuditDTO)arg;

        try {
            this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        // select * from xxx where transaction_id = xxx
        RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
            RocketmqTransactionLog.builder()
                .transactionId(transactionId)
                .build()
        );
        if (transactionLog != null) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

SpringCloudStream

image

image

image

SpringCloudStream-編寫生產者

pom.xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
application.yml

image

代碼編寫

@EnableBinding({Source.class})

// 掃描mybatis哪些包里面的接口
@MapperScan("com.itmuch.contentcenter.dao")
@SpringBootApplication
@EnableFeignClients// (defaultConfiguration = GlobalFeignConfiguration.class)
@EnableBinding({Source.class})
public class ContentCenterApplication {

image

修改com.alibaba.nacos日志級別

logging:
  level:
    com.itmuch.contentcenter.feignclient.UserCenterFeignClient: debug
    com.alibaba.nacos: error

SpringCloudStream-編寫消費者

pom.xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
application.yml

image

代碼編寫

@EnableBinding({Sink.class})

// 掃描mybatis哪些包里面的接口
@MapperScan("com.itmuch.usercenter.dao")
@SpringBootApplication
//@EnableDiscoveryClient
@EnableBinding({Sink.class})
public class UserCenterApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserCenterApplication.class, args);
    }

}

image

SpringCloudStream自定義接口-發送消息

image

image

image

image

SpringCloudStream自定義接口-消費消息

image

image

image

image

修改com.alibaba.nacos日志級別

logging:
  level:
    com.alibaba.nacos: error

消息過濾

Spring Cloud Stream實現消息過濾消費

  • condition
  • Tags
  • Sql 92

SpringCloudStream的監控

如下三個鏈接查看SpringCloudStream的監控
image

application.yml
management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

SpringCloudStream的異常處理

SpringCloudStream的異常處理

全局處理【通用】
@StreamListener(value = Processor.INPUT)
public void handle(String body) {
    throw new RuntimeException("x");
}

@StreamListener("errorChannel")
public void error(Message<?> message) {
    ErrorMessage errorMessage = (ErrorMessage) message;
    System.out.println("Handling ERROR: " + errorMessage);
}

SpringCloudStream+RocketMQ實現分布式事務01-重構生產者

application.yml
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              transactional: true
              group: tx-add-bonus-group
      bindings:
        output:
          # 用來指定topic
          destination: add-bonus

@EnableBinding({Source.class})

@MapperScan("com.itmuch.contentcenter.dao")
@SpringBootApplication
@EnableFeignClients// (defaultConfiguration = GlobalFeignConfiguration.class)
@EnableBinding({Source.class})
public class ContentCenterApplication {
ShareService
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
    // 1. 查詢share是否存在,不存在或者當前的audit_status != NOT_YET,那么拋異常
    Share share = this.shareMapper.selectByPrimaryKey(id);
    if (share == null) {
        throw new IllegalArgumentException("參數非法!該分享不存在!");
    }
    if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
        throw new IllegalArgumentException("參數非法!該分享已審核通過或審核不通過!");
    }

    // 3. 如果是PASS,那么發送消息給rocketmq,讓用戶中心去消費,並為發布人添加積分
    if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
        // 發送半消息。。
        String transactionId = UUID.randomUUID().toString();

        this.source.output()
            .send(
                MessageBuilder
                    .withPayload(
                        UserAddBonusMsgDTO.builder()
                            .userId(share.getUserId())
                            .bonus(50)
                            .build()
                    )
                    // header也有妙用...
                    .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                    .setHeader("share_id", id)
                    .setHeader("dto", JSON.toJSONString(auditDTO))
                    .build()
            );
    }
    else {
        this.auditByIdInDB(id, auditDTO);
    }
    return share;
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
    Share share = Share.builder()
        .id(id)
        .auditStatus(auditDTO.getAuditStatusEnum().toString())
        .reason(auditDTO.getReason())
        .build();
    this.shareMapper.updateByPrimaryKeySelective(share);

    // 4. 把share寫到緩存
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
    this.auditByIdInDB(id, auditDTO);

    this.rocketmqTransactionLogMapper.insertSelective(
        RocketmqTransactionLog.builder()
            .transactionId(transactionId)
            .log("審核分享...")
            .build()
    );
}
AddBonusTransactionListener
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
    private final ShareService shareService;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        MessageHeaders headers = msg.getHeaders();

        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer shareId = Integer.valueOf((String) headers.get("share_id"));

        String dtoString = (String) headers.get("dto");
        ShareAuditDTO auditDTO = JSON.parseObject(dtoString, ShareAuditDTO.class);

        try {
            this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        // select * from xxx where transaction_id = xxx
        RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
            RocketmqTransactionLog.builder()
                .transactionId(transactionId)
                .build()
        );
        if (transactionLog != null) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

SpringCloudStream+RocketMQ實現分布式事務02-重構消費者

application.yml
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        input:
          destination: add-bonus
          group: binder-group

@EnableBinding({Sink.class})

// 掃描mybatis哪些包里面的接口
@MapperScan("com.itmuch.usercenter.dao")
@SpringBootApplication
//@EnableDiscoveryClient
@EnableBinding({Sink.class})
public class UserCenterApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserCenterApplication.class, args);
    }

}
AddBonusStreamConsumer
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class AddBonusStreamConsumer {
    private final UserService userService;

    @StreamListener(Sink.INPUT)
    public void receive(UserAddBonusMsgDTO message) {
        message.setEvent("CONTRIBUTE");
        message.setDescription("投稿加積分..");
        this.userService.addBonus(message);
    }
}
UserService
@Transactional(rollbackFor = Exception.class)
public void addBonus(UserAddBonusMsgDTO msgDTO) {
    // 1. 為用戶加積分
    Integer userId = msgDTO.getUserId();
    Integer bonus = msgDTO.getBonus();
    User user = this.userMapper.selectByPrimaryKey(userId);

    user.setBonus(user.getBonus() + bonus);
    this.userMapper.updateByPrimaryKeySelective(user);

    // 2. 記錄日志到bonus_event_log表里面
    this.bonusEventLogMapper.insert(
        BonusEventLog.builder()
            .userId(userId)
            .value(bonus)
            .event(msgDTO.getEvent())
            .createTime(new Date())
            .description(msgDTO.getDescription())
            .build()
    );
    log.info("積分添加完畢...");
}

SpringCloudStream知識盤點

SpringCloudStream知識盤點


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM