消息隊列的使用<二>:ActiveMQ的基本使用(Java)


首發時間:2019-05-16


ActiveMQ


介紹

  • ActiveMQ是Apache旗下的一款開源的消息隊列(消息中間件MOM,Message Oriented Middleware)
  • 它完全支持JMS,支持JMS1.1和J2EE 1.4 規范。
  • 支持多種網絡協議。
  • 兼容多種語言(C,C++,Java,Python,PHP)
  • 可以方便地與spring進行整合開發。
  • (其他的一些說的太多可能也不是很懂,以后想了解再自己了解吧,畢竟這里只是個小博文。)


下載、安裝和初次運行

1.下載:從ActiveMQ官網下載ActiveMQ,地址:http://activemq.apache.org/download.html

2.安裝:下載下來的是一個壓縮包,解壓即安裝,直接解壓到一個目錄即可;
3.初次運行:(在啟動ActiveMQ前,請先要已經安裝和配置好JDK)在windows版本的activemq中在activeMQ/bin下面有兩個目錄,為win32,win64,根據自己的系統位數進入不同的目錄,然后直接雙擊目錄下的activemq.bat (在linux中為: ./activemq start)
4.檢測是否啟動:ActiveMQ默認使用61616端口提供JMS服務,使用8161端口提供管理控制台服務,我們可以直接訪問activemq的管理控制台網頁來確定是否已經開始服務:localhost:8161/admin,默認的用戶名和密碼都是admin,輸入后將進入如下的界面:

5.關閉ActiveMQ:windosw中直接ctrl+c關閉cmd窗口(在linux中: ./activemq stop)


Java上初次使用activeMQ

這里以PTP模型的生產者和消費者的消息傳遞為例。


1.首先導入依賴包,以maven為例:

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>

2.生產者發送消息(以PTP方式為例):【這里是符合上面的“JMS應用開發基本步驟”的】

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer {

    public static void main(String[] args) throws JMSException {
        //1.創建connectionfacoty,參數是activemq的服務地址,前綴tcp代表是tcp連接
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        //2.使用ConnectionFactory創建connnect,並啟動connnect
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //3.使用Connection創建session,第一個參數是是否使用事務,第二個參數是確認機制
        Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
        //4.創建目的地(這里以PTP為例,所以目的地是一個Queue),參數是Queue的名字
        Destination destination = session.createQueue("tempqueue");
        //5.創建生產者,第一個參數是目的地,此時創建的生產者要與目的地進行綁定。
        MessageProducer producer = session.createProducer(destination);
        //6.使用session創建消息,這里使用TEXT類型的消息
        TextMessage textMessage = session.createTextMessage("hello world!");
        //7.生產者發送消息
        producer.send(textMessage);
        //8.提交事務
        session.commit();
        //9.關閉資源
        session.close();
        connection.close();
    }
}

3.消費者接收消息(以PTP方式為例):

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Consumer {
    public static void main(String[] args) throws JMSException {
        //1.創建connectionfacoty
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        //2.創建connnect,並啟動connnect
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //3.創建session,第一個參數是是否使用事務,第二個參數是確認機制
        Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
        //4.創建目的地【消費者與生產者的目的地相同才能進行消息傳遞】
        Destination destination = session.createQueue("tempqueue");
        //5.創建消費者,第一個參數是目的地,此時創建的消費者要與目的地進行綁定。
        MessageConsumer consumer = session.createConsumer(destination);
        //6.使用消費者接受消息消息
        TextMessage message = (TextMessage) consumer.receive();
        System.out.println(message.getText());
        //8.提交事務
        session.commit();
        //9.關閉資源
        session.close();
        connection.close();
    }
}

上述代碼解析:
1.前半部分代碼都是一樣的,都是創建ConnectionFactory,Connection,Session
2.然后創建目的地Destination,這個目的地就是要把消息存儲到哪里和從哪里取消息。
3.如果是生產者,那么由Session來創建生產者,創建的時候傳入一個目的地,來與生產者綁定,生產者調用send發送的消息都會存儲到目的地中。生產者發送的消息需要使用Session來創建,調用createXXXMessage來創建消息,創建什么類型的消息取決於使用什么方法來創建。
4.如果是消費者,那么由Session來創建消費者,創建的時候傳入一個消費者,來與消費者綁定,消費者調用receive時會從目的地中獲取消息。獲取到的結果是一個XXXMessage,通常需要轉成對應類型的Message,然后再調用對應的獲取消息體的方法來獲取消息體。例如TextMessage類型的消息要獲取消息體需要調用getText()。
5.如果使用了事務,那么需要session.commit()
6.最后關閉所有資源



設置請求屬性:

設置標准屬性:使用消息調用setJMS開頭的方法。【要注意的是為了避免發生過期的消息,任何直接通過編程方式來調用setJMSExpiration()方法都會被忽略。 】
設置自定義屬性:使用消息調用setXXXProperty的方法。

接受屬性:



可靠性機制

在上面的概念學習中你應該了解到,如果不使用事務來進行消息確定,那么需要手動使用消息來調用acknowledge來確認消息。【而且這時候是在會話層進行確認,所以在這個會話中只要一條消息進行了確認,其他消息也會被確認(即使他收了兩條消息只確認了一條)】

當使用了事務的時候,代碼中就不要使用acknowledge了,會影響消息的確認。



事務

在上面的概念學習有提到了事務,事務可以使一系列操作共同成功或失敗。下面來演示一下事務的使用。
1.首先,在創建Session的時候第一個參數是是否使用事務,要使用事務需要賦值TRUE。
2.提交事務使用session.commit(),回滾是session.rollback()
3.對於生產者,事務是確保消息發送的一致性;對於消費者,事務是確保消息消費的一致性,
4.對於事務的測試可以使用單步運行來測試,在發消息處打斷點,測試未commit時消費者是否能取到消息。



消息消費方式

在上面的概念學習中有談到消費者消費消息的兩種方式,一種是堵塞的receive,一種是監聽器。監聽器與receive最大的區別是如果沒有,那么監聽器不會等,而receive會等。所以監聽器適用於不希望堵塞程度運行的場景。

receive

上面的代碼都是使用堵塞的receive來接收的,你應該可以留意到當運行了消費者后,沒有取到消息的時候會一直堵塞在那里。receive也可以設置阻塞時間,時間到了就不再等了。

監聽器:

public class Consumer {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("tempqueue2");
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage msg = (TextMessage) message;
                try {
                    System.out.println("..."+msg.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        session.commit();
        session.close();
        connection.close();
    }
}


消息類型

在上面有介紹到消息體有不同的類型,TextMessage,MapMessage等。
不同的消息類似影響存儲到消息隊列的消息的格式,也影響取消息的方式,取消息的方式要與消息類型對應。

TextMessage:數據類型類似於字符串。使用getText()來獲取數據。



MapMessage:數據類型類似於Map。使用getXXX(key)來獲取數據。





發布/訂閱模式

上面主要講的都是PTP模式,下面來講一下發布/訂閱模式。在上面的概念學習中,有涉及到多消費者廣播、持久化訂閱,下面將演示這些概念的實際使用。

非持久訂閱

先演示非持久訂閱的,由於非持久訂閱只能發送給在線的消費者,所以先運行消費者(多個)。【非持久訂閱的消息接收與PTP一樣可以使用receive】

然后創建生產者發生消息:

注意:在非持久化訂閱中,通常要使消費者持續receive,所以通常使用while循環來接受消息。

Message message = consumer.receive();
while(message!=null){
   TextMessage txtMsg = (TextMessage)message;
   sysout(textmsg.getText());
   message = consumer.receive();
}

持久化訂閱

要進行持久化訂閱,首先要將生產者的發送模式改成持久化模式,這個設置要在connection.start()之前

然后消費者要創建持久訂閱器,而且要在消息發送之前先運行一次把持久化訂閱器注冊到消息隊列上。
【注意:需要在連接上設置消費者ID,用來識別消費者,持久化訂閱器識別消費者依靠消費者ID,如果不設置,那么下一次“上線”的時候,由於消費者ID會變化,導致訂閱器無法與消費者進行關聯】

public class Consumer {
    public static void main(String[] args) throws JMSException {
        //1.創建connectionfacoty
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        //2.創建connnect,並啟動connnect
        Connection connection = connectionFactory.createConnection();
        connection.setClientID("001");

        //3.創建session,第一個參數是是否使用事務,第二個參數是確認機制
        Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic("tempTopic");
        TopicSubscriber s1 = session.createDurableSubscriber((Topic) destination, "s1");
        connection.start();
        Message msg = s1.receive();
        while (msg!=null){
            TextMessage txtmsg = (TextMessage) msg;
            System.out.println(txtmsg.getText());
            session.commit(); // 由於此時使用了while,所以要在里面commit
            msg= s1.receive();
        }
        session.close();
        connection.close();
    }
}


Broker

Broker是ActiveMQ服務器實例。
在使用獨立的ActiveMQ程序的時候,有時候會創建不同需求的服務器實例,通常來說都是使用某個配置文件進行創建。
而也是可以通過代碼內建一個Broker的,內建的Broker比較小巧,適用於一些希望把Broker整合到項目中的場景。

1.通過BrokerService創建:


上述代碼報錯java.lang.NoClassDefFoundError:com/fasterxml/jackson/databind/ObjectMapper時可能缺乏以下依賴包:

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.5</version>
        </dependency>

2.通過BrokerFactory創建:


使用BrokerFactory創建時需要一個配置文件,這個配置文件的路徑要被傳入到BrokerFactory中,在上面的代碼中就是properties:broker.properties,配置文件里面是這個Broker的參數。
下面是broker.properties的一個例子:

useJms=true
persistent=false
brokerName=FactoryBroker

對於能配置什么,可以參考使用BrokerService創建Broker時的setXXX。



整合spring開發

現在很多的項目都使用到了Spring,所以這里也講一下與Spring的整合。
首先理一下,ActiveMQ有什么可以交給Spring來管理的?可以說可以交給Spring管理的只有Destination,ConnectionFactory,Connection和Broker,只有這幾個實例的復用性比較強,這幾個的管理會在JmsTemplate使用中展示。

如果你學過Hibernate之類的框架,你應該知道Spring對Hibernate提供HibernateTemplate來整合,HibernateTemplate封裝了Hibernate的一些方法,簡化了使用,在使用HibernateTemplate的時候,dao層的類需要繼承HibernateDaoSupport,然后同時需要在類中注入Connection,這樣HibernateTemplate才可以正常工作。JmsTemplate類似於HibernateTemplate,不過它是面向JMS的。

使用JmsTemplate

1.首先要編寫Spring配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!--管理destination-->
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="spring-queue"/> <!-- 目的地名稱 -->
    </bean>
    <bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg name="name" value="spring-topic"/> <!-- 目的地名稱 -->
    </bean>
    
    <!--管理connectionFactory-->
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>tcp://localhost:61616</value>
                </property>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

    <!--管理JmsTemplate-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"></property>
        <property name="defaultDestination" ref="destinationQueue"></property>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"></bean>
        </property>
    </bean>

</beans>

2.然后創建測試類:


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import javax.annotation.Resource;
import javax.jms.*;

@RunWith(SpringJUnit4ClassRunner.class) //使用junit4進行測試
@ContextConfiguration(locations={"classpath:applicationContext.xml"}) //加載配置文件
public class JmsTemplateDemo {
    @Autowired
    private JmsTemplate jmsTemplate;
    @Resource(name="destinationQueue")
    private Destination destinationQueue;
    //發送
    @Test
    public  void send() throws JMSException {
        String msg = "hello world!";
        jmsTemplate.send(destinationQueue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });

    }
    //接收
    @Test
    public  void recv() throws JMSException {
        TextMessage message = (TextMessage) jmsTemplate.receive(destinationQueue);
        System.out.println(message.getText());
    }
}

在上面的代碼中可以使用jmsTemplate.send來發送消息,使用jmsTemplate.receive來接收消息。對於消息確認和事務管理則不需要關心,JmsTemplate會自己處理的。

監聽器

在上面的JmsTemplate接收消息中使用了receive來接收消息,Spring還支持使用監聽器來接收消息,配置監聽器,來達到一有消息就執行某些操作,這樣就省去了消費者的代碼。
1.首先,需要需要在spring配置文件中配置DefaultMessageListenerContainer【還有其他幾種監聽器】:

    <!--配置DefaultMessageListenerContainer -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsFactory"></property>
        <!--監聽器 -->
        <property name="messageListener" ref="myMessageListener"></property>
        <!--監聽哪個目的地 -->
        <property name="destination" ref="destinationQueue"></property>
     </bean>

    <!--配置監聽器-->
    <bean id="myMessageListener" class="withspring.MyMessageListener">
    </bean>

2.然后,定義一個監聽器:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String msg = txtMsg.getText();
                System.out.println("recv:"+ msg);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        } else {
            throw new IllegalArgumentException("消息類型錯誤!");
        }
    }
}

3.最后,運行隨便在這個項目中的一個發送消息的測試方法。



使用spring集合Broker

1.通過BrokerService:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
           
        <bean id="broker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
            <property name="brokerName" value="SpringBroker"/>
            <property name="persistent" value="false"/>
            <property name="transportConnectorURIs">
                <list>
                    <value>tcp://localhost:61616</value>
                </list>
            </property>
        </bean>
    </beans>

【還可以通過BrokerFactory或BrokerFactoryBean來創建,這里省略】
【當然,上面的是比較簡單的,沒有進行權限管理,你也登錄不了管理頁,想要確定是否創建成功可以監聽接口也可以進行生產和消費消息】




后續可擴展內容


這里只是一篇小博客,寫不了太多東西。如果想要了解更精細,可以去買書來看。 下面寫一下后續可擴展學習的內容,學不學由個人考慮。
* 傳輸協議【上面介紹了tcp://localhost:61616,其實還可以允許非TCP的連接】 * 消息存儲持久化【消息是怎么進行存儲的】 * KahaDB * AMQ * JDBC * 內存存儲 * 部署與集群 * 優化


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM