ActiveMq(二) - 點對點模式


1、概念

JMS有兩種消息通信模型,點對點模型(point to point)(p2p)和發布/訂閱模型(pub/sub),ActiveMq實現了JMS接口,同樣,ActiveMq也實現了這兩種模型。 點對點模型規定了一個消息只能有一個接收者,而發布/訂閱模型允許一個消息可以有多個消息接收者

點對點模型

當一個消息生產者產生一個消息時,會把消息放入一個隊列(Queue)中,然后消息消費者從Queue中讀取消息,如果同時有多個消費者讀取消息,ActiveMq保證消息只會被一個消費者讀取到,消費者讀取到消息之后需要向ActiveMq發送一條確認信息,確認消息已經被接收,此時,隊列(Queue)中的消息出隊,整個流程就處理完了。

2、幾個重要的類

> * ConnectionFactory:工廠類接口,通過這個工廠類接口就可以創建一個與ActiveMq的連接,即Connection。 > * Connection:連接,可以通過這個連接創建會話Session,連接默認是關閉的,需要顯式調用Connection的start方法。 > * Session:會話,通過會話類可以創建生產者或消費者、創建隊列Queue、創建消息對象。 > * MessageProducer:消息生產者,可將消息發送到隊列中。 > * Queue:隊列,存放消息的隊列。 > * MessageConsumer:消費者,從隊列中取出消息。 > * Message:消息接口,常用的實現類是TextMessage和ObjectMessage。

3、三種消息確認機制

> * Session.AUTO_ACKNOWLEDGE:自動確認,當消費接收到消息時,會自動的向ActiveMq發送收到消息確認 > * Session.CLIENT_ACKNOWLEDGE:消費者手動調用Message的acknowledge方法手動確認收到消息。發生在Session層面,當手動調用確認方法時,會把Session中所有已消費的消息都確認 > * Session.DUPS_OK_ACKNOWLEDGE:不必確認已收到消息。但它可能會引起消息的重復接收,但是降低了Session的開銷(不需要再去ACKNOWLEDGE一次),所以只有客戶端可以重復接收消息時,才可使用此模式

4、生產者代碼

``` package com.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
public static void main(String[] args) throws JMSException {
//通過tcp協議獲取連接工廠
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://server01:61616");

	//通過連接工廠獲取連接
	Connection connection = factory.createConnection();
	connection.start();//開啟連接
	
	/**
	 * 創建session,此處有兩個參數
	 * 	第一個布爾類型的參數表示是否開啟事務
	 *  第二個表示消息確認簽收模式,此處為自動確認
	 */
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	
	//創建消息隊列
	Queue queue = session.createQueue("queue");
	
	//創建消息生產者
	MessageProducer producer = session.createProducer(queue);
    //producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    //DeliveryMode.PERSISTENT:持久化(默認),當ActiveMq關閉時,隊列中的消息會保存
    //DeliveryMode.NON_PERSISTENT:非持久化,當ActiveMq關閉時,隊列中的消息會丟失
	
	//生產者發送5條消息
	for(int i = 0 ; i < 5; i++){
		Message message = session.createTextMessage("消息" + i);
		producer.send(message);
		TextMessage text = (TextMessage)message;
		System.out.println("生產者發送消息:" + text.getText());
	}
}

}

啟動生產者,看到控制台打印出5條記錄:

生產者發送消息:消息0
生產者發送消息:消息1
生產者發送消息:消息2
生產者發送消息:消息3
生產者發送消息:消息4

再訪問ActiveMq控制台 http://server01:8161,查看Queues中的消息數量
![](https://images2018.cnblogs.com/blog/1373276/201804/1373276-20180415010903949-884323074.png)

此時看到Queues中已經多了5條消息,消費者數量為0
> * Name:隊列名稱
> * Number Of Pending Messages :等待被處理的消息數量
> * Number Of Consumers :消費者數量
> * Messages Enqueued :入隊的消息數量
> * Messages Dequeued:出隊的消息數量

<h1>5、消費者代碼</h1>

package com.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
public static void main(String[] args) throws JMSException {
//通過tcp協議獲取連接工廠
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://server01:61616");

	//通過連接工廠獲取連接
	Connection connection = factory.createConnection();
	connection.start();////開啟連接
	
	/**
	 * 創建session,此處有兩個參數
	 * 	第一個布爾類型的參數表示是否開啟事務
	 *  第二個表示消息確認簽收模式,此處為自動簽收
	 */
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	
	//創建消息隊列
	Queue queue = session.createQueue("queue");
	
	//創建消費者
	MessageConsumer consumer = session.createConsumer(queue);
	
	while(true){
		Message message = consumer.receive();
		if(message == null){
			break;
		}
		TextMessage textMessage = (TextMessage)message;
		System.out.println("消費者收到消息:" + textMessage.getText());
	}
}

}

啟動消費者,看到Eclipse控制台收到5條記錄,說明此時消費者已經收到消息了。

消費者收到消息:消息0
消費者收到消息:消息1
消費者收到消息:消息2
消費者收到消息:消息3
消費者收到消息:消息4


此時再看看ActiveMq控制台,待消費的消息數量已經為0,已經有一個消費者了,出隊的消息數量也是5,說明消費者已經向ActiveMq發送確認收到消息的通知了,隊列中的消息已經被消費完了。此時若再次啟動消費者,Eclipse控制台不會再打印消息。
![](https://images2018.cnblogs.com/blog/1373276/201804/1373276-20180415012048914-1223879859.png)

<h1>6、消費者監聽模式</h1>
消費者接收消息有兩個方法,上面講的方式一是調用receive方法來接收,第二種方式是注冊監聽器的方式來獲取消息。
創建兩個消費者,分別監聽在Queue上,當消費者產生消息放入Queue中時,兩個消費者都會去接收消息,但是ActiveMq會保證一個消息只有一個消費者能收到。

package com.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ConsumerListener {
public static void main(String[] args) throws JMSException {
//通過tcp協議獲取連接工廠
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://server01:61616");

	//通過連接工廠獲取連接
	Connection connection = factory.createConnection();
	connection.start();////開啟連接
	
	/**
	 * 創建session,此處有兩個參數
	 * 	第一個布爾類型的參數表示是否開啟事務
	 *  第二個表示消息確認簽收模式,此處為自動簽收
	 */
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	
	//創建消息隊列
	Queue queue = session.createQueue("queue");
	
	//創建消費者
	MessageConsumer consumer = session.createConsumer(queue);

	//監聽模式
	consumer.setMessageListener(new MessageListener() {
		
		public void onMessage(Message message) {
			TextMessage textMessage = (TextMessage)message;
			try {
				System.out.println("消費者一收到消息:" + textMessage.getText());
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	});
	
	MessageConsumer consumer1 = session.createConsumer(queue);
	consumer1.setMessageListener(new MessageListener() {
		public void onMessage(Message message) {
			TextMessage textMessage = (TextMessage)message;
			try {
				System.out.println("消費者二收到消息:" + textMessage.getText());
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	});
}

}

先啟動消費者程序,此時會看到ActiveMq控制台已經存在兩個消費者了,但是還沒有消息。
![](https://images2018.cnblogs.com/blog/1373276/201804/1373276-20180415013155244-1046460892.png)

再啟動生產者,會看到Eclipse控制台打印出5條記錄,表明5個消息分別被不同的消費者接收到了,也就是同一個消息只能被一個消費者接收

消費者一收到消息:消息0
消費者二收到消息:消息1
消費者一收到消息:消息2
消費者二收到消息:消息3
消費者一收到消息:消息4

此時再看一下ActiveMq控制台,待處理的消息已經為0,入隊和出隊的消息記錄都是5。
![](https://images2018.cnblogs.com/blog/1373276/201804/1373276-20180415013445828-567862121.png)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM