一、前言
消息發送到Broker,消費者通過Destination可以訂閱消費某個特定的通道內的消息。一些特殊情況下,需要消費者對消息過濾下再進行消費,也就是篩選出某些特定消息。ActiveMQ提供了SQL92表達式語法的自定義消息篩選功能。非常方便快捷的能夠開發出具有消息篩選功能的應用。
ActiveMQ 支持:
- 數字表達式:
>
,>=
,<
,<=
,BETWEEN
,=
. - 字符表達式:
=
,<>
,IN
. IS NULL
或則IS NOT NULL
.- 邏輯
AND
, 邏輯OR
, 邏輯NOT
.
常數類型:
- 數字:3.1415926, 5。
- 字符: ‘a’,必須帶有單引號。
NULL
,特別的常量。- 布爾類型:
TRUE
,FALSE
二、程序案例
生產者:
package com.cfang.prebo.activemq.selector; import java.util.Scanner; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) { ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Destination destination = null; MessageProducer producer = null; Message message = null; try { Scanner scanner = new Scanner(System.in); connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618"); connection = connectionFactory.createConnection(null, null); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("TP_Q_TEST_SELECTOR00"); producer = session.createProducer(destination); while(true) { String line = scanner.nextLine(); if("exit".equals(line)) { break; } message = session.createTextMessage(line); message.setIntProperty("applicationName", line.length()); message.setStringProperty("result", "RT"); producer.send(message); } } catch (Exception e) { e.printStackTrace(); } finally { if(producer != null){ // 回收消息發送者 try { producer.close(); } catch (JMSException e) { e.printStackTrace(); } } if(session != null){ // 回收會話對象 try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if(connection != null){ // 回收連接對象 try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
如上,生產者還可以設置更多的條件,ActiveMQ也提供了全基本類型的 setXXXXXProperty方法去設置條件。
消費者:
package com.cfang.prebo.activemq.selector; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class ConsumerA { public static void main(String[] args) { ConnectionFactory connectionFactory = null; Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; try { connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618"); connection = connectionFactory.createConnection(null, null); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("TP_Q_TEST_SELECTOR00"); consumer = session.createConsumer(destination,"applicationName=2 and result='RT'"); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { System.out.println(message); } }); } catch (Exception e) { e.printStackTrace(); } finally { } } }
如上,消費者就只消費 applicationName = 2 且 result = 'RT' 的消息。
三、小結
1、提供了篩選功能,可以減少 destination 的數量。可以用於實現特定機器,特定消息(灰度?)。
2、如果同時兩個消費者的話,一個異常不能消費了,那么消息就會產生積壓。對另一個正常的消費者而言,性能會下降,消費時間可能會變長。