ActiveMQ學習之Broker


一、概念

           相當於一個ActiveMQ服務實例

                    Broker其實就是實現了用代碼的形式啟動了ActiveMQ將MQ嵌入到java代碼中,以便隨時用隨時啟動,在用的時候再去啟動這樣能節約資源,也保證了可靠性。

二、按照不同配置文件啟動ActiveMQ

           1、 先將ActiveMQ根目錄下conf文件夾中的activemq.xml復制一份並重命名為activemq02.xml

                命令(cp activemq.xml activemq02.xml)

           2、啟動activemq02.xml,默認啟動的是activemq.xml

                命令(./activemq start xbean:file:/usr/local/activeMQ/apache-activemq-5.15.11/conf/activemq02.xml)

三、嵌入式Broker

           用ActiveMQ Broker作為獨立的消息服務器來構建java應用。ActiveMQ也支持在虛擬機中通信,基於嵌入式的broker,能夠無縫的集成其他java應用

四、代碼

            1、pom.xml中引入包

  <!--activemq--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency> <!--fastjson--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.9</version> </dependency>

            2、broker代碼             


import org.apache.activemq.broker.BrokerService; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/10 16:04 * @Version: 1.0 */ public class EmbedBroker { public static void main(String[] args) throws Exception { //ActiveMQ也支持在虛擬機中通信,嵌入broker BrokerService brokerService=new BrokerService(); //將activeMQ嵌入到java程序中 brokerService.setUseJmx(true); //現在是將activeMQ嵌入到java程序中,所以使用本機 brokerService.addConnector("tcp://127.0.0.1:61616"); //啟動程序 brokerService.start(); } } 

            3、隊列生產者代碼    


import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/2 17:04 * @Version: 1.0 */ public class ActiveMQTest { //url路徑 private static final String ACTRIVE_URL="tcp://127.0.0.1:61616"; //隊列名稱 private static final String QUEUE_NAME="queue01"; public static void main(String[] args) { //1、創建連接工廠 //如果賬號密碼沒有修改的話,賬號密碼默認均為admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); try { //2、通過連接工廠獲取連接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3、創建session會話 //里面會有兩個參數,第一個為事物,第二個是簽收 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //4、創建目的地(具體是隊列還是主題),這里是創建隊列 Queue queue=session.createQueue(QUEUE_NAME); //5、創建消息生產者,隊列模式 MessageProducer messageProducer = session.createProducer(queue); //6、通過messageProducer生產三條消息發送到MQ消息隊列中 for (int i=0;i<3;i++){ //7、創建消息 TextMessage textMessage = session.createTextMessage("msg----->" + i);//創建一個文本消息 //8、通過messageProducer發送給mq messageProducer.send(textMessage); //9、數據非持久化 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); } messageProducer.close(); session.commit(); session.close(); connection.close(); System.out.println("消息發送成功"); } catch (JMSException e) { e.printStackTrace(); } } } 

            3、隊列消費者代碼   

import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/3 8:47 * @Version: 1.0 */ public class ActiveMQConsumer { //url路徑 private static final String ACTRIVE_URL="tcp://127.0.0.1:61616"; //隊列名稱 private static final String QUEUE_NAME="queue01"; public static void main(String[] args) { //1、創建連接工廠 //如果賬號密碼沒有修改的話,賬號密碼默認均為admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //如果賬號密碼修改的話 //第一個參數為賬號,第二個為密碼,第三個為請求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //2、通過連接工廠獲取連接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //3、創建session會話 //里面會有兩個參數,第一個為事物,第二個是簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4、這里接受的queue的名稱要和發送者的一致 Queue queue = session.createQueue(QUEUE_NAME); //5、創建消費者 MessageConsumer consumer = session.createConsumer(queue); //6、通過監聽的方式消費消息 while(true){ //MessageConsumer 調用的receive方法為同步調用,在消息到達之前一直阻塞線程 //用什么格式發送,這里就用什么格式接受 //receive等待消息,不限制時間 TextMessage message=(TextMessage)consumer.receive(); //receive帶參數等待消息,限制時間,單位毫秒 //TextMessage message=(TextMessage)consumer.receive(4000L); if(null != message){ System.out.println("接受的消息為------>"+message.getText()); }else{ break; } } //7、閉資源 consumer.close(); session.close(); connection.close(); }catch (Exception e){ e.printStackTrace(); } } } 

 

 來源:站長平台


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM