一、概述
前面學過ActiveMQ。ActiveMQ主要是實現的JMS規范,而RabbitMQ就是AMQP的一個具體實現。
RabbitMQ里面有幾個概念:生產者、消費者、消息、交換器、路由鍵、隊列、綁定、虛擬主機
1.生產者角度
生產者產生數據,然后根據指定交換器和路由鍵將數據發送到消息隊列RabbitMQ。為了保證交換器的存在,我們每次在初始化生產者的時候都要嘗試去創建一個交換器。
交換器總共有4種類型:
- direct 路由鍵完全匹配
- fanout 消息廣播,將忽略路由鍵
- topic 通過“*”和“#”的通配符進行綁定。注意:”.”將路由鍵分為了幾個標識符,“*”匹配1個,“#”匹配一個或多個
- headers 和direct類似,很少使用
2.消費者角度
消費者主要就是獲取並消費數據,因此需要創建一個隊列,同時需要創建一個交換器(交換器在消費者和生產者都可以創建),然后將隊列和交換器通過路由鍵進行綁定。最后就可以根據隊列進行數據的消費了。
二、Java代碼
1.pom.xml
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency>
2.生產者代碼
package cn.duanjt; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 生產者 * @author 段江濤 * @date 2018-11-30 */ public class Productor { public static void main(String[] args) throws IOException, TimeoutException { String ROUTE_KEY = "rabbitmq-duanjt";// 路由鍵名稱 String EXCHANGE_NAME = "exchange-duanjt";// 交換器名稱 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.23.24"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/");//虛擬主機,可通過控制台查看 //創建連接和信道 Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); // 創建一個交換器,參數為:交互器名稱和交換器類型 // 注意:其實這個交換器只需要聲明一次就可以,但是由於無法保證交換器已經存在了,所以我們每次都要聲明 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); for (int i = 0; i < 5; i++) { String msg = "Hello world.I love you forever ===>" + i; // 發布消息,需要參數:交換器,路由鍵。最后一個參數為消息內容 // 注意:RabbitMQ的消息類型只有一種,那就是byte[] channel.basicPublish(EXCHANGE_NAME, ROUTE_KEY, null, msg.getBytes("utf-8")); System.out.println("send:" + msg); } //關閉信道和連接 channel.close(); conn.close(); } }
3.消費者代碼
package cn.duanjt; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.BuiltinExchangeType; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { String QUEUE_NAME = "queue-duanjt";// 隊列名稱 String ROUTE_KEY = "rabbitmq-duanjt";// 路由鍵名稱 String EXCHANGE_NAME = "exchange-duanjt";// 交換器名稱 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.23.24"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); // 創建一個隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 創建交換器 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 將隊列和交換器通過路由鍵進行綁定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTE_KEY); //開始消費,第二個參數表示自動確認 channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) { // 當消息到達時執行回調方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "utf-8"); System.out.println("[Receive]:" + message); } }); } }
注意:
1.為了保證交換器的存在,所以消費者和生產者都要創建,因為不知道是消費者先啟動還是生產者先啟動
2.可以通過http://ip:15672 查看交換器、路由鍵和隊列之間的關系
3.一個連接(Connection)可以創建多個信道(Channel)。每個信道也可以在獨立的一個線程里面
4.一個隊列可以有多個消費者,這種情況下,消息將在消費者之間進行輪詢