ActiveMQ發布-訂閱消息模式


一、訂閱雜志
我們很多人都訂過雜志,其過程很簡單。只要告訴郵局我們所要訂的雜志名、投遞的地址,付了錢就OK。出版社定期會將出版的雜志交給郵局,郵局會根據訂閱的列表,將雜志送達消費者手中。這樣我們就可以看到每一期精彩的雜志了。

仔細思考一下訂雜志的過程,我們會發現這樣幾個特點:
1、消費者訂雜志不需要直接找出版社;
2、出版社只需要把雜志交給郵局;
3、郵局將雜志送達消費者。
郵局在整個過程中扮演了非常重要的中轉作用,在出版社和消費者相互不需要知道對方的情況下,郵局完成了雜志的投遞。

二、 發布-訂閱消息模式
剛剛講了訂閱雜志,下面我們會講傳統調用模式演化到發布-訂閱消息模式。

有些網站在注冊用戶成功后發一封激活郵件,用戶收到郵件后點擊激活鏈接后才能使用該網站。一般的做法是在注冊用戶業務邏輯中調用發送郵件的邏輯。這樣用戶業務就依賴於郵件業務。如果以后改為短信激活,注冊用戶業務邏輯就必須修改為調用發送短信的邏輯。如果要注冊后給用戶加點積分,再加一段邏輯。經過多次修改,我們發現很簡單的注冊用戶業務已經越來越復雜,越來越難以維護。相信很多開發者都會有類似痛苦的經歷。

即使用戶業務實現中對其他業務是接口依賴,也避免不了業務變化帶來的依賴影響。怎么辦?解耦!將注冊用戶業務邏輯中注冊成功后的處理剝離出來。

再回頭看看“訂閱雜志”,如果沒有郵局,出版社就必須自己將雜志送達所有消費者。這種情形就和現在的注冊用戶業務一樣。我們發現問題了,在用戶業務和其他業務之間缺少了郵局所扮角色。

我們把郵局抽象成一個管理消息的地方,叫“消息管理器”。注冊用戶成功后發送一個消息給消息管理器,由消息管理器轉發該消息給需要處理的業務。現在,用戶業務只依賴於消息管理器了,它再也不會為了注冊用戶成功后的其他處理而煩惱。

注冊用戶的改造就是借鑒了“訂閱雜志”這樣原始的模式。我們再進一步抽象,用戶業務就是消息的“生產者”,它將消息發布到消息管理器。郵件業務就是消息的“消費者”,它將收到的消息進行處理。郵局可以訂閱很多種雜志,雜志都是通過某種編號來區分;消息管理器也可以管理多種消息,每種消息都會有一個“主題”來區分,消費者都是通過主題來訂閱的。

發布-訂閱消息模式已經呈現在我們面前,在這里,對於發布者來說,它和所有的訂閱者就構成了一個1對多的關系。這種關系如下圖所示:

示例:
1、Publish.java:消息發布者

package com.ljq.durian.test.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Publish {
    private ConnectionFactory factory;
    private Connection connection;
    private Session session;
    private MessageProducer producer;

    public Publish() {
        try {
            factory = new ActiveMQConnectionFactory("ljq", "ljq", "failover:(tcp://localhost:61616)?Randomize=false");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(null);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void sendMessage() throws Exception {
        Destination destination = session.createTopic("Topic001");
        TextMessage msg = session.createTextMessage("我是消息內容...");
        producer.send(destination, msg);
        
        if(connection != null){
            connection.close();
        }    
    }

    public static void main(String[] args) throws Exception {
        Publish publish= new Publish();
        publish.sendMessage();
    }
}

2、Subscriber1.java:消息訂閱者

package com.ljq.durian.test.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Subscriber1 {
    private ConnectionFactory factory;
    private Connection connection;
    private Session session;

    public Subscriber1() {
        try {
            factory = new ActiveMQConnectionFactory("ljq", "ljq", "failover:(tcp://localhost:61616)?Randomize=false");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void receive() throws Exception {
        Destination topic = session.createTopic("Topic001") ;
        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(new Listener());
    }

    class Listener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            try {
                TextMessage tm = (TextMessage) message;
                System.out.println("Subscriber1 Received message: " + tm.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Subscriber1 subscriber = new Subscriber1();
        subscriber.receive();
    }
}

3、Subscriber2.java:消息訂閱者

package com.ljq.durian.test.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Subscriber2 {
    private ConnectionFactory factory;
    private Connection connection;
    private Session session;

    public Subscriber2() {
        try {
            factory =
                    new ActiveMQConnectionFactory("ljq", "ljq",
                            "failover:(tcp://192.168.1.101:61616)?Randomize=false");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void receive() throws Exception {
        Destination topic = session.createTopic("Topic001") ;
        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(new Listener());
    }

    class Listener implements MessageListener {
        public void onMessage(Message message) {
            System.out.println(message);
            try {
                TextMessage tm = (TextMessage) message;
                System.out.println("Subscriber2 Received message: " + tm.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Subscriber2 subscriber = new Subscriber2();
        subscriber.receive();
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM