一.Active MQ介紹
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。
主要特點:
1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
2. 完全支持JMS1.1和J2EE 1.4規范 (持久化,XA消息,事務)
3. 對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性
4. 通過了常見J2EE服務器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器上
5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
6. 支持通過JDBC和journal提供高速的消息持久化
7. 從設計上保證了高性能的集群,客戶端-服務器,點對點
8. 支持Ajax
9. 支持與Axis的整合
10. 可以很容易得調用內嵌JMS provider,進行測試
二.Active MQ的安裝啟動
1、到官網(http://activemq.apache.org/download.html)下載最新版本,根據需要,可選擇windows版本和linux版本。
2.本項目采用了windows版本,下載:apache-activemq-5.13.3.zip,解壓,如下圖:
進入bin目錄,發現有win32和win64兩個文件夾,這2個文件夾分別對應windows32位和windows64位操作系統的啟動腳本。結合對應的電腦系統,
雙擊activemq.bat腳本進行MQ服務器的啟動
得到如圖:
3.ActiveMQ默認啟動到8161端口,啟動完了后在瀏覽器地址欄輸入:http://localhost:8161/admin要求輸入用戶名密碼,默認用戶名密碼為admin、admin,這個用戶名密碼是在conf/users.properties中配置的。輸入用戶名密碼后便可看到如下圖的ActiveMQ控制台界面了。
三.Active MQ與Spring的整合配置
1.首先在項目中導入以下的jar包:
2.spring的整合配置文件:i-Factory-BEANS-JMS.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">
<import resource="cn/myccit/conf/imes-baseedi-action_BEAN_Inherit.xml"/><!-- basereport模塊 -->
<!--MQ 配置-->
<bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.16.233:61616" />
<!-- 是否異步發送 -->
<property name="useAsyncSend" value="true" />
<property name="closeTimeout" value="60000" />
<property name="userName" value="admin" />
<property name="password" value="admin" />
<property name="optimizeAcknowledge" value="true" />
<property name="optimizedAckScheduledAckInterval" value="10000" />
</bean>
</property>
</bean>
<!-- Spring Caching連接工廠 -->
<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!--<property name="clientId" value="1" /> -->
<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="jmsConnectionFactory"></property>
<!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session緩存數量 -->
<property name="sessionCacheSize" value="100" />
</bean>
<!-- Spring JMS Template -->
<!--<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<ref local="jmsConnectionFactory" />
</property>
</bean> -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="defaultDestination" ref="asyncQueue" />
<property name="receiveTimeout" value="10000" />
</bean>
<!--queue通道-->
<bean id="asyncQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0">
<value>firstQueue</value>
</constructor-arg>
</bean>
<bean id="jmsTemplate2" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="defaultDestination" ref="asyncTopic" />
<property name="receiveTimeout" value="10000" />
</bean>
<!--topic通道-->
<bean id="asyncTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0">
<value>asyncTopic</value>
</constructor-arg>
</bean>
<!-- 配置消息消費監聽者 -->
<bean id="machineMessageListener" class="cn.myccit.ifactory.edu.service.msg.MachineDataListener" >
<property name="machineLogDAO" ref="machineLogDAO"></property>
</bean>
<!-- 消息監聽容器,配置連接工廠,監聽器是上面定義的監聽器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="asyncQueue" />
<!--主題(Topic)和隊列消息的主要差異體現在JmsTemplate中"pubSubDomain"是否設置為True。如果為True,則是Topic;如果是false或者默認,則是queue-->
<property name="pubSubDomain" value="false" />
<property name="messageListener" ref="machineMessageListener" />
</bean>
<!-- 消息監聽適配器 -->
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<property name="delegate">
<bean class="cn.myccit.imes.baseedi.service.impl.TopicMsgServiceImpl"/>
</property>
<property name="defaultListenerMethod" value="receiveMessage"/>
</bean>
<!-- 消息監聽適配器對應的監聽容器 -->
<bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="asyncTopic"/>
<property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter來作為消息監聽器 -->
</bean>
</beans>
四.Active MQ的工作機制
Active MQ 的工作機制主要是通過JMS(Java Message Service)作為應用程序接口,用於在兩個應用程序之間,或分布式系統中發送消息。JSM,它是一個Java平台中關於面向消息中間件(MOM)的API,通過它提供標准的產生、發送、接收消息的接口簡化企業應用的開發,翻譯為Java消息服務。我們在應用jms的結構為:
1.JMS應用程序結構支持兩種模型:
隊列模型
隊列模型下,一個生產者向一個特定的隊列發布消息,一個消費者從該隊列中讀取消息。這里,生產者知道消費者的隊列,並直接將消息發送到消費者的隊列。這種模式被概括為:只有一個消費者將獲得消息。生產者不需要在接收者消費該消息期間處於運行狀態,接收者也同樣不需要在消息發送時處於運行狀態。每一個成功處理的消息都由接收者簽收。
發布者/訂閱者模型
發布者/訂閱者模型支持向一個特定的消息主題發布消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發布者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。這種模式被概括為:多個消費者可以獲得消息.在發布者和訂閱者之間存在時間依賴性。發布者需要建立一個訂閱(subscription),以便客戶能夠購訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連接時發布的消息將在訂閱者重新連接時重新發布。
2.JMS應用程序要實現的接口:
ConnectionFactory 接口(連接工廠)
用戶用來創建到JMS提供者的連接的被管對象。JMS客戶通過可移植的接口訪問連接,這樣當下層的實現改變時,代碼不需要進行修改。 管理員在JNDI名字空間中配置連接工廠,這樣,JMS客戶才能夠查找到它們。根據消息類型的不同,用戶將使用隊列連接工廠,或者主題連接工廠。
Connection 接口(連接)
連接代表了應用程序和消息服務器之間的通信鏈路。在獲得了連接工廠后,就可以創建一個與JMS提供者的連接。根據不同的連接類型,連接允許用戶創建會話,以發送和接收隊列和主題到目標。
Destination 接口(目標)
目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發布和接收的地點,或者是隊列,或者是主題。JMS管理員創建這些對象,然后用戶通過JNDI發現它們。和連接工廠一樣,管理員可以創建兩種類型的目標,點對點模型的隊列,以及發布者/訂閱者模型的主題。
MessageConsumer 接口(消息消費者)
由會話創建的對象,用於接收發送到目標的消息。消費者可以同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。
MessageProducer 接口(消息生產者)
由會話創建的對象,用於發送消息到目標。用戶可以創建某個目標的發送者,也可以創建一個通用的發送者,在發送消息時指定目標。
Message 接口(消息)
是在消費者和生產者之間傳送的對象,也就是說從一個應用程序創送到另一個應用程序。一個消息有三個主要部分:
消息頭(必須):包含用於識別和為消息尋找路由的操作設置。
一組消息屬性(可選):包含額外的屬性,支持其他提供者和用戶的兼容。可以創建定制的字段和過濾器(消息選擇器)。
一個消息體(可選):允許用戶創建五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。
消息接口非常靈活,並提供了許多方式來定制消息的內容。
Session 接口(會話)
表示一個單線程的上下文,用於發送和接收消息。由於會話是單線程的,所以消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支持事務。如果用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務之前,用戶可以使用回滾操作取消這些消息。一個會話允許用戶創建消息生產者來發送消息,創建消息消費者來接收消息。
本項目采用的通訊方式通過配置TCP/IP和端口進行實現:
<!--MQ 配置-->
<bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.16.233:61616" />
<!-- 是否異步發送 -->
<property name="useAsyncSend" value="true" />
<property name="closeTimeout" value="60000" />
<property name="userName" value="admin" />
<property name="password" value="admin" />
<property name="optimizeAcknowledge" value="true" />
<property name="optimizedAckScheduledAckInterval" value="10000" />
</bean>
</property>
</bean>
四.Active MQ展示和常用操作
1.PLC端輸出的模擬數據:
2.控制台通過接收的數據:
3.看板顯示界面顯示的數據: