1、ActiveMQ中消息的管理機制:使用ActiveMQ的目的必然是處理消息,大體步驟如下:
1)通過ConnectionFactory連接到ActiveMQ服務器
2)通過ConnectionFactory創建Connection
3)通過Connection獲取Session
4)通過Session創建消息的目的地,即隊列(Queue)或主題(Topic)
5)通過Session創建消息生產者,生產的目的地。即4)中創建的Queue或Topic
6)通過Session創建消息
7)通過消息生產者將消息發送至消息的目的地
8)消費者在相同的消息目的地消費消息
注:至於消息的目的地應該是隊列(Queue)還是主題(Topic),需要看該消息是只能被消費一次、還是需要被消費多次,也就是說Queue中的消息適合點對點的模式,而Topic中的消息則適合發布/訂閱模式。Queue和Topic都是Destination接口的實現
2、消費目的地的兩種模式:
1)點對點(P2P)模型
點對點模型,采用的是隊列(Queue)作為消息載體。在該模式中,一條消息只能被一個消費者消費,沒有被消費的,只能留在隊列中,等待被消費,或者超時。
2)發布/訂閱(Pub/Sub)模型
Topic模式適用於發布/訂閱,生產者將消息發布到Topic中,每個消息都可能有多個消費者,屬於1:N的關系,生產者和消費者之間有時間上的相關性,訂閱某一個主題的消費者只能消費自它訂閱之后生產者發布的消息,如果生產者在發布消息時沒有訂閱者,那發布的消息就是一條垃圾消息,因此生產者在發布消息時應該檢查當前是否有訂閱者。
JMS規范還允許客戶創建持久訂閱,這在一定程度上放松了時間上的相關性,持久訂閱允許消費者消費它在未處於激活狀態時生產者發送到主題的消息。
發布/訂閱模型采用的是主題(Topic)作為消息通訊載體。該模式類似微信公眾號的模式。發布者發布一條信息,然后將該信息傳遞給所有的訂閱者。注意:訂閱者想要接收到該信息,必須在該信息發布之前訂閱。
3、Message Groups(消息分組)
邏輯上,可以看成是一種並發的Exclusive Consumer(排它消費)。JMS消息屬性JMXGroupID被用來區分Message Group。
消息分組的特性:
a.保證所有具有相同JMSGroupID的消息會被分發到相同的消費者(只要這個Consumer保持Active);
b.Message Groups也是一種負載均衡的機制;
在一個消息被分發到Consumer前,Broker會檢查消息的JMSGroupID屬性。如果存在,那么broker會檢查是否有某個Consumer擁有這個Message Group。如果沒有,那么broker會選擇一個Consumer,並將它關聯到這個Message Group。此后,這個Consumer會接收這個Message Group的所有消息,直到Consumer被關閉。
從4.1版本開始,ActiveMQ支持一個布爾字段JMSXGroupFirstForConsumer。當某個message group的第一個消息被發送到consumer的時候,這個字段被設置。如果客戶使用failover transport連接到broker。在由於網絡問題等造成客戶重新連接到broker的時候,相同message group的消息可能會被分發到不同與之前的consumer,因此JMSXGroupFirstForConsumer字段也會被重新設置。
4、點對點模式和發布/訂閱(Pub/Sub)模式下生產消息和消費消息示例:
4.1 依賴引入:
<!--activemq依賴引入-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
##activemq service info
activemq.address=127.0.0.1
activemq.port=61616
activemq.username=admin
activemq.password=admin
4.2 代碼示列
1)生產和消費請求:ActiveMqQueueController
package com.zj.weblearn.controller.activemq; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.util.HashMap; import java.util.Map; /* * @Copyright (C), 2002-2020, * @ClassName: ActiveMqApp * @Author: * @Date: 2020/9/24 15:49 * @Description: * @History: * @Version:1.0 */ @Controller() @RequestMapping("/activemq/queue") public class ActiveMqQueueController { @Autowired com.zj.weblearn.serviceImpl.activemq.QueueMessageServiceImpl queueMessageServiceImpl; //點對點模式:生產者生產消息 http://localhost:8080/activemq/queue/produceMess.do @RequestMapping("/produceMess") @ResponseBody public Map producerSendMessage(){ Map resultMap=new HashMap(); resultMap.put("isSuccess",false); Map<String,String> message=new HashMap(); for(int i=0;i<10;i++){ message.put(String.valueOf(i),"this is "+i+" message"); } message.put(String.valueOf(10),""); try { //向隊列中發送消息 queueMessageServiceImpl.p2pProducerSendMess(message); resultMap.put("isSuccess",true); }catch (Exception e){ resultMap.put("errorMessage","生產消息異常"); } return resultMap; } //消費者消費消息 http://localhost:8080/activemq/queue/consumeMess.do @RequestMapping("/consumeMess") @ResponseBody public Map consumeMessage(){ Map resultMap=new HashMap(); resultMap.put("isSuccess",false); try {//消費隊列中消息的三種方法: //queueMessageServiceImpl.p2pconsumerConsumeMessMethod1ByReceive();//消費方法1 //queueMessageServiceImpl.p2pconsumerConsumeMessMethod2BySetReceiveTime();//消費方法2 queueMessageServiceImpl.p2pconsumerConsumeMessMethod3ByMessageListener();//消費方法3 resultMap.put("isSuccess",true); }catch (Exception e){ resultMap.put("errorMessage","消息消費異常"+e); } return resultMap; } //一邊生產一邊消費 http://localhost:8080/activemq/queue/sideProductSideConsume.do @RequestMapping("/sideProductSideConsume") @ResponseBody public Map sideProductSideConsumeMessage(){ Map resultMap=new HashMap(); resultMap.put("isSuccess",false); Map<String,String> message=new HashMap(); try {// queueMessageServiceImpl.sideProductSideConsumeMessage(message);//消費方法3 resultMap.put("isSuccess",true); }catch (Exception e){ resultMap.put("errorMessage","消息消費異常"+e); } return resultMap; } //消息分組 http://localhost:8080/activemq/queue/messageGroupBy.do @RequestMapping("/messageGroupBy") @ResponseBody public Map messageGroupBy(){ Map resultMap=new HashMap(); resultMap.put("isSuccess",false); Map<String,String> message=new HashMap(); try {// queueMessageServiceImpl.messageConsumedByGroup(message);//消費方法3 resultMap.put("isSuccess",true); }catch (Exception e){ resultMap.put("errorMessage","消息消費異常"+e); } return resultMap; } }
2)創建連接工具方法:ActiveMqConnectionUtils
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.JMSException; /* * @Copyright (C), 2002-2020, * @ClassName: RabbitMqConnectionUtils * @Author: * @Date: 2020/9/23 9:42 * @Description: * @History: * @Version:1.0 */ public class ActiveMqConnectionUtils { public static javax.jms.Connection getConnection() { javax.jms.Connection connection = null; try { javax.jms.ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ReadPropertiesUtils1.getValue("activemq.username"), ReadPropertiesUtils1.getValue("activemq.password"), "tcp://127.0.0.1:61616"); connection = connectionFactory.createConnection(); connection.start(); } catch (JMSException e) { e.printStackTrace(); } return connection; } }
3)點對點模式:生產和消費消息方法生產和消費消息方法(包含:receive()阻塞消費、receive(5000)消費時設置超時時間、consumer.setMessageListener(new MessageListener(){})消費時設置監聽是非阻塞的;)
import com.zj.weblearn.utils.ActiveMqConnectionUtils; import org.apache.activemq.broker.jmx.CompositeDataConstants; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import javax.jms.*; import java.io.UnsupportedEncodingException; import java.util.Map; /* * @Copyright (C), 2002-2020, * @ClassName: QueueMessageServiceImpl * @Author: * @Date: 2020/9/25 10:07 * @Description: * @History: * @Version:1.0 */ @Service public class QueueMessageServiceImpl { /* * 實現步驟 * 1.建立ConnectionFactory工廠對象,需要填入用戶名、密碼、連接地址(一般使用默認,如果沒有修改的話) * 2.通過ConnectionFactory對象創建一個Connection連接,並且調用Connection的start方法開啟連接,Connection方法默認是關閉的 * 3.通過Connection對象創建Session會話(上下文環境對象),用於接收消息,參數1是是否啟用事物,參數2是簽收模式,一般設置為自動簽收 * 4.通過Session對象創建Destination對象,指的是一個客戶端用來制定生產消息目標和消費消息來源的對象。在PTP的模式中,Destination被稱作隊列,在Pub/Sub模式中,Destination被稱作主題(Topic) * 5.通過Session對象創建消息的發送和接收對象(生產者和消費者) * 6.通過MessageProducer的setDeliverMode方法為其設置持久化或者非持久化特性 * 7.使用JMS規范的TextMessage形式創建數據(通過Session對象),並用MessageProducer的send方法發送數據。客戶端同理。記得關閉 */ //點對點模式,生產者生產消息 public void p2pProducerSendMess(Map<String, String> message) { Connection connection = ActiveMqConnectionUtils.getConnection(); try { Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("OrderCancleQueue"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (Map.Entry<String, String> entry : message.entrySet()) { TextMessage textMessage = session.createTextMessage(); MapMessage mapMessage=session.createMapMessage(); textMessage.setText(entry.getValue().toString()); producer.send(textMessage); } //此處有一個奇怪的現象,生產者還未把自身的10個消息全部發送完成,消費者就開始消費了,並且消費消息的順序沒有保證 System.out.println("p2pProducerSendMess>>>send message end"); if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } //點對點模式,消費者消費消息 總共有3種,方法1:通過receive()方法實現 public void p2pconsumerConsumeMessMethod1ByReceive() { Connection connection = ActiveMqConnectionUtils.getConnection(); try { Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("OrderCancleQueue"); MessageConsumer consumer = session.createConsumer(destination); while (true) { /* consumer.receive()是一個阻塞方法,在接收不到隊列中的消息時就一直阻塞直到接收到消息為止,因此while循環是不可能被中斷的,下面的資源也不會得到釋放,就會一直和ActiveMQ保持連接。 */ TextMessage textMessage = (TextMessage) consumer.receive(); String messageInfo=textMessage.getText(); if(StringUtils.isNotEmpty(messageInfo)){ System.out.println("p2pconsumerConsumeMessMethod1>>>consumer>>consume>>message>>"+new String (textMessage.getText().getBytes("iso8859-1"),"UTF-8")); }else{ System.out.println("p2pconsumerConsumeMessMethod1>>>consumer>>consume>>message>> end"); break; } } } catch (JMSException | UnsupportedEncodingException e) { e.printStackTrace(); } } //點對點模式,消費者消費消息 總共有3種,方法2:通過receive(Long var) 設置超時時間實現 public void p2pconsumerConsumeMessMethod2BySetReceiveTime() { Connection connection = ActiveMqConnectionUtils.getConnection(); try { Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("OrderCancleQueue"); MessageConsumer consumer = session.createConsumer(destination); while (true) { /* consumer.receive(4000l) */ TextMessage textMessage = (TextMessage) consumer.receive(5000);//5s String messageInfo=textMessage.getText(); if(StringUtils.isNotEmpty(messageInfo)){ System.out.println("p2pconsumerConsumeMessMethod1>>>consumer>>consume>>message>>"+new String (textMessage.getText().getBytes("iso8859-1"),"UTF-8")); }else{ System.out.println("p2pconsumerConsumeMessMethod1>>>consumer>>consume>>message>> end"); break; } } } catch (JMSException | UnsupportedEncodingException e) { e.printStackTrace(); } } //點對點模式,消費者消費消息 總共有3種,方法3:采用消息監聽器:messageListener /* * 1)使用消息監聽(即MessageListener)的方式也是非阻塞的,在監聽不到隊列中的消息時不會一直等待 * 2)使用消息監聽的方式在監聽到有消息時會開啟一個新的線程處理消息,因此如果我們在主線程中釋放了資源就可能會看不到onMessage()方法中的打印結果 * 3)使用消息監聽的方式一旦我們釋放了連接等資源,在生產者再次向監聽的隊列中發送消息時就監聽不到新發送的消息了,這就失去了監聽的意義,那么問題來了:使用消息監聽的方式到底該不該釋放連接等資源呢?釋放了就會監聽不到新的消息,不就失去了監聽的意義了嗎?不釋放又會占用系統資源,應該怎么辦呢? * * 答案是不釋放,因為MQ是用在分布式系統內部的,用於各個系統之間的消息通信(所有系統的總量其實並不會很多),對於消費者端而言基本上一個隊列或者一個Topic的連接數最多是分布式子系統的個數(在集群環境下最多是子系統的個數的n倍,n的值並不會很大,也就是每個子系統都要消費所有隊列和訂閱所有Topic的消息時的所需的連接數是最多的),因此連接數並不會很多(估算的話最多是:隊列的個數 * 系統的總個數 * 集群的倍數 + Topic的個數 * 系統的總個數 * 集群的倍數),所以沒必要考慮負載和系統資源占用的問題。雖然是這樣,但我們並不能隨便使用MQ,不能僅僅為了一個異步處理就開啟並占用一個MQ的連接,在異步處理的時候可以采用開啟線程的方式 * * 4)ActiveMQ隊列中的消息在消費時采用的是輪詢負載的機制,即每個消費者輪流消費消息,當然負載機制是可以配置的 * * */ public void p2pconsumerConsumeMessMethod3ByMessageListener() { Connection connection = ActiveMqConnectionUtils.getConnection(); try { //【注:第一個參數表示是否開啟事務(如果采用事務方式,在發送完消息后需要關閉session,否則消息不能發送出去);第二個參數為應答模式,我們使用生產一次,消費一次,自動應答這種模式;】 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("OrderCancleQueue"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage msg = (TextMessage) message; String text = null; try { text = msg.getText(); } catch (JMSException e) { e.printStackTrace(); } System.out.println(">>son thread:" + Thread.currentThread().getName()); System.out.println(text); } }); } catch (JMSException e) { e.printStackTrace(); } } public void sideProductSideConsumeMessage(Map<String, String> message) { for(int i=0;i<10;i++){ message.put(String.valueOf(i),"this is "+i+" message"); } Connection connection = ActiveMqConnectionUtils.getConnection(); try { Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("OrderCancleQueue"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (Map.Entry<String, String> entry : message.entrySet()) { TextMessage textMessage = session.createTextMessage(); textMessage.setText(entry.getValue().toString()); //發送消息 producer.send(textMessage); } MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage msg = (TextMessage) message; String text = null; try { text = msg.getText(); } catch (JMSException e) { e.printStackTrace(); } System.out.println(">>son thread>>" + Thread.currentThread().getName()+"consume message"+text); } }); } catch (JMSException e) { e.printStackTrace(); } } public void messageConsumedByGroup(Map<String, String> message) { for(int i=0;i<10;i++){ message.put(String.valueOf(i),"this is "+i+" message"); } Connection connection = ActiveMqConnectionUtils.getConnection(); try { Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("OrderCancleQueue"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); int i=0; for (Map.Entry<String, String> entry : message.entrySet()) { i++; TextMessage textMessage = session.createTextMessage(); textMessage.setStringProperty(CompositeDataConstants.JMSXGROUP_ID, i % 2 + ""); textMessage.setText(entry.getValue().toString()); //發送消息 producer.send(textMessage); //同時創建多個消費者,以便進行不同組中信息的消費 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage msg = (TextMessage) message; String text = null; try { text = msg.getText(); System.out.println(">>son thread>>" + Thread.currentThread().getName()+" consume message("+text+") group name>>"+message.getStringProperty(CompositeDataConstants.JMSXGROUP_ID)); } catch (JMSException e) { e.printStackTrace(); } } }); } } catch (JMSException e) { e.printStackTrace(); } } }
4)發布/訂閱(Pub/Sub)模型:生產和消費消息方法
import com.zj.weblearn.utils.ActiveMqConnectionUtils; import org.springframework.stereotype.Service; import javax.jms.*; import java.util.Map; /* * @Copyright (C), 2002-2020, * @ClassName: TopicMessageServiceImpl * @Author: * @Date: 2020/9/24 16:11 * @Description: * @History: * @Version:1.0 */ @Service public class TopicMessageServiceImpl { //發布/訂閱(Pub/Sub)模型,發布者發布信息: public void pubSubProducerPubMess(Map<String,String> message) { Connection connection = ActiveMqConnectionUtils.getConnection(); try { Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 5、使用Session對象創建一個Destination對象。兩種形式queue、topic,現在應該使用topic Topic topic = session.createTopic("test-topic"); // 6、使用Session對象創建一個Producer對象。 MessageProducer producer = session.createProducer(topic); // 7、創建一個Message對象,可以使用TextMessage。 for(Map.Entry entry:message.entrySet()){ TextMessage textMessage = session.createTextMessage("send message " + entry.getValue()); // 8、發送消息 producer.send(textMessage); } // 9、關閉資源 producer.close(); session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } //發布/訂閱(Pub/Sub)模型,訂閱者訂閱信息 public void pubSubConsumerSubMess() { Connection connection = ActiveMqConnectionUtils.getConnection(); try { Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 創建一個Destination對象。topic對象 Topic topic = session.createTopic("test-topic"); // 使用Session對象創建一個消費者對象。 MessageConsumer consumer = session.createConsumer(topic); System.out.println("topic consumer start sub message"); // 接收消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { // 打印結果 TextMessage textMessage = (TextMessage) message; String text; try { text = textMessage.getText(); System.out.println("this is recived messages>>" + text); } catch (JMSException e) { e.printStackTrace(); } } }); // 關閉資源 //consumer.close(); // session.close(); // connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
5、控制台中的相關描述:http://localhost:8161/admin/
Number Of Consumers 消費者 這個是消費者端的消費者數量 。
Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。可以理解為總接收數-總出隊列數
Messages Enqueued 進入隊列的消息 進入隊列的總數量,包括出隊列的。 這個數量只增不減
Messages Dequeued 出了隊列的消息 可以理解為是消費這消費掉的數量
