我們都知道ActiveMQ是消息中間件,首先我們先來了解下一些相關的概念;
消息
“消息”是在兩台計算機間傳送的數據單位。消息可以非常簡單,例如只包含文本字符串;也可以更復雜,可能包含嵌入對象。
消息隊列
“消息隊列”是在消息的傳輸過程中保存消息的容器,
消息隊列管理器在將消息從它的源中繼到它的目標時充當中間人,
隊列的主要目的是提供路由並保證消息的傳遞;如果發送消息時接收者不可用,消息隊列會保留消息,直到可以成功地傳遞它。
消息隊列的特點
消息隊列的主要特點是異步處理,主要目的是減少請求響應時間和解耦。
所以主要的使用場景就是將比較耗時而且不需要即時(同步)返回結果的操作作為消息放入消息隊列。
同時由於使用了消息隊列,只要保證消息格式不變,消息的發送方和接收方並不需要彼此聯系,也不需要受對方的影響,即解耦和。
ActiveMQ簡介
ActiveMQ是Apache軟件基金會所研發的開放源代碼消息中間件;由於ActiveMQ是一個純Java程序,因此只需要操作系統支持Java虛擬機,ActiveMQ便可執行。
對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去通過了常見J2EE服務器的測試
ActiveMQ角色介紹
1,Destination 目的地,消息發送者需要指定Destination才能發送消息,接收者需要指定Destination才能接收消息。
2,Producer 消息生產者,負責發送Message到目的地
3,Consumer/Receiver 消息消費者,負責從目的地中消費(處理/監聽/訂閱)Message
4,Message 消息 消息封裝一次通信的內容。
這里我們在spring整合ActiveMQ來寫個小例子.
先創建一個maven項目,並在pom.xml添加相關依賴,如下:
<dependencies>
<!-- ActiveMQ客戶端完整jar包依賴 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<!-- ActiveMQ和Spring整合配置文件標簽處理jar包依賴 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.5</version>
</dependency>
<!-- Spring-JMS插件相關jar包依賴 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.1.6.RELEASE</version>
</dependency>
<!-- Spring框架上下文jar包依賴 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.1.6.RELEASE</version>
</dependency>
</dependencies>
然后定義消息載體類型. 即要在ActiveMQ中傳遞的數據實體類型,
消息載體對象必須實現接口java.io.Serializable, 因為消息需要在網絡中傳遞,要求必須可序列化。
代碼如下:
public class Order implements Serializable { private static final long serialVersionUID = 1L; private String id; private String nick; private Long price; private Date createTime; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getNick() { return nick; } public void setNick(String nick) { this.nick = nick; } public Long getPrice() { return price; } public void setPrice(Long price) { this.price = price; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public static long getSerialversionuid() { return serialVersionUID; } @Override public String toString() { return "Order [id=" + id + ", nick=" + nick + ", price=" + price + ", createTime=" + createTime + "]"; } }
定義生產者
public class OrderProducer { private JmsTemplate template; public JmsTemplate getTemplate() { return template; } public void setTemplate(JmsTemplate template) { this.template = template; } /** * 生產者發送消息 * @param destinationName 目的地名稱 * @param order 需要發送的訂單數據 */
public void sendOrder(String destinationName, Order order){ try{ template.send(destinationName, new MessageCreator() { /** * 通過模板模式暴露的方法設置發送的信息 */ @Override public Message createMessage(Session session) throws JMSException { ObjectMessage objectMessage = session.createObjectMessage(order); return objectMessage; } }); }catch(Exception e){ e.printStackTrace(); throw new RuntimeException("send order to MQ server error!!"); } }
再來定義消費者:
public class OrderConsumer implements MessageListener { @Override public void onMessage(Message message) { try { ObjectMessage objectMessage=(ObjectMessage)message; Order order=(Order)objectMessage.getObject(); System.out.println("order:"+order); } catch (JMSException e) { e.printStackTrace(); } } }
在spring配置文件applicationContext.xml整合ActiveMQ,如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd">
<!-- 添加掃描 -->
<context:component-scan base-package="sn.sxt.*"></context:component-scan>
<!-- ActiveMQ 連接工廠 -->
<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
<!-- 需提供訪問路徑tcp://ip:61616;以及用戶名,密碼 -->
<amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://192.168.142.190:61616" userName="admin" password="admin" />
<!-- Spring Caching連接工廠 -->
<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- Session緩存數量 -->
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 消息生產者 start -->
<!-- 定義JmsTemplate對象. 此類型由Spring框架JMS組件提供. 用於訪問ActiveMQ使用. -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(發布/訂閱),即隊列模式, 默認數據可省略配置 -->
<!-- <property name="pubSubDomain" value="false" /> -->
</bean>
<!-- 定義生成者對象 -->
<bean id="orderProducer" class="sn.sxt.activemq.OrderProducer">
<!-- 為屬性賦值 -->
<property name="template" ref="jmsQueueTemplate"></property>
</bean>
<!--消息生產者 end -->
<!-- 消息消費者 start -->
<!-- 定義消息監聽器, 此組件為spring-jms組件定義. 可以一次注冊若干消息監聽器. 屬性解釋: destination-type - 目的地類型, queue代表消息隊列 可選值: queue | topic | durableTopic queue - 默認值. 代表消息隊列 topic - 代表消息隊列集合 durableTopic - 持久化的消息隊列集合. ActiveMQ會保證消息的消費者一定接收到此消息. container-type - 容器類型 可選值: default | simple default - 默認值. 默認容器類型, 對應DefaultMessageListenerContainer simple - 簡單容器類型, 對應SimpleMessageListenerContainer connection-factory - 鏈接工廠, 注入的是Spring-JMS組件提供的鏈接工廠對象. acknowledge - 確認方式 可選值: auto | client | dups-ok | transacted auto - 默認值, 即自動確認消息 client - 客戶端確認消息 dups-ok - 可使用副本的客戶端確認消息 transacted - 有事務的持久化消息確認機制. 需開啟對ActiveMQ的事務控制才可應用. -->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<!-- 注冊消息監聽器. 如果需要注冊多個, 重復定義下述標簽. -->
<jms:listener destination="test-spring" ref="orderReciver" />
</jms:listener-container>
<!-- 容器管理消息監聽器實現類對象 -->
<bean id="orderReciver" class="sn.sxt.activemq.OrderConsumer"/>
<!-- 消息消費者 end -->
</beans>
然后我們就可以進行測試了
public class Test { public static void main(String[] args) { ApplicationContext ac=new ClassPathXmlApplicationContext("applicationContext.xml"); OrderProducer op=ac.getBean("orderProducer",OrderProducer.class); Order order=new Order(); order.setId("666"); order.setNick("我很強"); order.setPrice(999l); order.setCreateTime(new Date()); op.sendOrder("test-spring",order); } }
在Linux啟動ActiveMQ后,運行測試結果如下:
order:Order [id=666, nick=我很強, price=999, createTime=Wed Aug 14 16:11:35 CST 2019]
測試完畢!!!!