前言
ActiveMQ提供了一種機制,使用它,消息服務可根據消息選擇器中的標准來執行消息過濾。生產者可在消息中放入應用程序特有的屬性,而消費者可使用基於這些屬性的選擇標准來表明對消息是否感興趣。這就簡化了客戶端的工作,並避免了向不需要這些消息的消費者傳送消息的開銷。然而,它也使得處理選擇標准的消息服務增加了一些額外開銷。
消息選擇器是用於MessageConsumer的過濾器,可以用來過濾傳入消息的屬性和消息頭部分(但不過濾消息體),並確定是否將實際消費該消息。消息選擇器是一些字符串,它們基於某種語法,而這種語法是SQL-92的子集。可以將消息選擇器作為MessageConsumer 創建的一部分。
實現對MapMessage和TextMessage兩種消息的過濾條件的設置和消費
Producer
在消息的屬性中設置過濾條件
package com.tgb.activemqFilter; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { // 單例模式 // 1、連接工廠 private ConnectionFactory connectionFactory; // 2、連接對象 private Connection connection; // 3、Session對象 private Session session; // 4、生產者 private MessageProducer messageProducer; private Destination destination; public Producer() { try { this.connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616"); this.connection = connectionFactory.createConnection(); this.connection.start(); // 設置自動簽收模式 this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); this.destination = this.session.createQueue("first"); this.messageProducer = this.session.createProducer(null); } catch (JMSException e) { throw new RuntimeException(e); } } public Session getSession() { return this.session; } public void send1(/* String QueueName, Message message */) { try { Destination destination = this.session.createQueue("first"); MapMessage msg1 = this.session.createMapMessage(); msg1.setString("name", "張三"); msg1.setInt("age", 20); // 設置用於消息過濾器的條件 msg1.setStringProperty("name", "張三"); msg1.setIntProperty("age", 20); msg1.setStringProperty("color", "bule"); MapMessage msg2 = this.session.createMapMessage(); msg2.setString("name", "李四"); msg2.setInt("age", 25); // 設置用於消息過濾器的條件 msg2.setStringProperty("name", "李四"); msg2.setIntProperty("age", 25); msg2.setStringProperty("color", "white"); MapMessage msg3 = this.session.createMapMessage(); msg3.setString("name", "趙六"); msg3.setInt("age", 30); // 設置用於消息過濾器的條件 msg3.setStringProperty("name", "趙六"); msg3.setIntProperty("age", 30); msg3.setStringProperty("color", "black"); // 發送消息 this.messageProducer.send(destination, msg1, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); this.messageProducer.send(destination, msg2, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); this.messageProducer.send(destination, msg3, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); } catch (JMSException e) { throw new RuntimeException(e); } } public void send2() { try { Destination destination = this.session.createQueue("first"); TextMessage message = this.session.createTextMessage("我是一個字符串"); message.setIntProperty("age", 25); // 發送消息 this.messageProducer.send(destination, message, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10); } catch (JMSException e) { throw new RuntimeException(e); } } public static void main(String[] args) { Producer producer = new Producer(); producer.send1(); // producer.send2(); } }
Conmuser
消費消息時,直接在session創建MessageConsumer時,將過濾條件作為參數傳入(過濾條件的寫法和SQL的寫法是很像的)
package com.tgb.activemqFilter; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Conmuser { // 單例模式 // 1、連接工廠 private ConnectionFactory connectionFactory; // 2、連接對象 private Connection connection; // 3、Session對象 private Session session; // 4、生產者 private MessageConsumer messageConsumer; // 5、目的地址 private Destination destination; // 消息選擇器 public final String SELECTOR_1 = "age > 25"; public final String SELECTOR_2 = " age > 20 and color='black'"; public Conmuser() { try { this.connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616"); this.connection = connectionFactory.createConnection(); this.connection.start(); // 設置自動簽收模式 this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); this.destination = this.session.createQueue("first"); // 在構造消費者的時候,指定了 消息選擇器 // 有選擇性的消費消息 this.messageConsumer = this.session.createConsumer(destination, SELECTOR_1); } catch (JMSException e) { throw new RuntimeException(e); } } public Session getSession() { return this.session; } // 用於監聽消息隊列的消息 class MyLister implements MessageListener { @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage ret = (TextMessage) message; System.out.println("results;" + ret.getText()); } if (message instanceof MapMessage) { MapMessage ret = (MapMessage) message; System.out.println(ret.toString()); System.out.println(ret.getString("name")); System.out.println(ret.getInt("age")); } } catch (JMSException e) { throw new RuntimeException(e); } } } // 用於異步監聽消息 public void receiver() { try { this.messageConsumer.setMessageListener(new MyLister()); } catch (JMSException e) { throw new RuntimeException(e); } } public static void main(String[] args) { Conmuser conmuser = new Conmuser(); conmuser.receiver(); } }
測試
Messages Enqueued: 張三 20 | 李四 25 | 趙六 30
消息過濾條件:age>25