ActiveMQ消息選擇器Selector


一、前言

  消息發送到Broker,消費者通過Destination可以訂閱消費某個特定的通道內的消息。一些特殊情況下,需要消費者對消息過濾下再進行消費,也就是篩選出某些特定消息。ActiveMQ提供了SQL92表達式語法的自定義消息篩選功能。非常方便快捷的能夠開發出具有消息篩選功能的應用。

  ActiveMQ 支持:

  1. 數字表達式: >,>=,<,<=,BETWEEN,=.
  2. 字符表達式:=,<>,IN.
  3. IS NULL 或則 IS NOT NULL.
  4. 邏輯AND, 邏輯OR, 邏輯NOT.

  常數類型:

  1. 數字:3.1415926, 5。
  2. 字符: ‘a’,必須帶有單引號。
  3. NULL,特別的常量。
  4. 布爾類型: TRUEFALSE

二、程序案例

  生產者:

package com.cfang.prebo.activemq.selector;

import java.util.Scanner;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = null;
        Connection connection = null;
        Session session = null;
        Destination destination = null;
        MessageProducer producer = null;
        Message message = null;
        try {
            Scanner scanner = new Scanner(System.in);
            connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618");
            connection = connectionFactory.createConnection(null, null);
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("TP_Q_TEST_SELECTOR00");
            producer = session.createProducer(destination);
            while(true) {
                String line = scanner.nextLine();
                if("exit".equals(line)) {
                    break;
                }
                message = session.createTextMessage(line);
                message.setIntProperty("applicationName", line.length());
                message.setStringProperty("result", "RT");
                producer.send(message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(producer != null){ // 回收消息發送者
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if(session != null){ // 回收會話對象
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if(connection != null){ // 回收連接對象
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
}

  如上,生產者還可以設置更多的條件,ActiveMQ也提供了全基本類型的 setXXXXXProperty方法去設置條件。

  消費者:

package com.cfang.prebo.activemq.selector;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerA {

	public static void main(String[] args) {
		ConnectionFactory connectionFactory = null;
		Connection connection = null;
		Session session = null;
		Destination destination = null;
		MessageConsumer consumer = null;
		try {
			connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618");
			connection = connectionFactory.createConnection(null, null);
			connection.start();
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("TP_Q_TEST_SELECTOR00");
			consumer = session.createConsumer(destination,"applicationName=2 and result='RT'");
			consumer.setMessageListener(new MessageListener() {
				public void onMessage(Message message) {
					System.out.println(message);
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			
		}
	}
}

  如上,消費者就只消費  applicationName = 2 且  result = 'RT' 的消息。

三、小結

  1、提供了篩選功能,可以減少 destination 的數量。可以用於實現特定機器,特定消息(灰度?)。

  2、如果同時兩個消費者的話,一個異常不能消費了,那么消息就會產生積壓。對另一個正常的消費者而言,性能會下降,消費時間可能會變長。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM