Java ActiveMQ 講解(一)理解JMS 和 ActiveMQ基本使用


最近的項目中用到了mq,之前自己一直在碼農一樣的照葫蘆畫瓢。最近幾天研究了下,把自己所有看下來的文檔和了解總結一下。

一. 認識JMS

1.概述

對於JMS,百度百科,是這樣介紹的:JMS即Java消息服務(Java Message Service)應用程序接口是一個Java平台中關於面向消息中間件(MOM)的API,用於在兩個應用程序之間,或分布式系統中發送消息,進行異步通信。Java消息服務是一個與具體平台無關的API,絕大多數MOM提供商都對JMS提供支持。

簡短來說,JMS是一種與廠商無關的 API,用來訪問消息收發系統消息。它類似於JDBC(Java Database Connectivity),提供了應用程序之間異步通信的功能。

JMS1.0是jsr 194里規定的規范(關於jsr規范,請點擊)。目前最新的規范是JSR 343,JMS2.0。

好了,說了這么多,其實只是在說,JMS只是sun公司為了統一廠商的接口規范,而定義出的一組api接口。

2. JMS體系結構

描述如下:

  • JMS提供者(JMS的實現者,比如activemq jbossmq等)
  • JMS客戶(使用提供者發送消息的程序或對象,例如在12306中,負責發送一條購票消息到處理隊列中,用來解決購票高峰問題,那么,發送消息到隊列的程序和從隊列獲取消息的程序都叫做客戶)
  • JMS生產者,JMS消費者(生產者及負責創建並發送消息的客戶,消費者是負責接收並處理消息的客戶)
  • JMS消息(在JMS客戶之間傳遞數據的對象)
  • JMS隊列(一個容納那些被發送的等待閱讀的消息的區域)
  • JMS主題(一種支持發送消息給多個訂閱者的機制)

3. JMS對象模型

  • 連接工廠(connectionfactory)客戶端使用JNDI查找連接工廠,然后利用連接工廠創建一個JMS連接。
  • JMS連接 表示JMS客戶端和服務器端之間的一個活動的連接,是由客戶端通過調用連接工廠的方法建立的。
  • JMS會話 session 標識JMS客戶端和服務端的會話狀態。會話建立在JMS連接上,標識客戶與服務器之間的一個會話進程。
  • JMS目的 Destinatio 又稱為消息隊列,是實際的消息源
  • 生產者和消費者
  • 消息類型,分為隊列類型(優先先進先出)以及訂閱類型

二. ActiveMQ

1. ActiveMQ的安裝

  1. 從官網下載安裝包,http://activemq.apache.org/download.html
  2. 賦予運行權限 chmod +x,windows可以忽略此步
  3. 運行 ./active start | stop

啟動后,activeMQ會占用兩個端口,一個是負責接收發送消息的tcp端口:61616,一個是基於web負責用戶界面化管理的端口:8161。這兩個端口可以在conf下面的xml中找到。http服務器使用了jettry。
這里有個問題是啟動mq后,很長時間管理界面才可以顯示出來。

2. 用Java訪問ActiveMQ

先附上Bean代碼:

public class MqBean implements Serializable{
	private Integer age;
	private String name;
	public Integer getAge() {
		return age;
	}
	public void setAge(Integer age) {
		this.age = age;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
}

2.1 隊列消息的發送:

public static void main(String[] args) {
	ConnectionFactory connectionFactory;
	Connection connection;
	Session session;
	Destination destination;
	MessageProducer producer;
	connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
	try {
		connection = connectionFactory.createConnection();
		connection.start();
		//第一個參數是是否是事務型消息,設置為true,第二個參數無效
		//第二個參數是
		//Session.AUTO_ACKNOWLEDGE為自動確認,客戶端發送和接收消息不需要做額外的工作。異常也會確認消息,應該是在執行之前確認的
		//Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到消息后,必須調用javax.jms.Message的acknowledge方法。jms服務器才會刪除消息。可以在失敗的
		//時候不確認消息,不確認的話不會移出隊列,一直存在,下次啟動繼續接受。接收消息的連接不斷開,其他的消費者也不會接受(正常情況下隊列模式不存在其他消費者)
		//DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程序的方法調用從處理消息處返回,會話對象就會確認消息的接收;而且允許重復確認。在需要考慮資源使用時,這種模式非常有效。
		//待測試
		session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
		destination = session.createQueue("test-queue");
		producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		//優先級不能影響先進先出。。。那這個用處究竟是什么呢呢呢呢
		MqBean bean = new MqBean();
		bean.setAge(13);
		for(int i=0;i<100;i++){
			bean.setName("小黃"+i);
			producer.send(session.createObjectMessage(bean));
		}
		producer.close();
		System.out.println("呵呵");
	} catch (JMSException e) {
		e.printStackTrace();
	}
}

注:在上面的代碼中,確認模式有三種,里面的DUPS_OK_ACKNOWLEDGE和AUTO_ACKNOWLEDGE一直沒明白有什么區別。因為無法測試。不過大概也明白了一些。其實主要是MQ處理消息的流程決定的:

  1. 消息從生成方客戶端傳送到消息服務器。
  2. 消息服務器讀取消息。
  3. 消息被放置到持久性存儲器當中(出於可靠性的考慮)。
  4. 消息服務器確認收到消息(出於可靠性的考慮)。
  5. 消息服務器確定消息的路由。
  6. 消息服務器寫出消息。
  7. 消息從消息服務器傳送到使用方客戶端。
  8. 使用方客戶端確認收到消息(出於可靠性的考慮)。
  9. 消息服務器處理客戶端確認(出於可靠性的考慮)。
  10. 消息服務器確定已經處理客戶端確認。

這些步驟是連續的,所以任何步驟都可能成為消息從生成方客戶端到使用方客戶端的傳送過程的瓶頸。這些步驟中的大多數都取決於消息傳送系統的物理特征:網絡帶寬、計算機處理速度和消息服務器體系結構等等。但是,有一些步驟還取決於消息傳送應用程序的特征和該應用程序要求的可靠性級別。
其實就是基於可靠性還是性能的選擇.

2.2 隊列消息的接收:

public static void main(String[] args) {
	ConnectionFactory connectionFactory;
	// Connection :JMS 客戶端到JMS Provider 的連接  
	Connection connection = null;
	// Session: 一個發送或接收消息的線程  
	Session session;
	// Destination :消息的目的地;消息發送給誰.  
	Destination destination;
	// 消費者,消息接收者  
	MessageConsumer consumer;
	connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
	try {
		// 構造從工廠得到連接對象  
		connection = connectionFactory.createConnection();
		// 啟動  
		connection.start();
		// 獲取操作連接  
		//這個最好還是有事務
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		// 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置  
		destination = session.createQueue("test-queue");
		consumer = session.createConsumer(destination);
		consumer.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message message) {
				try {
					MqBean bean = (MqBean) ((ObjectMessage)message).getObject();
					System.out.println(bean);
					if (null != message) {
						System.out.println("收到消息" + bean.getName());
					}
				} catch (Exception e) {
					// TODO: handle exception
				}
			}
		});
	} catch (Exception e) {
		e.printStackTrace();
	}
}

注:對於隊列來說,比較簡單的優化策略,應該就是隊列分載了。由於每個消費者都是單線程的,所以可以設置多個消費者來提高速度。
大家可以復制個消費者自己測試下,在消費者中添加sleep測試下效果。

2.3 訂閱消息的發送

public static void main(String[] args) {
	ConnectionFactory connectionFactory;
	Connection connection;
	Session session;
	Destination destination;
	MessageProducer producer;
	connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
	try {
		connection = connectionFactory.createConnection();
		connection.start();
		
		session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
		destination = session.createTopic("test-topic");
		producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		//優先級不能影響先進先出。。。那這個用處究竟是什么呢呢呢呢
		MqBean bean = new MqBean();
		bean.setAge(13);
		for(int i=0;i<100;i++){
			Thread.sleep(1000);
			bean.setName("小黃"+i);
			producer.send(session.createObjectMessage(bean));
		}
		producer.close();
		System.out.println("呵呵");
	} catch (Exception e) {
		e.printStackTrace();
	}
}

2.4 訂閱消息的接收

public static void main(String[] args) {
	ConnectionFactory connectionFactory;
	// Connection :JMS 客戶端到JMS Provider 的連接  
	Connection connection = null;
	// Session: 一個發送或接收消息的線程  
	Session session;
	// Destination :消息的目的地;消息發送給誰.  
	Destination destination;
	// 消費者,消息接收者  
	MessageConsumer consumer;
	connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.3.159:61616");
	try {
		// 構造從工廠得到連接對象  
		connection = connectionFactory.createConnection();
		// 啟動  
		connection.start();
		// 獲取操作連接  
		//這個最好還是有事務
		session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		// 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置  
		destination = session.createQueue("test-queue");
		consumer = session.createConsumer(destination);
		consumer.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message message) {
				try {
					MqBean bean = (MqBean) ((ObjectMessage)message).getObject();
					System.out.println(bean);
					if (null != message) {
						System.out.println("收到消息" + bean.getName());
					}
				} catch (Exception e) {
					// TODO: handle exception
				}
			}
		});
	} catch (Exception e) {
		e.printStackTrace();
	}
}

以上的消息發送后,如果沒有接收到,可以登錄自己的MQ管理頁面:http://192.168.3.159:8161/admin/ ,默認帳號密碼都是admin,查看隊列中的消息

enter image description here

Number Of Pending Messages 等待消費的消息 這個是當前未出隊列的數量。可以理解為總接收數-總出隊列數
Messages Enqueued 進入隊列的消息 進入隊列的總數量,包括出隊列的。 這個數量只增不減
Messages Dequeued 出了隊列的消息 可以理解為是消費這消費掉的數量


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM