一、消息中間件的產生的背景
1.在網絡通訊中,Http請求默認采用同步請求方式,基於請求與響應模式
2.在客戶端與服務器進行通訊時,客戶端調用服務端接口后,必須等待服務端完成處理后返回結果給客戶端才能繼續執行,這種情況屬於同步調用方式。
3.如果服務器端發生網絡延遲、不可達的情況,可能客戶端也會受到影響。
二、什么是消息中間件
消息隊列中間件是分布式系統中重要的組件,主要解決應用解耦,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。
目前使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
實現方式:面向消息的中間件(MessageOrlented MiddlewareMOM)較好的解決了以上問題。發送者將消息發送給消息服務器,消息服務器將消感存放在若千隊列中,
在合適的時候再將消息轉發給接收者。這種模式下,發送和接收是異步的,發送者無需等待; 二者的生命周期未必相同: 發送消息的時候接收者不一定運行,
接收消息的時候發送者也不一定運行;一對多通信: 對於一個消息可以有多個接收者。
原理圖
三、JMS介紹
1、什么是JMS
JMS是java的消息服務器,JMS的客戶端之間可以通過JMS服務進行異步的消息傳輸,是一個Java平台中面向消息中間件的API
角色划分
1.提供者: 實現JMS規范的消息中間件服務器 (存放消息容器)
2.客戶端:發送或接收消息的應用程序
3.生產者/發布者: 創建並發送消息的客戶端(向消息容器存放消息)
4.消費者/訂閱者:接收並處理消息的客戶端
5.消息:應用程序之間傳遞的數據內容
6.消息模式:在客戶端之間傳遞消息的方式,JMS中定義了主題和隊列兩種模式 點對點與發布訂閱模式。
2、什么是消息模型
- Point-to-Point(P2P) --- 點對點(生產者發送一條消息到queue,只有一個消費者能收到)
- Publish/Subscribe(Pub/Sub)--- 發布訂閱(發布者發送到topic的消息,只有訂閱了topic的訂閱者才會收到消息)
即點對點和發布訂閱模型
P2P(點對點)
- p2p模型圖
- 相關概念
- 消息隊列(Queue)
- 發送者(Sender)
- 接收者(Receiver)
- 每個消息都被發送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留着消息,直到他們被消費或超時。
- p2p特點
- 每個消息只有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中)
- 發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之后,不管接收者有沒有正在運行,它不會影響到消息被發送到隊列
- 接收者在成功接收消息之后需向隊列應答成功,如果你希望發送的每個消息都應該被成功處理的話,那么你需要P2P模式。
- 應用場景
A用戶與B用戶發送消息
Pub/Sub (發布與訂閱)
- Pub/Sub模式圖
- 相關概念
1.主題(Topic)
2.發布者(Publisher)
3.訂閱者(Subscriber)
4.客戶端將消息發送到主題。多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。
- Pub/Sub的特點
每個消息可以有多個消費者
發布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創建一個訂閱者之后,才能消費發布者的消息,而且為了消費消息,訂閱者必須保持運行的狀態。
為了緩和這樣嚴格的時間相關性,JMS允許訂閱者創建一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運行),它也能接收到發布者的消息。 如果你希望發送的消息可以不被做任何處理、
或者被一個消息者處理、或者可以被多個消費者處理的話,那么可以采用Pub/Sub模型
消息的消費
在JMS中,消息的產生和消息是異步的。對於消費來說,JMS的消息者可以通過兩種方式來消費消息。
○ 同步
訂閱者或接收者調用receive方法來接收消息,receive方法在能夠接收到消息之前(或超時之前)將一直阻塞
○ 異步
訂閱者或接收者可以注冊為一個消息監聽器。當消息到達之后,系統自動調用監聽器的onMessage方法。
- 發布訂閱與點對點通訊方式區別:
點對點 只能保證一個消費者進行消費 一對一 發布訂閱 只要集群服務訂閱該主題都會受收到消息 一對多
四、ActiveMQ
1、ActiveMQ的消息形式
應用場景:
- 異步處理
- 實現系統之間的解耦
- 流量削鋒
- 消息通訊
異步處理應用場景
異步處理 場景說明:用戶注冊后,需要發注冊郵件和注冊短信。傳統的做法有兩種 1.串行的方式2.並行方式
a、串行方式:將注冊信息寫入數據庫成功后,
發送注冊郵件,再發送注冊短信。以上三個任務全部完成后,返回給客戶端。
b.並行方式:將注冊信息寫入數據庫成功后,發送注冊郵件的同時,發送注冊短信。以上三個任務完成后,返回給客戶端。
與串行的差別是,並行的方式可以提高處理的時間
引入消息隊列,將不是必須的業務邏輯,異步處理。改造后的架構如下:
按照以上約定,用戶的響應時間相當於是注冊信息寫入數據庫的時間,也就是50毫秒。注冊郵件,發送短信寫入消息隊列后,直接返回,因此寫入消息隊列的速度很快,基本可以忽略,因此用戶的響應時間可能是50毫秒。因此架構改變后,系統的吞吐量提高到每秒20 QPS。比串行提高了3倍,比並行提高了兩倍。
解耦應用場景
場景說明:用戶下單后,訂單系統需要通知庫存系統。傳統的做法是,訂單系統調用庫存系統的接口。如下圖:
傳統模式的缺點:假如庫存系統無法訪問,則訂單減庫存將失敗,從而導致訂單失敗,訂單系統與庫存系統耦合 耗時時間接口 統一采用MQ推送 不建議才同步方
如何解決以上問題呢?引入應用消息隊列后的方案,如下圖:
訂單系統:用戶下單后,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功 庫存系統:訂閱下單的消息,
采用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操作 假如:在下單時庫存系統不能正常使用。也不影響正常下單,
因為下單后,訂單系統寫入消息隊列就不再關心其他的后續操作了。實現訂單系統與庫存系統的應用解耦
流量削峰應用解耦
流量削鋒也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛。 應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。
為解決這個問題,一般需要在應用前端加入消息隊列。
a、可以控制活動的人數
b、可以緩解短時間內高流量壓垮應用
用戶的請求,服務器接收后,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接拋棄用戶請求或跳轉到錯誤頁面。
秒殺業務根據消息隊列中的請求信息,再做后續處理 秒殺如何實現核心Redis+MQ+服務保護機制(服務降級、隔離、熔斷)+服務限流+圖形驗證+token
對於消息的傳遞有兩種類型:
- 一種是點對點的,即一個生產者和一個消費者一一對應;
- 另一種是發布/訂閱模式,即一個生產者產生消息並進行發送后,可以由多個消費者進行接收。
JMS定義了五種不同的消息正文格式,以及調用的消息類型,允許你發送並接收以一些不同形式的數據,提供現有消息格式的一些級別的兼容性。
· StreamMessage -- Java原始值的數據流
· MapMessage--一套名稱-值對
· TextMessage--一個字符串對象(常用)
· ObjectMessage--一個序列化的 Java對象
· BytesMessage--一個字節的數據流
2、activeMQ window安裝
進入http://activemq.apache.org/下載ActiveMQ
下載window或者Linux版本
ActiveMQ部署其實很簡單,和所有Java一樣,要跑java程序就必須先安裝JDK並配置好環境變量,這個很簡單。
然后解壓下載的apache-activemq-5.10-20140603.133406-78-bin.zip壓縮包到一個目錄,得到解壓后的目錄結構如下圖:
進入bin目錄,發現有win32和win64兩個文件夾,這2個文件夾分別對應windows32位和windows64位操作系統的啟動腳本。
看自己的系統位數進入,會看到如下目錄
其中activemq.bat便是啟動腳本,雙擊啟動。
ActiveMQ默認啟動到8161端口,啟動完了后在瀏覽器地址欄輸入:http://localhost:8161/admin要求輸入用戶名密碼,默認用戶名密碼為admin、admin,這個用戶名密碼是在conf/users.properties中配置的。輸入用戶名密碼后便可看到如下圖的ActiveMQ控制台界面了。
3、ActiveMQ Linux 安裝
- 安裝環境
1.需要jdk
2.安裝Linux系統。生產環境都是Linux系統。
- 安裝步驟
第一步: 把ActiveMQ 的壓縮包上傳到Linux系統。
第二步:解壓縮。
第三步:啟動。
使用bin目錄下的activemq命令啟動:
[root@localhost bin]# ./activemq start
關閉:
[root@localhost bin]# ./activemq stop
查看狀態:
[root@localhost bin]# ./activemq status
//修改主機名
Vim /etc/hosts
//查看主機名
注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建議使用5.11.2
進入管理后台:http://192.168.25.168:8161/admin用戶名:admin密碼:admin
4、ActiveMQ控制台介紹
Number Of Consumers 消費者 這個是消費者端的消費者數量
Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。可以理解為總接收數-總出隊列數
Messages Enqueued 進入隊列的消息 進入隊列的總數量,包括出隊列的。 這個數量只增不減
Messages Dequeued 出了隊列的消息 可以理解為是消費這消費掉的數量
這個要分兩種情況理解
在queues里它和進入隊列的總數量相等(因為一個消息只會被成功消費一次),如果暫時不等是因為消費者還沒來得及消費。
在 topics里 它因為多消費者從而導致數量會比入隊列數高。
簡單的理解上面的意思就是
- 當有一個消息進入這個隊列時,等待消費的消息是1,進入隊列的消息是1。
- 當消息消費后,等待消費的消息是0,進入隊列的消息是1,出隊列的消息是1.
- 在來一條消息時,等待消費的消息是1,進入隊列的消息就是2.
- 沒有消費者時 Pending Messages 和 入隊列數量一樣
- 有消費者消費的時候 Pedding會減少 出隊列會增加
- 到最后 就是 入隊列和出隊列的數量一樣多
- 以此類推,進入隊列的消息和出隊列的消息是池子,等待消費的消息是水流。
5、ActiveMQ使用方法
5.1 Queue
- Producer
生產者:生產消息,發送端。 生產者生產了一個消息,只能由一個消費者進行消費
pom文件引入依賴
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
步驟:
第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。
第二步:使用ConnectionFactory對象創建一個Connection對象。
第三步:開啟連接,調用Connection對象的start方法。
第四步:使用Connection對象創建一個Session對象。
第五步:使用Session對象創建一個Destination對象(topic、queue),此處創建一個Queue對象。
第六步:使用Session對象創建一個Producer對象。
第七步:創建一個Message對象,創建一個TextMessage對象。
第八步:使用Producer對象發送消息。
第九步:關閉資源。
public class provider { private static String queue = "yehui1"; public static void main(String[] args) throws JMSException { // 第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。 //brokerURL服務器的ip及端口號
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 第二步:使用ConnectionFactory對象創建一個Connection對象。
Connection connection = factory.createConnection(); // 第三步:開啟連接,調用Connection對象的start方法。
connection.start(); // 第四步:使用Connection對象創建一個Session對象。 //第一個參數:是否開啟事務。true:開啟事務,第二個參數忽略。 //第二個參數:當第一個參數為false時,才有意義。消息的應答模式。1、自動應答2、手動應答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象創建一個Destination對象(topic、queue),此處創建一個Queue對象。 //參數:隊列的名稱。
Destination destination = session.createQueue(queue); // 第六步:使用Session對象創建一個Producer對象。
MessageProducer producer = session.createProducer(destination); for (int i=0;i<10;i++) { // 第七步:創建一個Message對象,創建一個TextMessage對象。
TextMessage textMessage = session.createTextMessage("消費者你好我來了" + i); // 第八步:使用Producer對象發送消息。
producer.send(textMessage); } System.out.println("生產結束"); // 第九步:關閉資源。
producer.close(); session.close(); connection.close(); } }
- Consumer
消費者:接收消息。
步驟:
第一步:創建一個ConnectionFactory對象。
第二步:從ConnectionFactory對象中獲得一個Connection對象。
第三步:開啟連接。調用Connection對象的start方法。
第四步:使用Connection對象創建一個Session對象。
第五步:使用Session對象創建一個Destination對象。和發送端保持一致queue,並且隊列的名稱一致。
第六步:使用Session對象創建一個Consumer對象。
第七步:接收消息。
第八步:打印消息。
第九步:關閉資源
public class Comsoner { private static String queue = "yehui1"; public static void main(String[] args) throws JMSException, IOException { System.out.println("消費結束"); // 第一步:創建一個ConnectionFactory對象。 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 第二步:從ConnectionFactory對象中獲得一個Connection對象。 Connection connection = factory.createConnection(); // 第三步:開啟連接。調用Connection對象的start方法。 connection.start(); // 第四步:使用Connection對象創建一個Session對象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象創建一個Destination對象。和發送端保持一致queue,並且隊列的名稱一致。 Queue queue = session.createQueue(Comsoner.queue); // 第六步:使用Session對象創建一個Consumer對象。 MessageConsumer consumer = session.createConsumer(queue); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { //取消息的內容 String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //等待鍵盤輸入 System.in.read(); // 第九步:關閉資源 consumer.close(); session.close(); connection.close(); } }
5.2 Topic
Topic 話題,生產者生產了一個消息,可以由多個消費者進行消費
- Producer
使用步驟:
第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。
第二步:使用ConnectionFactory對象創建一個Connection對象。
第三步:開啟連接,調用Connection對象的start方法。
第四步:使用Connection對象創建一個Session對象。
第五步:使用Session對象創建一個Destination對象(topic、queue),此處創建一個Topic對象。
第六步:使用Session對象創建一個Producer對象。
第七步:創建一個Message對象,創建一個TextMessage對象。
第八步:使用Producer對象發送消息。
第九步:關閉資源。
public class Provider { private static String queue = "yehui-topic"; public static void main(String[] args) throws JMSException { //第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。 // brokerURL服務器的ip及端口號
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 第二步:使用ConnectionFactory對象創建一個Connection對象。
Connection connection = factory.createConnection(); // 第三步:開啟連接,調用Connection對象的start方法。
connection.start(); // 第四步:使用Connection對象創建一個Session對象。 // 第一個參數:是否開啟事務。true:開啟事務,第二個參數忽略。 // 第二個參數:當第一個參數為false時,才有意義。消息的應答模式。1、自動應答2、手動應答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象創建一個Destination對象(topic、queue),此處創建一個topic對象。 // 參數:話題的名稱。
Destination destination = session.createTopic(queue); // 第六步:使用Session對象創建一個Producer對象。
MessageProducer producer = session.createProducer(destination); for (int i=0;i<10;i++) { // 第七步:創建一個Message對象,創建一個TextMessage對象。
TextMessage textMessage = session.createTextMessage("消費者你好我來了" + i); // 第八步:使用Producer對象發送消息。
producer.send(textMessage); } // 第九步:關閉資源。
producer.close(); session.close(); connection.close(); } }
- Consumer
消費者:接收消息。
使用步驟:
第一步:創建一個ConnectionFactory對象。
第二步:從ConnectionFactory對象中獲得一個Connection對象。
第三步:開啟連接。調用Connection對象的start方法。
第四步:使用Connection對象創建一個Session對象。
第五步:使用Session對象創建一個Destination對象。和發送端保持一致topic,並且話題的名稱一致。
第六步:使用Session對象創建一個Consumer對象。
第七步:接收消息。
第八步:打印消息。
第九步:關閉資源
public class Comsoner { private static String queue = "yehui-topic"; public static void main(String[] args) throws JMSException, IOException { //第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。 // brokerURL服務器的ip及端口號
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 第二步:使用ConnectionFactory對象創建一個Connection對象。
Connection connection = factory.createConnection(); // 第三步:開啟連接,調用Connection對象的start方法。
connection.start(); // 第四步:使用Connection對象創建一個Session對象。 // 第一個參數:是否開啟事務。true:開啟事務,第二個參數忽略。 // 第二個參數:當第一個參數為false時,才有意義。消息的應答模式。1、自動應答2、手動應答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象創建一個Destination對象。和發送端保持一致topic,並且話題的名稱一致。
Topic topic = session.createTopic(Comsoner.queue); // 第六步:使用Session對象創建一個Consumer對象。
MessageConsumer consumer = session.createConsumer(topic); // 第七步:接收消息。
consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { // 取消息的內容
String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic的消費端03。。。。。"); // 等待鍵盤輸入
System.in.read(); // 第九步:關閉資源
consumer.close(); session.close(); connection.close(); } }
6、JMS消息可靠機制
ActiveMQ消息簽收機制:
客戶端成功接收一條消息的標志是一條消息被簽收,成功應答。
消息的簽收情形分兩種:
1、帶事務的session
如果session帶有事務,並且事務成功提交,則消息被自動簽收。如果事務回滾,則消息會被再次傳送。
2、不帶事務的session
不帶事務的session的簽收方式,取決於session的配置。
Activemq支持一下三種模式:
Session.AUTO_ACKNOWLEDGE 消息自動簽收
Session.CLIENT_ACKNOWLEDGE 客戶端調用acknowledge方法手動簽收
textMessage.acknowledge();//手動簽收
Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,消息可能會重復發送。在第二次重新傳送消息的時候,消息只有在被確認之后,
才認為已經被成功地消費了。消息的成功消費通常包含三個階段:客戶接收消息、客戶處理消息和消息被確認。 在事務性會話中,當一個事務被提交的時候,確認自動發生。在非事務性會話中,消息何時被確認取決於創建會話時的應答模式(acknowledgement mode)。該參數有以下三個可選值:
Number Of Consumers 消費者 這個是消費者端的消費者數量
Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。可以理解為總接收數-總出隊列數
Messages Enqueued 進入隊列的消息 進入隊列的總數量,包括出隊列的。 這個數量只增不減
Messages Dequeued 出了隊列的消息 可以理解為是消費這消費掉的數量
代碼案例:
provider
public class provider { private static String queue = "yehui1"; public static void main(String[] args) throws JMSException { // 第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。 //brokerURL服務器的ip及端口號
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 第二步:使用ConnectionFactory對象創建一個Connection對象。
Connection connection = factory.createConnection(); // 第三步:開啟連接,調用Connection對象的start方法。
connection.start(); // 第四步:使用Connection對象創建一個Session對象。 //第一個參數:是否開啟事務。true:開啟事務,第二個參數忽略。 //第二個參數:當第一個參數為false時,才有意義。消息的應答模式。1、自動應答2、手動應答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象創建一個Destination對象(topic、queue),此處創建一個Queue對象。 //參數:隊列的名稱。
Destination destination = session.createQueue(queue); // 第六步:使用Session對象創建一個Producer對象。
MessageProducer producer = session.createProducer(destination); for (int i=0;i<10;i++) { // 第七步:創建一個Message對象,創建一個TextMessage對象。
TextMessage textMessage = session.createTextMessage("消費者你好我來了" + i); // 第八步:使用Producer對象發送消息。
producer.send(textMessage); } System.out.println("生產結束"); // 第九步:關閉資源。
producer.close(); session.close(); connection.close(); } }
Comsoner
public class Comsoner { private static String queue = "yehui1"; public static void main(String[] args) throws JMSException, IOException { System.out.println("消費結束"); // 第一步:創建一個ConnectionFactory對象。
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 第二步:從ConnectionFactory對象中獲得一個Connection對象。
Connection connection = factory.createConnection(); // 第三步:開啟連接。調用Connection對象的start方法。
connection.start(); // 第四步:使用Connection對象創建一個Session對象。手動簽收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // 第五步:使用Session對象創建一個Destination對象。和發送端保持一致queue,並且隊列的名稱一致。
Queue queue = session.createQueue(Comsoner.queue); // 第六步:使用Session對象創建一個Consumer對象。
MessageConsumer consumer = session.createConsumer(queue); // 第七步:接收消息。
consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { //取消息的內容
String text = textMessage.getText(); System.out.println(text); //需要手動簽收 如何這里不手動簽收,將不會消費
textMessage.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } }); //等待鍵盤輸入
System.in.read(); // 第九步:關閉資源
consumer.close(); session.close(); connection.close(); } }
7、JMS可靠消息機制-持久話機制
PERSISTENT:指示JMS provider持久保存消息,以保證消息不會因為JMS provider的失敗而丟失
NON_PERSISTENT:不要求JMS provider持久保存消息
// 設置消息持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
8、整合Eamil實現異步發送郵件信息
采用MQ方式發送方郵件原理圖:
整合Eamil實現異步發送郵件信息
采用qq接口郵件接口
qq授權碼申請:
生產者:
Maven依賴:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-parent</artifactId>
<version>2.0.6.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- fastjson 依賴 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
yml文件
spring: activemq: broker-url: tcp://127.0.0.1:61616
user: admin password: admin queue: springboot-queue server: port: 8080
config類
@Configuration public class QueueConfig { @Value("${queue}") public String queue; @Bean public Queue getQueue(){ return new ActiveMQQueue(queue); } }
發送郵件代碼
@RestController public class QueueController { @Autowired private Destination queue; @Autowired private JmsMessagingTemplate messagingTemplate; @RequestMapping("/index") public String index(){ messagingTemplate.convertAndSend(queue,"測試消息隊列"); return "index"; } /** * 發送郵件代碼 * @return
*/ @RequestMapping("/sendEmail") public String sendEmail(){ JSONObject jsonObject = new JSONObject(); String userName = System.currentTimeMillis() + ""; jsonObject.put("userName", userName); jsonObject.put("eamil", "郵箱賬號"); String msg = jsonObject.toJSONString(); System.out.println("生產者向消費者發送內容:" + msg); messagingTemplate.convertAndSend(queue, msg); return "sendEmail"; } }
啟動類:
@SpringBootApplication public class AppStart { public static void main(String[] args) { SpringApplication.run(AppStart.class); } }
消費者:
maven依賴
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-parent</artifactId>
<version>2.0.6.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--activemq依賴-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<!--郵件代碼-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
</dependencies>
yml文件
spring: activemq: broker-url: tcp://127.0.0.1:61616
user: admin password: admin #郵件配置 mail: #郵箱協議 host: smtp.qq.com #用戶賬號 username: 郵箱賬號 #授權碼 password: 郵箱授權碼 enable: true smtp: auth: true starttls: enable: true required: true queue: springboot-queue server: port: 8081
發送郵件代碼
@Component public class Consumer { @Autowired private JavaMailSender javaMailSender; @JmsListener(destination = "${queue}") public void receive(String msg){ if(StringUtils.isBlank(msg)){ return; } sendSimpleMail(msg); System.out.println("監聽器收到msg:" + JSONObject.parseObject(msg).toJSONString()); } public void sendSimpleMail(String msg) { try { JSONObject jsonObject = JSONObject.parseObject(msg); String userName = jsonObject.getString("userName"); String eamil = jsonObject.getString("eamil"); SimpleMailMessage simpleMailMessage = new SimpleMailMessage(); //郵件發送人
simpleMailMessage.setFrom(eamil); //郵件接收人
simpleMailMessage.setTo(eamil); //郵件主題
simpleMailMessage.setSubject("郵件測試"); //郵件內容
simpleMailMessage.setText("郵件測試內容"); javaMailSender.send(simpleMailMessage); } catch (Exception e) { System.out.println("郵件發送失敗"); } } }
啟動類
@SpringBootApplication public class App { public static void main(String[] args) { SpringApplication.run(App.class); } }