1.前言
消息隊列除了kafka 外,還有許多種,比如RabbitMQ 、ActiveMQ、ZeroMQ、JMQ等。
老牌的ActiveMQ ,底層使用Java寫的,資源消耗大,速度也慢,但是適合 JMS 【java message service】的使用 ,事實上,性能差,現在用的人很少了。
現在流行使用kafka,那是因為支持很大的吞吐量,處理數據速度很快,但是,對數據的處理安全性不高,而且,需要處理那么大吞吐量的應用實際上不多,
kafka更多的是使用在大數據方面,底層是 使用 zookeeper開發 。
RabbitMQ的吞吐量比kafka的低一些,實際上這沒有可比性,RabbitMQ的開發理念重點不做吞吐量,而是安全性,常使用在金融方面的應用,使用的人很多,技術成熟,
是 amqp協議的完美實現,底層使用erlang語言實現,每個節點的服務程序【broker】由交換機和 消息隊列 組成 ,消息隊列又分主隊列和鏡像隊列,如果主隊列掛掉了,
那么會選一個鏡像隊列成為主隊列,也就是說鏡像隊列只要是用來備份的。那么,讀取隊列信息如果連接到非主隊列,則需要交換機路由到指定主隊列讀取,因此這樣的單節點,
導致了吞吐量受限。
綜合上來說,RabbitMQ是最好的,如果單考慮吞吐量,那么肯定選擇kafka。
這一篇隨筆,講解RabbitMQ 的 4大交換機中的 直連交換機的簡單使用。
消息中間件不僅可以在多個服務器間使用,也可以在單個服務器使用,用於消息轉發給訂閱消息隊列的監聽器,
這里我以兩個服務器作為演示,消息生產者工程端口為1004,消息消費者工程端口為1002.
注意,需要提前安裝RabbitMQ軟件,window10 詳細安裝 的 隨筆地址 https://www.cnblogs.com/c2g5201314/p/12990634.html
消息生產者端總結: (1)使用直連交換機 ,需要給綁定的消息隊列分配路由鍵 ,也就是一串用於識別的字符串。 (2)調用rabbit模板發送消息時,需要參數分別是 直連交換機名字、路由鍵、消息字, 數據類型都是字符串 ,如果是鍵值對象則需要轉成json字符串,然后后接收的消費者端解析json即可。 (3)直連交換機發送消息的底層原理,其實是使用rabbit模板,根據指定的交換機名字查找交換機【因此不同類型的交換機名字是不允許相同的】,找到后將路由鍵和消息傳給該直連交換機,
然后該交換機根據路由鍵查找消息隊列,需要與消息隊列的路由鍵完全一樣才可以匹配成功,找到匹配的消息隊列后,將消息放入消息隊列中,然后消息隊列會自動將消息推送給監聽該消息隊列的消費者端,
當消費者接收后並且確認后,消息隊列會將該消息銷毀。 (4)如果路由鍵匹配不到消息隊列【即消息隊列不存在】,消息將會拋棄。 (5)如果匹配到了消息隊列,但是沒有監聽該消息隊列的消費者端,那么消息將一直存在該隊列中,直到有監聽該隊列的消費者端啟動后,消費該消息,消息才會從消息隊列中銷毀。

消費完清空后,將恢復為0,【因此可證明 ,數據很安全,不會丟失】
消息消費者端總結: 總結: (1)消息消費者不需要配置什么東西,只需要在配置文件添加rabbit地址端口賬號密碼,即可連接, 然后在需要的監聽類關聯指定的隊列名字即可接收到該隊列的消息 (2)如果是使用 直連交換機發送消息,該隊列的所有監聽將會使用輪詢策略做負載均衡來消費信息, 不論是將監聽放於類上還是方法上,效果都是一樣的 (3)方法上寫監聽,記得在類上加@Service或@Component注冊bean,否則消息隊列監聽注冊無效 (4)消息只能傳輸字符串,但是可以使用json字符串,獲取后再對其解析即可,可用fastjson解析,
也可以使用objectMapper強制解析【不建議使用】
2.消息生產者端
(1)目錄結構
紅箭頭標出來的兩個文件是核心文件
(2)導入依賴包
<!-- 消息中間件--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.6.RELEASE</version> </dependency>
pom.xml源碼【不可直接復制源碼,我這里是maven多模塊的子工程,需要改依賴管理的】

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>cen.cloud</groupId> <artifactId>cen-mycloud</artifactId> <version>0.0.1-SNAPSHOT</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>rabbitmq-producer-1004</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmq-producer-1004</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!--eureka 注冊中心依賴包 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <!-- 消息中間件--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.6.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
(3)配置application.properties文件
完整源碼

#工程名/項目名/應用名/服務名 spring.application.name=rabbitmq-producer-1004 #端口號 server.port=1004 #eureka注冊 eureka.client.serviceUrl.defaultZone=http://localhost:7001/eureka/ #rabbitmq配置 #spring.rabbitmq.virtual-host=/ spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 #默認賬戶 spring.rabbitmq.username=guest #默認密碼 spring.rabbitmq.password=guest # #spring.rabbitmq.listener.simple.concurrency=10 #spring.rabbitmq.listener.simple.max-concurrency=20 #spring.rabbitmq.listener.simple.prefetch=50 ## #mq.env=local # # # #日志配置 # 指定日志輸入級別【根節點,表明整個項目基本的日志級別】 logging.level.root=info # ** 表示是指定的某個文件的路徑或類的日志級別 #logging.level.**=info # 指定日志輸出位置和日志文件名 , ./指工程根目錄 logging.file=./rabbitmq-producer-1004/log/spring.log # 指定日志輸出路徑,若file和path同時配置,則file生效 # 此配置默認生成文件為spring.log #logging.file.path=./log # 控制台日志輸出格式 # -5表示從左顯示5個字符寬度 logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %boldYellow(%thread) | %boldGreen(%logger) | %msg%n # 文件中輸出的格式 logging.pattern.file=%d{yyyy-MM-dd HH:mm:ss.SSS} = [%thread] = %-5level = %logger{50} - %msg%n
(4)創建rabbitmq配置類

package com.example.rabbitmqproducer1004.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * rabbitmq配置類---消息生產者 */ @Configuration public class RabbitmqConfig { //日志記錄器 Logger logger = LoggerFactory.getLogger(getClass()); /** * 定義 交換機、消息隊列、路由關鍵字 的名字 */ //定義交換機名字 exchange public static final String EXCHANG_1 = "exchange_1"; //定義消息隊列名字 queue public static final String QUEUE_1 = "queu_1"; //定義路由鍵 routingkey public static final String ROUTINGKEY_1 = "routing_1"; //=============================================================== /** * 下面的是 直連交換機 設置 綁定 消息隊列 到 交換機 * * DirectExchange:直連交換機,按照routingkey分發到指定隊列 */ //============================================== /** * 設置交換機類型 */ @Bean public DirectExchange directExchange() { logger.warn("設置交換機類型"); //實例交換機對象,然后注入該交換機的名字 return new DirectExchange(EXCHANG_1); } /** * 創建消息隊列 */ @Bean public Queue queue1() { logger.warn("創建消息隊列"); //實例消息隊列對象,輸入該隊列名字,如果需要該隊列持久化,則設為true,默認是false // return new Queue(QUEUE_1, true); return new Queue(QUEUE_1); } /** * 綁定 消息隊列 到 交換機【一個 交換機 允許被多個 消息隊列 綁定】 */ @Bean public Binding binding() { logger.warn("綁定 消息隊列 到 交換機"); //使用綁定構造器將 指定的隊列 綁定到 指定的交換機上 ,Direct交換機需要攜帶 路由鍵 return BindingBuilder.bind(queue1()).to(directExchange()).with(ROUTINGKEY_1); } }
(5)創建消息生產類

package com.example.rabbitmqproducer1004.rabbitmqFactory; import com.example.rabbitmqproducer1004.config.RabbitmqConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.UUID; /** * 消息生產類 */ @Component //實現接口 public class SendMessage { Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private RabbitTemplate rabbitTemplate; /** * 發送消息 * * 參數是消息內容 */ public void send(String message){ logger.warn("發送消息,內容:"+message); //發送消息 ,參數分別是 : 指定的交換機名字 、指定的路由鍵、消息字符串 rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANG_1,RabbitmqConfig.ROUTINGKEY_1,message); } }
(6)controller層,調用消息生產類

package com.example.rabbitmqproducer1004.controller; import com.example.rabbitmqproducer1004.rabbitmqFactory.SendMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class DController { @Autowired private SendMessage sendMessage; @RequestMapping("/mq") public String mq(String msg){ sendMessage.send(msg); return "發送成功"; } }
3.消息消費者端
(1)目錄結構
紅箭頭標出來的文件是核心文件
(2)導入依賴包
<!-- 消息中間件--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.6.RELEASE</version> </dependency>
完整的pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>cen.cloud</groupId> <artifactId>cen-mycloud</artifactId> <version>0.0.1-SNAPSHOT</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>rabbitmq-consumer-1002</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmq-consumer-1002</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!--eureka 注冊中心依賴包 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <!-- <!–健康檢測管理中心 ,可刷新配置文件–>--> <!-- <dependency>--> <!-- <groupId>org.springframework.boot</groupId>--> <!-- <artifactId>spring-boot-starter-actuator</artifactId>--> <!-- </dependency>--> <!-- 消息中間件--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.6.RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
(3)配置application.properties文件
完整源碼

#工程名/項目名/應用名/服務名 spring.application.name=rabbitmq-consumer-1002 #端口號 server.port=1002 #eureka注冊 eureka.client.serviceUrl.defaultZone=http://localhost:7001/eureka/ #rabbitmq配置 #spring.rabbitmq.virtual-host=/ spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 #默認賬戶 spring.rabbitmq.username=guest #默認密碼 spring.rabbitmq.password=guest # #spring.rabbitmq.listener.simple.concurrency=10 #spring.rabbitmq.listener.simple.max-concurrency=20 #spring.rabbitmq.listener.simple.prefetch=50 ## #mq.env=local # # # #日志配置 # 指定日志輸入級別【根節點,表明整個項目基本的日志級別】 logging.level.root=info # ** 表示是指定的某個文件的路徑或類的日志級別 #logging.level.**=info # 指定日志輸出位置和日志文件名 , ./指工程根目錄 logging.file=./rabbitmq-consumer-1002/log/spring.log # 指定日志輸出路徑,若file和path同時配置,則file生效 # 此配置默認生成文件為spring.log #logging.file.path=./log # 控制台日志輸出格式 # -5表示從左顯示5個字符寬度 logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %boldYellow(%thread) | %boldGreen(%logger) | %msg%n # 文件中輸出的格式 logging.pattern.file=%d{yyyy-MM-dd HH:mm:ss.SSS} = [%thread] = %-5level = %logger{50} - %msg%n
(4)rabbitmq配置類

package com.example.rabbitmqconsumer1002.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; /** * rabbitmq的消費者配置類 */ @Configuration public class RabbitConfig { private final Logger logger = LoggerFactory.getLogger(this.getClass()); //定義需要關聯的消息隊列名字 queue public static final String QUEUE_1 = "queu_1"; }
是的,你沒看錯,就這么點東西
(5)消息隊列監聽
接聽方式分兩種方式,
一種是放在類上

package com.example.rabbitmqconsumer1002.rabbitmqListener; import com.example.rabbitmqconsumer1002.config.RabbitConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息監聽類--發短信 */ //注冊bean @Component //設置需要監聽的消息隊列 @RabbitListener(queues = RabbitConfig.QUEUE_1) public class SendMessageListener { Logger logger = LoggerFactory.getLogger(getClass()); //消息事件處理 @RabbitHandler public void sendMessage(String msg) { logger.warn("我是端口1002的消費者,收到信息:" + msg); } }
一種是放在方法上 【但記得給這個方法的類注冊bean】

package com.example.rabbitmqconsumer1002.rabbitmqListener; import com.example.rabbitmqconsumer1002.config.RabbitConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; /** * 記得加@Service或@Component注冊bean,否則消息隊列監聽注冊無效 */ //@Service @Component public class SMService { Logger logger = LoggerFactory.getLogger(getClass()); @RabbitListener(queues = RabbitConfig.QUEUE_1) public void kk(String msg){ logger.warn("我是端口1002的消費者--方法監聽--,收到信息:" + msg); } }
4.測試
必須先啟動消息生產者端的工程,會自動在rabbitmq創建消息隊列和交換機,然后再啟動消息消費者端,
否則消費者端因為監聽不到該指定消息隊列而報錯。
(1)啟動消息生產端
打印的初始化循序
【請忽略日志級別,那是因為我故意設為警告級別,紅色看起來明顯】
瀏覽器輸入網址 http://127.0.0.1:15672/
可進入rabbitmq監控頁面
使用默認賬號密碼登錄
選擇 connection 選項,可以查看當前連接rabbitmq的主機信息
選擇exchange選項,可以看到新建的交換機
選擇queue選項,可看到新建的消息隊列
(2)啟動消息消費端
啟動后,再次選擇 connection 選項,可以看的消費者端也連接好了
(3)調用消息生產者的接口發消息,訪問網址 http://localhost:1004/mq?msg=你大爺,幫我發短信85345
提示發送成功
查看生產者控制台
現在去看消費者的控制台
可見 ,消費者 成功從消息隊列獲取到了消息。