activemq安裝與簡單消息發送接收實例


安裝環境:
Activemq5.11.1, jdk1.7(activemq5.11.1版本需要jdk升級到1.7),虛擬機: 192.168.147.131
[root@localhost software]# pwd
/export/software [root@localhost software]# tar -zxvf apache-activemq-5.11.1-bin.tar.gz [root@localhost software]# mv apache-activemq-5.11.1 /usr/local
配置Nginx代理Activemq后台管理應用默認綁定的8161端口  
upstream tomcat_tools.activemq.local { server 127.0.0.1:8161  weight=10 max_fails=2 fail_timeout=300s; } server { listen 80; server_name tools.activemq.local.com; root /usr/local/apache-activemq-5.11.1/webapps/; access_log /usr/local/apache-activemq-5.11.1/logs/tools.activemq.local.com_access.log main; error_log /usr/local/apache-activemq-5.11.1/logs/tools.activemq.local.com_error.log warn; error_page 403 404 /40x.html; location / { index index.html index.htm; proxy_next_upstream http_500 http_502 http_503 http_504 error timeout invalid_header; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_pass http://tomcat_tools.activemq.local;
 } #靜態文件,nginx自己處理 location ~ ^/(images|javascript|js|css|flash|media|static)/ { #過期30天,靜態文件不怎么更新,過期可以設大一點, #如果頻繁更新,則可以設置得小一點。 expires 30d; } }
重啟nginx
啟動activemq
[root@localhost linux-x86-64]# pwd
/usr/local/apache-activemq-5.11.1/bin/linux-x86-64 [root@localhost linux-x86-64]# ./activemq start
 
        
配置host[192.168.147.131 tools.activemq.local.com]

登錄activemq的后台,默認賬號 admin/admin
http://tools.activemq.local.com/admin

實例展示MQ消息的發送和接收[消息類型分為queue 和 Topic]
pom引入
 <dependency>
     <groupId>org.apache.activemq</groupId>
     <artifactId>activemq-all</artifactId>
     <version>5.11.1</version>
 </dependency>
Queue類型消息

1、定義消息destination和brokerUrl[61616為activemq用於消息通訊的端口]
public class Constant { public static final String brokerURL = "tcp://192.168.147.131:61616"; public static final String queueDestination = "testQueue"; }
 
        
2、編寫消息的發送程序
package com.mq.base.queue; import javax.jms.*; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * created on 2015/6/4 * @author dennisit@163.com * @version 1.0 */
public class MqSender { public static void main(String[] args) throws JMSException { // 默認的賬號和密碼為null
        String username = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; // 初始化連接工廠, DEFAULT_BROKER_URL =failover://tcp://localhost:61616
        ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, Constant.brokerURL); // 創建連接
        Connection connection = factory.createConnection(); connection.start(); // 創建會話
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 創建消息主題Queue
        Destination destination = session.createQueue(Constant.queueDestination); // MessageProducer負責發送消息
        MessageProducer producer = session.createProducer(destination); // 消息不持久化
 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); ObjectMessage message = session.createObjectMessage("hello world..."); producer.send(message); // 只有commit之后,消息才會進入隊列
 session.commit(); System.out.println("send..."); // 測試狀態,這里把關閉會話和連接注釋掉了。 // session.close(); // connection.close();
 } }
 
        

 執行消息發送,在管理后台查看


3、編寫消息的消費程序

package com.mq.base.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * created on 2015/6/4 * @author dennisit@163.com * @version 1.0 */
public class MqReceiver { public static void main(String[] args) throws JMSException { // 默認的賬號和密碼為null
        String username = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; // 初始化連接工廠, DEFAULT_BROKER_URL =failover://tcp://localhost:61616
        ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, Constant.brokerURL); // 創建連接
        Connection connection = factory.createConnection(); connection.start(); // 創建會話
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(Constant.queueDestination); // MessageConsumer負責接受消息
        MessageConsumer consumer = session.createConsumer(destination); ObjectMessage message = (ObjectMessage)consumer.receive(); if (null != message) { String messageString = (String)message.getObject(); System.out.println("Receive : " + messageString); } // 測試狀態,這里把關閉會話和連接注釋掉了。 // session.close(); // connection.close();
 } }

執行這段代碼會輸出接收到的消息內容:

管理后台在查看queue中心結果如下:

Topic類型消息

1、定義消息destination和brokerUrl[61616為activemq用於消息通訊的端口]

public class Constant { public static final String brokerURL = "tcp://192.168.147.131:61616"; public static final String topicDestination = "testTopic"; }

2、編寫消息生產者

package com.mq.base.topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * created on 2015/6/4 * @author dennisit@163.com * @version 1.0 */
public class MqSender { public static void main(String[] args) throws JMSException { // 默認的賬號和密碼為null
        String username = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; // 初始化連接工廠, DEFAULT_BROKER_URL =failover://tcp://localhost:61616
        ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, com.mq.base.queue.Constant.brokerURL); // 創建連接
        Connection connection = factory.createConnection(); connection.start(); // 創建會話
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 創建消息主題Topic,和Queue的區別就在此
        Destination destination = session.createTopic(Constant.topicDestination); // MessageProducer負責發送消息
        MessageProducer producer = session.createProducer(destination); // 消息不持久化
 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage message = session.createTextMessage(); // createObjectMessage("hello world...");
        message.setStringProperty("msgId","topicMessage"); producer.send(message); // 只有commit之后,消息才會進入隊列
 session.commit(); System.out.println("send..."); // 測試狀態,這里把關閉會話和連接注釋掉了。 // session.close(); // connection.close();
 } }

3、編寫消息消費者

package com.mq.base.topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * created on 2015/6/4 * @author dennisit@163.com * @version 1.0 */
public class MqReceiver { public static void main(String[] args) throws JMSException { // 默認的賬號和密碼為null
        String username = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; // 初始化連接工廠, DEFAULT_BROKER_URL =failover://tcp://localhost:61616
        ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, com.mq.base.queue.Constant.brokerURL); // 創建連接
        Connection connection = factory.createConnection(); connection.start(); // 創建會話
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic(Constant.topicDestination); // MessageConsumer負責接受消息
        MessageConsumer consumer = session.createConsumer(destination); TextMessage message = (TextMessage)consumer.receive(); if (null != message) { String messageString = message.getStringProperty("msgId"); System.out.println("Receive : " + messageString); session.commit(); } // 測試狀態,這里把關閉會話和連接注釋掉了。 // session.close(); // connection.close();
 } }

先啟動消費者:

啟動生產者,生產消息,此時會接收到消息如圖:

觀察topic后台管理

 

Queue模型消息和Topic模型消息區別

queue[點對點模型]
1、只有一個消費者
每條消息只有一個消費者,如果這條消息被消費,那么其它消費者不能接受到此消息。
2、時間無關性
消息的消費和時間無關,只要消息被發送了,在消息過期之前,如果沒有其他消費者消費了這個消息,那么客戶端可以在任何時候來消費這條消息。
3、消費者必須確認
消費者收到消息之后,必須向Message Provider確認,否則會被認為消息沒有被消費,仍然可以被其他消費者消費。可以設置自動確認。這個特點其實也是保證一條消息只能由一個消費者來消費。
4、非持久化的消息只發一次
非持久化的消息,可能會丟失,因為消息會過期,另外Message Provider可能宕機。
5、持久化的消息嚴格發一次
消息可以被持久化,比如持久化在文件系統或者數據庫中,這樣可以避免Message Provider的異常或者其它異常導致消息丟失。

Topic[發布者/訂閱者模型]
1、每條消息可以有多個訂閱者
2、訂閱者只能消費它們訂閱topic之后的消息
3、非持久化訂閱,訂閱者必須保持為活動狀態才能使用這些消息,如果一個訂閱者A斷開了10分鍾,那么A就會收不到這10分鍾內的消息。
4、持久化訂閱,Message Provider會保存這些消息,即使訂閱者因為網絡原因斷開了,再重新連接以后,能讓消費這些消息。
5、是否使用持久化訂閱,需要根據業務場景判斷。

 

轉載請注明出處:[http://www.cnblogs.com/dennisit/p/4551182.html]

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM