ActiveMQ介紹
ActiveMQ是一種消息中間件,有兩種模式,一種點對點模式 發布者將發布的消息發送給服務器,等待用戶監聽並接受數據;第二種訂閱模式 發布者將消息發布給消息服務器,讓服務器將所有的數據直接轉發給再監聽的用戶,進行一對多通信(類似微信公眾號)。
ActiveMQ主要是為了降低程序間的耦合,以及異步執行處理時間較長的代碼(如網絡通信)。
點對點模式:
發布者發布8條信息,這時有3個用戶在監聽服務器消息,則3個用戶共同消費這8條消息。服務器中的每條消息只能被一個用戶消費,這種模式服務器會存儲發布者發布的數據,當未被用戶接收的數據則會留在服務器中,等待下個監聽服務器的用戶接收數據。
訂閱模式(持久訂閱模式/非持久訂閱模式):
發布者發布消息給消息服務器,消息服務器則將消息直接轉發給監聽的用戶,這要求發布者發布消息的同時用戶也在監聽消息,若沒有用戶監聽, 則不保留數據,認為數據已發送完成。也就是發布者發布時,用戶沒在監聽消息,則不會在收到該數據。即使用戶以后再監聽也接收不到
持久訂閱模式:訂閱者會注冊一個clientId,當訂閱者離線時,ActiveMQ會為這個 ID 保存所有擁有這個ID的主題的消息,當訂閱者連接時,則會通過自己的clientId得到所有自己處於離線時所要接收主題消息
非持久訂閱模式:只有當訂閱者處於連接狀態才會接收到發布者發布出來的消息,並且發送完成后ActiveMQ則將消息丟棄。
在程序中使用ActiveMQ
從官網中下載activeMQ,下載地址:http://activemq.apache.org/download.html
解壓后,打開目錄下的bin,根據自己的系統選擇win32或win64安裝Active服務,並開啟activeMQ
開啟后瀏覽器訪問該地址:http://127.0.0.1:8161/,選擇 Manage ActiveMQ broker,輸入賬號密碼,默認都是admin
創建一個maven項目,在pom.xml文件中引入jar包
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
點對點模式
創建一個消費者和一個生產者的類
Consumer.java 消費者通過消息監聽器監聽服務器上的信息
package cn.lcf.activeMQ; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * Hello world! * */ public class Consumer { //設置連接地址 private static final String url = "tcp://127.0.0.1:61616"; //設置消息隊列名稱 private static final String queueName = "queue-text"; public static void main(String[] args) throws JMSException { // 1、創建連接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 2、創建連接對象 Connection createConnection = connectionFactory.createConnection(); // 3、啟動連接 createConnection.start(); // 4、創建會話 createSession第一個參數表示是否支持事務,第二個參數是客戶端接收確認模式,Session.AUTO_ACKNOWLEDGE是自動確認,Session.CLIENT_ACKNOWLEDGE 客戶端通過調用消息的 acknowledge 方法簽收消息。 Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5、創建消息目標 Queue createQueue = createSession.createQueue(queueName); // 6 、創建消費者 MessageConsumer createConsumer = createSession.createConsumer(createQueue); // 7、設置消費者監聽 createConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收的消息為" + textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } }
producer.java 生產者跟消費者差不多,這是第6步開始,變成創建生產者,並發送消息,發送完成之后需要關閉連接。
package cn.lcf.activeMQ; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * Hello world! * */ public class Producer { private static final String url = "tcp://127.0.0.1:61616"; private static final String queueName = "queue-text"; public static void main( String[] args ) throws JMSException { // 1、創建連接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 2、創建連接對象 Connection createConnection = connectionFactory.createConnection(); // 3、啟動連接 createConnection.start(); // 4、創建會話 Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5、創建消息目標 Queue createQueue = createSession.createQueue(queueName); // 6、創建生產者 MessageProducer createProducer = createSession.createProducer(createQueue); for (int i=0;i<100;++i) { // 7、創建消息 TextMessage textMessage = createSession.createTextMessage("666 " + i); // 8、發布消息 createProducer.send(textMessage); System.out.println("發送的消息為:" + "666 " + i); } // 9、關閉連接 createConnection.close(); } }
運行下生產者的類 Producer.java,將消息存到ActiveMQ服務器上
查看ActiveMQ中的隊列信息
擁有100條信息,未出列,這時候運行一個消費者(Consumer.java)去消費這100條信息
查看ActiveMQ上的信息,100條信息全被這個1個消費者接收
清空,然后同時運行三個消費者(Consumer.java執行三次后可在console切換不同類的控制台),在運行一個生產者
可以看到這100條消息被這三個消費者平分了。
點對點模式主要用於消除程序高並發高峰對數據庫造成的巨大壓力,可以通過使用消息隊列,讓消費者進程從消息隊列中獲取數據,然后異步將數據寫入數據庫,由於消息隊列的服務處理速度遠快於數據庫,因此用戶的響應延遲可得到有效改善。
訂閱模式(非持久訂閱)
將消費者(Consumer.java)修改成以下內容,寫法相同,只是session不再是創建隊列消費者,而是創建主題消費者
package cn.lcf.TestActiveMQ; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消費者 * */ public class Consumer { private static final String URL = "tcp://127.0.0.1:61616"; //訂閱模式名稱 private static final String topicName = "topic-name"; public static void main(String[] args) throws JMSException { //創建連接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); //創建連接 Connection createConnection = connectionFactory.createConnection();//打開連接 createConnection.start(); //創建會話 Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); //創建發布/訂閱模式消息 Topic createTopic = createSession.createTopic(topicName);// 非持久訂閱 //創建消費者 MessageConsumer createConsumer = createSession.createConsumer(createTopic); //設置消費者監聽 createConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收的消息為:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
將生產者(Producer.java)修改成以下內容
package cn.lcf.TestActiveMQ; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; /** * 生產者 * */ public class Producer { private static final String URL = "tcp://127.0.0.1:61616"; //發布/訂閱模式名稱 private static final String topicName = "topic-name"; public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); Connection createConnection = connectionFactory.createConnection(); createConnection.start(); Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); //發布/訂閱模式 Topic createTopic = createSession.createTopic(topicName); MessageProducer createProducer = createSession.createProducer(createTopic); for (int i = 0; i < 100; i++) { TextMessage textMessage = createSession.createTextMessage("666 " + i); createProducer.send(textMessage); System.out.println("發送的消息為:" + textMessage.getText()); } createConnection.close(); } }
之后先運行消費者,在運行生產者,消費者才能接受到信息,否則生產者發布信息時若沒有在監聽的消費者則會將信息丟棄,這樣消費者是接收不到信息的。
同時運行多個消費者,在運行生產者,消費者將獲取生產者發布的所有消息
訂閱模式(持久訂閱)
持久訂閱模式的客戶端需要創建一個鏈接id,以保證服務器確認該客戶端是否已消費信息,創建完訂閱模式,之后不再是創建一個消費者,而是創建一個帶有id的用戶,這個用戶id是唯一的,若有兩個相同的id連接,則會報錯。
public static void main(String[] args) throws JMSException { //創建連接工廠 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); //創建連接 Connection createConnection = connectionFactory.createConnection(); //創建客戶端ID createConnection.setClientID("333"); //打開連接 createConnection.start(); //創建會話 Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); //創建發布/訂閱模式消息 Topic createTopic = createSession.createTopic(topicName); //創建持久訂閱 即未在發布者發布時監聽消息,在之后也能接收消息 TopicSubscriber subscriber = createSession.createDurableSubscriber(createTopic, "333"); subscriber.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受消息:" + textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); }
生產者需要將消息模式設為持久訂閱模式
public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); Connection createConnection = connectionFactory.createConnection(); createConnection.start(); Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); //發布/訂閱模式 Topic createTopic = createSession.createTopic(topicName); MessageProducer createProducer = createSession.createProducer(createTopic); //設置為持久訂閱模式 createProducer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 0; i < 100; i++) { TextMessage textMessage = createSession.createTextMessage("666 " + i); createProducer.send(textMessage); System.out.println("發送的消息為:" + textMessage.getText()); } createConnection.close(); }
運行用戶(Consumer.java),創建連接id,之后將用戶連接關閉,啟動生產者(Producer.java)發布消息,最后在重新連接用戶獲取信息。當用戶離線狀態時,發布者發布的消息會將信息存在activeMQ服務器上,等待用戶監聽時將消息發送給用戶。
生成的用戶會在subscribers中顯示
運行發布者(Producer.java),並且用戶處於離線狀態,則會顯示消息等待出列。
最后再次連接上用戶(Consumer.java),則用戶的能立即獲取消息。
持久傳輸和非持久傳輸最大的區別是:采用持久傳輸時,傳輸的消息會保存到磁盤中(messages are persisted to disk/database),即“存儲轉發”方式。先把消息存儲到磁盤中,然后再將消息“轉發”給訂閱者。
采用非持久傳輸時,發送的消息不會存儲到磁盤中。
采用持久傳輸時,當Borker宕機 恢復后,消息還在。采用非持久傳輸,Borker宕機重啟后,消息丟失。比如,當生產者將消息投遞給Broker后,Broker將該消息存儲到磁盤中,在Broker將消息發送給Subscriber之前,Broker宕機了,如果采用持久傳輸,Broker重啟后,從磁盤中讀出消息再傳遞給Subscriber;如果采用非持久傳輸,這條消息就丟失了。