一、簡單模式
原理:生產者將消息交給默認的交換機,交換機獲取消息后交給綁定這個生產者的隊列(投遞規則為隊列名稱和routing key 相同的隊列),監聽當前隊列的消費者獲取信息並執行消費邏輯。
場景:有一個oa系統,用戶通過接收手機驗證碼進行注冊,頁面上點擊獲取驗證碼后,將驗證碼放到消息隊列,然后短信服務從隊列中獲取到驗證碼,並發送給用戶。
實現:
生產者:
public class Producter {
public static void main(String[] args) throws Exception {
// 1. 創建出鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 通過鏈接工廠創建鏈接對象
Connection connection = factory.newConnection();
// 3. 通過鏈接對象創建出channel
Channel channel = connection.createChannel();
// 4. 通過channel發布消息
/**
* 四個參數:
* 第一個參數是交換機的名稱
* 第二個參數是路由鍵
* 第三個參數標識消息的一些額外的屬性
* 第四個是消息的具體的內容
*/
String message = "字節";
for(int i = 0;i < 5;i ++){
channel.basicPublish("","byte001",null,message.getBytes());
}
// 5. 釋放資源,釋放channel 和 鏈接對象
channel.close();
connection.close();
}
}
消費者:
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 創建出鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 通過鏈接工廠創建鏈接對象
Connection connection = factory.newConnection();
// 3. 通過鏈接對象創建出channel
Channel channel = connection.createChannel();
// 4. 創建出消息隊列
/**
* 第一個參數是消息隊列的名稱
* 第二個參數表示消息是否持久化
* 第三個參數標識消息隊列是否被channel獨占
* 第四個參數標識是否自動刪除消息隊列,當消息隊列沒有綁定交換機后是否自動刪除
* 第五個是消息隊列擴展參數
*/
String queueName = "byte001";
channel.queueDeclare(queueName, true, false, false, null);
// 5. 創建消費者,對消息進行處理
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* consumerTag 用來標識.可以再監聽隊列時候設置
* envelope 信封,通過envelope可以通過這個獲取到很多東西
* properties 額外的消息屬性
* body:消息體
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("獲取到的消息:"+s);
}
};
// 6. 通過channel消費者和消息隊列關聯
/**
* 第一個參數是消息隊列的名字
* 第二個參數是否自動簽收(即消費消息后告知服務器已被消費)
* 第三個參數是消費者
*/
channel.basicConsume(queueName, true, consumer);
}
}
二、工作模式
原理:生產者將消息交給交換機,交換機交給綁定的隊列,隊列有多個消費者監聽,一條消息只能由一個消費者消費,這樣就形成了資源競爭,誰的資源空閑大,爭搶到的可能性就大。
場景:有一個電商平台,有兩個訂單服務,用戶下單的時候,任意一個訂單服務消費用戶的下單請求生成訂單即可。不用兩個訂單服務同時消費用戶的下單請求。
實現:
生產者:
public class Producter {
public static final String QUEUE_NAME = "byte002";
public static void main(String[] args) throws Exception {
// 1. 創建出鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 通過鏈接工廠創建鏈接對象
Connection connection = factory.newConnection();
// 3. 通過鏈接對象創建出channel
Channel channel = connection.createChannel();
// 申明隊列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 4. 通過channel發布消息
/**
* 四個參數:
* 第一個參數是交換機的名稱
* 第二個參數是路由鍵
* 第三個參數標識消息的一些額外的屬性
* 第四個是消息的具體的內容
*/
String message = "字節";
for(int i = 0;i < 100;i ++){
channel.basicPublish("",QUEUE_NAME,null,(message+i).getBytes());
}
// 5. 釋放資源,釋放channel 和 鏈接對象
channel.close();
connection.close();
}
}
消費者1:
public class Consumer {
public static final String QUEUE_NAME = "byte002";
public static void main(String[] args) throws Exception {
// 1. 創建出鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 通過鏈接工廠創建鏈接對象
Connection connection = factory.newConnection();
// 3. 通過鏈接對象創建出channel
Channel channel = connection.createChannel();
// 4. 創建出消息隊列
/**
* 第一個參數是消息隊列的名稱
* 第二個參數表示消息是否持久化
* 第三個參數標識消息隊列是否被channel獨占
* 第四個參數標識是否自動刪除消息隊列,當消息隊列沒有綁定交換機后是否自動刪除
* 第五個是消息隊列擴展參數
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1); // 告訴服務器,在我們沒有確認當前消息時不要給我們發送新的消息
// 5. 創建消費者,對消息進行處理
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* consumerTag 用來標識.可以再監聽隊列時候設置
* envelope 信封,通過envelope可以通過這個獲取到很多東西
* properties 額外的消息屬性
* body:消息體
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("消費者1收到的內容:"+s);
try {
Thread.sleep(10); // 模擬消費耗時
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false); // 參數2false為確認收到消息,true為拒絕收到消息
}
};
// 6. 通過channel消費者和消息隊列關聯
/**
* 第一個參數是消息隊列的名字
* 第二個參數是否自動簽收(即消費消息后告知服務器已被消費)
* 第三個參數是消費者
*/
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
消費者2:
public class Consumer2 {
public static final String QUEUE_NAME = "byte002";
public static void main(String[] args) throws Exception {
// 1. 創建出鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 通過鏈接工廠創建鏈接對象
Connection connection = factory.newConnection();
// 3. 通過鏈接對象創建出channel
Channel channel = connection.createChannel();
// 4. 創建出消息隊列
/**
* 第一個參數是消息隊列的名稱
* 第二個參數表示消息是否持久化
* 第三個參數標識消息隊列是否被channel獨占
* 第四個參數標識是否自動刪除消息隊列,當消息隊列沒有綁定交換機后是否自動刪除
* 第五個是消息隊列擴展參數
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1); // 告訴服務器,在我們沒有確認當前消息時不要給我們發送新的消息
// 5. 創建消費者,對消息進行處理
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* consumerTag 用來標識.可以再監聽隊列時候設置
* envelope 信封,通過envelope可以通過這個獲取到很多東西
* properties 額外的消息屬性
* body:消息體
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("消費者2收到的內容:"+s);
try {
Thread.sleep(500); // 模擬消費耗時
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false); // 參數2false為確認收到消息,true為拒絕收到消息
}
};
// 6. 通過channel消費者和消息隊列關聯
/**
* 第一個參數是消息隊列的名字
* 第二個參數是否自動簽收(即消費消息后告知服務器已被消費)
* 第三個參數是消費者
*/
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
保證資源競爭的代碼就是這一行channel.basicQos(1);如果不加這一行,我們會發現兩個消費者是輪詢消費消息的。
三、發布訂閱模式
原理:生產者將消息扔給交換機,交換機類型是fanout,不同的隊列注冊到交換機上,不同的消費注冊在不同的隊列上。所有消費者都會收到消息。
場景:有一個商城,我們新添加一個商品后,可能同時需要去更新緩存和數據庫。
實現:
生產者:
public class Producter {
// 定義交換機的名字
public static final String EXCHANGE_NAME="byte003";
public static void main(String[] args) throws Exception {
// 1. 創建出鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 通過鏈接工廠創建鏈接對象
Connection connection = factory.newConnection();
// 3. 通過鏈接對象創建出channel
Channel channel = connection.createChannel();
// 定義一個交換機,類型是fanout
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
// 因為消息先發到交換機,交換機沒有保存功能,所以如果沒有消費者,消息會丟失
channel.basicPublish(EXCHANGE_NAME,"",null,"發布訂閱模式的消息".getBytes());
channel.close();
connection.close();
}
}
消費者1:
public class Consumer1 {
public static final String EXCHANGE_NAME="byte003";
public static void main(String[] args) throws Exception {
// 1. 創建出鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 通過鏈接工廠創建鏈接對象
Connection connection = factory.newConnection();
// 3. 通過鏈接對象創建出channel
Channel channel = connection.createChannel();
String queueName = "queue003";
channel.queueDeclare(queueName,false,false,false,null);
// 綁定隊列到交換機
channel.queueBind(queueName,EXCHANGE_NAME,"");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* consumerTag 用來標識.可以再監聽隊列時候設置
* envelope 信封,通過envelope可以通過這個獲取到很多東西
* properties 額外的消息屬性
* body:消息體
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("消費者1:"+s);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(queueName,false,consumer);
}
}
消費者2:
public class Consumer2 {
public static final String EXCHANGE_NAME="byte003";
public static void main(String[] args) throws Exception {
// 1. 創建出鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 通過鏈接工廠創建鏈接對象
Connection connection = factory.newConnection();
// 3. 通過鏈接對象創建出channel
Channel channel = connection.createChannel();
String queueName = "queue004";
channel.queueDeclare(queueName,false,false,false,null);
// 綁定隊列到交換機
channel.queueBind(queueName,EXCHANGE_NAME,"");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* consumerTag 用來標識.可以再監聽隊列時候設置
* envelope 信封,通過envelope可以通過這個獲取到很多東西
* properties 額外的消息屬性
* body:消息體
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("消費者2:"+s);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(queueName,false,consumer);
}
}
需要注意的一點就是交換機沒有保存功能,如果沒有消費者,則消息會丟失。
四、路由模式
原理:生產者將消息發送給交換機,消息攜帶具體的routingkey。交換機類型是direct,接收到消息中的routingkey,比對與之綁定的隊列的routingkey,分發到不同的隊列上。
場景:還是一樣,有一個商城,新添加了一個商品,實時性不是很高,只需要添加到數據庫即可,不用刷新緩存。
實現:
生產者:
public class Producter {
// 定義交換機的名字
public static final String EXCHANGE_NAME="byte004";
public static void main(String[] args) throws Exception {
// 1. 創建出鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 通過鏈接工廠創建鏈接對象
Connection connection = factory.newConnection();
// 3. 通過鏈接對象創建出channel
Channel channel = connection.createChannel();
// 定義一個交換機,類型是direct
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
// 因為消息先發到交換機,交換機沒有保存功能,所以如果沒有消費者,消息會丟失
channel.basicPublish(EXCHANGE_NAME,"key1",null,"發布路由模式的消息".getBytes());
channel.close();
connection.close();
}
}
消費者1:
public class Consumer1 {
public static final String EXCHANGE_NAME="byte004";
public static void main(String[] args) throws Exception {
// 1. 創建出鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 通過鏈接工廠創建鏈接對象
Connection connection = factory.newConnection();
// 3. 通過鏈接對象創建出channel
Channel channel = connection.createChannel();
String queueName = "queue005";
channel.queueDeclare(queueName,false,false,false,null);
// 綁定隊列到交換機
/**
* 參數3是routingkey,只有和它一樣的routingkey的消息才會被當前消費者收到
*/
channel.queueBind(queueName,EXCHANGE_NAME,"key1");
// 如果要接收多個routingkey的消息,在執行一次上面的代碼即可,如下
channel.queueBind(queueName,EXCHANGE_NAME,"key2");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* consumerTag 用來標識.可以再監聽隊列時候設置
* envelope 信封,通過envelope可以通過這個獲取到很多東西
* properties 額外的消息屬性
* body:消息體
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("消費者1:"+s);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(queueName,false,consumer);
}
}
消費者2:
public class Consumer2 {
public static final String EXCHANGE_NAME="byte004";
public static void main(String[] args) throws Exception {
// 1. 創建出鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 通過鏈接工廠創建鏈接對象
Connection connection = factory.newConnection();
// 3. 通過鏈接對象創建出channel
Channel channel = connection.createChannel();
String queueName = "queue006";
channel.queueDeclare(queueName,false,false,false,null);
// 綁定隊列到交換機
channel.queueBind(queueName,EXCHANGE_NAME,"key2");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* consumerTag 用來標識.可以再監聽隊列時候設置
* envelope 信封,通過envelope可以通過這個獲取到很多東西
* properties 額外的消息屬性
* body:消息體
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("消費者2:"+s);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(queueName,false,consumer);
}
}
五、主題模式
原理:路由模式的一種,路由功能添加了模糊匹配。星號(*)代表1個單詞,#號(#)代表一個或多個單詞。具體可參考路由模式。
場景:還是一樣,有一個商城,新添加了一個商品,實時性不是很高,只需要添加到數據庫即可,數據庫包含了主數據庫mysql1和從數據庫mysql2的內容,不用刷新緩存。
實現:
生產者:
public class Producter {
// 定義交換機的名字
public static final String EXCHANGE_NAME="byte004";
public static void main(String[] args) throws Exception {
// 1. 創建出鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 通過鏈接工廠創建鏈接對象
Connection connection = factory.newConnection();
// 3. 通過鏈接對象創建出channel
Channel channel = connection.createChannel();
// 定義一個交換機,類型是topic
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
// 因為消息先發到交換機,交換機沒有保存功能,所以如果沒有消費者,消息會丟失
channel.basicPublish(EXCHANGE_NAME,"key.1.2",null,"發布路由模式的消息".getBytes());
channel.close();
connection.close();
}
}
消費者1:
public class Consumer1 {
public static final String EXCHANGE_NAME="byte004";
public static void main(String[] args) throws Exception {
// 1. 創建出鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 通過鏈接工廠創建鏈接對象
Connection connection = factory.newConnection();
// 3. 通過鏈接對象創建出channel
Channel channel = connection.createChannel();
String queueName = "queue005";
channel.queueDeclare(queueName,false,false,false,null);
// 綁定隊列到交換機
/**
* 參數3是routingkey,只有和它一樣的routingkey的消息才會被當前消費者收到
*/
channel.queueBind(queueName,EXCHANGE_NAME,"key.*");
// 如果要接收多個routingkey的消息,在執行一次上面的代碼即可,如下
channel.queueBind(queueName,EXCHANGE_NAME,"abc.#");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* consumerTag 用來標識.可以再監聽隊列時候設置
* envelope 信封,通過envelope可以通過這個獲取到很多東西
* properties 額外的消息屬性
* body:消息體
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("消費者1:"+s);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(queueName,false,consumer);
}
}
消費者2:
public class Consumer2 {
public static final String EXCHANGE_NAME="byte004";
public static void main(String[] args) throws Exception {
// 1. 創建出鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2. 通過鏈接工廠創建鏈接對象
Connection connection = factory.newConnection();
// 3. 通過鏈接對象創建出channel
Channel channel = connection.createChannel();
String queueName = "queue006";
channel.queueDeclare(queueName,false,false,false,null);
// 綁定隊列到交換機
channel.queueBind(queueName,EXCHANGE_NAME,"key.#");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* consumerTag 用來標識.可以再監聽隊列時候設置
* envelope 信封,通過envelope可以通過這個獲取到很多東西
* properties 額外的消息屬性
* body:消息體
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("消費者2:"+s);
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(queueName,false,consumer);
}
}
代碼已上傳:
github地址: https://github.com/binzh303/zhixie-code-example
gitee地址: https://gitee.com/javaXiaoCaiJi/zhixie-code-example
> 如果文章對您有幫助,請記得點贊關注喲~ > 歡迎大家關注我的公眾號:字節傳說,每日推送技術文章供大家學習參考。