這里就不說怎么安裝了,直接解壓出來就行了。
謝絕轉載,作者保留所有權力
目錄:
一:JMQ的兩種消息模式
1.1:點對點的消息模式
1.2:訂閱模式
二:點對點的實現代碼
2.1:點對點的發送端
2.2:點對點的接收端
三:訂閱/發布模式的實現代碼
3.1:訂閱模式的發送端
3.2:訂閱模式的接收端
四:發送消息的數據類型
4.1:傳遞javabean對象
4.2:發送文件
五:ActiveMQ的應用
5.1:保證消息的成功處理
5.2:避免消息隊列的並發
5.2.1:主動接收隊列消息
5.2.2:使用多個接收端
5.3:消息有效期的管理
5.4:過期消息,處理失敗的消息如何處理
六:ActiveMQ的安全配置
6.1:管理后台的密碼設置
6.2:生產消費者的連接密碼
一:JMQ的兩種消息模式
消息列隊有兩種消息模式,一種是點對點的消息模式,還有一種就是訂閱的模式.
1.1:點對點的消息模式
點對點的模式主要建立在一個隊列上面,當連接一個列隊的時候,發送端不需要知道接收端是否正在接收,可以直接向ActiveMQ發送消息,發送的消息,將會先進入隊列中,如果有接收端在監聽,則會發向接收端,如果沒有接收端接收,則會保存在activemq服務器,直到接收端接收消息,點對點的消息模式可以有多個發送端,多個接收端,但是一條消息,只會被一個接收端給接收到,哪個接收端先連上ActiveMQ,則會先接收到,而后來的接收端則接收不到那條消息
1.2:訂閱模式
訂閱/發布模式,同樣可以有着多個發送端與多個接收端,但是接收端與發送端存在時間上的依賴,就是如果發送端發送消息的時候,接收端並沒有監聽消息,那么ActiveMQ將不會保存消息,將會認為消息已經發送,換一種說法,就是發送端發送消息的時候,接收端不在線,是接收不到消息的,哪怕以后監聽消息,同樣也是接收不到的。這個模式還有一個特點,那就是,發送端發送的消息,將會被所有的接收端給接收到,不類似點對點,一條消息只會被一個接收端給接收到。
二:點對點的實現代碼
這里使用java來實現一下ActiveMQ的點對點模式。
ActiveMQ版本為 5.13.3
項目使用MAVEN來構建
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
</dependencies>
都是當前最新的版本
2.1:點對點的發送端
2.2:點對點的接收端
三:訂閱/發布模式的實現代碼
3.1:訂閱模式的發送端
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TOPSend {
//連接賬號
private String userName = "";
//連接密碼
private String password = "";
//連接地址
private String brokerURL = "tcp://192.168.0.130:61616";
//connection的工廠
private ConnectionFactory factory;
//連接對象
private Connection connection;
//一個操作會話
private Session session;
//目的地,其實就是連接到哪個隊列,如果是點對點,那么它的實現是Queue,如果是訂閱模式,那它的實現是Topic
private Destination destination;
//生產者,就是產生數據的對象
private MessageProducer producer;
public static void main(String[] args) {
TOPSend send = new TOPSend();
send.start();
}
public void start(){
try {
//根據用戶名,密碼,url創建一個連接工廠
factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
//從工廠中獲取一個連接
connection = factory.createConnection();
//測試過這個步驟不寫也是可以的,但是網上的各個文檔都寫了
connection.start();
//創建一個session
//第一個參數:是否支持事務,如果為true,則會忽略第二個參數,被jms服務器設置為SESSION_TRANSACTED
//第二個參數為false時,paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
//Session.AUTO_ACKNOWLEDGE為自動確認,客戶端發送和接收消息不需要做額外的工作。哪怕是接收端發生異常,也會被當作正常發送成功。
//Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到消息后,必須調用javax.jms.Message的acknowledge方法。jms服務器才會當作發送成功,並刪除消息。
//DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;而且允許重復確認。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創建一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個隊列吧,這里就是連接了一個名為"text-msg"的隊列,這個會話將會到這個隊列,當然,如果這個隊列不存在,將會被創建
//=======================================================
//點對點與訂閱模式唯一不同的地方,就是這一行代碼,點對點創建的是Queue,而訂閱模式創建的是Topic
destination = session.createTopic("topic-text");
//=======================================================
//從session中,獲取一個消息生產者
producer = session.createProducer(destination);
//設置生產者的模式,有兩種可選
//DeliveryMode.PERSISTENT 當activemq關閉的時候,隊列數據將會被保存 保存在緩存中,生產環境中可以存入數據庫,redis避免丟失
//DeliveryMode.NON_PERSISTENT 當activemq關閉的時候,隊列里面的數據將會被清空
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//創建一條消息,當然,消息的類型有很多,如文字,字節,對象等,可以通過session.create..方法來創建出來
TextMessage textMsg = session.createTextMessage("哈哈");
long s = System.currentTimeMillis();
for(int i = 0 ; i < 100 ; i ++){
//發送一條消息
producer.send(textMsg);
}
long e = System.currentTimeMillis();
System.out.println("發送消息成功");
System.out.println(e - s);
//即便生產者的對象關閉了,程序還在運行哦
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
3.2:訂閱模式的接收端
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TOPSend {
//連接賬號
private String userName = "";
//連接密碼
private String password = "";
//連接地址
private String brokerURL = "tcp://192.168.0.130:61616";
//connection的工廠
private ConnectionFactory factory;
//連接對象
private Connection connection;
//一個操作會話
private Session session;
//目的地,其實就是連接到哪個隊列,如果是點對點,那么它的實現是Queue,如果是訂閱模式,那它的實現是Topic
private Destination destination;
//生產者,就是產生數據的對象
private MessageProducer producer;
public static void main(String[] args) {
TOPSend send = new TOPSend();
send.start();
}
public void start(){
try {
//根據用戶名,密碼,url創建一個連接工廠
factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
//從工廠中獲取一個連接
connection = factory.createConnection();
//測試過這個步驟不寫也是可以的,但是網上的各個文檔都寫了
connection.start();
//創建一個session
//第一個參數:是否支持事務,如果為true,則會忽略第二個參數,被jms服務器設置為SESSION_TRANSACTED
//第二個參數為false時,paramB的值可為Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一個。
//Session.AUTO_ACKNOWLEDGE為自動確認,客戶端發送和接收消息不需要做額外的工作。哪怕是接收端發生異常,也會被當作正常發送成功。
//Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到消息后,必須調用javax.jms.Message的acknowledge方法。jms服務器才會當作發送成功,並刪除消息。
//DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;而且允許重復確認。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//創建一個到達的目的地,其實想一下就知道了,activemq不可能同時只能跑一個隊列吧,這里就是連接了一個名為"text-msg"的隊列,這個會話將會到這個隊列,當然,如果這個隊列不存在,將會被創建
//=======================================================
//點對點與訂閱模式唯一不同的地方,就是這一行代碼,點對點創建的是Queue,而訂閱模式創建的是Topic
destination = session.createTopic("topic-text");
//=======================================================
//從session中,獲取一個消息生產者
producer = session.createProducer(destination);
//設置生產者的模式,有兩種可選
//DeliveryMode.PERSISTENT 當activemq關閉的時候,隊列數據將會被保存
//DeliveryMode.NON_PERSISTENT 當activemq關閉的時候,隊列里面的數據將會被清空
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//創建一條消息,當然,消息的類型有很多,如文字,字節,對象等,可以通過session.create..方法來創建出來
TextMessage textMsg = session.createTextMessage("哈哈");
long s = System.currentTimeMillis();
for(int i = 0 ; i < 100 ; i ++){
//發送一條消息
textMsg.setText("哈哈" + i);
producer.send(textMsg);
}
long e = System.currentTimeMillis();
System.out.println("發送消息成功");
System.out.println(e - s);
//即便生產者的對象關閉了,程序還在運行哦
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
四:發送消息的數據類型
上面的代碼演示,全部都是發送字符串,但是ActiveMQ支持哪些數據呢?
大家可以看一下 javax.jms.Message 這個接口,只要是這個接口的數據,都可以被發送。
或者這樣看起來有點麻煩,那么看到上面的代碼,創建消息,是通過session這個對象來創建的,那我們來看一下這里有哪些可以被創建的呢?
//純字符串的數據
session.createTextMessage();
//序列化的對象
session.createObjectMessage();
//流,可以用來傳遞文件等
session.createStreamMessage();
//用來傳遞字節
session.createBytesMessage();
//這個方法創建出來的就是一個map,可以把它當作map來用,當你看了它的一些方法,你就懂了
session.createMapMessage();
//這個方法,拿到的是javax.jms.Message,是所有message的接口
session.createMessage();
4.1:傳遞javabean對象
傳遞一個java對象,可能是最多的使用方式了,而且這種數據接收與使用都方便,那么,下面的代碼就來演示下如何發送一個java對象
當然了,這個對象必須序列化,也就是實現Serializable接口
//通過這個方法,可以把一個對象發送出去,當然,這個對象需要序列化,因為一切在網絡在傳輸的,都是字節
ObjectMessage obj = session.createObjectMessage();
for(int i = 0 ; i < 100 ; i ++){
Person p = new Person(i,"名字");
obj.setObject(p);
producer.send(obj);
}
那么在接收端要怎么接收這個對象呢?
//實現一個消息的監聽器
//實現這個監聽器后,以后只要有消息,就會通過這個監聽器接收到
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
//同樣的,強轉為ObjectMessage,然后拿到對象,強轉為Person
Person p = (Person) ((ObjectMessage)message).getObject();
System.out.println(p);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
4.2:發送文件
發送文件,這里用BytesMessage
BytesMessage bb = session.createBytesMessage();
bb.writeBytes(new byte[]{2});
至於這里的new Byte[]{2},肯定不是這樣寫的,從文件里面拿流出來即可
接收的話
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
BytesMessage bm = (BytesMessage)message;
FileOutputStream out = null;
try {
out = new FileOutputStream("d:/1.ext");
} catch (FileNotFoundException e2) {
e2.printStackTrace();
}
byte[] by = new byte[1024];
int len = 0 ;
try {
while((len = bm.readBytes(by))!= -1){
out.write(by,0,len);
}
} catch (JMSException | IOException e1) {
e1.printStackTrace();
}
}
});
五:ActiveMQ的應用
5.1:保證消息的成功處理
消息發送成功后,接收端接收到了消息。然后進行處理,但是可能由於某種原因,高並發也好,IO阻塞也好,反正這條消息在接收端處理失敗了。而點對點的特性是一條消息,只會被一個接收端給接收,只要接收端A接收成功了,接收端B,就不可能接收到這條消息,如果是一些普通的消息還好,但是如果是一些很重要的消息,比如說用戶的支付訂單,用戶的退款,這些與金錢相關的,是必須保證成功的,那么這個時候要怎么處理呢?
我們可以使用 CLIENT_ACKNOWLEDGE 模式
之前其實就有提到當創建一個session的時候,需要指定其事務,及消息的處理模式,當時使用的是
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
AUTO_ACKNOWLEDGE
這一個代碼的是,當消息發送給接收端之后,就自動確認成功了,而不管接收端有沒有處理成功,而一旦確認成功后,就會把隊列里面的消息給清除掉,避免下一個接收端接收到同樣的消息。
那么,它還有另外一個模式,那就是 CLIENT_ACKNOWLEDGE
這行要寫在接收端里面,不是寫在發送端的
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
這行代碼以后,如果接收端不確認消息,那么activemq將會把這條消息一直保留,直到有一個接收端確定了消息。
那么要怎么確認消息呢?
在接收端接收到消息的時候,調用javax.jms.Message的acknowledge方法
@Override
public void onMessage(Message message) {
try {
//獲取到接收的數據
String text = ((TextMessage)message).getText();
System.out.println(text);
//確認接收,並成功處理了消息
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
這樣,當消息處理成功之后,確認消息,如果不確定,activemq將會發給下一個接收端處理
注意:只在點對點中有效,訂閱模式,即使不確認,也不會保存消息
5.2:避免消息隊列的並發
JMQ設計出來的原因,就是用來避免並發的,和溝通兩個系統之間的交互。
5.2.1:主動接收隊列消息
先看一下之前的代碼:
//實現一個消息的監聽器
//實現這個監聽器后,以后只要有消息,就會通過這個監聽器接收到
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
//獲取到接收的數據
String text = ((TextMessage)message).getText();
System.out.println(text);
//確認接收,並成功處理了消息
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
之前的代碼里面,實現了一個監聽器,監聽消息的傳遞,這樣只要每有一個消息,都會即時的傳遞到程序中。
但是,這樣的處理,在高並發的時候,因為它是被動接收,並沒有考慮到程序的處理能力,可能會壓跨系統,那要怎么辦呢?
答案就是把被動變為主動,當程序有着處理消息的能力時,主動去接收一條消息進行處理
實現的代碼如下:
if(當程序有能力處理){//當程序有能力處理時接收
Message receive = consumer.receive();
//這個可以設置超時時間,超過則不等待消息
recieve.receive(10000);
//其實receive是一個阻塞式方法,一定會拿到值的
if(null != receive){
String text = ((TextMessage)receive).getText();
receive.acknowledge();
System.out.println(text);
}else{
//沒有值嘛
//
}
}
通過上面的代碼,就可以讓程序自已判斷,自己是否有能力接收這條消息,如果不能接收,那就給別的接收端接收,或者等自己有能力處理的時候接收
5.2.2:使用多個接收端
ActiveMQ是支持多個接收端的,如果當程序無法處理這么多數據的時候,可以考慮多個線程,或者增加服務器來處理。
5.3:消息有效期的管理
這樣的場景也是有的,一條消息的有效時間,當發送一條消息的時候,可能希望這條消息在指定的時間被處理,如果超過了指定的時間,那么這條消息就失效了,就不需要進行處理了,那么我們可以使用ActiveMQ的設置有效期來實現
代碼如下:
TextMessage msg = session.createTextMessage("哈哈");
for(int i = 0 ; i < 100 ; i ++){
//設置該消息的超時時間
producer.setTimeToLive(i * 1000);
producer.send(msg);
}
這里每一條消息的有效期都是不同的,打開ip:8161/admin/就可以查看到,里面的消息越來越少了。
過期的消息是不會被接收到的。
過期的消息會從隊列中清除,並存儲到ActiveMQ.DLQ這個隊列里面,這個稍后會解釋。
5.4:過期消息,處理失敗的消息如何處理
過期的、處理失敗的消息,將會被ActiveMQ置入“ActiveMQ.DLQ”這個隊列中。
這個隊列是ActiveMQ自動創建的。
如果需要查看這些未被處理的消息,可以進入這個隊列中查看
//指定一個目的地,也就是一個隊列的位置
destination = session.createQueue("ActiveMQ.DLQ");
這樣就可以進入隊列中,然后實現接口,或者通過receive()方法,就可以拿到未被處理的消息,從而保證正確的處理
六:ActiveMQ的安全配置
6.1:管理后台的密碼設置
我們都知道,打開ip:8161/admin/ 就是activemq的管理控制台,它的默認賬號和密碼都是admin,在生產環境肯定需要更改密碼的,這要怎么做呢?
在activemq/conf/jetty.xml中找到
<pre name="code" class="html"> <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
<property name="name" value="BASIC" />
<property name="roles" value="admin" />
<!-- 把這個改為true,當然,高版本的已經改為了true -->
<property name="authenticate" value="true" />
</bean>
高版本的已經默認成為了true。所以我們直接進行下一步即可
在activemq/conf/jetty-realm.properties文件中配置,打開如下
## --------------------------------------------------------------------------- ## Licensed to the Apache Software Foundation (ASF) under one or more ## contributor license agreements. See the NOTICE file distributed with ## this work for additional information regarding copyright ownership. ## The ASF licenses this file to You under the Apache License, Version 2.0 ## (the "License"); you may not use this file except in compliance with ## the License. You may obtain a copy of the License at ## ## http://www.apache.org/licenses/LICENSE-2.0 ## ## Unless required by applicable law or agreed to in writing, software ## distributed under the License is distributed on an "AS IS" BASIS, ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- # Defines users that can access the web (console, demo, etc.) # username: password [,rolename ...] #用戶名,密碼,角色 admin: admin, admin user: user, user
注意:大家重點看倒數第二行,那里三個分別是用戶名,密碼,角色,其中admin角色是固定的
6.2:生產消費者的連接密碼
注意:activemq默認是不需要密碼,生產消費者就可以連接的
我們需要經過配置,才能設置密碼,這一步在生產環境中一定要配置
找到activemq/conf/activemq.xml,並打開
在<broker>節點中,在<systemUsage>節點上面,增加如下的一個插件
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
這樣就開啟了密碼認證
然后賬號密碼的配置在activemq/conf/credentials.properties文件中
打開這個文件如下
## --------------------------------------------------------------------------- ## Licensed to the Apache Software Foundation (ASF) under one or more ## contributor license agreements. See the NOTICE file distributed with ## this work for additional information regarding copyright ownership. ## The ASF licenses this file to You under the Apache License, Version 2.0 ## (the "License"); you may not use this file except in compliance with ## the License. You may obtain a copy of the License at ## ## http://www.apache.org/licenses/LICENSE-2.0 ## ## Unless required by applicable law or agreed to in writing, software ## distributed under the License is distributed on an "AS IS" BASIS, ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- # Defines credentials that will be used by components (like web console) to access the broker #賬號 activemq.username=admin #密碼 activemq.password=123456 guest.password=password
這樣就配置完畢了。

