RabbitMQ的使用


            接到的項目是:spring的項目做spring整合rabbitMQ的作生產者,而測試使用springboot整合RmQ做消費者,交換機模式---Topic,這里還涉及到隊列和消息的持久化,這里稍作總結!

 1:設置了隊列和消息的持久化之后,當broker服務重啟的之后,消息依舊存在

 

spring整合rabbitMQ的作生產者:

pom.xml:

<!-- 添加springboot對amqp的支持 -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>
         <!--無此類會報錯,具體原因不詳-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aspects</artifactId>
            <version>3.2.8.RELEASE</version>
        </dependency>

 

rabbitMq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd">

    <!-- RabbitMQ公共配置部分 start -->

    <!--配置connection-factory,指定連接rabbit server參數 -->
    <rabbit:connection-factory id="connectionFactory"
                               virtual-host="/" username="guest" password="guest" host="172.24.245.90"
                               port="5672" />

    <!--通過指定下面的admin信息,當前producer中的exchange和queue會在rabbitmq服務器上自動生成 -->
    <rabbit:admin id="connectAdmin" connection-factory="connectionFactory" />

    <!-- RabbitMQ公共配置部分 end -->



    <!-- ~~~~~~~~~~~~~~~~~~~~~華麗的分割線~~~~~~~~~~~~~~~~~~~~~~~~~~ -->

    <!-- 定義 topic方式的exchange、隊列、消息收發 start -->

    <!--定義queue -->
    <!--中durable是是否持久划的標志,默認是true-->
    <rabbit:queue name="topic_queue_t" durable="true"
                  auto-delete="false" exclusive="false" declared-by="connectAdmin" />

    <!--定義topic類型exchange,綁定direct_queue_test -->
    <rabbit:topic-exchange name="exchange_topic">
        <rabbit:bindings>
            <rabbit:binding queue="topic_queue_t" pattern="notice.*" />
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!--定義rabbit template用於數據的接收和發送 -->
    <rabbit:template id="topicAmqpTemplate"
                     connection-factory="connectionFactory" exchange="exchange_topic" />

    <!-- 消息接收者 -->

    <!--<bean id="topicMessageReceiver" class="com.dcits.ensemble.service.sms.rabbit.TopicMessageReceiver"></bean>-->

    <!-- queue litener 觀察 監聽模式 當有消息到達時會通知監聽在對應的隊列上的監聽對象 -->

   <!-- <rabbit:listener-container
            connection-factory="connectionFactory">
        <rabbit:listener queues="topic_queue_t" ref="topicMessageReceiver" />
    </rabbit:listener-container>-->

    <!-- 定義 topic方式的exchange、隊列、消息收發 end -->


    <!-- ~~~~~~~~~~~~~~~~~~~~~華麗的分割線~~~~~~~~~~~~~~~~~~~~~~~~~~ -->

</beans>

 

aapplication.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">

    <import resource="classpath*:rabbitMq.xml" />

    <!-- 掃描指定package下所有帶有如@controller,@services,@resource,@ods並把所注釋的注冊為Spring Beans -->
    <!--<context:component-scan base-package="com.dcits.ensemble.service.sms.rabbit, com.dcits.ensemble.service.sms.rabbit" />-->
    <context:component-scan base-package="com.dcits.ensemble.service.sms.rabbit" />

    <!-- 激活annotation功能 -->
    <context:annotation-config />
    <!-- 激活annotation功能 -->
    <context:spring-configured />

</beans>

生產者:

 

package com.dcits.ensemble.service.sms.rabbit;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;


import javax.annotation.Resource;
import java.io.IOException;
 
@Service
public class TopicMessageProducer {
    private Logger logger = LoggerFactory.getLogger(TopicMessageProducer.class);
 
    @Resource(name = "topicAmqpTemplate")
    private AmqpTemplate topicAmqpTemplate;
    /**
     * @author:LiFangTao
     * @date: 2019/6/4
     * @description:
     * topicAmqpTemplate.convertAndSend(String routingKey, Object object),異步調用生產者
     */

    @Async
    public void sendMessage(Object message) throws IOException {
        logger.info("to send message:{}", message);
        //未持久化的消息
        topicAmqpTemplate.convertAndSend("notice.info", message);

       //rabbitMQ的消息持久化
        /*ConnectionFactory factory=new ConnectionFactory(); //創建連接工廠
        factory.setHost("172.24.245.90");
        Connection connection=factory.newConnection(); //創建連接
        Channel channel=connection.createChannel();//創建信道
        //將隊列設置為持久化之后,還需要將消息也設為可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
        channel.basicPublish("exchange_topic","notice.info", MessageProperties.PERSISTENT_TEXT_PLAIN,message.toString().getBytes());
        System.out.println("持久化結束");*/

    }
}

 

springboot整合rabbitMQ的作消費者:

pom.xml:

 

<!-- 添加springboot對amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

 

Mqconfig:

package com.example.test002.mq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class Conf {

        @Bean(name="message")
        public Queue queueMessage() {
            return new Queue("topic_queue_t");
        }

        @Bean
        public TopicExchange exchange() {
            return new TopicExchange("exchange_topic");
        }

        @Bean
        Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).with("notice.info");
        }


}
TopicReceiver:

package com.example.test002.mq;
 
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
/**
 * Created by Administrator on 2018/4/10.
 */
@Component
public class TopicReceiver {
 
    @RabbitListener(queues ="topic_queue_t" )
    public void receiveMessage1(String str){
        System.out.println("我是消費者----------- , "+str);
    }
 

}

測試持久化:

 

一:未持久化的MQ:

 

注意,RabbitMQ不允許對一個已經存在的隊列用不同的參數重新聲明,對於試圖這么做的程序,會報錯,所以,改動之前代碼之前,要在控制台中把原來的隊列刪除

 

 

 

步驟:
1,啟動rabbitmq server
2,運行以上java代碼
3,使用rabbitmqctl查看消息

 

 

4,關閉rabbitmq server,再啟動
5,使用rabbitmqctl查看消息

A:只隊列持久化,未消息持久化

 

 

   答:仍可別消費(已持久化)

B: 未隊列持久化,只消息持久化(不存在)

C:都未持久化

 

 

 

重啟前;

 

重啟后:(無隊列)

 

 

 

二:持久化后的MQ:

 

 

1隊列持久化

 

2消息持久化:

 

3測試:

 

步驟:
1,啟動rabbitmq server
2,運行以上java代碼
3,使用rabbitmqctl查看消息

 

 


4,關閉rabbitmq server,再啟動
5,使用rabbitmqctl查看消息

 

6:關閉生產者項目,啟動消費者項目:


  已消費(持久化成功)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM