Spring實現異步的方式
引入MQ后的架構演進
MQ的使用場景
- 異步處理
- 流量削峰填谷
- 解耦微服務
MQ的選擇
mq對比詳情
搭建MQ
搭建RocketMq控制台
RocketMq的術語與概念
RocketMQ進階
看官方RocketMQ指導
消息編程模型01-編寫生產者
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
application.yml
代碼實現
private final RocketMQTemplate rocketMQTemplate;
消息編程模型02-編寫消費者
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
application.yml
代碼實現
分布式事務01-流程剖析,概念術語,事務消息狀態
分布式事務02-編碼實現
表創建
CREATE TABLE `rocketmq_transaction_log` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵' ,
`transaction_Id` varchar(45) NOT NULL COMMENT '事務' ,
`log` varchar(45) NOT NULL COMMENT '日志' ,
PRIMARY KEY (`id`)
);
ShareService
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
// 1. 查詢share是否存在,不存在或者當前的audit_status != NOT_YET,那么拋異常
Share share = this.shareMapper.selectByPrimaryKey(id);
if (share == null) {
throw new IllegalArgumentException("參數非法!該分享不存在!");
}
if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
throw new IllegalArgumentException("參數非法!該分享已審核通過或審核不通過!");
}
// 3. 如果是PASS,那么發送消息給rocketmq,讓用戶中心去消費,並為發布人添加積分
if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
// 發送半消息。。
String transactionId = UUID.randomUUID().toString();
this.rocketMQTemplate.sendMessageInTransaction(
"tx-add-bonus-group",
"add-bonus",
MessageBuilder
.withPayload(
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50)
.build()
)
// header也有妙用...
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", id)
.setHeader("dto", JSON.toJSONString(auditDTO))
.build(),
auditDTO
);
}
else {
this.auditByIdInDB(id, auditDTO);
}
return share;
}
@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
Share share = Share.builder()
.id(id)
.auditStatus(auditDTO.getAuditStatusEnum().toString())
.reason(auditDTO.getReason())
.build();
this.shareMapper.updateByPrimaryKeySelective(share);
// 4. 把share寫到緩存
}
@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
this.auditByIdInDB(id, auditDTO);
this.rocketmqTransactionLogMapper.insertSelective(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("審核分享...")
.build()
);
}
AddBonusTransactionListener
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
private final ShareService shareService;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.valueOf((String) headers.get("share_id"));
ShareAuditDTO auditDTO = (ShareAuditDTO)arg;
try {
this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
// select * from xxx where transaction_id = xxx
RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.build()
);
if (transactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
SpringCloudStream
SpringCloudStream-編寫生產者
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
application.yml
代碼編寫
@EnableBinding({Source.class})
// 掃描mybatis哪些包里面的接口
@MapperScan("com.itmuch.contentcenter.dao")
@SpringBootApplication
@EnableFeignClients// (defaultConfiguration = GlobalFeignConfiguration.class)
@EnableBinding({Source.class})
public class ContentCenterApplication {
修改com.alibaba.nacos日志級別
logging:
level:
com.itmuch.contentcenter.feignclient.UserCenterFeignClient: debug
com.alibaba.nacos: error
SpringCloudStream-編寫消費者
pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
application.yml
代碼編寫
@EnableBinding({Sink.class})
// 掃描mybatis哪些包里面的接口
@MapperScan("com.itmuch.usercenter.dao")
@SpringBootApplication
//@EnableDiscoveryClient
@EnableBinding({Sink.class})
public class UserCenterApplication {
public static void main(String[] args) {
SpringApplication.run(UserCenterApplication.class, args);
}
}
SpringCloudStream自定義接口-發送消息
SpringCloudStream自定義接口-消費消息
修改com.alibaba.nacos日志級別
logging:
level:
com.alibaba.nacos: error
消息過濾
- condition
- Tags
- Sql 92
SpringCloudStream的監控
如下三個鏈接查看SpringCloudStream的監控
application.yml
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always
SpringCloudStream的異常處理
全局處理【通用】
@StreamListener(value = Processor.INPUT)
public void handle(String body) {
throw new RuntimeException("x");
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
ErrorMessage errorMessage = (ErrorMessage) message;
System.out.println("Handling ERROR: " + errorMessage);
}
SpringCloudStream+RocketMQ實現分布式事務01-重構生產者
application.yml
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
output:
producer:
transactional: true
group: tx-add-bonus-group
bindings:
output:
# 用來指定topic
destination: add-bonus
@EnableBinding({Source.class})
@MapperScan("com.itmuch.contentcenter.dao")
@SpringBootApplication
@EnableFeignClients// (defaultConfiguration = GlobalFeignConfiguration.class)
@EnableBinding({Source.class})
public class ContentCenterApplication {
ShareService
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
// 1. 查詢share是否存在,不存在或者當前的audit_status != NOT_YET,那么拋異常
Share share = this.shareMapper.selectByPrimaryKey(id);
if (share == null) {
throw new IllegalArgumentException("參數非法!該分享不存在!");
}
if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
throw new IllegalArgumentException("參數非法!該分享已審核通過或審核不通過!");
}
// 3. 如果是PASS,那么發送消息給rocketmq,讓用戶中心去消費,並為發布人添加積分
if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
// 發送半消息。。
String transactionId = UUID.randomUUID().toString();
this.source.output()
.send(
MessageBuilder
.withPayload(
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50)
.build()
)
// header也有妙用...
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", id)
.setHeader("dto", JSON.toJSONString(auditDTO))
.build()
);
}
else {
this.auditByIdInDB(id, auditDTO);
}
return share;
}
@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
Share share = Share.builder()
.id(id)
.auditStatus(auditDTO.getAuditStatusEnum().toString())
.reason(auditDTO.getReason())
.build();
this.shareMapper.updateByPrimaryKeySelective(share);
// 4. 把share寫到緩存
}
@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
this.auditByIdInDB(id, auditDTO);
this.rocketmqTransactionLogMapper.insertSelective(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.log("審核分享...")
.build()
);
}
AddBonusTransactionListener
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
private final ShareService shareService;
private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.valueOf((String) headers.get("share_id"));
String dtoString = (String) headers.get("dto");
ShareAuditDTO auditDTO = JSON.parseObject(dtoString, ShareAuditDTO.class);
try {
this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
// select * from xxx where transaction_id = xxx
RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId)
.build()
);
if (transactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
SpringCloudStream+RocketMQ實現分布式事務02-重構消費者
application.yml
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876
bindings:
input:
destination: add-bonus
group: binder-group
@EnableBinding({Sink.class})
// 掃描mybatis哪些包里面的接口
@MapperScan("com.itmuch.usercenter.dao")
@SpringBootApplication
//@EnableDiscoveryClient
@EnableBinding({Sink.class})
public class UserCenterApplication {
public static void main(String[] args) {
SpringApplication.run(UserCenterApplication.class, args);
}
}
AddBonusStreamConsumer
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class AddBonusStreamConsumer {
private final UserService userService;
@StreamListener(Sink.INPUT)
public void receive(UserAddBonusMsgDTO message) {
message.setEvent("CONTRIBUTE");
message.setDescription("投稿加積分..");
this.userService.addBonus(message);
}
}
UserService
@Transactional(rollbackFor = Exception.class)
public void addBonus(UserAddBonusMsgDTO msgDTO) {
// 1. 為用戶加積分
Integer userId = msgDTO.getUserId();
Integer bonus = msgDTO.getBonus();
User user = this.userMapper.selectByPrimaryKey(userId);
user.setBonus(user.getBonus() + bonus);
this.userMapper.updateByPrimaryKeySelective(user);
// 2. 記錄日志到bonus_event_log表里面
this.bonusEventLogMapper.insert(
BonusEventLog.builder()
.userId(userId)
.value(bonus)
.event(msgDTO.getEvent())
.createTime(new Date())
.description(msgDTO.getDescription())
.build()
);
log.info("積分添加完畢...");
}