SpringBoot集成ActiveMQ實例詳解


在項目開發的過程中我們經常會遇到類似的業務場景:用戶申請提現,后台進行賬務處理、發送提現短信、調用銀行打款通道。

image

在這個過程中調用三方通道(短信或銀行通道)都比較耗時,同時賬務處理可能也是由專門的賬務系統進行處理。那么,為了提高並發和相應速度,后面的三個操作都可以通過異步進行處理。這就用到了消息隊列。

消息隊列中間件是分布式系統中重要的組件,主要解決應用耦合、異步消息、流量削鋒等問題,實現高性能、高可用、可伸縮和最終一致性架構,是大型分布式系統不可缺少的中間件。

市面上比較常見的消息隊列有:ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ。

在Spring Boot的starter中專門集成了ActiveMQ,因此,本篇文章我們就來講講對ActiveMQ的集成。

JMS規范

JMS即Java消息服務(Java Message Service)應用程序接口,是一個Java平台中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平台無關的API,絕大多數MOM提供商都對JMS提供支持。

JMS的消息機制有2種模型,一種是隊列的形式(Point to Point—)發送的消息只能被一個消費者消費;一種是訂閱(Topic)模式,可以被多個訂閱者訂閱,訂閱者都會接收到同樣的消息。

而ActiveMQ就是對JMS的實現之一。

ActiveMQ介紹

ActiveMQ是一種開源的基於JMS(Java Message Servie)規范的一種消息中間件的實現,ActiveMQ的設計目標是提供標准的、面向消息的、能夠跨越多語言和多系統的應用集成消息通信中間件。

它為企業應用中消息傳遞提供高可用、出色性能、可擴展、穩定和安全保障。

ActiveMQ實現JMS規范並在此之上提供大量額外的特性。ActiveMQ支持隊列和訂閱兩種模式的消息發送。

AcitveMQ的數據傳送流程如下圖:

image

ActiveMQ的兩種消息傳遞類型:

(1)點對點傳輸,即一個生產者對應一個消費者,生產者向broke推送數據,數據存儲在broke的一個隊列中,當消費者接受該條隊列里的數據。

(2)基於發布/訂閱模式的傳輸,即根據訂閱話題來接收相應數據,一個生產者可向多個消費者推送數據,與MQTT協議的實現是類似的。

兩種消息傳遞類型的不同,點對點傳輸消費者可以接收到在連接之前生產者所推送的數據,而基於發布/訂閱模式的傳輸方式消費者只能接收到連接之后生產者推送的數據。

Spring Boot集成ActiveMQ

Spring Boot針對ActiveMQ專門提供了spring-boot-starter-activemq,用來支持ActiveMQ在Spring Boot的自動集成配置。在此基礎上我們可以很輕易的進行集成和使用。

創建項目並引入依賴

創建標准的Spring Boot項目,並在項目中引入以下依賴:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

此時如果不需要web或其他相關處理,只引入該依賴即可。如果使用pool的話, 就需要在pom中加入以下依賴:

<dependency>
     <groupId>org.apache.activemq</groupId>
     <artifactId>activemq-pool</artifactId>
</dependency>

配置文件

在application.properties中添加如下配置:

# 基於內存的ActiveMQ
spring.activemq.in-memory=true
# 不使用連接池,如果使用連接池還需在pom中添加activemq-pool的依賴
spring.activemq.pool.enabled=false

# 獨立安裝的ActiveMQ
#spring.activemq.broker-url=tcp://127.0.0.1:61616
#spring.activemq.user=admin
#spring.activemq.password=admin

上述配置中有兩套配置,Spring Boot支持基於內存ActiveMQ和基於獨立安裝的ActiveMQ。正常請求基於內存的形式是為了方便測試而使用,基於獨立安裝的形式才是真正用於生產環境。此處為了講解功能,方便測試,采用基於內存的形式。

隊列模式實例

首先,我們來實現基於隊列(Queue)形式的實現。這里需要用到兩個類ActiveMQQueue和JmsMessagingTemplate。前者是由ActiveMQ對javax.jms.Queue的接口實現。后者為Spring提供發送消息的工具類,結合Queue對消息進行發送。

JmsMessagingTemplate默認已經被實例化,直接拿來使用即可。而ActiveMQQueue則需要我們進行實例化,並傳入消息隊列的名稱。

@Configuration
public class MyMqConfig {

	@Bean
	public Queue queue() {
		return new ActiveMQQueue("sms.queue");
	}
}

Spring Boot中很常規的實例化操作,不再贅述。當實例化完ActiveMQQueue之后,我們的隊列便創建完成,下面創建對應的生產者和消費者。

生產者對應代碼如下:

@Component
public class Producer {

	@Resource
	private JmsMessagingTemplate jmsMessagingTemplate;

	@Resource
	private Queue queue;

	public void sendMsg(String msg) {
		System.out.println("發送消息內容 :" + msg);
		this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
	}

}

此處用到JmsMessagingTemplate和Queue,上面已經提到,這兩個類都已經完成了初始化。消費者對應的配置如下:

@Component
public class Consumer {

	@JmsListener(destination = "sms.queue")
	public void receiveMsg(String text) {
		System.out.println("接收到消息 : "+text);
	}
}

Spring提供了注解式監聽器端點:使用@JmsListener。使用@JmsListener托管bean的帶注釋方法對其進行訂閱。在Java8中,@JmsListener是一個可重復的注解,可以關聯多個JMS destinations到同一個方法中。而在Java 6和7中,可以使用@JmsListeners注解。

其中destination指定監控的消息隊列名稱為“sms.queue”。當隊列sms.queue中有消息發送時會觸發此方法的執行,text為消息內容。

上面完成了隊列初始化、生產者和消費者代碼的編寫,下面通過單元測試來驗證是否能夠正確發送和處理消息。

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActiveMqTests {

	@Autowired
	private Producer producer;

	@Test
	public void sendSimpleQueueMessage() {
		this.producer.sendMsg("提現200.00元");
	}
}

執行單元測試,會發現在日志中打印如下信息:

發送消息內容 :提現200.00元
接收到消息 : 提現200.00元

說明消息可以正常發送和接收。如果是基於內存模式,在執行單元測試時會打印出“javax.jms.JMSException: peer (vm://localhost#1) stopped.”異常日志,這是Info級別的錯誤,是ActiveMQ的一個bug。

訂閱模式實例

廣播發送的消息,可以被多個消費者接收。這里我們就在原有的基礎上進行廣播消息的添加。

首先,Spring Boot集成ActiveMQ時默認只支持隊列或者廣播之一,通過配置項spring.jms.pub-sub-domain來指定,true 為廣播模式,false為隊列模式,默認情況下支持隊列模式。

此時要使用廣播模式,則需在配置文件中添加如下配置:

spring.jms.pub-sub-domain=true

需要注意的是,此時隊列模式不可正常工作。

然后在MyMqConfig中添加:

@Bean
public Topic topic() {
	return new ActiveMQTopic("sms.topic");
}

這里創建了ActiveMQTopic,並將topic的名稱指定為sms.topic。

Producer中新增如下代碼:

@Resource
private Topic topic;

public void sendTopic(String msg) {
	System.out.println("發送Topic消息內容 :"+msg);
	this.jmsMessagingTemplate.convertAndSend(this.topic, msg);
}

為了演示多個廣播接收者,在Comsumer中新增兩個消費者:

@JmsListener(destination = "sms.topic")
public void receiveTopic1(String text) {
	System.out.println("receiveTopic1接收到Topic消息 : " + text);
}

@JmsListener(destination = "sms.topic")
public void receiveTopic2(String text) {
	System.out.println("receiveTopic2接收到Topic消息 : " + text);
}

單元測試類中新增如下測試:

@Test
public void sendSimpleTopicMessage() {
	this.producer.sendTopic("提現200.00元");
}

此時,執行單元測試,便可看到如下日志信息:

發送Topic消息內容 :提現200.00元
receiveTopic2接收到Topic消息 : 提現200.00元
receiveTopic1接收到Topic消息 : 提現200.00元

說明消息發送成功。

同時支持兩種形式

在上面的實例中,要么支持隊列模式要么支持廣播模式,如果在生產環境中兩者都需要支持,那么就需要自定義JmsListenerContainerFactory實例。當然,如果Spring Boot默認的配置無法滿足需求,也可以自定義該類,這里只是其中場景之一。

基本配置和使用步驟:通過DefaultJmsListenerContainerFactory創建自定義的JmsListenerContainerFactory實例,在@JmsListener注解中通過containerFactory屬性進行引用。

在MyMqConfig配置類中新增如下配置:

@Bean("queueListenerFactory")
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {
	DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
	factory.setConnectionFactory(connectionFactory);
	factory.setPubSubDomain(false);
	return factory;
}

@Bean("topicListenerFactory")
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
	DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
	factory.setConnectionFactory(connectionFactory);
	//設置為發布訂閱方式, 默認情況下使用的生產消費者方式
	factory.setPubSubDomain(true);
	return factory;
}

這里分別實例化了基於隊列和訂閱的工廠類。然后分別在對應的消費者方法上添加containerFactory屬性。示例代碼如下:

@JmsListener(destination = "sms.queue", containerFactory = "queueListenerFactory")
public void receiveMsg(String text) {
	System.out.println("接收到消息 : " + text);
}

@JmsListener(destination = "sms.topic", containerFactory = "topicListenerFactory")
public void receiveTopic1(String text) {
	System.out.println("receiveTopic1接收到Topic消息 : " + text);
}

分別執行兩種形式的消息,發現都正常互利。同時,此時配置文件中的項spring.jms.pub-sub-domain也無效了。

其他事項

1、activeMq的端口號是61616;

2、使用topic,需要配置spring.jms.pub-sub-domain=true;

3、queue如果沒有消費者,會將信息存儲到queue中;

4、發送的消息為對象的時候,需要將對象序列化;消費者接收對象信息時需要使用ObjectMessage進行轉化;

5、使用JmsListener注解中的containerFactory屬性,可以配置spring.jms.pub-sub屬性,實現同時接收queque和topic;

6、queue為點對點模式;tipic為發布訂閱模式;

7、示例中的消息隊列名稱(sms.queue和sms.topic)可根據需要設置成配置屬性;

源碼地址:https://github.com/secbr/springboot-learn/tree/master/springboot-activemq

參考文章:

https://www.cnblogs.com/xiguadadage/p/11217604.html

https://blog.csdn.net/bihansheng2010/article/details/87190645


程序新視界:精彩和成長都不容錯過

程序新視界-微信公眾號


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM