一、前言
消息發送到Broker,消費者通過Destination可以訂閱消費某個特定的通道內的消息。一些特殊情況下,需要消費者對消息過濾下再進行消費,也就是篩選出某些特定消息。ActiveMQ提供了SQL92表達式語法的自定義消息篩選功能。非常方便快捷的能夠開發出具有消息篩選功能的應用。
ActiveMQ 支持:
- 數字表達式:
>,>=,<,<=,BETWEEN,=. - 字符表達式:
=,<>,IN. IS NULL或則IS NOT NULL.- 邏輯
AND, 邏輯OR, 邏輯NOT.
常數類型:
- 數字:3.1415926, 5。
- 字符: ‘a’,必須帶有單引號。
NULL,特別的常量。- 布爾類型:
TRUE,FALSE
二、程序案例
生產者:
package com.cfang.prebo.activemq.selector; import java.util.Scanner; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) { ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Destination destination = null; MessageProducer producer = null; Message message = null; try { Scanner scanner = new Scanner(System.in); connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618"); connection = connectionFactory.createConnection(null, null); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("TP_Q_TEST_SELECTOR00"); producer = session.createProducer(destination); while(true) { String line = scanner.nextLine(); if("exit".equals(line)) { break; } message = session.createTextMessage(line); message.setIntProperty("applicationName", line.length()); message.setStringProperty("result", "RT"); producer.send(message); } } catch (Exception e) { e.printStackTrace(); } finally { if(producer != null){ // 回收消息發送者 try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session != null){ // 回收會話對象 try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(connection != null){ // 回收連接對象 try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
如上,生產者還可以設置更多的條件,ActiveMQ也提供了全基本類型的 setXXXXXProperty方法去設置條件。
消費者:
package com.cfang.prebo.activemq.selector;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ConsumerA {
public static void main(String[] args) {
ConnectionFactory connectionFactory = null;
Connection connection = null;
Session session = null;
Destination destination = null;
MessageConsumer consumer = null;
try {
connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618");
connection = connectionFactory.createConnection(null, null);
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("TP_Q_TEST_SELECTOR00");
consumer = session.createConsumer(destination,"applicationName=2 and result='RT'");
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
System.out.println(message);
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
}
如上,消費者就只消費 applicationName = 2 且 result = 'RT' 的消息。
三、小結
1、提供了篩選功能,可以減少 destination 的數量。可以用於實現特定機器,特定消息(灰度?)。
2、如果同時兩個消費者的話,一個異常不能消費了,那么消息就會產生積壓。對另一個正常的消費者而言,性能會下降,消費時間可能會變長。
