官網介紹:https://www.rabbitmq.com/getstarted.html
五種工作模式的主要特點
- 簡單模式:一個生產者,一個消費者
- work模式:一個生產者,多個消費者,每個消費者獲取到的消息唯一(消費者彼此競爭成為接收者)。
- 訂閱模式:一個生產者發送的消息會被多個消費者獲取。
- 路由模式:發送消息到交換機並且要指定路由key ,消費者將隊列綁定到交換機時需要指定路由key
- topic模式:將路由鍵和某模式進行匹配,此時隊列需要綁定在一個模式上,“#”匹配一個詞或多個詞,“*”只匹配一個詞。
簡單模式(一個生產者,一個消費者)
這種模式下不需要將Exchange進行任何綁定(binding)操作
public static final String QUEUE_NAME= "myqueue";
public static void test() throws Exception {
//定義連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
//設置Virtual Host
factory.setVirtualHost("/ld");
factory.setUsername("ld");
factory.setPassword("aaa");
//通過工廠獲取連接
Connection connection = factory.newConnection();
//創建隊列,發送消息
public void producer () {
//創建通道
Channel channel = connection.createChannel();
//聲明創建隊列
/**
隊列名
是否持久化
是否排外 即只允許該channel訪問該隊列 一般等於true的話用於一個隊列只能有一個消費者來消費的場景
是否自動刪除(自動刪除的前提是: 至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會 自動刪除。)
其他屬性
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//消息內容
String message = "Hello World!";
//發布消息
/**
交換機
隊列名
其他屬性 路由
消息body
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//關閉連接和通道
channel.close();
connection.close();
}
//消費者消費消息
public void consumer () {
//創建通道
Channel channel = connection.createChannel();
//聲明通道
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定義消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
//監聽隊列
//autoAck 是否自動確認消息,true自動確認
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
//這個方法會阻塞住,直到獲取到消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("接收到消息:" + message);
}
}
}
work模式(一個生產者,一個隊列,多個消費者,每個消費者獲取到的消息唯一)
public static final String QUEUE_NAME= "myqueue";
//消息生產者
public void producer{
//獲取連接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "";
for (int i = 0; i < 100; i++) {
message = "" + i;
channel.basicPublish("",QUEUE_NAME, null, message.getBytes());
Thread.sleep(i);
}
channel.close();
connection.close();
}
//消費者1
public void consumer1{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//同一時刻服務器只發送一條消息給消費端
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
//false:手動確認
channel.basicConsume(QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("recive1:" + message);
Thread.sleep(100);
//消息消費完給服務器返回確認狀態,表示該消息已被消費
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
channel.basicPublish
channel.basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
mandatory:
true:如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,
那么會調用basic.return方法將消息返還給生產者。
false:出現上述情形broker會直接將消息扔掉
immediate:
true:如果exchange在將消息route到queue(s)時發現對應的queue上沒有消費者,那么這條消息不會放入隊列中。
當與消息routeKey關聯的所有queue(一個或多個)都沒有消費者時,該消息會通過basic.return方法返還給生產者。
BasicProperties :
需要注意的是BasicProperties.deliveryMode:
0:不持久化 1:持久化
這里指的是消息的持久化,配合channel(durable=true),queue(durable)可以實現,即使服務器宕機,消息仍然保留
fanout訂閱模式(一個生產者,多個隊列,多個消費者)
這種模式需要提前將Exchange與Queue進行綁定,
一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定。
一個生產者發送的消息會被多個消費者獲取。
生產者:可以將消息發送到隊列或者是交換機。
消費者:只能從隊列中獲取消息。
如果消息發送到沒有隊列綁定的交換機上,那么消息將丟失。
public static final String EXCHANGE_NAME = "exchange_fanout";
//生產者
public void producer() {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//聲明交換機 fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
channel.close();
connection.close();
}
//消費者1
public final static String QUEUE_NAME = "queue_fanout_1";
public void consumer1() {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//綁定隊列到交換機上
channel.queueBind(QUEUE_NAME, Send.EXCHANGE_NAME, "");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(message);
}
}
direct路由模式(完全匹配、單播的模式)
1. 發送消息到交換機並且要指定路由key
2. 消費者將隊列綁定到交換機時需要指定路由key
3. 完全匹配,只有匹配到的消費者才能消費消息
4. 一個隊列可以綁定多個路由
public static final String EXCHANGE_NAME = "exchange_direct";
//生產者
public void producer() {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//聲明交換機 direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "key", null, message.getBytes());
channel.close();
connection.close();
}
//消費者1
public final static String QUEUE_NAME = "queue_direct_1";
public void consumer1() {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//綁定隊列到交換機上,並制定路由鍵為"key"
channel.queueBind(QUEUE_NAME, com.bw.rabbitmq.routing.Send.EXCHANGE_NAME, "key");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(message);
}
}
topic通配符模式
兩個通配符:符號“#”和符號“*”。#匹配0個或多個單詞,*匹配一個單詞
//生產者
public static final String EXCHANGE_NAME = "exchange_topic";
public void producer() {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//聲明交換機 topic:交換機類型
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
System.out.println(message);
channel.close();
connection.close();
}
//消費者1
public final static String QUEUE_NAME = "queue_topic_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//綁定隊列到交換機上,並制定路由鍵匹配規則為"key.*"
channel.queueBind(QUEUE_NAME, com.bw.rabbitmq.topics.Send.EXCHANGE_NAME, "key.*");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(message);
}
}