Jms規范里的兩種message傳輸方式Topic和Queue,兩者的對比如下表():
Topic | Queue | |
概要 | Publish Subscribe messaging 發布訂閱消息 | Point-to-Point 點對點 |
有無狀態 | topic數據默認不落地,是無狀態的。 | Queue數據默認會在mq服務器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存儲。 |
完整性保障 | 並不保證publisher發布的每條數據,Subscriber都能接受到。 | Queue保證每條數據都能被receiver接收。 |
消息是否會丟失 | 一般來說publisher發布消息到某一個topic時,只有正在監聽該topic地址的sub能夠接收到消息;如果沒有sub在監聽,該topic就丟失了。 | Sender發送消息到目標Queue,receiver可以異步接收這個Queue上的消息。Queue上的消息如果暫時沒有receiver來取,也不會丟失。 |
消息發布接收策略 | 一對多的消息發布接收策略,監聽同一個topic地址的多個sub都能收到publisher發送的消息。Sub接收完通知mq服務器 | 一對一的消息發布接收策略,一個sender發送的消息,只能有一個receiver接收。receiver接收完后,通知mq服務器已接收,mq服務器對queue里的消息采取刪除或其他操作。 |
以下是符合Jms1.1規范的使用Queue傳輸消息的代碼,使用Active mq作為Jms的實現,想要更clean的代碼,可以考慮將Amq的實現DI:
- public void testMySend() throws JMSException {
- Connection connection = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616").createConnection();
- Session session = null;
- Queue queue = null;
- MessageProducer producer = null;
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- queue=session.createQueue("myTest");
- producer= session.createProducer(queue);
- Message msg=session.createTextMessage("hello");
- producer.send(msg);
- producer.close();
- session.close();
- connection.close();
- }
- public void testMyReceive() throws JMSException{
- Connection connection = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616").createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue("myTest");
- MessageConsumer consumer= session.createConsumer(queue);
- consumer.setMessageListener(new MyListener());
- connection.start();
- consumer.close();
- session.close();
- connection.close();
- }
消費端需要設定一個MessageListener:
- private class MyListener implements MessageListener{
- public void onMessage(Message message) {
- System.out.println("msg start!----------------");
- try {
- System.out.println(""+((TextMessage)message).getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- System.out.println("msg end!----------------");
- }
- }
以下是符合Jms1.1規范的使用Topic 發送消息的代碼:
- public void testMyTopicPublisher() throws JMSException {
- Connection connection = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616").createConnection();
- Session session = null;
- Topic topic = null;
- MessageProducer producer = null;
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- topic=session.createTopic("myTopicTest");
- producer= session.createProducer(topic);
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- Message msg=session.createTextMessage("hello and whatever u say");
- producer.send(msg);
- producer.close();
- session.close();
- connection.close();
- }
以下是符合Jms1.1規范的使用Topic 接收消息的代碼:
- public static void testMyTopicConsumer() throws JMSException, InterruptedException{
- Connection connection = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616").createConnection();
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- Topic topic = session.createTopic("myTopicTest");
- MessageConsumer consumer= session.createConsumer(topic);
- consumer.setMessageListener(new MyListener());
- connection.start();
- }
jms在創建Session時可以有兩個參數,第一個參數是是否使用事務,第二個參數是消費者向發送者確認消息已經接收的方式:
- Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
確認消息的方式有如下三種:
AUTO_ACKNOWLEDGE(自動通知)
CLIENT_ACKNOWLEDGE(客戶端自行決定通知時機)
DUPS_OK_ACKNOWLEDGE(延時//批量通知)
如果使用的是 客戶端自行決定通知時機 方式,那么需要在MessageListener里顯式調用message.acknowledge()來通知服務器。服務器接收到通知后采取相應的操作。
轉自:
http://longdick.iteye.com/blog/465229