JMS--ActiveMQ的簡單使用


一.消息中間件概述

  消息中間件利用高效可靠的消息傳遞機制進行平台無關的數據交流,並基於數據通信來進行分布式系統的集成。通過提供消息傳遞和消息排隊模型,它可以在分布式環境下擴展進程間的通信。對於消息中間件,常見的角色大致也就有 Producer(生產者)、Consumer(消費者)。消息隊列中間件是分布式系統中重要的組件,主要解決應用解耦,異步消息,流量削鋒等問題,實現高性能,高可用,可伸縮和最終一致性架構。

1.1常見消息中間件

ActiveMQ

ActiveMQ是 Apache 出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持 JMS1.1 和J2EE 1.4 規范的 JMS Provider 實現。

RabbitMQ

AMQP 協議的領導實現,支持多種場景。淘寶的 MySQL 集群內部有使用它進行通訊,OpenStack 開源雲平台的通信組件,最先在金融行業得到運用。

ZeroMQ

史上最快的消息隊列系統。

Kafka

Apache 下的一個子項目 。特點:高吞吐,在一台普通的服務器上既可以達到 10W/s 的吞吐速率;完全的分布式系統。適合處理海量數據(消息丟失率較高)。

1.2應用場景

  • 異步處理

  • 應用解耦

  • 流量削峰

  • 消息通訊

二.JMS消息服務

  JMS(Java Messaging Service)是 Java 平台上有關面向消息中間件的技術規范,它便於消息系統中的Java 應用程序進行消息交換,並且通過提供標准的產生、發送、接收消息的接口簡化企業應用的開發。JMS 本身只定義了一系列的接口規范,是一種與廠商無關的 API,用來訪問消息收發系統。它類似JDBC(java Database Connectivity):這里,JDBC 是可以用來訪問許多不同關系數據庫的 API,而 JMS則提供同樣與廠商無關的訪問方法,以訪問消息收發服務。

2.1JMS消息模型

消息中間件一般有兩種傳遞模式:點對點模式(P2P)和發布-訂閱模式(Pub/Sub)。

2.1.1點對點模型

點對點模型(Pointer-to-Pointer):即生產者和消費者之間的消息往來。每個消息都被發送到特定的消息隊列,接收者從隊列中獲取消息。隊列保留着消息,直到他們被消費或超時。

特點

  • 每個消息只有一個消費者(一旦被消費,就不在消息隊列中了)

  • 發送者和接收者之間沒有依賴,直接發送,不管是否有消費者

  • 接收者成功接收消息后需向隊列應答成功

2.1.2發布/訂閱模型

  發布/訂閱(Publish-Subscribe):包含三個角色:主體(Topic),發布者(Publisher),訂閱者(Subscriber),多個發布者將消息發送到 topic,系統將這些消息投遞到訂閱此 topic 的訂閱者。發布者發送到 topic 的消息,只有訂閱了 topic 的訂閱者才會收到消息。topic 實現了發布和訂閱,當你發布一個消息,所有訂閱這個 topic 的服務都能得到這個消息。

特點

  • 每個消息可有有多個消費者

  • 發布者和訂閱者之間有時間上的依賴

  • 針對某個主題(Topic)的訂閱者,它必須創建一個訂閱者之后,才能消費發布者的消息,而且為了消費消息,訂閱者必須保持運行的狀態

2.2JMS編程模型

ConnectionFactory

創建Connection對象的工廠,針對兩種不同的 jms 消息模型,分別有 QueueConnectionFactory 和TopicConnectionFactory 兩種。

Destination

  Destination 的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對於消息生產者來說,它的 Destination 是某個隊列(Queue)或某個主題(Topic);對於消息消費者來說,它的 Destination 也是某個隊列或主題(即消息來源)。所以,Destination 實際上就是兩種類型的對象:Queue、Topic。

Connection

  Connection 表示在客戶端和 JMS 系統之間建立的鏈接(對 TCP/IP socket 的包裝)。Connection 可以產生一個或多個 Session。

Session

  Session 是我們對消息進行操作的接口,可以通過 session 創建生產者、消費者、消息等。Session 提供了事務的功能,如果需要使用 session 發送/接收多個消息時,可以將這些發送/接收動作放到一個事務中。

Producer

  Producer(消息生產者):消息生產者由 Session 創建,並用於將消息發送到 Destination。同樣,消息生產者分兩種類型:QueueSender和TopicPublisher。可以調用消息生產者的方法(send或publish方法)發送消息。

Consumer

  Consumer(消息消費者):消息消費者由 Session 創建,用於接收被發送到 Destination 的消息。兩種類型:QueueReceiver 和 TopicSubscriber。可分別通過 session 的 createReceiver(Queue)或createSubscriber(Topic)來創建。當然,也可以 session 的 creatDurableSubscriber 方法來創建持久化的訂閱者。

MessageListener

  消息監聽器。如果注冊了消息監聽器,一旦消息到達,將自動調用監聽器的 onMessage 方法。EJB 中的 MDB(Message-Driven Bean)就是一種 MessageListener。

三.消息隊列ActiveMQ

  ActiveMQ 是由 Apache 出品的一款開源消息中間件,旨在為應用程序提供高效、可擴展、穩定、安全的企業級消息通信。它的設計目標是提供標准的、面向消息的、多語言的應用集成消息通信中間件。ActiveMQ 實現了JMS 1.1 並提供了很多附加的特性,比如 JMX 管理、主從管理、消息組通信、消息優先級、延遲接收消息、虛擬接收者、消息持久化、消息隊列監控等等。

官網:http://activemq.apache.org/

解壓安裝后進入管理界面:localhost:8161 用戶名密碼均為:admin

3.1點對點模式

第一步:新建兩個Maven工程並都導入activemq坐標

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.13.4</version>
</dependency>

第二步:編寫消息生產者

/**
 * 生產消費者模式:生產者
 * @author Mr.song
 * @date 2019/05/24 20:50
 */
public class QueueProducerTest {

    public static void main(String[] args) throws Exception {
        //1.獲取連接工廠
        ConnectionFactory connectionFactory = new
                ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.從工廠獲取連接
        Connection connection = connectionFactory.createConnection();
        //3.啟動連接
        connection.start();
        //4.通過連接獲取會話: 參數1-是否支持事務,  參數2-消息的確認模式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.使用會話創建隊列的目的地
        Queue queue = session.createQueue("queue-demo");
        //6.創建消息的生產這對象
        MessageProducer producer = session.createProducer(queue);
        //7.創建消息內容(使用會話對象創建)
        TextMessage textMessage = session.createTextMessage("activeMQ的生產消費模型第一個消息來了");
        //8.發送消息
        producer.send(queue,textMessage);
        //9.釋放資源
        producer.close();
        session.close();
        connection.close();
    }
}

第三步:編寫消息消費者

/**
 * ActiveMQ的生產消費模式-消費者
 * @author Mr.song
 * @date 2019/05/25 15:15
 */
public class QueueConsumerTest {
    public static void main(String[] args) throws JMSException {
        //1.創建連接工廠
        ConnectionFactory factory = new
                ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.創建連接
        Connection connection = factory.createConnection();
        //3.啟動連接
        connection.start();
        //4.創建會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建消息目的地
        Queue queue = session.createQueue("queue-demo");
        //6.創建消息的消費者
        MessageConsumer consumer = session.createConsumer(queue);
        //7.使用消費者接受消息:采用監聽器輪詢接受消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                //將message進行轉換
                try {
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("1號消費者:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.如需,可釋放資源
//        consumer.close();
//        session.close();
//        connection.close();
    }
}

 

Tips: 創建session的兩個參數:

  • 第一個 : 是否使用事務

  • 第二個 : 消息的確認模式

    • AUTO_ACKNOWLEDGE = 1 自動確認

    • CLIENT_ACKNOWLEDGE = 2 客戶端手動確

    • DUPS_OK_ACKNOWLEDGE = 3 自動批量確認

    • SESSION_TRANSACTED = 0 事務提交並確認

第四步:運行測試

3.2發布訂閱模式

第一步:新建兩個Maven工程並都導入activemq坐標

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.13.4</version>
</dependency>

第二步:編寫消息生產者

/**
 * 發布訂閱模式的發布者
 * @author Mr.song
 * @date 2019/05/25 15:36
 */
public class TopicProduceTest {

    public static void main(String[] args) throws JMSException {
        //1.獲取連接工廠
        ConnectionFactory connectionFactory = new
                ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.獲取連接
        Connection connection = connectionFactory.createConnection();
        //3.開啟連接
        connection.start();
        //4.獲取會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建消息隊列的目的地,創建的是發布訂閱模型的隊列
        Topic topic = session.createTopic("topic-demo");
        //6.創建消息的生產者對象
        MessageProducer producer = session.createProducer(topic);
        //7.創建消息內容
        TextMessage textMessage = session.createTextMessage("ActiveMQ的發布訂閱模型消息來了");
        //8.發送消息,指定發布到哪個隊列
        producer.send(topic,textMessage);
        //9.關閉資源
        producer.close();
        session.close();
        connection.close();
    }
}

第三步:編寫消息消費者

/**
 * ActiveMQ發布訂閱模式的消費者
 * @author Mr.song
 * @date 2019/05/25 15:49
 */
public class TopicConsumerTest {
    public static void main(String[] args) throws JMSException {
        //1.創建連接工廠
        ConnectionFactory connectionFactory = new
                ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.創建連接
        Connection connection = connectionFactory.createConnection();
        //3.開啟連接
        connection.start();
        //4.創建會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建消費者目的地
        Topic topic = session.createTopic("topic-demo");
        //6.創建消費者
        MessageConsumer consumer = session.createConsumer(topic);
        //7.使用消費者接受消息:使用監聽器進行輪詢
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                //進行消息轉換
                try {
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("訂閱到了消息:"+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.如果需要,可以關閉資源
//        consumer.close();
//        session.close();
//        connection.close();
    }
}

第四步:運行測試

四.Spring整合JMS

ActiveMQ可以通過Spring的配置文件方式很容易嵌入到Spring應用中。

4.1點對點模式/發布訂閱模式

第一步:創建Maven工程並導入相關坐標

<!-- activemq  start -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.11.2</version>
</dependency>
<dependency>
    <groupId>javax.jms</groupId>
    <artifactId>javax.jms-api</artifactId>
    <version>2.0.1</version>
</dependency>
<!-- activemq  end -->
<!-- spring 與 mq整合  start -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>5.0.4.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>5.0.4.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>4.13</version>
</dependency>
<!-- spring 與 mq整合  end -->
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-test</artifactId>
    <version>5.0.4.RELEASE</version>
</dependency>

第二步:編寫消息監聽器

/**
 * 生成消費模式,消息監聽器
 * @author Mr.song
 * @date 2019/05/25 16:20
 */
@Component
public class QueueListener implements MessageListener {
    /**
     * 獲取到消息進行相關的處理
     * @param message
     */
    @Override
    public void onMessage(Message message) {
        try {//1.消息轉型
            MapMessage mapMessage = (MapMessage) message;
            String phone = mapMessage.getString("phone");
            String code = mapMessage.getString("code");
            System.out.println("消費者端得到的手機號及驗證碼是:"+phone+"=="+code);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

//==================================================
/**
 * 發布訂閱模式,消息監聽器
 * @author Mr.song
 * @date 2019/05/25 16:25
 */
@Component
public class TopicListener implements MessageListener {
    /**
     * 獲取到消息進行相關的處理
     * @param message
     */
    @Override
    public void onMessage(Message message) {
        try {//1.消息轉型和獲取
            MapMessage mapMessage = (MapMessage) message;
            String phone = mapMessage.getString("phone");
            String code = mapMessage.getString("code");
            System.out.println("訂閱者獲得的手機號和驗證碼:"+phone+"=="+code);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

第三步:編寫消息發布者配置文件(applicaitionContext-mq.xml)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- 1.配置連接工廠,ActiveMQ的連接工廠 -->
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616"
                           userName="admin" password="admin"/>
    <!-- 2.配置Spring支持會話緩存的連接工廠 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 注入供應商的連接工廠 -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <!-- 設置session緩存的大小: 100個會話 -->
        <property name="sessionCacheSize" value="100"/>
    </bean>

    <!--=================== 通過配置,選擇點對點/發布訂閱模式 ======================= -->
    
    <!-- 3.配置Spring提供的jms模板 : 點對點模式 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 注入連接工廠,Spring的那個 -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!-- 指定是否是發布訂閱模型:false即是點對點模式 -->
        <property name="pubSubDomain" value="false"/>
    </bean>

    <!-- 3.配置Spring提供的jms模板 : 發布訂閱模式 -->
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 注入工廠連接 -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!-- 指定是否是發布訂閱模型:true即是發布訂閱模式 -->
        <property name="pubSubDomain" value="true"/>
    </bean>
</beans>

第四步:編寫消息消費者配置文件(applicationContext-listener.xml)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- 1.配置Spring容器啟動時要掃描的包 -->
    <context:component-scan base-package="cn.dintalk.listener"/>

    <!-- 2.配置連接工廠:ActiveMQ的連接工廠 -->
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616"
                           userName="admin" password="admin"/>
    <!-- 3.配置Spring支持會話緩存的連接工廠 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 注入連接工廠:供應商提供的 -->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"/>
        <!-- 設置會話緩存大小: 100個會話 -->
        <property name="sessionCacheSize" value="100"/>
    </bean>

  <!-- ================== 配置不同模式下的消息監聽器 =================== -->  
    
    <!-- 4.配置生產消費模式的監聽器 -->
    <jms:listener-container destination-type="queue">
        <!-- 配置監聽器類,和消息目的地 -->
        <jms:listener destination="spring-queue" ref="queueListener"/>
    </jms:listener-container>
    
    <!-- 5.配置發布訂閱模式的監聽器 -->
    <jms:listener-container destination-type="topic">
        <jms:listener destination="spring-topic" ref="topicListener"/>
    </jms:listener-container>
</beans>

第五步:編寫消息生產者(兩種模式都有)

/**
 * Spring整合ActiveMQ的 生產/發布測試類
 * @author Mr.song
 * @date 2019/05/25 16:34
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext-mq.xml")
public class SpringActiveMQProducerTest {

    @Autowired //點對點模式的jms模板
    private JmsTemplate jmsQueueTemplate;
    @Autowired //發布訂閱模式的jms模板
    private JmsTemplate jmsTopicTemplate;
    // 1.點對點模式
    @Test
    public void testQueueProducer(){
        jmsQueueTemplate.send("spring-queue", new MessageCreator() {
            /**
             * 消息生成器
             * @param session
             * @return
             * @throws JMSException
             */
            @Override
            public Message createMessage(Session session) throws JMSException {
                MapMessage mapMessage = session.createMapMessage();
                mapMessage.setString("phone","12345678901");
                mapMessage.setString("code","6542");
                return mapMessage;
            }
        });
    }
    //2.發布訂閱模式
    @Test
    public void testTopicProducer(){
        jmsTopicTemplate.send("spring-topic", new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                MapMessage mapMessage = session.createMapMessage();
                mapMessage.setString("phone","12345678901");
                mapMessage.setString("code","5648");
                return mapMessage;
            }
        });
    }
}

第六步:編寫消息消費者(注冊監聽器)(兩種模式都有)

/**
 * @author Mr.song
 * @date 2019/05/25 17:42
 */
public class SpringActiveMQConsumerTest {
    //1.加載配置文件,注冊消息監聽器
    public static void main(String[] args) {
        ClassPathXmlApplicationContext ac = new
                ClassPathXmlApplicationContext("classpath:applicationContext-listener.xml");
        ac.start();
    }
}

第七步:運行測試

關注微信公眾號, 隨時隨地學習

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM