RabbitMQ基礎組件封裝實踐2-(分布式定時任務elastic-job封裝)


 

一、可靠性消息投遞

1、rabbit-core-producer工程下創建服務

@Service
public class MessageStoreService {

	@Autowired
	private BrokerMessageMapper brokerMessageMapper;
	
	public int insert(BrokerMessage brokerMessage) {
		return this.brokerMessageMapper.insert(brokerMessage);
	}
	
	public BrokerMessage selectByMessageId(String messageId) {
		return this.brokerMessageMapper.selectByPrimaryKey(messageId);
	}

	public void succuess(String messageId) {
		this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
				BrokerMessageStatus.SEND_OK.getCode(),
				new Date());
	}
	
	public void failure(String messageId) {
		this.brokerMessageMapper.changeBrokerMessageStatus(messageId,
				BrokerMessageStatus.SEND_FAIL.getCode(),
				new Date());
	}
	
	public List<BrokerMessage> fetchTimeOutMessage4Retry(BrokerMessageStatus brokerMessageStatus){
		return this.brokerMessageMapper.queryBrokerMessageStatus4Timeout(brokerMessageStatus.getCode());
	}
	
	public int updateTryCount(String brokerMessageId) {
		return this.brokerMessageMapper.update4TryCount(brokerMessageId, new Date());
	}
	
	
	
	
}

  

 

2、定義消息發送狀態

public enum BrokerMessageStatus {

	SENDING("0"),
	SEND_OK("1"),
	SEND_FAIL("2"),
	SEND_FAIL_A_MOMENT("3");
	
	private String code;
	
	private BrokerMessageStatus(String code) {
		this.code = code;
	}
	
	public String getCode() {
		return this.code;
	}
	
}

  

常量信息

public interface BrokerMessageConst {
	
	//超時時間為1分鍾
	int TIMEOUT = 1;
}

  

RabbitBrokerImpl增加方式可靠性消息方法

 @Override
    public void reliantSend(Message message) {
        message.setMessageType(MessageType.RELIANT);
        BrokerMessage bm = messageStoreService.selectByMessageId(message.getMessageId());
        if(bm == null) {
            //1. 把數據庫的消息發送日志先記錄好
            Date now = new Date();
            BrokerMessage brokerMessage = new BrokerMessage();
            brokerMessage.setMessageId(message.getMessageId());
            brokerMessage.setStatus(BrokerMessageStatus.SENDING.getCode());
            //tryCount 在最開始發送的時候不需要進行設置
            brokerMessage.setNextRetry(DateUtils.addMinutes(now, BrokerMessageConst.TIMEOUT));
            brokerMessage.setCreateTime(now);
            brokerMessage.setUpdateTime(now);
            brokerMessage.setMessage(message);
            messageStoreService.insert(brokerMessage);
        }
        //2. 執行真正的發送消息邏輯
        sendKernel(message);
    }

  

收到確認消息操作

RabbitTemplateContainer類中的confirm方法修改如下:

 @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //消息應答
       List<String> strings =  splitter.splitToList(correlationData.getId());
       String messageId = strings.get(0);
       long sendTime = Long.parseLong(strings.get(1));
        String messageType = strings.get(2);
       if(ack){
           log.info("發送消息成功,confirm messageId={}, sendTime={}" , messageId, sendTime);
       }else {
           log.info("發送消息失敗,confirm messageId={}, sendTime={}" , messageId, sendTime);
       }
        if(ack) {
            //	當Broker 返回ACK成功時, 就是更新一下日志表里對應的消息發送狀態為 SEND_OK

            // 	如果當前消息類型為reliant 我們就去數據庫查找並進行更新
            if(MessageType.RELIANT.endsWith(messageType)) {
                this.messageStoreService.succuess(messageId);
            }
            log.info("發送消息成功,confirm messageId={}, sendTime={}" , messageId, sendTime);
        } else {
            log.info("發送消息失敗,confirm messageId={}, sendTime={}" , messageId, sendTime);

        }
    }

  

 二、分布式定時任務組件封裝 rabbit-task

1、增加依賴。引入elastic-job

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rabbit-parent</artifactId>
        <groupId>com.example</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>rabbit-task</artifactId>

    <properties>
        <elastic-job.version>2.1.4</elastic-job.version>
    </properties>
    <dependencies>
        <!-- spring boot dependency -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--  elastic-job dependency -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>${elastic-job.version}</version>
        </dependency>
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>${elastic-job.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

</project>

  

2、增加自動裝配

1) 創建類JobParserAutoConfigurartion ,用於解析Elastic-Job連接zk注冊中心的配置,並初始化配置信息到zk注冊中心。

將配置屬性讀取到JobZookeeperProperties 類中

@Slf4j
@Configuration
@ConditionalOnProperty(prefix = "elastic.job.zk", name = {"namespace", "serverLists"}, matchIfMissing = false)
@EnableConfigurationProperties(JobZookeeperProperties.class)
public class JobParserAutoConfigurartion {

	@Bean(initMethod = "init")
	public ZookeeperRegistryCenter zookeeperRegistryCenter(JobZookeeperProperties jobZookeeperProperties) {
		ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(jobZookeeperProperties.getServerLists(),
				jobZookeeperProperties.getNamespace());
		zkConfig.setBaseSleepTimeMilliseconds(zkConfig.getBaseSleepTimeMilliseconds());
		zkConfig.setMaxSleepTimeMilliseconds(zkConfig.getMaxSleepTimeMilliseconds());
		zkConfig.setConnectionTimeoutMilliseconds(zkConfig.getConnectionTimeoutMilliseconds());
		zkConfig.setSessionTimeoutMilliseconds(zkConfig.getSessionTimeoutMilliseconds());
		zkConfig.setMaxRetries(zkConfig.getMaxRetries());
		zkConfig.setDigest(zkConfig.getDigest());
		log.info("初始化job注冊中心配置成功, zkaddress : {}, namespace : {}", jobZookeeperProperties.getServerLists(), jobZookeeperProperties.getNamespace());
		return new ZookeeperRegistryCenter(zkConfig);
	}
	
	@Bean
	public ElasticJobConfParser elasticJobConfParser(JobZookeeperProperties jobZookeeperProperties, ZookeeperRegistryCenter zookeeperRegistryCenter) {
		return new ElasticJobConfParser(jobZookeeperProperties, zookeeperRegistryCenter);
	}
	
}

 

2)、在resources文件夾下創建META-INF

在META-INF文件夾下創建spring.factories

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.task.autoconfigure.JobParserAutoConfigurartion

  

3)、JobZookeeperProperties 類中

@ConfigurationProperties(prefix = "elastic.job.zk")
@Data
public class JobZookeeperProperties {

	private String namespace;
	
	private String serverLists;
	
	private int maxRetries = 3;

	private int connectionTimeoutMilliseconds = 15000;
	
	private int sessionTimeoutMilliseconds = 60000;
	
	private int baseSleepTimeMilliseconds = 1000;
	
	private int maxSleepTimeMilliseconds = 3000;
	
	private String digest = "";
	
}

  

 3、增加模塊裝配

1) 增加注解。 導入JobParserAutoConfigurartion配置

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(JobParserAutoConfigurartion.class)
public @interface EnableElasticJob {

}

  

2) 增加Job配置注解ElasticJobConfig 

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ElasticJobConfig {

	String name();	//elasticjob的名稱
	
	String cron() default "";
	
	int shardingTotalCount() default 1;
	
	String shardingItemParameters() default "";
	
	String jobParameter() default "";
	
	boolean failover() default false;
	
	boolean misfire() default true;
	
	String description() default "";
	
	boolean overwrite() default false;
	
	boolean streamingProcess() default false;
	
	String scriptCommandLine() default "";
	
	boolean monitorExecution() default false;
	
	public int monitorPort() default -1;	//must

	public int maxTimeDiffSeconds() default -1;	//must

	public String jobShardingStrategyClass() default "";	//must

	public int reconcileIntervalMinutes() default 10;	//must

	public String eventTraceRdbDataSource() default "";	//must

	public String listener() default "";	//must

	public boolean disabled() default false;	//must

	public String distributedListener() default "";

	public long startedTimeoutMilliseconds() default Long.MAX_VALUE;	//must

	public long completedTimeoutMilliseconds() default Long.MAX_VALUE;		//must

	public String jobExceptionHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler";

	public String executorServiceHandler() default "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler";
	
}

  

 3、增加解析注解(ElasticJobConfig )的信息類ElasticJobConfParser 

@Slf4j
public class ElasticJobConfParser implements ApplicationListener<ApplicationReadyEvent> {

	private JobZookeeperProperties jobZookeeperProperties;
	
	private ZookeeperRegistryCenter zookeeperRegistryCenter;
	
	public ElasticJobConfParser(JobZookeeperProperties jobZookeeperProperties,
			ZookeeperRegistryCenter zookeeperRegistryCenter) {
		this.jobZookeeperProperties = jobZookeeperProperties;
		this.zookeeperRegistryCenter = zookeeperRegistryCenter;
	}

	@Override
	public void onApplicationEvent(ApplicationReadyEvent event) {
		try {
			ApplicationContext applicationContext = event.getApplicationContext();
			Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(ElasticJobConfig.class);
			for(Iterator<?> it = beanMap.values().iterator(); it.hasNext();) {
				Object confBean = it.next();
				Class<?> clazz = confBean.getClass();
				if(clazz.getName().indexOf("$") > 0) {
					String className = clazz.getName();
					clazz = Class.forName(className.substring(0, className.indexOf("$")));
				}
				// 	獲取接口類型 用於判斷是什么類型的任務
				String jobTypeName = clazz.getInterfaces()[0].getSimpleName();
				//	獲取配置項 ElasticJobConfig
				ElasticJobConfig conf = clazz.getAnnotation(ElasticJobConfig.class);
				
				String jobClass = clazz.getName();
				String jobName = this.jobZookeeperProperties.getNamespace() + "." + conf.name();
				String cron = conf.cron();
				String shardingItemParameters = conf.shardingItemParameters();
				String description = conf.description();
				String jobParameter = conf.jobParameter();
				String jobExceptionHandler = conf.jobExceptionHandler();
				String executorServiceHandler = conf.executorServiceHandler();

				String jobShardingStrategyClass = conf.jobShardingStrategyClass();
				String eventTraceRdbDataSource = conf.eventTraceRdbDataSource();
				String scriptCommandLine = conf.scriptCommandLine();

				boolean failover = conf.failover();
				boolean misfire = conf.misfire();
				boolean overwrite = conf.overwrite();
				boolean disabled = conf.disabled();
				boolean monitorExecution = conf.monitorExecution();
				boolean streamingProcess = conf.streamingProcess();

				int shardingTotalCount = conf.shardingTotalCount();
				int monitorPort = conf.monitorPort();
				int maxTimeDiffSeconds = conf.maxTimeDiffSeconds();
				int reconcileIntervalMinutes = conf.reconcileIntervalMinutes();				
				
				//	把當當網的esjob的相關configuration
				JobCoreConfiguration coreConfig = JobCoreConfiguration
						.newBuilder(jobName, cron, shardingTotalCount)
						.shardingItemParameters(shardingItemParameters)
						.description(description)
						.failover(failover)
						.jobParameter(jobParameter)
						.misfire(misfire)
						.jobProperties(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER.getKey(), jobExceptionHandler)
						.jobProperties(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER.getKey(), executorServiceHandler)
						.build();
				
				//	到底要創建什么樣的任務.
				JobTypeConfiguration typeConfig = null;
				if(ElasticJobTypeEnum.SIMPLE.getType().equals(jobTypeName)) {
					typeConfig = new SimpleJobConfiguration(coreConfig, jobClass);
				}
				
				if(ElasticJobTypeEnum.DATAFLOW.getType().equals(jobTypeName)) {
					typeConfig = new DataflowJobConfiguration(coreConfig, jobClass, streamingProcess);
				}
				
				if(ElasticJobTypeEnum.SCRIPT.getType().equals(jobTypeName)) {
					typeConfig = new ScriptJobConfiguration(coreConfig, scriptCommandLine);
				}
				
				// LiteJobConfiguration
				LiteJobConfiguration jobConfig = LiteJobConfiguration
						.newBuilder(typeConfig)
						.overwrite(overwrite)
						.disabled(disabled)
						.monitorPort(monitorPort)
						.monitorExecution(monitorExecution)
						.maxTimeDiffSeconds(maxTimeDiffSeconds)
						.jobShardingStrategyClass(jobShardingStrategyClass)
						.reconcileIntervalMinutes(reconcileIntervalMinutes)
						.build();
				
				// 	創建一個Spring的beanDefinition
				BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
				factory.setInitMethodName("init");
				factory.setScope("prototype");
				
				//	1.添加bean構造參數,相當於添加自己的真實的任務實現類
				if (!ElasticJobTypeEnum.SCRIPT.getType().equals(jobTypeName)) {
					factory.addConstructorArgValue(confBean);
				}
				//	2.添加注冊中心
				factory.addConstructorArgValue(this.zookeeperRegistryCenter);
				//	3.添加LiteJobConfiguration
				factory.addConstructorArgValue(jobConfig);

				//	4.如果有eventTraceRdbDataSource 則也進行添加
				if (StringUtils.hasText(eventTraceRdbDataSource)) {
					BeanDefinitionBuilder rdbFactory = BeanDefinitionBuilder.rootBeanDefinition(JobEventRdbConfiguration.class);
					rdbFactory.addConstructorArgReference(eventTraceRdbDataSource);
					factory.addConstructorArgValue(rdbFactory.getBeanDefinition());
				}
				
				//  5.添加監聽
				List<?> elasticJobListeners = getTargetElasticJobListeners(conf);
				factory.addConstructorArgValue(elasticJobListeners);
				
				// 	接下來就是把factory 也就是 SpringJobScheduler注入到Spring容器中
				DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory();

				String registerBeanName = conf.name() + "SpringJobScheduler";
				defaultListableBeanFactory.registerBeanDefinition(registerBeanName, factory.getBeanDefinition());
				SpringJobScheduler scheduler = (SpringJobScheduler)applicationContext.getBean(registerBeanName);
				scheduler.init();
				log.info("啟動elastic-job作業: " + jobName);
			}
			log.info("共計啟動elastic-job作業數量為: {} 個", beanMap.values().size());
			
		} catch (Exception e) {
			log.error("elasticjob 啟動異常, 系統強制退出", e);
			System.exit(1);
		}
	}
	
	private List<BeanDefinition> getTargetElasticJobListeners(ElasticJobConfig conf) {
		List<BeanDefinition> result = new ManagedList<BeanDefinition>(2);
		String listeners = conf.listener();
		if (StringUtils.hasText(listeners)) {
			BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(listeners);
			factory.setScope("prototype");
			result.add(factory.getBeanDefinition());
		}

		String distributedListeners = conf.distributedListener();
		long startedTimeoutMilliseconds = conf.startedTimeoutMilliseconds();
		long completedTimeoutMilliseconds = conf.completedTimeoutMilliseconds();

		if (StringUtils.hasText(distributedListeners)) {
			BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(distributedListeners);
			factory.setScope("prototype");
			factory.addConstructorArgValue(Long.valueOf(startedTimeoutMilliseconds));
			factory.addConstructorArgValue(Long.valueOf(completedTimeoutMilliseconds));
			result.add(factory.getBeanDefinition());
		}
		return result;
	}

}  

實現了ApplicationListener<ApplicationReadyEvent>接口,等Bean都初始化完成,應用起來后執行onApplicationEvent方法。

 

 

2) 增加枚舉ElasticJobTypeEnum 

public enum ElasticJobTypeEnum {

	SIMPLE("SimpleJob", "簡單類型job"),
	DATAFLOW("DataflowJob", "流式類型job"),
	SCRIPT("ScriptJob", "腳本類型job");
	
	private String type;
	
	private String desc;
	
	private ElasticJobTypeEnum(String type, String desc) {
		this.type = type;
		this.desc = desc;
	}

	public String getType() {
		return type;
	}

	public void setType(String type) {
		this.type = type;
	}

	public String getDesc() {
		return desc;
	}

	public void setDesc(String desc) {
		this.desc = desc;
	}

}

  

 

三、分布式定時任務測試

1、創建工程rabbit-task-test工程

工程結構如下:

 

 

1、application.properties 配置如下

server.port=8881
elastic.job.zk.namespace=elastic-job
elastic.job.zk.serverLists=47.xx.xx.120:2181

  

2、啟動ElasticJob。

增加注解EnableElasticJob

@EnableElasticJob
@SpringBootApplication
@ServletComponentScan(basePackages = {"com.example.rabbittasktest.esjob"})
public class RabbitTaskTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitTaskTestApplication.class, args);
    }

}

  

3、創建定時任務

1 ) 創建定時任務1 ,5秒鍾執行一次

@Component
@ElasticJobConfig(
        name = "com.example.rabbittasktest.esjob.TestJob",
        cron = "0/5 * * * * ?", //5秒鍾一次
        description = "測試定時任務",
        overwrite = true,
        shardingTotalCount = 5
)
@Slf4j
public class TestJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("執行測試job");
    }
}

  

  

2) 創建定時任務2

10秒鍾一次執行一次

@Component
@ElasticJobConfig(
        name = "com.example.rabbittasktest.esjob.TestJob2",
        cron = "0/10 * * * * ?", //10秒鍾一次
        description = "測試定時任務2",
        overwrite = true,
        shardingTotalCount = 2
)
@Slf4j
public class TestJob2 implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("執行測試job2");
    }
}

  

啟動工程,說明已經成功了。

 

三、定時任務補償

在rabbit-core-producer 工程中創建job

@Component
@ElasticJobConfig(
        name = "com.example.producer.task.RetryMessageDataflowJob",
        cron = "0/10 * * * * ?", //10秒鍾一次
        description = "可靠性投遞消息補償任務",
        overwrite = true,
        shardingTotalCount = 1  //broker_message表只有一張,加入有100張表,可以采用10個分片,每個分片處理10張表
)
@Slf4j
public class RetryMessageDataflowJob implements DataflowJob<BrokerMessage> {

    @Autowired
    private MessageStoreService messageStoreService;

    @Autowired
    private RabbitBroker rabbitBroker;

    private static final int MAX_RETRY_COUNT = 3;

    @Override
    public List<BrokerMessage> fetchData(ShardingContext shardingContext) {
        List<BrokerMessage>  list = messageStoreService.fetchTimeOutMessage4Retry(BrokerMessageStatus.SENDING);
        log.info("------@@@抓取的數據數量:{}", list.size());
        return list;
    }

    @Override
    public void processData(ShardingContext shardingContext, List<BrokerMessage> list) {
        list.forEach(brokerMessage -> {
            if(brokerMessage.getTryCount() >= MAX_RETRY_COUNT){
                this.messageStoreService.failure(brokerMessage.getMessageId());
                log.warn("消息重試最終失敗,消息設置為最終失敗,消息Id={}", brokerMessage.getMessageId());
            }else {
                //每次重發的時候更新try_count字段 ,值+1
                this.messageStoreService.updateTryCount(brokerMessage.getMessageId());
                //重發消息
                this.rabbitBroker.reliantSend(brokerMessage.getMessage());
            }
        });
    }
}

  

四、可靠性消息測試

1、創建工程rabbit-test

1) 增加依賴

        <dependency>
            <groupId>com.example</groupId>
            <artifactId>rabbit-core-producer</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>

  

2) rabbit 啟動 參考Linux 下安裝RabbitMQ

目前存在Exchange,名稱為exchange1

 

 

 創建綁定了queue1

 

 Routing key為springboot.* 

 

3、增加配置屬性

application.properties

server.servlet.context-path=/rabbittest
server.port=8001
spring.application.name=/rabbittest

spring.rabbitmq.addresses=118.xx.xx.101:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
#spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.listener.simple.auto-startup=false

elastic.job.zk.serverLists=47.xx.xx.120:2181
elastic.job.zk.namespace=elastic-job

  

4、增加注解

@EnableElasticJob
@SpringBootApplication
@ComponentScan( {"com.example.rabbittest", "com.example"})
@MapperScan({"com.example.producer.mapper"})
public class RabbittestApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbittestApplication.class, args);
    }

}

  

5、增加測試方法

@RestController
public class TestController {

    @Autowired
    private ProducerClient producerClient;

    @RequestMapping("/test1")
    public String test1() throws Exception{
        for(int i = 0 ; i < 1; i ++) {
            String uniqueId = UUID.randomUUID().toString();
            Map<String, Object> attributes = new HashMap<>();
            attributes.put("name", "張三");
            attributes.put("age", "18");
            Message message = new Message(
                    uniqueId,
                    "exchange1",
                    "springboot.abc",
                    attributes,
                    0, MessageType.RELIANT);
            producerClient.send(message);
        }

        Thread.sleep(100000);

        return  "success";
    }
}  

訪問http://localhost:8001/rabbittest/test1

打印日志如下: 說明發送成功。

 

 修改exchange1為exchange-noexist, 因為exchange1-noexist不存在, confirm回調方法里返回失敗。

Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'exchange1-notexist' in vhost '/', class-id=60, method-id=40)

定時任務補償3次后還是失敗,設置為最終失敗。

 

6、批量發送。

 public void sendMessages() {
        List<Message> messages = MessageHolder.clear();
        messages.forEach(message -> {
            MessageHolderAyncQueue.submit((Runnable) () -> {
                CorrelationData correlationData =
                        new CorrelationData(String.format("%s#%s#%s",
                                message.getMessageId(),
                                System.currentTimeMillis(),
                                message.getMessageType()));
                String topic = message.getTopic();
                String routingKey = message.getRoutingKey();
                RabbitTemplate rabbitTemplate = rabbitTemplateContainer.getTemplate(message);
                rabbitTemplate.convertAndSend(topic, routingKey, message, correlationData);
                log.info("#RabbitBrokerImpl.sendMessages# send to rabbitmq, messageId: {}", message.getMessageId());
            });
        });
    }

  本質上還是一條一條發送。

 

7、延遲發送

延遲發送需要在Rabbit MQ中增加延遲插件

 代碼修改如下

	@Override
	public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
//		messageProperties.setExpiration(delaultExprie);
		com.example.api.Message message = (com.example.api.Message)object;
		messageProperties.setDelay(message.getDelayMills());
		return this.delegate.toMessage(object, messageProperties);
	}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM