RabbitMQ --- 直連交換機 【 無回調方法,不能獲取消費結果 】


1.前言

  消息隊列除了kafka 外,還有許多種,比如RabbitMQ 、ActiveMQ、ZeroMQ、JMQ等。

  老牌的ActiveMQ ,底層使用Java寫的,資源消耗大,速度也慢,但是適合 JMS 【java message service】的使用 ,事實上,性能差,現在用的人很少了。

  現在流行使用kafka,那是因為支持很大的吞吐量,處理數據速度很快,但是,對數據的處理安全性不高,而且,需要處理那么大吞吐量的應用實際上不多,

kafka更多的是使用在大數據方面,底層是 使用 zookeeper開發 。

  RabbitMQ的吞吐量比kafka的低一些,實際上這沒有可比性,RabbitMQ的開發理念重點不做吞吐量,而是安全性,常使用在金融方面的應用,使用的人很多,技術成熟,

是 amqp協議的完美實現,底層使用erlang語言實現,每個節點的服務程序【broker】由交換機和 消息隊列 組成 ,消息隊列又分主隊列和鏡像隊列,如果主隊列掛掉了, 

那么會選一個鏡像隊列成為主隊列,也就是說鏡像隊列只要是用來備份的。那么,讀取隊列信息如果連接到非主隊列,則需要交換機路由到指定主隊列讀取,因此這樣的單節點,

導致了吞吐量受限。

  綜合上來說,RabbitMQ是最好的,如果單考慮吞吐量,那么肯定選擇kafka。

  這一篇隨筆,講解RabbitMQ 的 4大交換機中的 直連交換機的簡單使用。

  消息中間件不僅可以在多個服務器間使用,也可以在單個服務器使用,用於消息轉發給訂閱消息隊列的監聽器,

這里我以兩個服務器作為演示,消息生產者工程端口為1004,消息消費者工程端口為1002.

注意,需要提前安裝RabbitMQ軟件,window10 詳細安裝 的 隨筆地址 https://www.cnblogs.com/c2g5201314/p/12990634.html
消息生產者端總結:
(1)使用直連交換機 ,需要給綁定的消息隊列分配路由鍵 ,也就是一串用於識別的字符串。
(2)調用rabbit模板發送消息時,需要參數分別是 直連交換機名字、路由鍵、消息字,
  數據類型都是字符串 ,如果是鍵值對象則需要轉成json字符串,然后后接收的消費者端解析json即可。
(3)直連交換機發送消息的底層原理,其實是使用rabbit模板,根據指定的交換機名字查找交換機【因此不同類型的交換機名字是不允許相同的】,找到后將路由鍵和消息傳給該直連交換機,
  然后該交換機根據路由鍵查找消息隊列,需要與消息隊列的路由鍵完全一樣才可以匹配成功,找到匹配的消息隊列后,將消息放入消息隊列中,然后消息隊列會自動將消息推送給監聽該消息隊列的消費者端,
  當消費者接收后並且確認后,消息隊列會將該消息銷毀。 (
4)如果路由鍵匹配不到消息隊列【即消息隊列不存在】,消息將會拋棄。 (5)如果匹配到了消息隊列,但是沒有監聽該消息隊列的消費者端,那么消息將一直存在該隊列中,直到有監聽該隊列的消費者端啟動后,消費該消息,消息才會從消息隊列中銷毀。

 

 消費完清空后,將恢復為0,【因此可證明 ,數據很安全,不會丟失】

 

 




 

消息消費者端總結:
總結:
(1)消息消費者不需要配置什么東西,只需要在配置文件添加rabbit地址端口賬號密碼,即可連接,
  然后在需要的監聽類關聯指定的隊列名字即可接收到該隊列的消息
(2)如果是使用 直連交換機發送消息,該隊列的所有監聽將會使用輪詢策略做負載均衡來消費信息,
  不論是將監聽放於類上還是方法上,效果都是一樣的
(3)方法上寫監聽,記得在類上加@Service或@Component注冊bean,否則消息隊列監聽注冊無效
(4)消息只能傳輸字符串,但是可以使用json字符串,獲取后再對其解析即可,可用fastjson解析,
  也可以使用objectMapper強制解析【不建議使用】

 

2.消息生產者端

 

(1)目錄結構

 

 

 紅箭頭標出來的兩個文件是核心文件

(2)導入依賴包

        <!-- 消息中間件-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>

pom.xml源碼【不可直接復制源碼,我這里是maven多模塊的子工程,需要改依賴管理的】

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>cen.cloud</groupId>
        <artifactId>cen-mycloud</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>rabbitmq-producer-1004</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-producer-1004</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>


        <!--eureka 注冊中心依賴包 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>

        <!-- 消息中間件-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
View Code

(3)配置application.properties文件

 

 

 完整源碼

#工程名/項目名/應用名/服務名
spring.application.name=rabbitmq-producer-1004
#端口號
server.port=1004
#eureka注冊
eureka.client.serviceUrl.defaultZone=http://localhost:7001/eureka/


#rabbitmq配置
#spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
#默認賬戶
spring.rabbitmq.username=guest
#默認密碼
spring.rabbitmq.password=guest

#
#spring.rabbitmq.listener.simple.concurrency=10
#spring.rabbitmq.listener.simple.max-concurrency=20
#spring.rabbitmq.listener.simple.prefetch=50
##
#mq.env=local



#
#
#
#日志配置
# 指定日志輸入級別【根節點,表明整個項目基本的日志級別】
logging.level.root=info
# ** 表示是指定的某個文件的路徑或類的日志級別
#logging.level.**=info

# 指定日志輸出位置和日志文件名 , ./指工程根目錄
logging.file=./rabbitmq-producer-1004/log/spring.log

# 指定日志輸出路徑,若file和path同時配置,則file生效
# 此配置默認生成文件為spring.log
#logging.file.path=./log

# 控制台日志輸出格式
# -5表示從左顯示5個字符寬度
logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %boldYellow(%thread) | %boldGreen(%logger) | %msg%n

# 文件中輸出的格式
logging.pattern.file=%d{yyyy-MM-dd HH:mm:ss.SSS} = [%thread] = %-5level = %logger{50} - %msg%n
View Code

(4)創建rabbitmq配置類

package com.example.rabbitmqproducer1004.config;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq配置類---消息生產者
 */
@Configuration
public class RabbitmqConfig {
    //日志記錄器
    Logger logger = LoggerFactory.getLogger(getClass());



    /**
     * 定義 交換機、消息隊列、路由關鍵字 的名字
     */

    //定義交換機名字 exchange
    public static final String EXCHANG_1 = "exchange_1";

    //定義消息隊列名字 queue
    public static final String QUEUE_1 = "queu_1";


    //定義路由鍵 routingkey
    public static final String ROUTINGKEY_1 = "routing_1";


    //===============================================================

    /**
     * 下面的是 直連交換機 設置 綁定 消息隊列 到 交換機
     *
     * DirectExchange:直連交換機,按照routingkey分發到指定隊列
     */
    //==============================================

    /**
     * 設置交換機類型
     */
    @Bean
    public DirectExchange directExchange() {
        logger.warn("設置交換機類型");
        //實例交換機對象,然后注入該交換機的名字
        return new DirectExchange(EXCHANG_1);
    }

    /**
     * 創建消息隊列
     */
    @Bean
    public Queue queue1() {
        logger.warn("創建消息隊列");
        //實例消息隊列對象,輸入該隊列名字,如果需要該隊列持久化,則設為true,默認是false
//        return new Queue(QUEUE_1, true);
        return new Queue(QUEUE_1);
    }

    /**
     * 綁定 消息隊列 到 交換機【一個 交換機 允許被多個 消息隊列 綁定】
     */
    @Bean
    public Binding binding() {
        logger.warn("綁定 消息隊列 到 交換機");
        //使用綁定構造器將 指定的隊列 綁定到 指定的交換機上 ,Direct交換機需要攜帶 路由鍵
        return BindingBuilder.bind(queue1()).to(directExchange()).with(ROUTINGKEY_1);
    }


}
View Code

(5)創建消息生產類

package com.example.rabbitmqproducer1004.rabbitmqFactory;


import com.example.rabbitmqproducer1004.config.RabbitmqConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * 消息生產類
 */
@Component
//實現接口
public class SendMessage  {

    Logger logger = LoggerFactory.getLogger(this.getClass());


    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
     * 發送消息
     *
     * 參數是消息內容
     */
    public void send(String message){
        logger.warn("發送消息,內容:"+message);

        //發送消息 ,參數分別是 : 指定的交換機名字 、指定的路由鍵、消息字符串
        rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANG_1,RabbitmqConfig.ROUTINGKEY_1,message);
    }



}
View Code

(6)controller層,調用消息生產類

package com.example.rabbitmqproducer1004.controller;

import com.example.rabbitmqproducer1004.rabbitmqFactory.SendMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class DController {

    @Autowired
    private SendMessage sendMessage;

    @RequestMapping("/mq")
    public String mq(String msg){
        sendMessage.send(msg);
        return "發送成功";
    }

}
View Code

3.消息消費者端

(1)目錄結構

 紅箭頭標出來的文件是核心文件

(2)導入依賴包

        <!-- 消息中間件-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>

完整的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>cen.cloud</groupId>
        <artifactId>cen-mycloud</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>rabbitmq-consumer-1002</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-consumer-1002</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--eureka 注冊中心依賴包 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>

        <!--        &lt;!&ndash;健康檢測管理中心 ,可刷新配置文件&ndash;&gt;-->
        <!--        <dependency>-->
        <!--            <groupId>org.springframework.boot</groupId>-->
        <!--            <artifactId>spring-boot-starter-actuator</artifactId>-->
        <!--        </dependency>-->

        <!-- 消息中間件-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
View Code

(3)配置application.properties文件

 完整源碼

#工程名/項目名/應用名/服務名
spring.application.name=rabbitmq-consumer-1002
#端口號
server.port=1002
#eureka注冊
eureka.client.serviceUrl.defaultZone=http://localhost:7001/eureka/

#rabbitmq配置
#spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
#默認賬戶
spring.rabbitmq.username=guest
#默認密碼
spring.rabbitmq.password=guest
#
#spring.rabbitmq.listener.simple.concurrency=10
#spring.rabbitmq.listener.simple.max-concurrency=20
#spring.rabbitmq.listener.simple.prefetch=50
##
#mq.env=local

#
#
#
#日志配置
# 指定日志輸入級別【根節點,表明整個項目基本的日志級別】
logging.level.root=info
# ** 表示是指定的某個文件的路徑或類的日志級別
#logging.level.**=info

# 指定日志輸出位置和日志文件名 , ./指工程根目錄
logging.file=./rabbitmq-consumer-1002/log/spring.log

# 指定日志輸出路徑,若file和path同時配置,則file生效
# 此配置默認生成文件為spring.log
#logging.file.path=./log

# 控制台日志輸出格式
# -5表示從左顯示5個字符寬度
logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %boldYellow(%thread) | %boldGreen(%logger) | %msg%n

# 文件中輸出的格式
logging.pattern.file=%d{yyyy-MM-dd HH:mm:ss.SSS} = [%thread] = %-5level = %logger{50} - %msg%n
View Code

(4)rabbitmq配置類

package com.example.rabbitmqconsumer1002.config;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

/**
 * rabbitmq的消費者配置類
 */
@Configuration
public class RabbitConfig {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());


    //定義需要關聯的消息隊列名字 queue
    public static final String QUEUE_1 = "queu_1";

    

}
View Code

是的,你沒看錯,就這么點東西

(5)消息隊列監聽

接聽方式分兩種方式,

一種是放在類上

package com.example.rabbitmqconsumer1002.rabbitmqListener;

import com.example.rabbitmqconsumer1002.config.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消息監聽類--發短信
 */
//注冊bean
@Component
//設置需要監聽的消息隊列
@RabbitListener(queues = RabbitConfig.QUEUE_1)
public class SendMessageListener {
    Logger logger = LoggerFactory.getLogger(getClass());

    //消息事件處理
    @RabbitHandler
    public void sendMessage(String msg) {
        logger.warn("我是端口1002的消費者,收到信息:" + msg);
    }


}
View Code

一種是放在方法上 【但記得給這個方法的類注冊bean】

 

package com.example.rabbitmqconsumer1002.rabbitmqListener;

import com.example.rabbitmqconsumer1002.config.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

/**
 * 記得加@Service或@Component注冊bean,否則消息隊列監聽注冊無效
 */
//@Service
@Component
public class SMService {
    Logger logger = LoggerFactory.getLogger(getClass());
    @RabbitListener(queues = RabbitConfig.QUEUE_1)
    public void kk(String msg){
        logger.warn("我是端口1002的消費者--方法監聽--,收到信息:" + msg);

    }

}
View Code

 

4.測試

必須先啟動消息生產者端的工程,會自動在rabbitmq創建消息隊列和交換機,然后再啟動消息消費者端,

否則消費者端因為監聽不到該指定消息隊列而報錯。

(1)啟動消息生產端

打印的初始化循序

 

 

 【請忽略日志級別,那是因為我故意設為警告級別,紅色看起來明顯】

瀏覽器輸入網址 http://127.0.0.1:15672/

 可進入rabbitmq監控頁面

使用默認賬號密碼登錄

 

 

 

選擇 connection 選項,可以查看當前連接rabbitmq的主機信息

 

 

 選擇exchange選項,可以看到新建的交換機

 

 

 

選擇queue選項,可看到新建的消息隊列

 

 

 (2)啟動消息消費端

啟動后,再次選擇 connection 選項,可以看的消費者端也連接好了

 

 (3)調用消息生產者的接口發消息,訪問網址  http://localhost:1004/mq?msg=你大爺,幫我發短信85345

 

 提示發送成功

 

 查看生產者控制台

 

 

現在去看消費者的控制台

 

 

可見 ,消費者 成功從消息隊列獲取到了消息。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM