ActiveMQ消息隊列的使用及應用


 

 

這里就不說怎么安裝了,直接解壓出來就行了。

 

謝絕轉載,作者保留所有權力

 

 

目錄: 

復制代碼
一:JMQ的兩種消息模式
    1.1:點對點的消息模式
    1.2:訂閱模式
二:點對點的實現代碼
    2.1:點對點的發送端
    2.2:點對點的接收端
三:訂閱/發布模式的實現代碼
    3.1:訂閱模式的發送端
    3.2:訂閱模式的接收端
四:發送消息的數據類型
    4.1:傳遞javabean對象
    4.2:發送文件
五:ActiveMQ的應用
    5.1:保證消息的成功處理
    5.2:避免消息隊列的並發
        5.2.1:主動接收隊列消息
        5.2.2:使用多個接收端
    5.3:消息有效期的管理
    5.4:過期消息,處理失敗的消息如何處理

六:ActiveMQ的安全配置

  6.1:管理后台的密碼設置

   6.2:生產消費者的連接密碼

 

復制代碼

 

 

 

一:JMQ的兩種消息模式

消息列隊有兩種消息模式,一種是點對點的消息模式,還有一種就是訂閱的模式.

 

1.1:點對點的消息模式

 

點對點的模式主要建立在一個隊列上面,當連接一個列隊的時候,發送端不需要知道接收端是否正在接收,可以直接向ActiveMQ發送消息,發送的消息,將會先進入隊列中,如果有接收端在監聽,則會發向接收端,如果沒有接收端接收,則會保存在activemq服務器,直到接收端接收消息,點對點的消息模式可以有多個發送端,多個接收端,但是一條消息,只會被一個接收端給接收到,哪個接收端先連上ActiveMQ,則會先接收到,而后來的接收端則接收不到那條消息

 

1.2:訂閱模式

 

訂閱/發布模式,同樣可以有着多個發送端與多個接收端,但是接收端與發送端存在時間上的依賴,就是如果發送端發送消息的時候,接收端並沒有監聽消息,那么ActiveMQ將不會保存消息,將會認為消息已經發送,換一種說法,就是發送端發送消息的時候,接收端不在線,是接收不到消息的,哪怕以后監聽消息,同樣也是接收不到的。這個模式還有一個特點,那就是,發送端發送的消息,將會被所有的接收端給接收到,不類似點對點,一條消息只會被一個接收端給接收到。

 

 

 

 

二:點對點的實現代碼

這里使用java來實現一下ActiveMQ的點對點模式。

ActiveMQ版本為 5.13.3

項目使用MAVEN來構建

復制代碼
    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.7.0</version>
        </dependency>
    </dependencies>
復制代碼

都是當前最新的版本

 

2.1:點對點的發送端

按 Ctrl+C 復制代碼
按 Ctrl+C 復制代碼

 

 

2.2:點對點的接收端

 

按 Ctrl+C 復制代碼
按 Ctrl+C 復制代碼

 

 

 

 

 

三:訂閱/發布模式的實現代碼

3.1:訂閱模式的發送端

復制代碼
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TOPSend {
    //連接賬號
        private String userName = "";
        //連接密碼
        private String password = "";
        //連接地址
        private String brokerURL = "tcp://192.168.0.130:61616";
        //connection的工廠
        private ConnectionFactory factory;
        //連接對象
        private Connection connection;
        //一個操作會話
        private Session session;
        //目的地,其實就是連接到哪個隊列,如果是點對點,那么它的實現是Queue,如果是訂閱模式,那它的實現是Topic
        private Destination destination;
        //生產者,就是產生數據的對象
        private MessageProducer producer;
        
        public static void main(String[] args) {
            TOPSend send = new TOPSend();
            send.start();
        }
        
        public void start(){
            try {
                //根據用戶名,密碼,url創建一個連接工廠
                factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
                //從工廠中獲取一個連接
                connection = factory.createConnection();
                //測試過這個步驟不寫也是可以的,但是網上的各個文檔都寫了
                connection.start();
                //創建一個session
                //第一個參數:是否支持事務,如果為true,則會忽略第二個參數,被jms服務器設置為SESSION_TRANSACTED
                //第二個參數為false時,paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
                //Session.AUTO_ACKNOWLEDGE為自動確認,客戶端發送和接收消息不需要做額外的工作。哪怕是接收端發生異常,也會被當作正常發送成功。
                //Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到消息后,必須調用javax.jms.Message的acknowledge方法。jms服務器才會當作發送成功,並刪除消息。
                //DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;而且允許重復確認。
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //創建一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個隊列吧,這里就是連接了一個名為"text-msg"的隊列,這個會話將會到這個隊列,當然,如果這個隊列不存在,將會被創建
                
                
                
                //=======================================================
                //點對點與訂閱模式唯一不同的地方,就是這一行代碼,點對點創建的是Queue,而訂閱模式創建的是Topic
                destination = session.createTopic("topic-text");
                //=======================================================
                
                
                
                
                //從session中,獲取一個消息生產者
                producer = session.createProducer(destination);
                //設置生產者的模式,有兩種可選
                //DeliveryMode.PERSISTENT 當activemq關閉的時候,隊列數據將會被保存 保存在緩存中,生產環境中可以存入數據庫,redis避免丟失
                //DeliveryMode.NON_PERSISTENT 當activemq關閉的時候,隊列里面的數據將會被清空
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                
                //創建一條消息,當然,消息的類型有很多,如文字,字節,對象等,可以通過session.create..方法來創建出來
                TextMessage textMsg = session.createTextMessage("哈哈");
                long s = System.currentTimeMillis();
                for(int i = 0 ; i < 100 ; i ++){
                    //發送一條消息
                    producer.send(textMsg);
                }
                long e = System.currentTimeMillis();
                System.out.println("發送消息成功");
                System.out.println(e - s);
                //即便生產者的對象關閉了,程序還在運行哦
                producer.close();
                
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
}
復制代碼

 

 

3.2:訂閱模式的接收端

復制代碼
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TOPSend {
    //連接賬號
        private String userName = "";
        //連接密碼
        private String password = "";
        //連接地址
        private String brokerURL = "tcp://192.168.0.130:61616";
        //connection的工廠
        private ConnectionFactory factory;
        //連接對象
        private Connection connection;
        //一個操作會話
        private Session session;
        //目的地,其實就是連接到哪個隊列,如果是點對點,那么它的實現是Queue,如果是訂閱模式,那它的實現是Topic
        private Destination destination;
        //生產者,就是產生數據的對象
        private MessageProducer producer;
        
        public static void main(String[] args) {
            TOPSend send = new TOPSend();
            send.start();
        }
        
        public void start(){
            try {
                //根據用戶名,密碼,url創建一個連接工廠
                factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
                //從工廠中獲取一個連接
                connection = factory.createConnection();
                //測試過這個步驟不寫也是可以的,但是網上的各個文檔都寫了
                connection.start();
                //創建一個session
                //第一個參數:是否支持事務,如果為true,則會忽略第二個參數,被jms服務器設置為SESSION_TRANSACTED
                //第二個參數為false時,paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
                //Session.AUTO_ACKNOWLEDGE為自動確認,客戶端發送和接收消息不需要做額外的工作。哪怕是接收端發生異常,也會被當作正常發送成功。
                //Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到消息后,必須調用javax.jms.Message的acknowledge方法。jms服務器才會當作發送成功,並刪除消息。
                //DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;而且允許重復確認。
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //創建一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個隊列吧,這里就是連接了一個名為"text-msg"的隊列,這個會話將會到這個隊列,當然,如果這個隊列不存在,將會被創建
                
                
                
                //=======================================================
                //點對點與訂閱模式唯一不同的地方,就是這一行代碼,點對點創建的是Queue,而訂閱模式創建的是Topic
                destination = session.createTopic("topic-text");
                //=======================================================
                
                
                
                
                //從session中,獲取一個消息生產者
                producer = session.createProducer(destination);
                //設置生產者的模式,有兩種可選
                //DeliveryMode.PERSISTENT 當activemq關閉的時候,隊列數據將會被保存
                //DeliveryMode.NON_PERSISTENT 當activemq關閉的時候,隊列里面的數據將會被清空
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                
                //創建一條消息,當然,消息的類型有很多,如文字,字節,對象等,可以通過session.create..方法來創建出來
                TextMessage textMsg = session.createTextMessage("哈哈");
                long s = System.currentTimeMillis();
                for(int i = 0 ; i < 100 ; i ++){
                    //發送一條消息
                    textMsg.setText("哈哈" + i);
                    producer.send(textMsg);
                }
                long e = System.currentTimeMillis();
                System.out.println("發送消息成功");
                System.out.println(e - s);
                //即便生產者的對象關閉了,程序還在運行哦
                producer.close();
                
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
}
復制代碼

 

四:發送消息的數據類型

上面的代碼演示,全部都是發送字符串,但是ActiveMQ支持哪些數據呢?

大家可以看一下  javax.jms.Message 這個接口,只要是這個接口的數據,都可以被發送。

 

或者這樣看起來有點麻煩,那么看到上面的代碼,創建消息,是通過session這個對象來創建的,那我們來看一下這里有哪些可以被創建的呢?

復制代碼
            //純字符串的數據
            session.createTextMessage();
            //序列化的對象
            session.createObjectMessage();
            //流,可以用來傳遞文件等
            session.createStreamMessage();
            //用來傳遞字節
            session.createBytesMessage();
            //這個方法創建出來的就是一個map,可以把它當作map來用,當你看了它的一些方法,你就懂了
            session.createMapMessage();
            //這個方法,拿到的是javax.jms.Message,是所有message的接口
            session.createMessage();
復制代碼

 

 

4.1:傳遞javabean對象

傳遞一個java對象,可能是最多的使用方式了,而且這種數據接收與使用都方便,那么,下面的代碼就來演示下如何發送一個java對象

當然了,這個對象必須序列化,也就是實現Serializable接口

 

復制代碼
            //通過這個方法,可以把一個對象發送出去,當然,這個對象需要序列化,因為一切在網絡在傳輸的,都是字節
            ObjectMessage obj = session.createObjectMessage();
            for(int i = 0 ; i < 100 ; i ++){
                Person p = new Person(i,"名字");
                obj.setObject(p);
                producer.send(obj);
            }
復制代碼

 

那么在接收端要怎么接收這個對象呢?

 

復制代碼
            //實現一個消息的監聽器
            //實現這個監聽器后,以后只要有消息,就會通過這個監聽器接收到
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        //同樣的,強轉為ObjectMessage,然后拿到對象,強轉為Person
                        Person p = (Person) ((ObjectMessage)message).getObject();
                        System.out.println(p);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    
                }
            });
復制代碼

 

 

 

 

4.2:發送文件

 

發送文件,這里用BytesMessage

            BytesMessage bb = session.createBytesMessage();
            bb.writeBytes(new byte[]{2});

至於這里的new Byte[]{2},肯定不是這樣寫的,從文件里面拿流出來即可

 

 

接收的話

復制代碼
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    
                    BytesMessage bm = (BytesMessage)message;
                    FileOutputStream out = null;
                    try {
                        out = new FileOutputStream("d:/1.ext");
                    } catch (FileNotFoundException e2) {
                        e2.printStackTrace();
                    }
                    byte[] by = new byte[1024];
                    int len = 0 ;
                    try {
                        while((len = bm.readBytes(by))!= -1){
                            out.write(by,0,len);
                        }
                    } catch (JMSException | IOException e1) {
                        e1.printStackTrace();
                    }
                    
                }
            });
復制代碼

 

 

 

 

 

 

五:ActiveMQ的應用

5.1:保證消息的成功處理

消息發送成功后,接收端接收到了消息。然后進行處理,但是可能由於某種原因,高並發也好,IO阻塞也好,反正這條消息在接收端處理失敗了。而點對點的特性是一條消息,只會被一個接收端給接收,只要接收端A接收成功了,接收端B,就不可能接收到這條消息,如果是一些普通的消息還好,但是如果是一些很重要的消息,比如說用戶的支付訂單,用戶的退款,這些與金錢相關的,是必須保證成功的,那么這個時候要怎么處理呢?

 

我們可以使用  CLIENT_ACKNOWLEDGE  模式

 

之前其實就有提到當創建一個session的時候,需要指定其事務,及消息的處理模式,當時使用的是

 

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 

AUTO_ACKNOWLEDGE 

這一個代碼的是,當消息發送給接收端之后,就自動確認成功了,而不管接收端有沒有處理成功,而一旦確認成功后,就會把隊列里面的消息給清除掉,避免下一個接收端接收到同樣的消息。

那么,它還有另外一個模式,那就是 CLIENT_ACKNOWLEDGE

這行要寫在接收端里面,不是寫在發送端的

 

session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

 

這行代碼以后,如果接收端不確認消息,那么activemq將會把這條消息一直保留,直到有一個接收端確定了消息。

那么要怎么確認消息呢?

在接收端接收到消息的時候,調用javax.jms.Message的acknowledge方法

復制代碼
@Override
                public void onMessage(Message message) {
                    try {
                        //獲取到接收的數據
                        String text = ((TextMessage)message).getText();
                        System.out.println(text);
                        //確認接收,並成功處理了消息
                        message.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
復制代碼

這樣,當消息處理成功之后,確認消息,如果不確定,activemq將會發給下一個接收端處理

 注意:只在點對點中有效,訂閱模式,即使不確認,也不會保存消息

 

 

5.2:避免消息隊列的並發

JMQ設計出來的原因,就是用來避免並發的,和溝通兩個系統之間的交互。

 

5.2.1:主動接收隊列消息

 

先看一下之前的代碼:

復制代碼
            //實現一個消息的監聽器
            //實現這個監聽器后,以后只要有消息,就會通過這個監聽器接收到
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        //獲取到接收的數據
                        String text = ((TextMessage)message).getText();
                        System.out.println(text);
                        //確認接收,並成功處理了消息
                        message.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
復制代碼

之前的代碼里面,實現了一個監聽器,監聽消息的傳遞,這樣只要每有一個消息,都會即時的傳遞到程序中。

但是,這樣的處理,在高並發的時候,因為它是被動接收,並沒有考慮到程序的處理能力,可能會壓跨系統,那要怎么辦呢?

 

答案就是把被動變為主動,當程序有着處理消息的能力時,主動去接收一條消息進行處理

 

實現的代碼如下:

復制代碼
      if(當程序有能力處理){//當程序有能力處理時接收
                    Message receive = consumer.receive();
           //這個可以設置超時時間,超過則不等待消息 
            recieve.receive(10000); //其實receive是一個阻塞式方法,一定會拿到值的 if(null != receive){ String text = ((TextMessage)receive).getText(); receive.acknowledge(); System.out.println(text); }else{ //沒有值嘛 // } }
復制代碼

 

通過上面的代碼,就可以讓程序自已判斷,自己是否有能力接收這條消息,如果不能接收,那就給別的接收端接收,或者等自己有能力處理的時候接收

 

 

5.2.2:使用多個接收端

ActiveMQ是支持多個接收端的,如果當程序無法處理這么多數據的時候,可以考慮多個線程,或者增加服務器來處理。

 

 

 

5.3:消息有效期的管理

這樣的場景也是有的,一條消息的有效時間,當發送一條消息的時候,可能希望這條消息在指定的時間被處理,如果超過了指定的時間,那么這條消息就失效了,就不需要進行處理了,那么我們可以使用ActiveMQ的設置有效期來實現

 

代碼如下:

復制代碼
            TextMessage msg = session.createTextMessage("哈哈");
            for(int i = 0 ; i < 100 ; i ++){
                //設置該消息的超時時間
                producer.setTimeToLive(i * 1000);
                producer.send(msg);
            }
復制代碼

 

這里每一條消息的有效期都是不同的,打開ip:8161/admin/就可以查看到,里面的消息越來越少了。

 

過期的消息是不會被接收到的。

 

過期的消息會從隊列中清除,並存儲到ActiveMQ.DLQ這個隊列里面,這個稍后會解釋。

 

 

5.4:過期消息,處理失敗的消息如何處理

 過期的、處理失敗的消息,將會被ActiveMQ置入“ActiveMQ.DLQ”這個隊列中。

這個隊列是ActiveMQ自動創建的。

如果需要查看這些未被處理的消息,可以進入這個隊列中查看

//指定一個目的地,也就是一個隊列的位置
destination = session.createQueue("ActiveMQ.DLQ");

這樣就可以進入隊列中,然后實現接口,或者通過receive()方法,就可以拿到未被處理的消息,從而保證正確的處理

 

 

六:ActiveMQ的安全配置

6.1:管理后台的密碼設置

我們都知道,打開ip:8161/admin/ 就是activemq的管理控制台,它的默認賬號和密碼都是admin,在生產環境肯定需要更改密碼的,這要怎么做呢?

在activemq/conf/jetty.xml中找到

復制代碼
   <pre name="code" class="html"> <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
        <property name="name" value="BASIC" />
        <property name="roles" value="admin" />
         <!-- 把這個改為true,當然,高版本的已經改為了true -->
        <property name="authenticate" value="true" />
  </bean>
復制代碼

高版本的已經默認成為了true。所以我們直接進行下一步即可

在activemq/conf/jetty-realm.properties文件中配置,打開如下

復制代碼
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements.  See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License.  You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------

# Defines users that can access the web (console, demo, etc.)
# username: password [,rolename ...]
#用戶名,密碼,角色
admin: admin, admin
user: user, user
復制代碼

注意:大家重點看倒數第二行,那里三個分別是用戶名,密碼,角色,其中admin角色是固定的

 

 

6.2:生產消費者的連接密碼

注意:activemq默認是不需要密碼,生產消費者就可以連接的

我們需要經過配置,才能設置密碼,這一步在生產環境中一定要配置

找到activemq/conf/activemq.xml,並打開
在<broker>節點中,在<systemUsage>節點上面,增加如下的一個插件

復制代碼
<plugins>
             <simpleAuthenticationPlugin>
                 <users>
                     <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>
                 </users>
             </simpleAuthenticationPlugin>
         </plugins>
復制代碼

這樣就開啟了密碼認證
然后賬號密碼的配置在activemq/conf/credentials.properties文件中

打開這個文件如下

復制代碼
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements.  See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License.  You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------

# Defines credentials that will be used by components (like web console) to access the broker

#賬號
activemq.username=admin
#密碼
activemq.password=123456
guest.password=password
復制代碼

 

 

這樣就配置完畢了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM