JavaWeb之ActiveMQ消息過濾


前言

ActiveMQ提供了一種機制,使用它,消息服務可根據消息選擇器中的標准來執行消息過濾。生產者可在消息中放入應用程序特有的屬性,而消費者可使用基於這些屬性的選擇標准來表明對消息是否感興趣。這就簡化了客戶端的工作,並避免了向不需要這些消息的消費者傳送消息的開銷。然而,它也使得處理選擇標准的消息服務增加了一些額外開銷。 

消息選擇器是用於MessageConsumer的過濾器,可以用來過濾傳入消息的屬性和消息頭部分(但不過濾消息體),並確定是否將實際消費該消息。消息選擇器是一些字符串,它們基於某種語法,而這種語法是SQL-92的子集。可以將消息選擇器作為MessageConsumer 創建的一部分。

實現對MapMessage和TextMessage兩種消息的過濾條件的設置和消費

Producer

在消息的屬性中設置過濾條件

package com.tgb.activemqFilter;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
    // 單例模式
    // 1、連接工廠
    private ConnectionFactory connectionFactory;
    // 2、連接對象
    private Connection connection;
    // 3、Session對象
    private Session session;
    // 4、生產者
    private MessageProducer messageProducer;
    private Destination destination;

    public Producer() {
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("admin",
                    "admin", "tcp://127.0.0.1:61616");
            this.connection = connectionFactory.createConnection();
            this.connection.start();
            // 設置自動簽收模式
            this.session = this.connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            this.destination = this.session.createQueue("first");
            this.messageProducer = this.session.createProducer(null);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }

    }

    public Session getSession() {
        return this.session;
    }

    public void send1(/* String QueueName, Message message */) {
        try {

            Destination destination = this.session.createQueue("first");
            MapMessage msg1 = this.session.createMapMessage();
            msg1.setString("name", "張三");
            msg1.setInt("age", 20);
            // 設置用於消息過濾器的條件
            msg1.setStringProperty("name", "張三");
            msg1.setIntProperty("age", 20);
            msg1.setStringProperty("color", "bule");

            MapMessage msg2 = this.session.createMapMessage();
            msg2.setString("name", "李四");
            msg2.setInt("age", 25);
            // 設置用於消息過濾器的條件
            msg2.setStringProperty("name", "李四");
            msg2.setIntProperty("age", 25);
            msg2.setStringProperty("color", "white");

            MapMessage msg3 = this.session.createMapMessage();
            msg3.setString("name", "趙六");
            msg3.setInt("age", 30);
            // 設置用於消息過濾器的條件
            msg3.setStringProperty("name", "趙六");
            msg3.setIntProperty("age", 30);
            msg3.setStringProperty("color", "black");
            // 發送消息
            this.messageProducer.send(destination, msg1,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
            this.messageProducer.send(destination, msg2,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
            this.messageProducer.send(destination, msg3,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    public void send2() {
        try {
            Destination destination = this.session.createQueue("first");
            TextMessage message = this.session.createTextMessage("我是一個字符串");
            message.setIntProperty("age", 25);
            // 發送消息
            this.messageProducer.send(destination, message,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }

    }

    public static void main(String[] args) {
        Producer producer = new Producer();
        producer.send1();
        // producer.send2();

    }
}

 

Conmuser

消費消息時,直接在session創建MessageConsumer時,將過濾條件作為參數傳入(過濾條件的寫法和SQL的寫法是很像的)

package com.tgb.activemqFilter;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Conmuser {
    // 單例模式
    // 1、連接工廠
    private ConnectionFactory connectionFactory;
    // 2、連接對象
    private Connection connection;
    // 3、Session對象
    private Session session;
    // 4、生產者
    private MessageConsumer messageConsumer;
    // 5、目的地址
    private Destination destination;
    // 消息選擇器
    public final String SELECTOR_1 = "age > 25";
    public final String SELECTOR_2 = " age > 20 and color='black'";

    public Conmuser() {
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("admin",
                    "admin", "tcp://127.0.0.1:61616");
            this.connection = connectionFactory.createConnection();
            this.connection.start();
            // 設置自動簽收模式
            this.session = this.connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            this.destination = this.session.createQueue("first");
            // 在構造消費者的時候,指定了 消息選擇器
            // 有選擇性的消費消息
            this.messageConsumer = this.session.createConsumer(destination,
                    SELECTOR_1);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    public Session getSession() {
        return this.session;
    }

    // 用於監聽消息隊列的消息
    class MyLister implements MessageListener {

        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    TextMessage ret = (TextMessage) message;
                    System.out.println("results;" + ret.getText());
                }
                if (message instanceof MapMessage) {
                    MapMessage ret = (MapMessage) message;
                    System.out.println(ret.toString());
                    System.out.println(ret.getString("name"));
                    System.out.println(ret.getInt("age"));
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

    }

    // 用於異步監聽消息
    public void receiver() {
        try {
            this.messageConsumer.setMessageListener(new MyLister());
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) {
        Conmuser conmuser = new Conmuser();
        conmuser.receiver();

    }
}

 

 

測試

Messages Enqueued: 張三 20 | 李四 25 | 趙六 30

消息過濾條件:age>25

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM