生產者:推送消息方,主要負責給用戶推送消息。(ActiveMQ接口)
消費者:接收消息方,主要查看推送過來的消息。(用戶)
ActiveMQ安裝非常簡單,在此我就不發了,安裝完ActiveMQ后,進入網址 http://localhost:8161/admin/ ,登錄管理后台,默認用戶名:admin 密碼:admin 到此結束,開始寫代碼。
寫代碼之前,還需要開啟ActiveMQ服務,在安裝文件夾里,打開bin文件夾,運行服務,如果是32位則選用32,64選64的bin。開啟服務后,就可以快樂的copy代碼了。
一、
①、一對一推送消息(Queue模式)生產者代碼: ↓
1 public class Sender { 2 3 public static void main(String[] args) throws JMSException, InterruptedException { 4 // ConnectionFactory :連接工廠,JMS 用它創建連接 5 //61616是ActiveMQ默認端口 6 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( 7 ActiveMQConnection.DEFAULT_USER, 8 ActiveMQConnection.DEFAULT_PASSWORD, 9 "tcp://localhost:61616"); 10 11 // Connection :JMS 客戶端到JMS Provider 的連接 12 Connection connection = connectionFactory.createConnection(); 13 14 connection.start(); 15 // Session: 一個發送或接收消息的線程 16 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 17 18 // Destination :消息的目的地;消息發送給誰. 19 Destination destination = session.createQueue("my-queue"); 20 21 // MessageProducer:消息發送者 22 MessageProducer producer = session.createProducer(destination); 23 24 // 設置不持久化,可以更改 25 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 26 27 for(int i=0;i<10;i++){ 28 //創建文本消息 29 TextMessage message = session.createTextMessage("hello.I am producer, this is a test message"+i); 30 31 Thread.sleep(1000); 32 //發送消息 33 producer.send(message); 34 } 35 36 session.commit(); 37 session.close(); 38 connection.close(); 39 } 40 41 }
②、消費者代碼(Queue模式)接收發送過來的消息: ↓
1 // ConnectionFactory :連接工廠,JMS 用它創建連接 2 private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, 3 ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); 4 5 public static void main(String[] args) throws JMSException { 6 // Connection :JMS 客戶端到JMS Provider 的連接 7 final Connection connection = connectionFactory.createConnection(); 8 9 connection.start(); 10 // Session: 一個發送或接收消息的線程 11 final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 12 // Destination :消息的目的地;消息送誰那獲取. 13 Destination destination = session.createQueue("my-queue"); 14 // 消費者,消息接收者 15 MessageConsumer consumer1 = session.createConsumer(destination); 16 17 consumer1.setMessageListener(new MessageListener() { 18 @Override 19 public void onMessage(Message msg) { 20 21 try { 22 23 TextMessage message = (TextMessage)msg ; 24 System.out.println("consumerOne收到消息: "+message.getText()); 25 session.commit(); 26 } catch (JMSException e) { 27 e.printStackTrace(); 28 } 29 30 } 31 }); 32 }
運行之后控制台不會退出一直監聽消息庫,對於消息發送者的十條信息,控制輸出:
consumerOne收到消息: hello.I am producer, this is a test message0
consumerOne收到消息: hello.I am producer, this is a test message1
consumerOne收到消息: hello.I am producer, this is a test message2
consumerOne收到消息: hello.I am producer, this is a test message3
consumerOne收到消息: hello.I am producer, this is a test message4
consumerOne收到消息: hello.I am producer, this is a test message5
consumerOne收到消息: hello.I am producer, this is a test message6
consumerOne收到消息: hello.I am producer, this is a test message7
consumerOne收到消息: hello.I am producer, this is a test message8
consumerOne收到消息: hello.I am producer, this is a test message9
如果此時另外一個線程也存在消費者監聽該Queue,則兩者交換輸出。(注:Queue僅使用於 1V1 ,發送推送消息給個人)
二、
①、一對多訂閱推送消息(Topic模式)生產者代碼: ↓
1 package com.ym.admin.config; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.DeliveryMode; 6 import javax.jms.Destination; 7 import javax.jms.JMSException; 8 import javax.jms.MapMessage; 9 import javax.jms.MessageProducer; 10 import javax.jms.Session; 11 import javax.jms.TextMessage; 12 13 import org.apache.activemq.ActiveMQConnection; 14 import org.apache.activemq.ActiveMQConnectionFactory; 15 16 //發送消息 17 public class Sender { 18 public static void main(String[] args) throws JMSException, InterruptedException { 19 // ConnectionFactory :連接工廠,JMS 用它創建連接 20 //61616是ActiveMQ默認端口 21 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( 22 "admin", 23 "admin", 24 "tcp://localhost:61616"); 25 26 // Connection :JMS 客戶端到JMS Provider 的連接 27 Connection connection = connectionFactory.createConnection(); 28 connection.start(); 29 // Session: 一個發送或接收消息的線程 30 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //自動確認 31 32 // Destination :消息的目的地;消息發送給誰. 33 //Destination destination = session.createQueue("my-queue"); 34 Destination destination = session.createTopic("STOCKS.myTopic"); //創建topic myTopic 35 // MessageProducer:消息發送者 36 MessageProducer producer = session.createProducer(destination); 37 38 // 設置不持久化,可以更改 39 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 40 41 for(int i=0;i<3;i++){ 42 //創建文本消息 43 TextMessage message = session.createTextMessage("j"+i); 44 //發送消息 45 System.out.println("發送消息成功:"+i); 46 producer.send(message); 47 } 48 49 session.close(); 50 connection.close(); 51 } 52 }
②、消費者代碼(Topic模式)接收發送過來的消息: ↓
1 package com.ym.admin.config; 2 3 import javax.jms.Connection; 4 import javax.jms.ConnectionFactory; 5 import javax.jms.ConnectionMetaData; 6 import javax.jms.Destination; 7 import javax.jms.JMSException; 8 import javax.jms.MapMessage; 9 import javax.jms.Message; 10 import javax.jms.MessageConsumer; 11 import javax.jms.MessageListener; 12 import javax.jms.Session; 13 import javax.jms.TextMessage; 14 import javax.jms.Topic; 15 16 import org.apache.activemq.ActiveMQConnection; 17 import org.apache.activemq.ActiveMQConnectionFactory; 18 import org.springframework.jms.support.JmsUtils; 19 import org.springframework.util.StringUtils; 20 21 //接收訂閱 22 public class Receiver { 23 private static ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", 24 "admin", "tcp://localhost:61616"); 25 public static void main(String[] args) { 26 27 // Connection :JMS 客戶端到JMS Provider 的連接 28 try { 29 final Connection connection = connectionFactory.createConnection(); 30 connection.setClientID("anpei"); //持久訂閱需要設置這個。 31 connection.start(); 32 int INDIVIDUAL_ACK_TYPE = 4; 33 // Session: 一個發送或接收消息的線程 34 final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 35 // Destination :消息的目的地;消息送誰那獲取. 36 Topic topic = session.createTopic("STOCKS.myTopic");// 創建topic 37 38 // 消費者,消息接收者 39 // MessageConsumer consumer1 = session.createConsumer(destination); 普通訂閱 40 MessageConsumer consumer1 = session.createDurableSubscriber(topic,"anpei");//持久化訂閱 41 42 consumer1.setMessageListener(new MessageListener() { 43 44 @Override 45 public void onMessage(Message msg) { 46 47 try { 48 49 TextMessage message = (TextMessage) msg; 50 System.out.println("AAAAAAAAA收到消息: " + message.getText());54 // JmsUtils.commitIfNecessary(session); 55 session.commit(); 56 57 58 } catch (JMSException e) { 59 e.printStackTrace(); 60 } 61 62 } 63 }); 64 65 /*// 再來一個消費者,消息接收者 66 MessageConsumer consumer2 = session.createConsumer(destination); 67 68 consumer2.setMessageListener(new MessageListener() { 69 @Override 70 public void onMessage(Message msg) { 71 72 try { 73 74 TextMessage message = (TextMessage) msg; 75 System.out.println("BBBBBBBBB收到消息: " + message.getText()); 76 JmsUtils.commitIfNecessary(session); 77 // session.commit(); 78 } catch (JMSException e) { 79 e.printStackTrace(); 80 } 81 82 } 83 });*/ 84 } catch (Exception e) { 85 e.printStackTrace(); 86 } 87 } 88 89 }
上面程序運行后輸出以下信息:
AAAAAAAAA收到消息: j0
AAAAAAAAA收到消息: j1
AAAAAAAAA收到消息: j2
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Session: 一個發送或接收消息的線程
一、事務
這一句的 true 代表是否使用事務,true:使用 false:不使用。
使用事務和不使用事務的區別:↓
如果使用事務,下面一定要session.commit();提交。 如果不使用事務,則需要把這句話刪掉,因為沒有事務可以提交,會報錯。
如果使用事務,一切都要按照事務的規則來。如果為false,則需要使用ACK機制進行確認接收。如果不確認接收,將一直收到消息通知。
二、ACK機制
Session.AUTO_ACKNOWLEDGE ACK機制,對接收的消息進行處理
- AUTO_ACKNOWLEDGE = 1 自動確認
- CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
- DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
- SESSION_TRANSACTED = 0 事務提交並確認
此外AcitveMQ補充了一個自定義的ACK_MODE:
- INDIVIDUAL_ACKNOWLEDGE = 4 單條消息確認
- 具體請參考 http://blog.csdn.net/lulongzhou_llz/article/details/42270113
三、持久化消息
定義:消息持久性對於可靠消息傳遞來說應該是一種比較好的方法,有了消息持久化,即使發送者和接受者不是同時在線(服務同時啟動)或者消息中心在發送者發送消息后宕機了,在消息中心重新啟動后仍然可以將消息發送出去。(意思就是說,即使消費者處於離線狀態,在重新登陸后,依然可以收到消息) - 上面一對多訂閱的代碼,是我持久化過后的代碼,通過:
connection.setClientID("zhangsan"); //持久訂閱需要設置這個。
MessageConsumer consumer1 = session.createDurableSubscriber(topic,"zhangsan");//持久化訂閱
如:我是張三,我是張三,請給我饅頭。我是王七,我是王七,請給我饅頭。對比一對一消息推送,很容易就看懂了。
上面程序接收的消息,接收一次,就不能接收了,因為 - AUTO_ACKNOWLEDGE = 1 自動確認
程序在第一次運行,就已經自動確認已經接收了消息,ActiveMQ將自動刪除已經確認的消息。
如果哪個地方有問題,歡迎留言,謝謝。