MQ(隊列消息的入門)


 

  消息中間件利用高效可靠的消息傳遞機制進行平台無關的數據交流,並基於數據通信來進行分布式系統的集成,通過提供消息傳遞消息排隊模型,它可以在分布式環境下拓展進程間的通信,對於消息中間件,常見的角色大致也就有Producer(生產者).Consumer(消費者)

 

MQ     消息中間件     消息隊列

Message Queue簡稱MQ

種類:

1.Apache  ActiveMQ  

     ActiveMQ是Apache出品,最流行的,能力強勁的開源消息總線.ActiveMQ是一個完全支持JMS1.1和J2EE1.4規范的JMS Provider實現.我們在本次課程中介紹ActiveMQ的使用.

2.阿里  RocketMQ

  

3.Pivotal 開發RabbitMQ

  AMQP協議的領導實現,支持多種場景.淘寶的Mysql集群內部有使用它進行通訊,OpenStack開源雲平台的通信組件,最先在金融行業得到運用.

ZeroMQ

  史上最快的消息隊列系統.

kafka:

  Apache下的一個子項目.特點:高吞吐,在一台普通的服務器上既可以達到10W/s的吞吐速率;完全的分布式系統.適合處理海量數據.

使用場景(為什么使用MQ?):

//TODO

JMS簡介

什么是JMS?

    JMS(Java Messaging Service)是java平台上有關面向消息中間件的技術規范(可以使用jmsTemplate),它便於消息系統中的java應用程序進行消息交換,並且通過提供標准的產生,發送,接收消息的接口庫簡化企業應用的開發.

  JMS本身只定義了一系列的接口規范,是一種與廠商無關的API.用來訪問消息收發系統.它類似於JDBC(java Database Connectivity);這里,JDBC是可以用來訪問許多不同關系數據庫的的API.而JMS則提供同樣與廠商無關的訪問方法,以訪問消息收發服務.許多廠商目前都支持JMS,包括IBM的MQseries.BEA的Weblogic.JMSservice和Progress的SonicMQ,這只是幾個例子.JMS使您能夠通過消息收發服務(有時稱為消息中介程序或者路由器)從一個JMS客戶機向另一個JMS客戶機發送消息,消息是JMS中的一種類型對象,由兩部分組成:報頭消息主體.

報頭由路由信息以及有關該消息的元數據組成.

消息主體則攜帶着應用程序的數據或有效負載.

  JMS定義了五種不同的消息正文格式,以及調用的消息類型.允許你發送並接收以一些不同形式的數據,提供現有消息格式的一些級別的兼容性.

  TextMessage ---一個字符串對象

  MapMessage---一套名稱-值對

  ObjectMessage--一個序列化的java對象

  ByteMessage -- 一個字節的數據流

  StreamMessage --Java原始值的數據流

JMS消息傳遞類型

  對於消息的傳遞有兩種類型:

    一種是點對點,即一個生產者和一個消費者一一對應.

    

    另一種是發布/訂閱模式,即一個生產者產生消息並進行發送后,可以由多個消費者進行接收.

JMS入門小Demo

現在是點對點模式:

  點對點的模式主要建立在一個隊列上面,當連接一個列隊的時候,發送端不需要知道接收端是否正在接收,可以直接行ActiveMQ發送消息,發送的消息,將會先進入隊列中,如果有接收端在監聽,則會發現接收端,如果沒有接收端接收,多個接收端,但是一條消息,只會被一個接收端給接收到,那個接收端先連上ActiveMQ,則會先接收到,而后來的接收端則接收不到那條消息.

創建的是一個沒有使用任何骨架的java工程

引入的依賴為:

   <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- java編譯插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

   代碼

/**
 * @Auther:qingmu
 * @Description:腳踏實地,只為出人頭地
 * @Date:Created in 16:16 2019/4/23
 */

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 點對點模式
 * 生產者
 */

public class QueueProducer {
    public static void main(String[] args) throws Exception {
        //1.創建連接工廠
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        //2.獲取連接
        Connection connection = connectionFactory.createConnection();
        //3.啟動連接
        connection.start();
        //4.獲取session  (參數1:是否啟動事務,參數2:消息確認模式)
        /**
         *  AUTO_ACKNOWLEDGE = 1    自動確認
         •    CLIENT_ACKNOWLEDGE = 2    客戶端手動確認
         •    DUPS_OK_ACKNOWLEDGE = 3    自動批量確認
         •    SESSION_TRANSACTED = 0    事務提交並確認

         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建隊列對象
        Queue queue = session.createQueue("test-queue");
        //6.創建消息生產者
        MessageProducer producer = session.createProducer(queue);
        //7.創建消息
        TextMessage textMessage = session.createTextMessage("歡迎來到神奇的");
        //8.發送消息
        producer.send(textMessage);
        //9.關閉資源
        producer.close();
        session.close();
        connection.close();

    }
}

直接運行這個main方法,后可以mq中查看到:

消費者的代碼為:

/**
 * @Auther:qingmu
 * @Description:腳踏實地,只為出人頭地
 * @Date:Created in 16:23 2019/4/23
 */

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 點對點模式
 * 消息消費者
 */
public class QueueConsumer {
    public static void main(String[] args) throws Exception{
        //1.創建連接工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        //2.獲取連接
        Connection connection = connectionFactory.createConnection();
        //3.啟動連接
        connection.start();
        //4.獲取session  (參數1:是否啟動事務,參數2:消息確認模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建隊列對象
        Queue queue = session.createQueue("test-queue");
        //6.創建消息消費
        MessageConsumer consumer = session.createConsumer(queue);

        //7.監聽消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        //8.等待鍵盤輸入
        System.in.read();
        //9.關閉資源
        consumer.close();
        session.close();
        connection.close();

    }
}

通過上面定義的監聽器,可以獲取到生產者生產的信息.在控制台上

在進行測試的時候,開啟兩個以上的消費者,開啟一個生產者,然后可以觀察到只能在一個消費者的控制台上進行顯示,而在另一個消費者的控制台上不能進行打印.

發布和訂閱者模式

消費生產者:

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @Auther:qingmu
 * @Description:腳踏實地,只為出人頭地
 * @Date:Created in 16:32 2019/4/23
 */
public class TopicProducer {
    public static void main(String[] args) throws Exception{
        //1.創建連接工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        //2.獲取連接
        Connection connection = connectionFactory.createConnection();
        //3.啟動連接
        connection.start();
        //4.獲取session  (參數1:是否啟動事務,參數2:消息確認模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建主題對象
        Topic topic = session.createTopic("test-topic");
        //6.創建消息生產者
        MessageProducer producer = session.createProducer(topic);
        //7.創建消息
        TextMessage textMessage = session.createTextMessage("歡迎來到神奇的世界");
        //8.發送消息
        producer.send(textMessage);
        //9.關閉資源
        producer.close();
        session.close();
        connection.close();

    }
}

 運行完以后的結果為:

 

消費者為:

/**
 * @Auther:qingmu
 * @Description:腳踏實地,只為出人頭地
 * @Date:Created in 16:36 2019/4/23
 */

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 訂閱
 * 多對多
 */
public class TopicConsumer {
    public static void main(String[] args) throws Exception{
        //1.創建連接工廠
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
        //2.獲取連接
        Connection connection = connectionFactory.createConnection();
        //3.啟動連接
        connection.start();
        //4.獲取session  (參數1:是否啟動事務,參數2:消息確認模式)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.創建主題對象
        //Queue queue = session.createQueue("test-queue");
        Topic topic = session.createTopic("test-topic");
        //6.創建消息消費
        MessageConsumer consumer = session.createConsumer(topic);

        //7.監聽消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        //8.等待鍵盤輸入
        System.in.read();
        //9.關閉資源
        consumer.close();
        session.close();
        connection.close();

    }
}

 

運行測試:

  同時開啟2個以上的消費者,再次運行生產者,觀察每一個消費者控制台的輸出,會發現每個消費者會接收到消息.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM