接到的項目是:spring的項目做spring整合rabbitMQ的作生產者,而測試使用springboot整合RmQ做消費者,交換機模式---Topic,這里還涉及到隊列和消息的持久化,這里稍作總結!
1:設置了隊列和消息的持久化之后,當broker服務重啟的之后,消息依舊存在
spring整合rabbitMQ的作生產者:
pom.xml:
<!-- 添加springboot對amqp的支持 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.3.5.RELEASE</version> </dependency> <!--無此類會報錯,具體原因不詳--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aspects</artifactId> <version>3.2.8.RELEASE</version> </dependency>
rabbitMq.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd"> <!-- RabbitMQ公共配置部分 start --> <!--配置connection-factory,指定連接rabbit server參數 --> <rabbit:connection-factory id="connectionFactory" virtual-host="/" username="guest" password="guest" host="172.24.245.90" port="5672" /> <!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 --> <rabbit:admin id="connectAdmin" connection-factory="connectionFactory" /> <!-- RabbitMQ公共配置部分 end --> <!-- ~~~~~~~~~~~~~~~~~~~~~華麗的分割線~~~~~~~~~~~~~~~~~~~~~~~~~~ --> <!-- 定義 topic方式的exchange、隊列、消息收發 start --> <!--定義queue --> <!--其中durable是是否持久划的標志,默認是true--> <rabbit:queue name="topic_queue_t" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin" /> <!--定義topic類型exchange,綁定direct_queue_test --> <rabbit:topic-exchange name="exchange_topic"> <rabbit:bindings> <rabbit:binding queue="topic_queue_t" pattern="notice.*" /> </rabbit:bindings> </rabbit:topic-exchange> <!--定義rabbit template用於數據的接收和發送 --> <rabbit:template id="topicAmqpTemplate" connection-factory="connectionFactory" exchange="exchange_topic" /> <!-- 消息接收者 --> <!--<bean id="topicMessageReceiver" class="com.dcits.ensemble.service.sms.rabbit.TopicMessageReceiver"></bean>--> <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 --> <!-- <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="topic_queue_t" ref="topicMessageReceiver" /> </rabbit:listener-container>--> <!-- 定義 topic方式的exchange、隊列、消息收發 end --> <!-- ~~~~~~~~~~~~~~~~~~~~~華麗的分割線~~~~~~~~~~~~~~~~~~~~~~~~~~ --> </beans>
aapplication.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> <import resource="classpath*:rabbitMq.xml" /> <!-- 掃描指定package下所有帶有如@controller,@services,@resource,@ods並把所注釋的注冊為Spring Beans --> <!--<context:component-scan base-package="com.dcits.ensemble.service.sms.rabbit, com.dcits.ensemble.service.sms.rabbit" />--> <context:component-scan base-package="com.dcits.ensemble.service.sms.rabbit" /> <!-- 激活annotation功能 --> <context:annotation-config /> <!-- 激活annotation功能 --> <context:spring-configured /> </beans>
生產者:
package com.dcits.ensemble.service.sms.rabbit; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.io.IOException; @Service public class TopicMessageProducer { private Logger logger = LoggerFactory.getLogger(TopicMessageProducer.class); @Resource(name = "topicAmqpTemplate") private AmqpTemplate topicAmqpTemplate; /** * @author:LiFangTao * @date: 2019/6/4 * @description: * topicAmqpTemplate.convertAndSend(String routingKey, Object object),異步調用生產者 */ @Async public void sendMessage(Object message) throws IOException { logger.info("to send message:{}", message); //未持久化的消息 topicAmqpTemplate.convertAndSend("notice.info", message); //rabbitMQ的消息持久化 /*ConnectionFactory factory=new ConnectionFactory(); //創建連接工廠 factory.setHost("172.24.245.90"); Connection connection=factory.newConnection(); //創建連接 Channel channel=connection.createChannel();//創建信道 //將隊列設置為持久化之后,還需要將消息也設為可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN channel.basicPublish("exchange_topic","notice.info", MessageProperties.PERSISTENT_TEXT_PLAIN,message.toString().getBytes()); System.out.println("持久化結束");*/ } }
springboot整合rabbitMQ的作消費者:
pom.xml:
<!-- 添加springboot對amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
Mqconfig:
package com.example.test002.mq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class Conf { @Bean(name="message") public Queue queueMessage() { return new Queue("topic_queue_t"); } @Bean public TopicExchange exchange() { return new TopicExchange("exchange_topic"); } @Bean Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("notice.info"); } }
TopicReceiver:
package com.example.test002.mq; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Created by Administrator on 2018/4/10. */ @Component public class TopicReceiver { @RabbitListener(queues ="topic_queue_t" ) public void receiveMessage1(String str){ System.out.println("我是消費者----------- , "+str); } }
測試持久化:
一:未持久化的MQ:
注意,RabbitMQ不允許對一個已經存在的隊列用不同的參數重新聲明,對於試圖這么做的程序,會報錯,所以,改動之前代碼之前,要在控制台中把原來的隊列刪除
步驟:
1,啟動rabbitmq server
2,運行以上java代碼
3,使用rabbitmqctl查看消息
4,關閉rabbitmq server,再啟動
5,使用rabbitmqctl查看消息
A:只隊列持久化,未消息持久化
答:仍可別消費(已持久化)
B: 未隊列持久化,只消息持久化(不存在)
C:都未持久化
重啟前;
重啟后:(無隊列)
二:持久化后的MQ:
1隊列持久化:
2消息持久化:
3測試:
步驟:
1,啟動rabbitmq server
2,運行以上java代碼
3,使用rabbitmqctl查看消息
4,關閉rabbitmq server,再啟動
5,使用rabbitmqctl查看消息
6:關閉生產者項目,啟動消費者項目:
已消費(持久化成功)