ActiveMQ學習總結


一、消息中間件的產生的背景

1.在網絡通訊中,Http請求默認采用同步請求方式,基於請求與響應模式

2.在客戶端與服務器進行通訊時,客戶端調用服務端接口后,必須等待服務端完成處理后返回結果給客戶端才能繼續執行,這種情況屬於同步調用方式。

3.如果服務器端發生網絡延遲、不可達的情況,可能客戶端也會受到影響。

二、什么是消息中間件

消息隊列中間件是分布式系統中重要的組件,主要解決應用解耦,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。

目前使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

實現方式:面向消息的中間件(MessageOrlented MiddlewareMOM)較好的解決了以上問題。發送者將消息發送給消息服務器,消息服務器將消感存放在若千隊列中,

在合適的時候再將消息轉發給接收者。這種模式下,發送和接收是異步的,發送者無需等待; 二者的生命周期未必相同: 發送消息的時候接收者不一定運行,

接收消息的時候發送者也不一定運行;一對多通信: 對於一個消息可以有多個接收者。

原理圖

三、JMS介紹

1、什么是JMS

JMS是java的消息服務器,JMS的客戶端之間可以通過JMS服務進行異步的消息傳輸,是一個Java平台中面向消息中間件的API

角色划分

   1.提供者: 實現JMS規范的消息中間件服務器 (存放消息容器)

  2.客戶端:發送或接收消息的應用程序

  3.生產者/發布者: 創建並發送消息的客戶端(向消息容器存放消息)

  4.消費者/訂閱者:接收並處理消息的客戶端

  5.消息:應用程序之間傳遞的數據內容

  6.消息模式:在客戶端之間傳遞消息的方式,JMS中定義了主題和隊列兩種模式 點對點與發布訂閱模式。

2、什么是消息模型

  •  Point-to-Point(P2P) --- 點對點(生產者發送一條消息到queue,只有一個消費者能收到)
  • Publish/Subscribe(Pub/Sub)---  發布訂閱(發布者發送到topic的消息,只有訂閱了topic的訂閱者才會收到消息)

     即點對點和發布訂閱模型

    P2P(點對點)

  • p2p模型圖

  • 相關概念
  1. 消息隊列(Queue)
  2. 發送者(Sender)
  3. 接收者(Receiver)
  4. 每個消息都被發送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留着消息,直到他們被消費或超時。
  •  p2p特點
  1. 每個消息只有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中)
  2. 發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之后,不管接收者有沒有正在運行,它不會影響到消息被發送到隊列
  3. 接收者在成功接收消息之后需向隊列應答成功,如果你希望發送的每個消息都應該被成功處理的話,那么你需要P2P模式。
  •  應用場景

             A用戶與B用戶發送消息

     Pub/Sub (發布與訂閱)        

  • Pub/Sub模式圖

 

  • 相關概念

            1.主題(Topic)

            2.發布者(Publisher)

            3.訂閱者(Subscriber) 
            4.客戶端將消息發送到主題。多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。

  • Pub/Sub的特點 

            每個消息可以有多個消費者

                         發布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創建一個訂閱者之后,才能消費發布者的消息,而且為了消費消息,訂閱者必須保持運行的狀態。

                  為了緩和這樣嚴格的時間相關性,JMS允許訂閱者創建一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運行),它也能接收到發布者的消息。  如果你希望發送的消息可以不被做任何處理、

                   或者被一個消息者處理、或者可以被多個消費者處理的話,那么可以采用Pub/Sub模型

           消息的消費 
                 在JMS中,消息的產生和消息是異步的。對於消費來說,JMS的消息者可以通過兩種方式來消費消息。 
         ○ 同步 
                訂閱者或接收者調用receive方法來接收消息,receive方法在能夠接收到消息之前(或超時之前)將一直阻塞 
          ○ 異步 
               訂閱者或接收者可以注冊為一個消息監聽器。當消息到達之后,系統自動調用監聽器的onMessage方法。

  •  發布訂閱與點對點通訊方式區別:

                點對點 只能保證一個消費者進行消費 一對一 發布訂閱 只要集群服務訂閱該主題都會受收到消息 一對多

 四、ActiveMQ

1、ActiveMQ的消息形式

    應用場景:

  •  異步處理
  •  實現系統之間的解耦
  • 流量削鋒
  • 消息通訊

異步處理應用場景

異步處理 場景說明:用戶注冊后,需要發注冊郵件和注冊短信。傳統的做法有兩種 1.串行的方式2.並行方式

a、串行方式:將注冊信息寫入數據庫成功后,

發送注冊郵件,再發送注冊短信。以上三個任務全部完成后,返回給客戶端。

b.並行方式:將注冊信息寫入數據庫成功后,發送注冊郵件的同時,發送注冊短信。以上三個任務完成后,返回給客戶端。

與串行的差別是,並行的方式可以提高處理的時間

引入消息隊列,將不是必須的業務邏輯,異步處理。改造后的架構如下:

 

按照以上約定,用戶的響應時間相當於是注冊信息寫入數據庫的時間,也就是50毫秒。注冊郵件,發送短信寫入消息隊列后,直接返回,因此寫入消息隊列的速度很快,基本可以忽略,因此用戶的響應時間可能是50毫秒。因此架構改變后,系統的吞吐量提高到每秒20 QPS。比串行提高了3倍,比並行提高了兩倍。

解耦應用場景

場景說明:用戶下單后,訂單系統需要通知庫存系統。傳統的做法是,訂單系統調用庫存系統的接口。如下圖:

傳統模式的缺點:假如庫存系統無法訪問,則訂單減庫存將失敗,從而導致訂單失敗,訂單系統與庫存系統耦合 耗時時間接口 統一采用MQ推送 不建議才同步方

如何解決以上問題呢?引入應用消息隊列后的方案,如下圖:

 

訂單系統:用戶下單后,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功 庫存系統:訂閱下單的消息,

采用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操作 假如:在下單時庫存系統不能正常使用。也不影響正常下單,

因為下單后,訂單系統寫入消息隊列就不再關心其他的后續操作了。實現訂單系統與庫存系統的應用解耦

流量削峰應用解耦

流量削鋒也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛。 應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。

為解決這個問題,一般需要在應用前端加入消息隊列。

a、可以控制活動的人數

b、可以緩解短時間內高流量壓垮應用

 

用戶的請求,服務器接收后,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接拋棄用戶請求或跳轉到錯誤頁面。

秒殺業務根據消息隊列中的請求信息,再做后續處理 秒殺如何實現核心Redis+MQ+服務保護機制(服務降級、隔離、熔斷)+服務限流+圖形驗證+token

對於消息的傳遞有兩種類型:

  •  一種是點對點的,即一個生產者和一個消費者一一對應;
  • 另一種是發布/訂閱模式,即一個生產者產生消息並進行發送后,可以由多個消費者進行接收。

JMS定義了五種不同的消息正文格式,以及調用的消息類型,允許你發送並接收以一些不同形式的數據,提供現有消息格式的一些級別的兼容性。

  · StreamMessage -- Java原始值的數據流

  · MapMessage--一套名稱-值對

  · TextMessage--一個字符串對象(常用)

  · ObjectMessage--一個序列化的 Java對象

  · BytesMessage--一個字節的數據流

2、activeMQ window安裝

進入http://activemq.apache.org/下載ActiveMQ

下載window或者Linux版本

ActiveMQ部署其實很簡單,和所有Java一樣,要跑java程序就必須先安裝JDK並配置好環境變量,這個很簡單。

然后解壓下載的apache-activemq-5.10-20140603.133406-78-bin.zip壓縮包到一個目錄,得到解壓后的目錄結構如下圖:

進入bin目錄,發現有win32和win64兩個文件夾,這2個文件夾分別對應windows32位和windows64位操作系統的啟動腳本。

看自己的系統位數進入,會看到如下目錄

其中activemq.bat便是啟動腳本,雙擊啟動。

ActiveMQ默認啟動到8161端口,啟動完了后在瀏覽器地址欄輸入:http://localhost:8161/admin要求輸入用戶名密碼,默認用戶名密碼為admin、admin,這個用戶名密碼是在conf/users.properties中配置的。輸入用戶名密碼后便可看到如下圖的ActiveMQ控制台界面了。

 3、ActiveMQ Linux 安裝

  • 安裝環境

           1.需要jdk

           2.安裝Linux系統。生產環境都是Linux系統。

  • 安裝步驟

     第一步: 把ActiveMQ 的壓縮包上傳到Linux系統。

              第二步:解壓縮。

               第三步:啟動。

       使用bin目錄下的activemq命令啟動:

                  [root@localhost bin]# ./activemq start

          關閉:

              [root@localhost bin]# ./activemq stop

          查看狀態:

               [root@localhost bin]# ./activemq status

               //修改主機名

               Vim /etc/hosts

                

            //查看主機名

            

      注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建議使用5.11.2

           進入管理后台:http://192.168.25.168:8161/admin用戶名:admin密碼:admin

 4、ActiveMQ控制台介紹

Number Of Consumers   消費者 這個是消費者端的消費者數量 
Number Of Pending Messages   等待消費的消息 這個是當前未出隊列的數量。可以理解為總接收數-總出隊列數 
Messages Enqueued     進入隊列的消息  進入隊列的總數量,包括出隊列的。 這個數量只增不減 
Messages Dequeued    出了隊列的消息  可以理解為是消費這消費掉的數量 
     這個要分兩種情況理解 
             在queues里它和進入隊列的總數量相等(因為一個消息只會被成功消費一次),如果暫時不等是因為消費者還沒來得及消費。 
            在 topics里 它因為多消費者從而導致數量會比入隊列數高。 
    簡單的理解上面的意思就是 

  •  當有一個消息進入這個隊列時,等待消費的消息是1,進入隊列的消息是1。 
  •  當消息消費后,等待消費的消息是0,進入隊列的消息是1,出隊列的消息是1. 
  •  在來一條消息時,等待消費的消息是1,進入隊列的消息就是2. 
  •  沒有消費者時  Pending Messages   和 入隊列數量一樣 
  •  有消費者消費的時候 Pedding會減少 出隊列會增加 
  •   到最后 就是 入隊列和出隊列的數量一樣多 
  •   以此類推,進入隊列的消息和出隊列的消息是池子,等待消費的消息是水流。 

 5、ActiveMQ使用方法

         5.1 Queue

  • Producer         

                  生產者:生產消息,發送端。 生產者生產了一個消息,只能由一個消費者進行消費

         pom文件引入依賴

 <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
 </dependency>

      步驟:

     第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。

     第二步:使用ConnectionFactory對象創建一個Connection對象。

     第三步:開啟連接,調用Connection對象的start方法。

     第四步:使用Connection對象創建一個Session對象。

     第五步:使用Session對象創建一個Destination對象(topicqueue),此處創建一個Queue對象。

     第六步:使用Session對象創建一個Producer對象。

     第七步:創建一個Message對象,創建一個TextMessage對象。

     第八步:使用Producer對象發送消息。

              第九步:關閉資源。

public class provider { private  static String queue = "yehui1"; public static void main(String[] args) throws JMSException { // 第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。 //brokerURL服務器的ip及端口號
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 第二步:使用ConnectionFactory對象創建一個Connection對象。
        Connection connection = factory.createConnection(); // 第三步:開啟連接,調用Connection對象的start方法。
 connection.start(); // 第四步:使用Connection對象創建一個Session對象。 //第一個參數:是否開啟事務。true:開啟事務,第二個參數忽略。 //第二個參數:當第一個參數為false時,才有意義。消息的應答模式。1、自動應答2、手動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象創建一個Destination對象(topic、queue),此處創建一個Queue對象。 //參數:隊列的名稱。
        Destination destination = session.createQueue(queue); // 第六步:使用Session對象創建一個Producer對象。
        MessageProducer producer = session.createProducer(destination); for (int i=0;i<10;i++) { // 第七步:創建一個Message對象,創建一個TextMessage對象。
            TextMessage textMessage = session.createTextMessage("消費者你好我來了" + i); // 第八步:使用Producer對象發送消息。
 producer.send(textMessage); } System.out.println("生產結束"); // 第九步:關閉資源。
 producer.close(); session.close(); connection.close(); } }
  • Consumer       

             消費者:接收消息。

               步驟:

         第一步:創建一個ConnectionFactory對象。

                   第二步:從ConnectionFactory對象中獲得一個Connection對象。

                  第三步:開啟連接。調用Connection對象的start方法。

                  第四步:使用Connection對象創建一個Session對象。

                  第五步:使用Session對象創建一個Destination對象。和發送端保持一致queue,並且隊列的名稱一致。

                  第六步:使用Session對象創建一個Consumer對象。

                  第七步:接收消息。

                  第八步:打印消息。

                  第九步:關閉資源

public class Comsoner {
    private  static String queue = "yehui1"; public static void main(String[] args) throws JMSException, IOException { System.out.println("消費結束"); // 第一步:創建一個ConnectionFactory對象。
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 第二步:從ConnectionFactory對象中獲得一個Connection對象。
        Connection connection = factory.createConnection(); // 第三步:開啟連接。調用Connection對象的start方法。
 connection.start(); // 第四步:使用Connection對象創建一個Session對象。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象創建一個Destination對象。和發送端保持一致queue,並且隊列的名稱一致。
        Queue queue = session.createQueue(Comsoner.queue); // 第六步:使用Session對象創建一個Consumer對象。
        MessageConsumer consumer = session.createConsumer(queue); // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { //取消息的內容
                    String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //等待鍵盤輸入
 System.in.read(); // 第九步:關閉資源
 consumer.close(); session.close(); connection.close(); } }

 5.2 Topic 

          Topic 話題,生產者生產了一個消息,可以由多個消費者進行消費

  •  Producer

              使用步驟:

                     第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。

                     第二步:使用ConnectionFactory對象創建一個Connection對象。

                     第三步:開啟連接,調用Connection對象的start方法。

                     第四步:使用Connection對象創建一個Session對象。

                     第五步:使用Session對象創建一個Destination對象(topic、queue),此處創建一個Topic對象。

                      第六步:使用Session對象創建一個Producer對象。

                       第七步:創建一個Message對象,創建一個TextMessage對象。

                      第八步:使用Producer對象發送消息。

                     第九步:關閉資源。

public class Provider { private static String queue = "yehui-topic"; public static void main(String[] args) throws JMSException { //第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。 // brokerURL服務器的ip及端口號
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 第二步:使用ConnectionFactory對象創建一個Connection對象。
        Connection connection = factory.createConnection(); // 第三步:開啟連接,調用Connection對象的start方法。
 connection.start(); // 第四步:使用Connection對象創建一個Session對象。 // 第一個參數:是否開啟事務。true:開啟事務,第二個參數忽略。 // 第二個參數:當第一個參數為false時,才有意義。消息的應答模式。1、自動應答2、手動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象創建一個Destination對象(topic、queue),此處創建一個topic對象。 // 參數:話題的名稱。
        Destination destination = session.createTopic(queue); // 第六步:使用Session對象創建一個Producer對象。
        MessageProducer producer = session.createProducer(destination); for (int i=0;i<10;i++) { // 第七步:創建一個Message對象,創建一個TextMessage對象。
            TextMessage textMessage = session.createTextMessage("消費者你好我來了" + i); // 第八步:使用Producer對象發送消息。
 producer.send(textMessage); } // 第九步:關閉資源。
 producer.close(); session.close(); connection.close(); } }
  • Consumer

          消費者:接收消息。

        使用步驟:

     第一步:創建一個ConnectionFactory對象。

     第二步:從ConnectionFactory對象中獲得一個Connection對象。

     第三步:開啟連接。調用Connection對象的start方法。

     第四步:使用Connection對象創建一個Session對象。

     第五步:使用Session對象創建一個Destination對象。和發送端保持一致topic,並且話題的名稱一致。

     第六步:使用Session對象創建一個Consumer對象。

              第七步:接收消息。

              第八步:打印消息。

             第九步:關閉資源

public class Comsoner { private  static String queue = "yehui-topic"; public static void main(String[] args) throws JMSException, IOException { //第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。 // brokerURL服務器的ip及端口號
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 第二步:使用ConnectionFactory對象創建一個Connection對象。
        Connection connection = factory.createConnection(); // 第三步:開啟連接,調用Connection對象的start方法。
 connection.start(); // 第四步:使用Connection對象創建一個Session對象。 // 第一個參數:是否開啟事務。true:開啟事務,第二個參數忽略。 // 第二個參數:當第一個參數為false時,才有意義。消息的應答模式。1、自動應答2、手動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象創建一個Destination對象。和發送端保持一致topic,並且話題的名稱一致。
        Topic topic = session.createTopic(Comsoner.queue); // 第六步:使用Session對象創建一個Consumer對象。
        MessageConsumer consumer = session.createConsumer(topic); // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { // 取消息的內容
                    String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic的消費端03。。。。。"); // 等待鍵盤輸入
 System.in.read(); // 第九步:關閉資源
 consumer.close(); session.close(); connection.close(); } }

6、JMS消息可靠機制

ActiveMQ消息簽收機制:

       客戶端成功接收一條消息的標志是一條消息被簽收,成功應答。

消息的簽收情形分兩種:

       1、帶事務的session

                如果session帶有事務,並且事務成功提交,則消息被自動簽收。如果事務回滾,則消息會被再次傳送。

       2、不帶事務的session

            不帶事務的session的簽收方式,取決於session的配置。

   Activemq支持一下三種模式:

   Session.AUTO_ACKNOWLEDGE  消息自動簽收

   Session.CLIENT_ACKNOWLEDGE  客戶端調用acknowledge方法手動簽收

    textMessage.acknowledge();//手動簽收

   Session.DUPS_OK_ACKNOWLEDGE 不是必須簽收,消息可能會重復發送。在第二次重新傳送消息的時候,消息只有在被確認之后,

才認為已經被成功地消費了。消息的成功消費通常包含三個階段:客戶接收消息、客戶處理消息和消息被確認。 在事務性會話中,當一個事務被提交的時候,確認自動發生。在非事務性會話中,消息何時被確認取決於創建會話時的應答模式(acknowledgement mode)。該參數有以下三個可選值:

Number Of Consumers  消費者 這個是消費者端的消費者數量 
Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。可以理解為總接收數-總出隊列數 
Messages Enqueued 進入隊列的消息  進入隊列的總數量,包括出隊列的。 這個數量只增不減 
Messages Dequeued 出了隊列的消息  可以理解為是消費這消費掉的數量 

代碼案例:

provider 
public class provider { private  static String queue = "yehui1"; public static void main(String[] args) throws JMSException { // 第一步:創建ConnectionFactory對象,需要指定服務端ip及端口號。 //brokerURL服務器的ip及端口號
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 第二步:使用ConnectionFactory對象創建一個Connection對象。
        Connection connection = factory.createConnection(); // 第三步:開啟連接,調用Connection對象的start方法。
 connection.start(); // 第四步:使用Connection對象創建一個Session對象。 //第一個參數:是否開啟事務。true:開啟事務,第二個參數忽略。 //第二個參數:當第一個參數為false時,才有意義。消息的應答模式。1、自動應答2、手動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session對象創建一個Destination對象(topic、queue),此處創建一個Queue對象。 //參數:隊列的名稱。
        Destination destination = session.createQueue(queue); // 第六步:使用Session對象創建一個Producer對象。
        MessageProducer producer = session.createProducer(destination); for (int i=0;i<10;i++) { // 第七步:創建一個Message對象,創建一個TextMessage對象。
            TextMessage textMessage = session.createTextMessage("消費者你好我來了" + i); // 第八步:使用Producer對象發送消息。
 producer.send(textMessage); } System.out.println("生產結束"); // 第九步:關閉資源。
 producer.close(); session.close(); connection.close(); } }

Comsoner

public class Comsoner { private  static String queue = "yehui1"; public static void main(String[] args) throws JMSException, IOException { System.out.println("消費結束"); // 第一步:創建一個ConnectionFactory對象。
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 第二步:從ConnectionFactory對象中獲得一個Connection對象。
        Connection connection = factory.createConnection(); // 第三步:開啟連接。調用Connection對象的start方法。
 connection.start(); // 第四步:使用Connection對象創建一個Session對象。手動簽收
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // 第五步:使用Session對象創建一個Destination對象。和發送端保持一致queue,並且隊列的名稱一致。
        Queue queue = session.createQueue(Comsoner.queue); // 第六步:使用Session對象創建一個Consumer對象。
        MessageConsumer consumer = session.createConsumer(queue); // 第七步:接收消息。
        consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { //取消息的內容
                    String text = textMessage.getText(); System.out.println(text); //需要手動簽收 如何這里不手動簽收,將不會消費
 textMessage.acknowledge(); } catch (JMSException e) { e.printStackTrace(); } } }); //等待鍵盤輸入
 System.in.read(); // 第九步:關閉資源
 consumer.close(); session.close(); connection.close(); } }

7、JMS可靠消息機制-持久話機制

PERSISTENT:指示JMS provider持久保存消息,以保證消息不會因為JMS provider的失敗而丟失

NON_PERSISTENT:不要求JMS provider持久保存消息

// 設置消息持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT);

8、整合Eamil實現異步發送郵件信息

采用MQ方式發送方郵件原理圖:

整合Eamil實現異步發送郵件信息

采用qq接口郵件接口

qq授權碼申請:

 

  生產者:

   Maven依賴:

 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-parent</artifactId>
        <version>2.0.6.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <!-- fastjson 依賴 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

    </dependencies>

yml文件

spring: activemq: broker-url: tcp://127.0.0.1:61616
 user: admin password: admin queue: springboot-queue server: port: 8080

config類

@Configuration public class QueueConfig { @Value("${queue}") public String queue; @Bean public Queue getQueue(){ return new ActiveMQQueue(queue); } }

發送郵件代碼

@RestController public class QueueController { @Autowired private Destination queue; @Autowired private JmsMessagingTemplate messagingTemplate; @RequestMapping("/index") public String index(){ messagingTemplate.convertAndSend(queue,"測試消息隊列"); return "index"; } /** * 發送郵件代碼 * @return
     */ @RequestMapping("/sendEmail") public String sendEmail(){ JSONObject jsonObject = new JSONObject(); String userName = System.currentTimeMillis() + ""; jsonObject.put("userName", userName); jsonObject.put("eamil", "郵箱賬號"); String msg = jsonObject.toJSONString(); System.out.println("生產者向消費者發送內容:" + msg); messagingTemplate.convertAndSend(queue, msg); return "sendEmail"; } }

啟動類:

@SpringBootApplication public class AppStart { public static void main(String[] args) { SpringApplication.run(AppStart.class); } }

消費者:

  maven依賴

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-parent</artifactId>
        <version>2.0.6.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--activemq依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <!--郵件代碼-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
    </dependencies>

yml文件

spring: activemq: broker-url: tcp://127.0.0.1:61616
 user: admin password: admin #郵件配置 mail: #郵箱協議 host: smtp.qq.com #用戶賬號 username: 郵箱賬號 #授權碼 password: 郵箱授權碼 enable: true smtp: auth: true starttls: enable: true required: true queue: springboot-queue server: port: 8081

發送郵件代碼

@Component public class Consumer { @Autowired private JavaMailSender javaMailSender; @JmsListener(destination = "${queue}") public void receive(String msg){ if(StringUtils.isBlank(msg)){ return; } sendSimpleMail(msg); System.out.println("監聽器收到msg:" + JSONObject.parseObject(msg).toJSONString()); } public void sendSimpleMail(String msg) { try { JSONObject jsonObject = JSONObject.parseObject(msg); String userName = jsonObject.getString("userName"); String eamil = jsonObject.getString("eamil"); SimpleMailMessage simpleMailMessage = new SimpleMailMessage(); //郵件發送人
 simpleMailMessage.setFrom(eamil); //郵件接收人
 simpleMailMessage.setTo(eamil); //郵件主題
            simpleMailMessage.setSubject("郵件測試"); //郵件內容
            simpleMailMessage.setText("郵件測試內容"); javaMailSender.send(simpleMailMessage); } catch (Exception e) { System.out.println("郵件發送失敗"); } } }

啟動類

@SpringBootApplication public class App { public static void main(String[] args) { SpringApplication.run(App.class); } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM