一、訂閱雜志
我們很多人都訂過雜志,其過程很簡單。只要告訴郵局我們所要訂的雜志名、投遞的地址,付了錢就OK。出版社定期會將出版的雜志交給郵局,郵局會根據訂閱的列表,將雜志送達消費者手中。這樣我們就可以看到每一期精彩的雜志了。
仔細思考一下訂雜志的過程,我們會發現這樣幾個特點:
1、消費者訂雜志不需要直接找出版社;
2、出版社只需要把雜志交給郵局;
3、郵局將雜志送達消費者。
郵局在整個過程中扮演了非常重要的中轉作用,在出版社和消費者相互不需要知道對方的情況下,郵局完成了雜志的投遞。
二、 發布-訂閱消息模式
剛剛講了訂閱雜志,下面我們會講傳統調用模式演化到發布-訂閱消息模式。
有些網站在注冊用戶成功后發一封激活郵件,用戶收到郵件后點擊激活鏈接后才能使用該網站。一般的做法是在注冊用戶業務邏輯中調用發送郵件的邏輯。這樣用戶業務就依賴於郵件業務。如果以后改為短信激活,注冊用戶業務邏輯就必須修改為調用發送短信的邏輯。如果要注冊后給用戶加點積分,再加一段邏輯。經過多次修改,我們發現很簡單的注冊用戶業務已經越來越復雜,越來越難以維護。相信很多開發者都會有類似痛苦的經歷。
即使用戶業務實現中對其他業務是接口依賴,也避免不了業務變化帶來的依賴影響。怎么辦?解耦!將注冊用戶業務邏輯中注冊成功后的處理剝離出來。
再回頭看看“訂閱雜志”,如果沒有郵局,出版社就必須自己將雜志送達所有消費者。這種情形就和現在的注冊用戶業務一樣。我們發現問題了,在用戶業務和其他業務之間缺少了郵局所扮角色。
我們把郵局抽象成一個管理消息的地方,叫“消息管理器”。注冊用戶成功后發送一個消息給消息管理器,由消息管理器轉發該消息給需要處理的業務。現在,用戶業務只依賴於消息管理器了,它再也不會為了注冊用戶成功后的其他處理而煩惱。
注冊用戶的改造就是借鑒了“訂閱雜志”這樣原始的模式。我們再進一步抽象,用戶業務就是消息的“生產者”,它將消息發布到消息管理器。郵件業務就是消息的“消費者”,它將收到的消息進行處理。郵局可以訂閱很多種雜志,雜志都是通過某種編號來區分;消息管理器也可以管理多種消息,每種消息都會有一個“主題”來區分,消費者都是通過主題來訂閱的。
發布-訂閱消息模式已經呈現在我們面前,在這里,對於發布者來說,它和所有的訂閱者就構成了一個1對多的關系。這種關系如下圖所示:
示例:
1、Publish.java:消息發布者
package com.ljq.durian.test.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Publish { private ConnectionFactory factory; private Connection connection; private Session session; private MessageProducer producer; public Publish() { try { factory = new ActiveMQConnectionFactory("ljq", "ljq", "failover:(tcp://localhost:61616)?Randomize=false"); connection = factory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); } catch (Exception e) { e.printStackTrace(); } } public void sendMessage() throws Exception { Destination destination = session.createTopic("Topic001"); TextMessage msg = session.createTextMessage("我是消息內容..."); producer.send(destination, msg); if(connection != null){ connection.close(); } } public static void main(String[] args) throws Exception { Publish publish= new Publish(); publish.sendMessage(); } }
2、Subscriber1.java:消息訂閱者
package com.ljq.durian.test.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Subscriber1 { private ConnectionFactory factory; private Connection connection; private Session session; public Subscriber1() { try { factory = new ActiveMQConnectionFactory("ljq", "ljq", "failover:(tcp://localhost:61616)?Randomize=false"); connection = factory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); } catch (Exception e) { e.printStackTrace(); } } public void receive() throws Exception { Destination topic = session.createTopic("Topic001") ; MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new Listener()); } class Listener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage tm = (TextMessage) message; System.out.println("Subscriber1 Received message: " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Subscriber1 subscriber = new Subscriber1(); subscriber.receive(); } }
3、Subscriber2.java:消息訂閱者
package com.ljq.durian.test.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Subscriber2 { private ConnectionFactory factory; private Connection connection; private Session session; public Subscriber2() { try { factory = new ActiveMQConnectionFactory("ljq", "ljq", "failover:(tcp://192.168.1.101:61616)?Randomize=false"); connection = factory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); } catch (Exception e) { e.printStackTrace(); } } public void receive() throws Exception { Destination topic = session.createTopic("Topic001") ; MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new Listener()); } class Listener implements MessageListener { public void onMessage(Message message) { System.out.println(message); try { TextMessage tm = (TextMessage) message; System.out.println("Subscriber2 Received message: " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Subscriber2 subscriber = new Subscriber2(); subscriber.receive(); } }