RabbitMQ 從入門到精通 (一)


[TOC]

初識RabbitMQ

RabbitMQ 是一個開源的消息代理和隊列服務器,用來通過普通協議在完全不同的應用之間共享數據,RabbitMQ是使用 Erlang語言來編寫的,並且RabbitMQ是基於AMQP協議的

RabbitMQ的優點:

  • 開源、性能優秀、穩定性保障
  • 提供可靠性消息投遞模式(confirm)、返回模式(return)
  • 與SpringAMQP完美的整合、API豐富
  • 集群模式豐富,表達式配置,HA模式,鏡像隊列模型
  • 保證數據不丟失的前提下做到高可靠性、可用性

RabbitMQ官網

RabbitMQ的整體架構:

  RabbitMQ的消息流轉:

 

 

AMQP

AMQP全稱: Advanced Message Queuing Protocol

AMQP翻譯: 高級消息隊列協議

**AMQP定義:**是具有現代特征的二進制協議。是一個提供統一消息服務的應用層標准高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計

   

AMQP核心概念:

  • **Server:**又稱Broker,接受客戶端的連接,實現AMQP實體服務
  • **Connection:**連接,應用程序與Broker的網絡連接
  • **Channel:**網絡信道,幾乎所有的操作都在Channel中進行,Channel是進行消息讀寫的通道。客戶端可建立多個Channel,每個Channel代表一個會話任務
  • **Message:**消息,服務器和應用程序之間傳送的數據,由Properties和Body組成。Properties可以對消息進行修飾,比如消息的優先級、延遲等高級特性;Body則是消息體的內容
  • **Virtual host:**虛擬地址,用於進行邏輯隔離,最上層的消息路由。同一個Virtual Host里面不能有相同名稱的Exchange或Queue
  • **Exchange:**交換機,接收消息,根據路由鍵轉發消息到綁定的隊列
  • **Binding:**Exchange和Queue之間的虛擬連接,binding中可以包含routing key
  • **Routing key:**一個路由規則,虛擬機可用它確定如何路由一個特定消息
  • **Queue:**也稱為Message Queue,消息隊列,保存消息並將它們轉發給消費者

 

 

RabbitMQ的極速入門

后台啟動: ./rabbitmq start &

關閉: ./rabbitmqctl stop

節點狀態: ./rabbitmqctl status

管控台: http://ip:15672

 

 

RabbitMQ生產消費快速入門:

環境: springboot+jdk1.7+rabbitmq3.6.5 (Maven依賴配置)

 <parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.9.RELEASE</version>
  </parent>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		
		<dependency>
			<groupId>com.rabbitmq</groupId>
			<artifactId>amqp-client</artifactId>
			<version>3.6.5</version>
		</dependency>
	</dependencies>

 

public class Procuder {
	public static void main(String[] args) throws Exception {
		
		//1.創建一個ConnectionFactory 並進行配置
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);
		//2.通過連接工廠創建連接
		Connection connection = connectionFactory.newConnection();
		
		//3.通過Connection 創建一個 Channel
		Channel channel = connection.createChannel();
	
		/**
		 * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
		 * exchange:指定交換機 不指定 則默認 (AMQP default交換機) 通過routingkey進行匹配 
		 * props 消息屬性
		 * body 消息體
		 */
		//4.通過Channel發送數據
		for(int i = 0; i < 5; i++){
		  System.out.println("生產消息:" + i);
		  String msg = "Hello RabbitMQ" + i;
	      channel.basicPublish("", "test", null, msg.getBytes());
		}
		
		
		//5.記得關閉相關的連接
		channel.close();
		connection.close();
	}
}

 

public class Consumer {
	public static void main(String[] args) throws Exception{
				//1.創建一個ConnectionFactory 並進行配置
				ConnectionFactory connectionFactory = new ConnectionFactory();
				connectionFactory.setHost("192.168.244.11");
				connectionFactory.setPort(5672);
				connectionFactory.setVirtualHost("/");
				connectionFactory.setHandshakeTimeout(20000);
				//2.通過連接工廠創建連接
				Connection connection = connectionFactory.newConnection();
				
				//3.通過Connection 創建一個 Channel
				Channel channel = connection.createChannel();
				
				//4. 聲明創建一個隊列
				String queueName = "test";
				/**
				 * durable 是否持久化
				 * exclusive 獨占的  相當於加了一把鎖
				 */
				channel.queueDeclare(queueName,true,false,false,null);
				
				//5.創建消費者
				QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
				
				//6.設置channel
				/**
				 * ACK: 當一條消息從生產端發到消費端,消費端接收到消息后會馬上回送一個ACK信息給broker,告訴它這條消息收到了
				 * autoack: 
				 * true  自動簽收 當消費者一收到消息就表示消費者收到了消息,消費者收到了消息就會立即從隊列中刪除。
				 * false 手動簽收 當消費者收到消息在合適的時候來顯示的進行確認,說我已經接收到了該消息了,RabbitMQ可以從隊列中刪除該消息了
				 * 
				 */
				channel.basicConsume(queueName, true, queueingConsumer);
				
				//7.獲取消息
				while(true){
					Delivery delivery = queueingConsumer.nextDelivery();
					String msg = new String(delivery.getBody());
					System.err.println("消費端:" + msg);
					//Envelope envelope = delivery.getEnvelope();
				}
	}
}

 

Exchange(交換機)詳解

Exchange: 接收消息,並根據路由鍵轉發消息所綁定的隊列

 

交換機屬性:

  • Name: 交換機名稱
  • Type: 交換機類型 diect、topic、fanout、headers
  • **Durability:**是否需要持久化,true為持久化
  • AutoDelete: 當最后一個綁定到Exchange的隊列刪除后,自動刪除該Exchange
  • Internal: 當前Exchange是否用於RabbitMQ內部使用,默認為false (百分之99的情況默認為false 除非對Erlang語言較了解,做一些擴展)
  • **Arguments:**擴展參數, 用於擴展AMQP協議可自定化使用

 

Direct Exchange

所有發送到Direct Exchange的消息被轉發到RouteKey指定的Queue

**注意:**Direct模式可以使用RabbitMQ自帶的Exchange: default Exchange,所以不需要將Exchange進行任何綁定(binding)操作,消息傳遞時,RoutingKey必須完全匹配才會被隊列接收,否則該消息會被拋棄

 

public class ProducerDirectExchange {
	public static void main(String[] args) throws Exception {
		//1.創建ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		//2.創建Connection
		Connection connection = connectionFactory.newConnection();
		//3.創建Channel
		Channel channel = connection.createChannel();
		//4.聲明
		String exchangeName = "test_direct_exchange";
		String routingKey = "test.direct";
		//5.發送
		String msg = "Hello World RabbitMQ4 Direct Exchange Message";
		channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
	}
}

 

public class ConsumerDirectExchange {
	public static void main(String[] args) throws Exception{
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);
		connectionFactory.setAutomaticRecoveryEnabled(true);
		connectionFactory.setNetworkRecoveryInterval(3000);
		
		Connection connection = connectionFactory.newConnection();
		
		Channel channel = connection.createChannel();
		//聲明
		String exchangeName = "test_direct_exchange";
		String exchangeType = "direct";
		String queueName = "test_direct_queue";
		String routingKey = "test.direct";
		//表示聲明了一個交換機
		channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);
		//表示聲明了一個隊列
		channel.queueDeclare(queueName,false,false,false,null);
		//建立一個綁定關系
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//durable 是否持久化消息
		QueueingConsumer consumer = new QueueingConsumer(channel);
		//參數:隊列名稱,是否自動ACK,Consumer
		channel.basicConsume(queueName, true, consumer);
		
		//循環獲取消息
		while(true){
			//獲取消息,如果沒有消息,這一步將會一直阻塞
			Delivery delivery = consumer.nextDelivery();
			String msg = new String(delivery.getBody());
			System.out.println("收到消息:" + msg);
		}
	}
}

 

Topic Exchange

所有發送到Topic Exchange的消息被轉發到所有關心RouteKey中指定Topic的Queue上

Exchange將RouteKey和某Topic進行模糊匹配,此時隊列需要綁定一個Topic

注意:可以使用通配符進行匹配

符號 # 匹配一個或多個詞

符號 * 匹配不多不少一個詞

例如: "log.#" 能夠匹配到 “log.info.oa”

​"log.*" 只會匹配到 "log.err"

public class ProducerTopicExchange {
	public static void main(String[] args) throws Exception {
		//1.創建ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);

		//2.創建Connection
		Connection connection = connectionFactory.newConnection();
		//3.創建Channel
		Channel channel = connection.createChannel();
		//4.聲明
		String exchangeName = "test_topic_exchange";
		String routingKey1 = "user.save";
		String routingKey2 = "user.update";
		String routingKey3 = "user.delete.abc";
		//5.發送
		String msg = "Hello World RabbitMQ4 Direct Exchange Message";
		channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
		channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
		channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
	}
}

 

public class ConsumerTopicExchange {
	public static void main(String[] args) throws Exception{
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);
		connectionFactory.setAutomaticRecoveryEnabled(true);
		connectionFactory.setNetworkRecoveryInterval(3000);
		
		Connection connection = connectionFactory.newConnection();
		
		Channel channel = connection.createChannel();
		//聲明
		String exchangeName = "test_topic_exchange";
		String exchangeType = "topic";
		String queueName = "test_topic_queue";
		String routingKey = "user.#";
		//表示聲明了一個交換機
		channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);
		//表示聲明了一個隊列
		channel.queueDeclare(queueName,false,false,false,null);
		//建立一個綁定關系
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//durable 是否持久化消息
		QueueingConsumer consumer = new QueueingConsumer(channel);
		//參數:隊列名稱,是否自動ACK,Consumer
		channel.basicConsume(queueName, true, consumer);
		
		//循環獲取消息
		while(true){
			//獲取消息,如果沒有消息,這一步將會一直阻塞
			Delivery delivery = consumer.nextDelivery();
			String msg = new String(delivery.getBody());
			System.out.println("收到消息:" + msg);
		}
	}
}

 

Fanout Exchange

不處理路由鍵,只需要簡單的將隊列綁定到交換機上,發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上 所以Fanout交換機轉發消息是最快的

 

public class ProducerFanoutExchange {
	public static void main(String[] args) throws Exception {
		//1.創建ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);

		//2.創建Connection
		Connection connection = connectionFactory.newConnection();
		//3.創建Channel
		Channel channel = connection.createChannel();
		//4.聲明
		String exchangeName = "test_fanout_exchange";
		//5.發送
		for(int i = 0; i < 10 ; i++){
			String msg = "Hello World RabbitMQ4 Direct Exchange Message";
			channel.basicPublish(exchangeName, "", null, msg.getBytes());
		}
		channel.close();
		connection.close();
	}
}

 

public class ConsumerFanoutExchange {
	public static void main(String[] args) throws Exception{
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);
		connectionFactory.setAutomaticRecoveryEnabled(true);
		connectionFactory.setNetworkRecoveryInterval(3000);
		
		Connection connection = connectionFactory.newConnection();
		
		Channel channel = connection.createChannel();
		//聲明
		String exchangeName = "test_fanout_exchange";
		String exchangeType = "fanout";
		String queueName = "test_topic_queue";
		//無需指定路由key 
		String routingKey = "";
		//表示聲明了一個交換機
		channel.exchangeDeclare(exchangeName, exchangeType,true,false,false,null);
		//表示聲明了一個隊列
		channel.queueDeclare(queueName,false,false,false,null);
		//建立一個綁定關系
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//durable 是否持久化消息
		QueueingConsumer consumer = new QueueingConsumer(channel);
		//參數:隊列名稱,是否自動ACK,Consumer
		channel.basicConsume(queueName, true, consumer);
		
		//循環獲取消息
		while(true){
			//獲取消息,如果沒有消息,這一步將會一直阻塞
			Delivery delivery = consumer.nextDelivery();
			String msg = new String(delivery.getBody());
			System.out.println("收到消息:" + msg);
		}
	}
}

 

Message 消息

服務器與應用程序之間傳遞的數據,本質上就是一段數據,由Properties和Body組成

常用屬性:delivery mode、headers (自定義屬性)

其他屬性:content_type、content_encoding、priority、expiration

消息的properties屬性用法示例:

public class Procuder {
	public static void main(String[] args) throws Exception {
		
		//1.創建一個ConnectionFactory 並進行配置
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.244.11");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		connectionFactory.setHandshakeTimeout(20000);
		//2.通過連接工廠創建連接
		Connection connection = connectionFactory.newConnection();
		
		//3.通過Connection 創建一個 Channel
		Channel channel = connection.createChannel();
	
		Map<String,Object> headers = new HashMap<>();
		headers.put("my1", "111");
		headers.put("my2", "222");
		
		//10秒不消費 消息過期移除消息隊列
		AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
				.deliveryMode(2)
				.contentEncoding("utf-8")
				.expiration("10000")
				.headers(headers)
				.build();
		
		//4.通過Channel發送數據
		for(int i = 0; i < 5; i++){
		  System.out.println("生產消息:" + i);
		  String msg = "Hello RabbitMQ" + i;
	      channel.basicPublish("", "test", properties, msg.getBytes());
		}
		
		
		//5.記得關閉相關的連接
		channel.close();
		connection.close();
	}
}

 

public class Consumer {
	public static void main(String[] args) throws Exception{
				//1.創建一個ConnectionFactory 並進行配置
				ConnectionFactory connectionFactory = new ConnectionFactory();
				connectionFactory.setHost("192.168.244.11");
				connectionFactory.setPort(5672);
				connectionFactory.setVirtualHost("/");
				connectionFactory.setHandshakeTimeout(20000);
				//2.通過連接工廠創建連接
				Connection connection = connectionFactory.newConnection();
				
				//3.通過Connection 創建一個 Channel
				Channel channel = connection.createChannel();
				
				//4. 聲明創建一個隊列
				String queueName = "test";
				channel.queueDeclare(queueName,true,false,false,null);
				
				//5.創建消費者
				QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
				
				//6.設置channel
				channel.basicConsume(queueName, true, queueingConsumer);
				
				//7.獲取消息
				while(true){
					Delivery delivery = queueingConsumer.nextDelivery();
					String msg = new String(delivery.getBody());
					System.err.println("消費端:" + msg);
					
					Map<String, Object> headers = delivery.getProperties().getHeaders();
					System.err.println("headers value:" + headers.get("my1"));
				}
	}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM