ActiveMQ的Broker


一、什么是ActiveMQ的Broker

  相當於一個ActiveMQ服務器實例.說白了,Broker其實就是實現了用代碼的形式啟動ActiveMQ,將MQ嵌入到Java代碼中.以便隨時需要隨時啟動,在用的時候再去啟動這樣能節省了資源,也保證了可用性.這種方式,我們實際開發中很少采用,因為他缺少太多了東西,比如:日志,數據存儲等等.

  

二、使用指定的配置文件來啟動Broker

  Broker:一個Broker就是Linux安裝的一個activemq軟件,我們只要把activemq.xml配置文件多復制幾份,就能在一台服務器上啟動多個Broker.

  具體步驟:

  1、到ActiveMQ的安裝目錄下面,進入到conf配置文件目錄,拷貝一份activemq.xml配置文件(/usr/local/activemq/apache-activemq-5.15.5/conf)

// 當前位置,ActiveMQ安裝位置的conf配置文件目錄
[root@localhost conf]# pwd
/usr/local/activemq/apache-activemq-5.15.5/conf
// 拷貝activemq.xml並命名為activemq02.xml
[root@localhost conf]# cp activemq.xml activemq02.xml
[root@localhost conf]# ll
總用量 96
-rw-r--r--. 1 root root 5911 9月  23 19:01 activemq02.xml
-rw-r--r--. 1 root root 5911 8月   6 2018 activemq.xml
-rw-r--r--. 1 root root 1370 8月   6 2018 broker.ks
-rw-r--r--. 1 root root  592 8月   6 2018 broker-localhost.cert
-rw-r--r--. 1 root root  665 8月   6 2018 broker.ts
-rw-r--r--. 1 root root 1357 8月   6 2018 client.ks
-rw-r--r--. 1 root root  665 8月   6 2018 client.ts
-rw-r--r--. 1 root root 1172 8月   6 2018 credentials-enc.properties
-rw-r--r--. 1 root root 1121 8月   6 2018 credentials.properties
-rw-r--r--. 1 root root  962 8月   6 2018 groups.properties
-rw-r--r--. 1 root root 1011 8月   6 2018 java.security
-rw-r--r--. 1 root root 1087 8月   6 2018 jetty-realm.properties
-rw-r--r--. 1 root root 7795 8月   6 2018 jetty.xml
-rw-r--r--. 1 root root  965 8月   6 2018 jmx.access
-rw-r--r--. 1 root root  964 8月   6 2018 jmx.password
-rw-r--r--. 1 root root 3071 8月   6 2018 log4j.properties
-rw-r--r--. 1 root root 1207 8月   6 2018 logging.properties
-rw-r--r--. 1 root root 1016 8月   6 2018 login.config
-rw-r--r--. 1 root root  961 8月   6 2018 users.properties

  

  2、使用命令 ./activemq start xbean:file:/usr/local/activemq/apache-activemq-5.15.5/conf/activemq02.xml啟動Broker

  (我們之前使用命令 ./activemq start:實際上就是默認啟動 /usr/local/activemq/apache-activemq-5.15.5/conf/activmq.xml配置文件,實際上完整的命令是:./activemq start xbean:file:/usr/local/activemq/apache-activemq-5.15.5/conf/activemq.xml)

  

  3、啟動完成之后,生產者生產消息

  

  4、消費者消費消息

 

三、嵌入式Broker啟動

  1、EmbedBroker API

public class EmbedBroker {
    public static void main(String[] args) throws Exception {
        //ActiveMQ也支持在vm中通信基於嵌入的broker
        BrokerService brokerService = new BrokerService();
        brokerService.setPopulateJMSXUserID(true);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }
}

  啟動控制台如下,可以看出本地確實啟動了一個ActiveMQ實例

  

  2、生產者

public class JmsQueueProducer {
    public static final String BROKER_URL = "tcp://127.0.0.1:61616";
    public static final String QUEUE_NAME = "queue01";
    public static final String TEXT_MESSAGE_NAME = "textMessage";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        for (int i = 1; i < 9; i++) {
            TextMessage textMessage = session.createTextMessage(TEXT_MESSAGE_NAME + i);
            producer.send(queue, textMessage);
        }
        System.out.println("生產者成功發送消息至MQ QUEUE......");
        // 釋放資源
        producer.close();
        session.close();
        connection.close();
    }
}

  

  3、消費者

public class JmsQueueConsumer {
    public static final String BROKER_URL = "tcp://127.0.0.1:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws IOException, JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer(queue);

        while (true) {
            Message message = consumer.receive(4 * 1000);
            if (message != null && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("接收到的消息是:" + textMessage.getText());
            } else {
                break;
            }
        }
        consumer.close();
        session.close();
        connection.close();
    }
}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM