Active MQ學習筆記


一.Active MQ介紹

ActiveMQ Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。

主要特點:

1. 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2. 完全支持JMS1.1J2EE 1.4規范 (持久化,XA消息,事務)

3. Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性

4. 通過了常見J2EE服務器(Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器上

5. 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

6. 支持通過JDBCjournal提供高速的消息持久化

7. 從設計上保證了高性能的集群,客戶端-服務器,點對點

8. 支持Ajax

9. 支持與Axis的整合

10. 可以很容易得調用內嵌JMS provider,進行測試

 

 

 

 

二.Active MQ的安裝啟動

1、到官網http://activemq.apache.org/download.html下載最新版本,根據需要,可選擇windows版本和linux版本。

2.本項目采用了windows版本,下載:apache-activemq-5.13.3.zip,解壓,如下圖:

進入bin目錄,發現有win32和win64兩個文件夾,這2個文件夾分別對應windows32位和windows64位操作系統的啟動腳本。結合對應的電腦系統,

雙擊activemq.bat腳本進行MQ服務器的啟動

 

得到如圖:

 

3.ActiveMQ默認啟動到8161端口,啟動完了后在瀏覽器地址欄輸入:http://localhost:8161/admin要求輸入用戶名密碼,默認用戶名密碼為admin、admin,這個用戶名密碼是在conf/users.properties中配置的。輸入用戶名密碼后便可看到如下圖的ActiveMQ控制台界面了。

 

三.Active MQSpring的整合配置

1.首先在項目中導入以下的jar包:

2.spring的整合配置文件:i-Factory-BEANS-JMS.xml

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 

xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:tx="http://www.springframework.org/schema/tx" 

xmlns:context="http://www.springframework.org/schema/context"

xmlns:jee="http://www.springframework.org/schema/jee"

xmlns:p="http://www.springframework.org/schema/p"

xsi:schemaLocation="

    http://www.springframework.org/schema/context

    http://www.springframework.org/schema/context/spring-context-3.0.xsd 

    http://www.springframework.org/schema/beans

    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

    http://www.springframework.org/schema/tx

    http://www.springframework.org/schema/tx/spring-tx-3.0.xsd

    http://www.springframework.org/schema/aop

    http://www.springframework.org/schema/aop/spring-aop-3.0.xsd

    http://www.springframework.org/schema/jee

    http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">

 

<import resource="cn/myccit/conf/imes-baseedi-action_BEAN_Inherit.xml"/><!-- basereport模塊 -->

    <!--MQ 配置-->

    <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  

        <property name="connectionFactory">  

            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  

                <property name="brokerURL" value="tcp://192.168.16.233:61616" />  

                 <!-- 是否異步發送 -->  

                <property name="useAsyncSend" value="true" />  

                <property name="closeTimeout" value="60000" />  

                <property name="userName" value="admin" />  

                <property name="password" value="admin" />  

                <property name="optimizeAcknowledge" value="true" /> 

                <property name="optimizedAckScheduledAckInterval" value="10000" />  

            </bean>  

        </property>  

    </bean>  

 

     <!-- Spring Caching連接工廠 -->

    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->  

    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">

     <!--<property name="clientId" value="1" />  -->

        <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->  

        <property name="targetConnectionFactory" ref="jmsConnectionFactory"></property>

        <!-- 同上,同理 -->

        <!-- <constructor-arg ref="amqConnectionFactory" /> -->

        <!-- Session緩存數量 -->

        <property name="sessionCacheSize" value="100" />

    </bean>

     

    <!-- Spring JMS Template -->  

    <!--<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  

        <property name="connectionFactory">  

            <ref local="jmsConnectionFactory" />  

        </property>  

    </bean>  -->

 

  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

    <property name="connectionFactory" ref="jmsConnectionFactory" />

    <property name="defaultDestination" ref="asyncQueue" />

    <property name="receiveTimeout" value="10000" />

  </bean>              

             <!--queue通道-->  

      <bean id="asyncQueue" class="org.apache.activemq.command.ActiveMQQueue">  

        <constructor-arg index="0">  

            <value>firstQueue</value>  

        </constructor-arg>  

      </bean>  

    

  <bean id="jmsTemplate2" class="org.springframework.jms.core.JmsTemplate">

    <property name="connectionFactory" ref="jmsConnectionFactory" />

    <property name="defaultDestination" ref="asyncTopic" />

    <property name="receiveTimeout" value="10000" />

  </bean>              

            <!--topic通道-->  

   <bean id="asyncTopic" class="org.apache.activemq.command.ActiveMQTopic">  

        <constructor-arg index="0">  

            <value>asyncTopic</value>  

        </constructor-arg>  

    </bean>  

    

 

  <!-- 配置消息消費監聽者 -->

  <bean id="machineMessageListener" class="cn.myccit.ifactory.edu.service.msg.MachineDataListener" >

   <property name="machineLogDAO" ref="machineLogDAO"></property>

  </bean>

  <!-- 消息監聽容器,配置連接工廠,監聽器是上面定義的監聽器 -->

  <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

    <property name="connectionFactory" ref="connectionFactory" />

    <property name="destination" ref="asyncQueue" />

    <!--主題(Topic)和隊列消息的主要差異體現在JmsTemplate中"pubSubDomain"是否設置為True。如果為True,則是Topic;如果是false或者默認,則是queue-->

    <property name="pubSubDomain" value="false" />

    <property name="messageListener" ref="machineMessageListener" />

  </bean>

 

  

  

    <!-- 消息監聽適配器 -->  

    <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  

        <property name="delegate">  

            <bean class="cn.myccit.imes.baseedi.service.impl.TopicMsgServiceImpl"/>  

        </property>  

        <property name="defaultListenerMethod" value="receiveMessage"/>  

    </bean>  

 

    <!-- 消息監聽適配器對應的監聽容器 -->  

    <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  

        <property name="connectionFactory" ref="connectionFactory"/>  

        <property name="destination" ref="asyncTopic"/>  

        <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter來作為消息監聽器 -->  

    </bean>  

    

</beans>

 

 

四.Active MQ的工作機制

Active MQ 的工作機制主要是通過JMSJava Message Service作為應用程序接口,用於在兩個應用程序之間,或分布式系統中發送消息。JSM它是一個Java平台中關於面向消息中間件(MOM)的API,通過它提供標准的產生、發送、接收消息的接口簡化企業應用的開發,翻譯為Java消息服務。我們在應用jms的結構為:

1.JMS應用程序結構支持兩種模型

隊列模型

隊列模型下,一個生產者向一個特定的隊列發布消息,一個消費者從該隊列中讀取消息。這里,生產者知道消費者的隊列,並直接將消息發送到消費者的隊列。這種模式被概括為:只有一個消費者將獲得消息。生產者不需要在接收者消費該消息期間處於運行狀態,接收者也同樣不需要在消息發送時處於運行狀態。每一個成功處理的消息都由接收者簽收。

發布者/訂閱者模型

發布者/訂閱者模型支持向一個特定的消息主題發布消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發布者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。這種模式被概括為:多個消費者可以獲得消息.在發布者和訂閱者之間存在時間依賴性。發布者需要建立一個訂閱(subscription),以便客戶能夠購訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連接時發布的消息將在訂閱者重新連接時重新發布

  2.JMS應用程序要實現的接口:

ConnectionFactory 接口(連接工廠)

用戶用來創建到JMS提供者的連接的被管對象。JMS客戶通過可移植的接口訪問連接,這樣當下層的實現改變時,代碼不需要進行修改。 管理員在JNDI名字空間中配置連接工廠,這樣,JMS客戶才能夠查找到它們。根據消息類型的不同,用戶將使用隊列連接工廠,或者主題連接工廠。

Connection 接口(連接)

連接代表了應用程序和消息服務器之間的通信鏈路。在獲得了連接工廠后,就可以創建一個與JMS提供者的連接。根據不同的連接類型,連接允許用戶創建會話,以發送和接收隊列和主題到目標。

Destination 接口(目標)

目標是一個包裝了消息目標標識符的被管對象,消息目標是指消息發布和接收的地點,或者是隊列,或者是主題。JMS管理員創建這些對象,然后用戶通過JNDI發現它們。和連接工廠一樣,管理員可以創建兩種類型的目標,點對點模型的隊列,以及發布者/訂閱者模型的主題。

MessageConsumer 接口(消息消費者)

由會話創建的對象,用於接收發送到目標的消息。消費者可以同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。

MessageProducer 接口(消息生產者)

由會話創建的對象,用於發送消息到目標。用戶可以創建某個目標的發送者,也可以創建一個通用的發送者,在發送消息時指定目標。

Message 接口(消息)

是在消費者和生產者之間傳送的對象,也就是說從一個應用程序創送到另一個應用程序。一個消息有三個主要部分:

消息頭(必須):包含用於識別和為消息尋找路由的操作設置。

一組消息屬性(可選):包含額外的屬性,支持其他提供者和用戶的兼容。可以創建定制的字段和過濾器(消息選擇器)。

一個消息體(可選):允許用戶創建五種類型的消息(文本消息,映射消息,字節消息,流消息和對象消息)。

消息接口非常靈活,並提供了許多方式來定制消息的內容。

Session 接口(會話)

表示一個單線程的上下文,用於發送和接收消息。由於會話是單線程的,所以消息是連續的,就是說消息是按照發送的順序一個一個接收的。會話的好處是它支持事務。如果用戶選擇了事務支持,會話上下文將保存一組消息,直到事務被提交才發送這些消息。在提交事務之前,用戶可以使用回滾操作取消這些消息。一個會話允許用戶創建消息生產者來發送消息,創建消息消費者來接收消息。

 

本項目采用的通訊方式通過配置TCP/IP和端口進行實現:

<!--MQ 配置-->

    <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  

        <property name="connectionFactory">  

            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  

                <property name="brokerURL" value="tcp://192.168.16.233:61616" />  

                 <!-- 是否異步發送 -->  

                <property name="useAsyncSend" value="true" />  

                <property name="closeTimeout" value="60000" />  

                <property name="userName" value="admin" />  

                <property name="password" value="admin" />  

                <property name="optimizeAcknowledge" value="true" /> 

                <property name="optimizedAckScheduledAckInterval" value="10000" />  

            </bean>  

        </property>  

    </bean>  

四.Active MQ展示和常用操作

1.PLC端輸出的模擬數據:

2.控制台通過接收的數據:

3.看板顯示界面顯示的數據:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM