一、可靠性消息投遞
1、rabbit-core-producer工程下創建服務
@Service
public class MessageStoreService {
@Autowired
private BrokerMessageMapper brokerMessageMapper;
public int insert(BrokerMessage brokerMessage) {
return this.brokerMessageMapper.insert(brokerMessage);
}
public BrokerMessage selectByMessageId(String messageId) {
return this.brokerMessageMapper.selectByPrimaryKey(messageId);
}
public void succuess(String messageId) {
this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
BrokerMessageStatus.SEND_OK.getCode(),
new Date());
}
public void failure(String messageId) {
this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
BrokerMessageStatus.SEND_FAIL.getCode(),
new Date());
}
public List<BrokerMessage> fetchTimeOutMessage4Retry(BrokerMessageStatus brokerMessageStatus){
return this.brokerMessageMapper.queryBrokerMessageStatus4Timeout(brokerMessageStatus.getCode());
}
public int updateTryCount(String brokerMessageId) {
return this.brokerMessageMapper.update4TryCount(brokerMessageId, new Date());
}
}
2、定義消息發送狀態
public enum BrokerMessageStatus {
SENDING("0"),
SEND_OK("1"),
SEND_FAIL("2"),
SEND_FAIL_A_MOMENT("3");
private String code;
private BrokerMessageStatus(String code) {
this.code = code;
}
public String getCode() {
return this.code;
}
}
常量信息
public interface BrokerMessageConst {
//超時時間為1分鍾
int TIMEOUT = 1;
}
RabbitBrokerImpl增加方式可靠性消息方法
@Override
public void reliantSend(Message message) {
message.setMessageType(MessageType.RELIANT);
BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId());
if(bm == null) {
//1. 把數據庫的消息發送日志先記錄好
Date now = new Date();
BrokerMessage brokerMessage = new BrokerMessage();
brokerMessage.setMessageId(message.getMessageId());
brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode());
//tryCount 在最開始發送的時候不需要進行設置
brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));
brokerMessage.setCreateTime(now);
brokerMessage.setUpdateTime(now);
brokerMessage.setMessage(message);
messageStoreService.insert(brokerMessage);
}
//2. 執行真正的發送消息邏輯
sendKernel(message);
}
收到確認消息操作
RabbitTemplateContainer類中的confirm方法修改如下:
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//消息應答
List<String> strings = splitter.splitToList(correlationData.getId());
String messageId = strings.get(0);
long sendTime = Long.parseLong(strings.get(1));
String messageType = strings.get(2);
if(ack){
log.info("發送消息成功,confirm messageId={}, sendTime={}" , messageId, sendTime);
}else {
log.info("發送消息失敗,confirm messageId={}, sendTime={}" , messageId, sendTime);
}
if(ack) {
// 當Broker 返回ACK成功時, 就是更新一下日志表里對應的消息發送狀態為 SEND_OK
// 如果當前消息類型為reliant 我們就去數據庫查找並進行更新
if(MessageType.RELIANT.endsWith(messageType)) {
this.messageStoreService.succuess(messageId);
}
log.info("發送消息成功,confirm messageId={}, sendTime={}" , messageId, sendTime);
} else {
log.info("發送消息失敗,confirm messageId={}, sendTime={}" , messageId, sendTime);
}
}
二、分布式定時任務組件封裝 rabbit-task
1、增加依賴。引入elastic-job
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rabbit-parent</artifactId>
<groupId>com.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rabbit-task</artifactId>
<properties>
<elastic-job.version>2.1.4</elastic-job.version>
</properties>
<dependencies>
<!-- spring boot dependency -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- elastic-job dependency -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>
2、增加自動裝配
1) 創建類JobParserAutoConfigurartion ,用於解析Elastic-Job連接zk注冊中心的配置,並初始化配置信息到zk注冊中心。
將配置屬性讀取到JobZookeeperProperties 類中
@Slf4j
@Configuration
@ConditionalOnProperty(prefix = "elastic.job.zk", name = {"namespace", "serverLists"}, matchIfMissing = false)
@EnableConfigurationProperties(JobZookeeperProperties.class)
public class JobParserAutoConfigurartion {
@Bean(initMethod = "init")
public ZookeeperRegistryCenter zookeeperRegistryCenter(JobZookeeperProperties jobZookeeperProperties) {
ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(jobZookeeperProperties.getServerLists(),
jobZookeeperProperties.getNamespace());
zkConfig.setBaseSleepTimeMilliseconds(zkConfig.getBaseSleepTimeMilliseconds());
zkConfig.setMaxSleepTimeMilliseconds(zkConfig.getMaxSleepTimeMilliseconds());
zkConfig.setConnectionTimeoutMilliseconds(zkConfig.getConnectionTimeoutMilliseconds());
zkConfig.setSessionTimeoutMilliseconds(zkConfig.getSessionTimeoutMilliseconds());
zkConfig.setMaxRetries(zkConfig.getMaxRetries());
zkConfig.setDigest(zkConfig.getDigest());
log.info("初始化job注冊中心配置成功, zkaddress : {}, namespace : {}", jobZookeeperProperties.getServerLists(), jobZookeeperProperties.getNamespace());
return new ZookeeperRegistryCenter(zkConfig);
}
@Bean
public ElasticJobConfParser elasticJobConfParser(JobZookeeperProperties jobZookeeperProperties, ZookeeperRegistryCenter zookeeperRegistryCenter) {
return new ElasticJobConfParser(jobZookeeperProperties, zookeeperRegistryCenter);
}
}
2)、在resources文件夾下創建META-INF
在META-INF文件夾下創建spring.factories
# Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.example.task.autoconfigure.JobParserAutoConfigurartion
3)、JobZookeeperProperties 類中
@ConfigurationProperties(prefix = "elastic.job.zk")
@Data
public class JobZookeeperProperties {
private String namespace;
private String serverLists;
private int maxRetries = 3;
private int connectionTimeoutMilliseconds = 15000;
private int sessionTimeoutMilliseconds = 60000;
private int baseSleepTimeMilliseconds = 1000;
private int maxSleepTimeMilliseconds = 3000;
private String digest = "";
}
3、增加模塊裝配
1) 增加注解。 導入JobParserAutoConfigurartion配置
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(JobParserAutoConfigurartion.class)
public @interface EnableElasticJob {
}
2) 增加Job配置注解ElasticJobConfig
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticJobConfig {
String name(); //elasticjob的名稱
String cron() default "";
int shardingTotalCount() default 1;
String shardingItemParameters() default "";
String jobParameter() default "";
boolean failover() default false;
boolean misfire() default true;
String description() default "";
boolean overwrite() default false;
boolean streamingProcess() default false;
String scriptCommandLine() default "";
boolean monitorExecution() default false;
public int monitorPort() default -1; //must
public int maxTimeDiffSeconds() default -1; //must
public String jobShardingStrategyClass() default ""; //must
public int reconcileIntervalMinutes() default 10; //must
public String eventTraceRdbDataSource() default ""; //must
public String listener() default ""; //must
public boolean disabled() default false; //must
public String distributedListener() default "";
public long startedTimeoutMilliseconds() default Long.MAX_VALUE; //must
public long completedTimeoutMilliseconds() default Long.MAX_VALUE; //must
public String jobExceptionHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler";
public String executorServiceHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler";
}
3、增加解析注解(ElasticJobConfig )的信息類ElasticJobConfParser
@Slf4j
public class ElasticJobConfParser implements ApplicationListener<ApplicationReadyEvent> {
private JobZookeeperProperties jobZookeeperProperties;
private ZookeeperRegistryCenter zookeeperRegistryCenter;
public ElasticJobConfParser(JobZookeeperProperties jobZookeeperProperties,
ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.jobZookeeperProperties = jobZookeeperProperties;
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
try {
ApplicationContext applicationContext = event.getApplicationContext();
Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(ElasticJobConfig.class);
for(Iterator<?> it = beanMap.values().iterator(); it.hasNext();) {
Object confBean = it.next();
Class<?> clazz = confBean.getClass();
if(clazz.getName().indexOf("$") > 0) {
String className = clazz.getName();
clazz = Class.forName(className.substring(0, className.indexOf("$")));
}
// 獲取接口類型 用於判斷是什么類型的任務
String jobTypeName = clazz.getInterfaces()[0].getSimpleName();
// 獲取配置項 ElasticJobConfig
ElasticJobConfig conf = clazz.getAnnotation(ElasticJobConfig.class);
String jobClass = clazz.getName();
String jobName = this.jobZookeeperProperties.getNamespace() + "." + conf.name();
String cron = conf.cron();
String shardingItemParameters = conf.shardingItemParameters();
String description = conf.description();
String jobParameter = conf.jobParameter();
String jobExceptionHandler = conf.jobExceptionHandler();
String executorServiceHandler = conf.executorServiceHandler();
String jobShardingStrategyClass = conf.jobShardingStrategyClass();
String eventTraceRdbDataSource = conf.eventTraceRdbDataSource();
String scriptCommandLine = conf.scriptCommandLine();
boolean failover = conf.failover();
boolean misfire = conf.misfire();
boolean overwrite = conf.overwrite();
boolean disabled = conf.disabled();
boolean monitorExecution = conf.monitorExecution();
boolean streamingProcess = conf.streamingProcess();
int shardingTotalCount = conf.shardingTotalCount();
int monitorPort = conf.monitorPort();
int maxTimeDiffSeconds = conf.maxTimeDiffSeconds();
int reconcileIntervalMinutes = conf.reconcileIntervalMinutes();
// 把當當網的esjob的相關configuration
JobCoreConfiguration coreConfig = JobCoreConfiguration
.newBuilder(jobName, cron, shardingTotalCount)
.shardingItemParameters(shardingItemParameters)
.description(description)
.failover(failover)
.jobParameter(jobParameter)
.misfire(misfire)
.jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), jobExceptionHandler)
.jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), executorServiceHandler)
.build();
// 到底要創建什么樣的任務.
JobTypeConfiguration typeConfig = null;
if(ElasticJobTypeEnum.SIMPLE.getType().equals(jobTypeName)) {
typeConfig = new SimpleJobConfiguration(coreConfig, jobClass);
}
if(ElasticJobTypeEnum.DATAFLOW.getType().equals(jobTypeName)) {
typeConfig = new DataflowJobConfiguration(coreConfig, jobClass, streamingProcess);
}
if(ElasticJobTypeEnum.SCRIPT.getType().equals(jobTypeName)) {
typeConfig = new ScriptJobConfiguration(coreConfig, scriptCommandLine);
}
// LiteJobConfiguration
LiteJobConfiguration jobConfig = LiteJobConfiguration
.newBuilder(typeConfig)
.overwrite(overwrite)
.disabled(disabled)
.monitorPort(monitorPort)
.monitorExecution(monitorExecution)
.maxTimeDiffSeconds(maxTimeDiffSeconds)
.jobShardingStrategyClass(jobShardingStrategyClass)
.reconcileIntervalMinutes(reconcileIntervalMinutes)
.build();
// 創建一個Spring的beanDefinition
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
factory.setInitMethodName("init");
factory.setScope("prototype");
// 1.添加bean構造參數,相當於添加自己的真實的任務實現類
if (!ElasticJobTypeEnum.SCRIPT.getType().equals(jobTypeName)) {
factory.addConstructorArgValue(confBean);
}
// 2.添加注冊中心
factory.addConstructorArgValue(this.zookeeperRegistryCenter);
// 3.添加LiteJobConfiguration
factory.addConstructorArgValue(jobConfig);
// 4.如果有eventTraceRdbDataSource 則也進行添加
if (StringUtils.hasText(eventTraceRdbDataSource)) {
BeanDefinitionBuilder rdbFactory = BeanDefinitionBuilder.rootBeanDefinition(JobEventRdbConfiguration.class);
rdbFactory.addConstructorArgReference(eventTraceRdbDataSource);
factory.addConstructorArgValue(rdbFactory.getBeanDefinition());
}
// 5.添加監聽
List<?> elasticJobListeners = getTargetElasticJobListeners(conf);
factory.addConstructorArgValue(elasticJobListeners);
// 接下來就是把factory 也就是 SpringJobScheduler注入到Spring容器中
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();
String registerBeanName = conf.name() + "SpringJobScheduler";
defaultListableBeanFactory.registerBeanDefinition(registerBeanName, factory.getBeanDefinition());
SpringJobScheduler scheduler = (SpringJobScheduler)applicationContext.getBean(registerBeanName);
scheduler.init();
log.info("啟動elastic-job作業: " + jobName);
}
log.info("共計啟動elastic-job作業數量為: {} 個", beanMap.values().size());
} catch (Exception e) {
log.error("elasticjob 啟動異常, 系統強制退出", e);
System.exit(1);
}
}
private List<BeanDefinition> getTargetElasticJobListeners(ElasticJobConfig conf) {
List<BeanDefinition> result = new ManagedList<BeanDefinition>(2);
String listeners = conf.listener();
if (StringUtils.hasText(listeners)) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(listeners);
factory.setScope("prototype");
result.add(factory.getBeanDefinition());
}
String distributedListeners = conf.distributedListener();
long startedTimeoutMilliseconds = conf.startedTimeoutMilliseconds();
long completedTimeoutMilliseconds = conf.completedTimeoutMilliseconds();
if (StringUtils.hasText(distributedListeners)) {
BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(distributedListeners);
factory.setScope("prototype");
factory.addConstructorArgValue(Long.valueOf(startedTimeoutMilliseconds));
factory.addConstructorArgValue(Long.valueOf(completedTimeoutMilliseconds));
result.add(factory.getBeanDefinition());
}
return result;
}
}
實現了ApplicationListener<ApplicationReadyEvent>接口,等Bean都初始化完成,應用起來后執行onApplicationEvent方法。
2) 增加枚舉ElasticJobTypeEnum
public enum ElasticJobTypeEnum {
SIMPLE("SimpleJob", "簡單類型job"),
DATAFLOW("DataflowJob", "流式類型job"),
SCRIPT("ScriptJob", "腳本類型job");
private String type;
private String desc;
private ElasticJobTypeEnum(String type, String desc) {
this.type = type;
this.desc = desc;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
}
三、分布式定時任務測試
1、創建工程rabbit-task-test工程
工程結構如下:

1、application.properties 配置如下
server.port=8881 elastic.job.zk.namespace=elastic-job elastic.job.zk.serverLists=47.xx.xx.120:2181
2、啟動ElasticJob。
增加注解EnableElasticJob
@EnableElasticJob
@SpringBootApplication
@ServletComponentScan(basePackages = {"com.example.rabbittasktest.esjob"})
public class RabbitTaskTestApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitTaskTestApplication.class, args);
}
}
3、創建定時任務
1 ) 創建定時任務1 ,5秒鍾執行一次
@Component
@ElasticJobConfig(
name = "com.example.rabbittasktest.esjob.TestJob",
cron = "0/5 * * * * ?", //5秒鍾一次
description = "測試定時任務",
overwrite = true,
shardingTotalCount = 5
)
@Slf4j
public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("執行測試job");
}
}
2) 創建定時任務2
10秒鍾一次執行一次
@Component
@ElasticJobConfig(
name = "com.example.rabbittasktest.esjob.TestJob2",
cron = "0/10 * * * * ?", //10秒鍾一次
description = "測試定時任務2",
overwrite = true,
shardingTotalCount = 2
)
@Slf4j
public class TestJob2 implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("執行測試job2");
}
}
啟動工程,說明已經成功了。

三、定時任務補償
在rabbit-core-producer 工程中創建job
@Component
@ElasticJobConfig(
name = "com.example.producer.task.RetryMessageDataflowJob",
cron = "0/10 * * * * ?", //10秒鍾一次
description = "可靠性投遞消息補償任務",
overwrite = true,
shardingTotalCount = 1 //broker_message表只有一張,加入有100張表,可以采用10個分片,每個分片處理10張表
)
@Slf4j
public class RetryMessageDataflowJob implements DataflowJob<BrokerMessage> {
@Autowired
private MessageStoreService messageStoreService;
@Autowired
private RabbitBroker rabbitBroker;
private static final int MAX_RETRY_COUNT = 3;
@Override
public List<BrokerMessage> fetchData(ShardingContext shardingContext) {
List<BrokerMessage> list = messageStoreService.fetchTimeOutMessage4Retry(BrokerMessageStatus.SENDING);
log.info("------@@@抓取的數據數量:{}", list.size());
return list;
}
@Override
public void processData(ShardingContext shardingContext, List<BrokerMessage> list) {
list.forEach(brokerMessage -> {
if(brokerMessage.getTryCount() >= MAX_RETRY_COUNT){
this.messageStoreService.failure(brokerMessage.getMessageId());
log.warn("消息重試最終失敗,消息設置為最終失敗,消息Id={}", brokerMessage.getMessageId());
}else {
//每次重發的時候更新try_count字段 ,值+1
this.messageStoreService.updateTryCount(brokerMessage.getMessageId());
//重發消息
this.rabbitBroker.reliantSend(brokerMessage.getMessage());
}
});
}
}
四、可靠性消息測試
1、創建工程rabbit-test
1) 增加依賴
<dependency>
<groupId>com.example</groupId>
<artifactId>rabbit-core-producer</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
2) rabbit 啟動 參考Linux 下安裝RabbitMQ
目前存在Exchange,名稱為exchange1

創建綁定了queue1

Routing key為springboot.*
3、增加配置屬性
application.properties
server.servlet.context-path=/rabbittest server.port=8001 spring.application.name=/rabbittest spring.rabbitmq.addresses=118.xx.xx.101:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 #spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true spring.rabbitmq.listener.simple.auto-startup=false elastic.job.zk.serverLists=47.xx.xx.120:2181 elastic.job.zk.namespace=elastic-job
4、增加注解
@EnableElasticJob
@SpringBootApplication
@ComponentScan( {"com.example.rabbittest", "com.example"})
@MapperScan({"com.example.producer.mapper"})
public class RabbittestApplication {
public static void main(String[] args) {
SpringApplication.run(RabbittestApplication.class, args);
}
}
5、增加測試方法
@RestController
public class TestController {
@Autowired
private ProducerClient producerClient;
@RequestMapping("/test1")
public String test1() throws Exception{
for(int i = 0 ; i < 1; i ++) {
String uniqueId = UUID.randomUUID().toString();
Map<String, Object> attributes = new HashMap<>();
attributes.put("name", "張三");
attributes.put("age", "18");
Message message = new Message(
uniqueId,
"exchange1",
"springboot.abc",
attributes,
0, MessageType.RELIANT);
producerClient.send(message);
}
Thread.sleep(100000);
return "success";
}
}
訪問http://localhost:8001/rabbittest/test1
打印日志如下: 說明發送成功。

修改exchange1為exchange-noexist, 因為exchange1-noexist不存在, confirm回調方法里返回失敗。
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange1-notexist' in vhost '/', class-id=60, method-id=40)
定時任務補償3次后還是失敗,設置為最終失敗。

6、批量發送。
public void sendMessages() {
List<Message> messages = MessageHolder.clear();
messages.forEach(message -> {
MessageHolderAyncQueue.submit((Runnable) () -> {
CorrelationData correlationData =
new CorrelationData(String.format("%s#%s#%s",
message.getMessageId(),
System.currentTimeMillis(),
message.getMessageType()));
String topic = message.getTopic();
String routingKey = message.getRoutingKey();
RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);
rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
log.info("#RabbitBrokerImpl.sendMessages# send to rabbitmq, messageId: {}", message.getMessageId());
});
});
}
本質上還是一條一條發送。
7、延遲發送
延遲發送需要在Rabbit MQ中增加延遲插件
代碼修改如下
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
// messageProperties.setExpiration(delaultExprie);
com.example.api.Message message = (com.example.api.Message)object;
messageProperties.setDelay(message.getDelayMills());
return this.delegate.toMessage(object, messageProperties);
}
