ActiveMQ簡介


    我們都知道ActiveMQ是消息中間件,首先我們先來了解下一些相關的概念;

消息

“消息”是在兩台計算機間傳送的數據單位。消息可以非常簡單,例如只包含文本字符串;也可以更復雜,可能包含嵌入對象。

 

消息隊列

 

“消息隊列”是在消息的傳輸過程中保存消息的容器,

消息隊列管理器在將消息從它的源中繼到它的目標時充當中間人,

隊列的主要目的是提供路由並保證消息的傳遞;如果發送消息時接收者不可用,消息隊列會保留消息,直到可以成功地傳遞它。

 

消息隊列的特點

 

消息隊列的主要特點是異步處理,主要目的是減少請求響應時間和解耦。

所以主要的使用場景就是將比較耗時而且不需要即時(同步)返回結果的操作作為消息放入消息隊列。

同時由於使用了消息隊列,只要保證消息格式不變,消息的發送方和接收方並不需要彼此聯系,也不需要受對方的影響,即解耦和。

 

ActiveMQ簡介

ActiveMQ是Apache軟件基金會所研發的開放源代碼消息中間件;由於ActiveMQ是一個純Java程序,因此只需要操作系統支持Java虛擬機,ActiveMQ便可執行。

 Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去通過了常見J2EE服務器的測試

 

 

ActiveMQ角色介紹

1,Destination 目的地,消息發送者需要指定Destination才能發送消息,接收者需要指定Destination才能接收消息。

2,Producer 消息生產者,負責發送Message到目的地

3,Consumer/Receiver 消息消費者,負責從目的地中消費(處理/監聽/訂閱)Message

4,Message 消息 消息封裝一次通信的內容。

這里我們在spring整合ActiveMQ來寫個小例子.

先創建一個maven項目,並在pom.xml添加相關依賴,如下:

    <dependencies>
        <!-- ActiveMQ客戶端完整jar包依賴 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>
        <!-- ActiveMQ和Spring整合配置文件標簽處理jar包依賴 -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>4.5</version>
        </dependency>
        <!-- Spring-JMS插件相關jar包依賴 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.1.6.RELEASE</version>
        </dependency>
        <!-- Spring框架上下文jar包依賴 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.1.6.RELEASE</version>
        </dependency>
    </dependencies>

 

然后定義消息載體類型. 即要在ActiveMQ中傳遞的數據實體類型,

消息載體對象必須實現接口java.io.Serializable, 因為消息需要在網絡中傳遞,要求必須可序列化。

代碼如下:

 

public class Order implements Serializable { private static final long serialVersionUID = 1L; private String id; private String nick; private Long price; private Date createTime; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getNick() { return nick; } public void setNick(String nick) { this.nick = nick; } public Long getPrice() { return price; } public void setPrice(Long price) { this.price = price; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public static long getSerialversionuid() { return serialVersionUID; } @Override public String toString() { return "Order [id=" + id + ", nick=" + nick + ", price=" + price + ", createTime=" + createTime + "]"; } }

 

定義生產者

public class OrderProducer { private JmsTemplate template; public JmsTemplate getTemplate() { return template; } public void setTemplate(JmsTemplate template) { this.template = template; } /** * 生產者發送消息 * @param destinationName 目的地名稱 * @param order 需要發送的訂單數據 */
    public void sendOrder(String destinationName, Order order){ try{ template.send(destinationName, new MessageCreator() { /** * 通過模板模式暴露的方法設置發送的信息 */ @Override public Message createMessage(Session session) throws JMSException { ObjectMessage objectMessage = session.createObjectMessage(order); return objectMessage; } }); }catch(Exception e){ e.printStackTrace(); throw new RuntimeException("send order to MQ server error!!"); } } 

 

 

再來定義消費者:

public class OrderConsumer implements MessageListener { @Override public void onMessage(Message message) { try { ObjectMessage objectMessage=(ObjectMessage)message; Order order=(Order)objectMessage.getObject(); System.out.println("order:"+order); } catch (JMSException e) { e.printStackTrace(); } } }

在spring配置文件applicationContext.xml整合ActiveMQ,如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.9.0.xsd">

    <!-- 添加掃描 -->
    <context:component-scan base-package="sn.sxt.*"></context:component-scan>

    <!-- ActiveMQ 連接工廠 -->
    <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
    <!-- 需提供訪問路徑tcp://ip:61616;以及用戶名,密碼 -->
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://192.168.142.190:61616" userName="admin" password="admin" />

    <!-- Spring Caching連接工廠 -->
    <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- Session緩存數量 -->
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 消息生產者 start -->

    <!-- 定義JmsTemplate對象. 此類型由Spring框架JMS組件提供. 用於訪問ActiveMQ使用. -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
        <constructor-arg ref="connectionFactory" />
        <!-- 非pub/sub模型(發布/訂閱),即隊列模式, 默認數據可省略配置 -->
        <!-- <property name="pubSubDomain" value="false" /> -->
    </bean>

    <!-- 定義生成者對象 -->
    <bean id="orderProducer" class="sn.sxt.activemq.OrderProducer">
        <!-- 為屬性賦值 -->
        <property name="template" ref="jmsQueueTemplate"></property>
    </bean>

    <!--消息生產者 end -->

    <!-- 消息消費者 start -->

    <!-- 定義消息監聽器, 此組件為spring-jms組件定義. 可以一次注冊若干消息監聽器. 屬性解釋: destination-type - 目的地類型, queue代表消息隊列 可選值: queue | topic | durableTopic queue - 默認值. 代表消息隊列 topic - 代表消息隊列集合 durableTopic - 持久化的消息隊列集合. ActiveMQ會保證消息的消費者一定接收到此消息. container-type - 容器類型 可選值: default | simple default - 默認值. 默認容器類型, 對應DefaultMessageListenerContainer simple - 簡單容器類型, 對應SimpleMessageListenerContainer connection-factory - 鏈接工廠, 注入的是Spring-JMS組件提供的鏈接工廠對象. acknowledge - 確認方式 可選值: auto | client | dups-ok | transacted auto - 默認值, 即自動確認消息 client - 客戶端確認消息 dups-ok - 可使用副本的客戶端確認消息 transacted - 有事務的持久化消息確認機制. 需開啟對ActiveMQ的事務控制才可應用. -->
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
        <!-- 注冊消息監聽器. 如果需要注冊多個, 重復定義下述標簽. -->
        <jms:listener destination="test-spring" ref="orderReciver" />
    </jms:listener-container>

    <!-- 容器管理消息監聽器實現類對象 -->
    <bean id="orderReciver" class="sn.sxt.activemq.OrderConsumer"/>

    <!-- 消息消費者 end -->

</beans>

然后我們就可以進行測試了

public class Test { public static void main(String[] args) { ApplicationContext ac=new ClassPathXmlApplicationContext("applicationContext.xml"); OrderProducer op=ac.getBean("orderProducer",OrderProducer.class); Order order=new Order(); order.setId("666"); order.setNick("我很強"); order.setPrice(999l); order.setCreateTime(new Date()); op.sendOrder("test-spring",order); } }

在Linux啟動ActiveMQ后,運行測試結果如下:

order:Order [id=666, nick=我很強, price=999, createTime=Wed Aug 14 16:11:35 CST 2019]

測試完畢!!!!

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM