1. 不使用Exchange交換機(默認交換機)
工具類
package com.lemon.rabbitmq.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * rabbit mq 工具類 : 獲取連接 */ public class ConnectionUtil { public static Connection getConnection() throws Exception { //創建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); //主機地址;默認為 localhost connectionFactory.setHost("127.0.0.1"); //連接端口;默認為 5672 connectionFactory.setPort(5672); //虛擬主機名稱;默認為 / connectionFactory.setVirtualHost("/lemon"); //連接用戶名;默認為guest connectionFactory.setUsername("lemon"); //連接密碼;默認為guest connectionFactory.setPassword("lemon"); //創建連接 Connection connection = connectionFactory.newConnection(); return connection; } }
package cn.lemon.rabbitmq.utils; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; /** * 消費者工具類:生產消息消費者,方便調用 */ public class ConsumerUtil { public static DefaultConsumer getConsumer(Channel channel) { DefaultConsumer consumer = new DefaultConsumer(channel) { /** * consumerTag 消費者標簽,在channel.basicConsume時候可以指定 * envelope 消息包的內容,可從中獲取消息id,路由key,交換機等信息 * properties 消息屬性信息 * body 消息內容 */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //消息id System.out.println("消息id: " + envelope.getDeliveryTag()); //交換機 System.out.println("交換機: " + envelope.getExchange()); //路由key System.out.println("路由key: " + envelope.getRoutingKey()); //接受到的消息 System.out.println("收到的消息: " + new String(body, "utf-8")); System.out.println("---------------------------------"); } }; return consumer; } }
a. simple簡單模式:一個生產者發送消息到隊列中由一個消費者接收。
package cn.lemon.rabbitmq.simple; import cn.lemon.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /* 消息模式,簡單模式: 一個生產者、一個消費者,不需要設置交換機 */ public class Producer { public static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { //1. 獲取連接 Connection connection = ConnectionUtil.getConnection(); //2. 創建頻道 Channel channel = connection.createChannel(); /** * 3. 聲明(創建)隊列 * 參數1:隊列名稱 * 參數2:是否定義持久化隊列 * 參數3:是否獨占本次連接 * 參數4:是否在不使用的時候自動刪除隊列 * 參數5:隊列其它參數 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); //4. 發送消息 String message = "你好,小兔子"; /** * 參數1:交換機名稱,如果沒有指定則使用默認Default Exchange * 參數2:路由key,簡單模式可以傳遞隊列名稱 * 參數3:消息其它屬性 * 參數4:消息內容 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("已發送消息:" + message); // 5. 關閉資源 channel.close(); connection.close(); } }
package cn.lemon.rabbitmq.simple; import cn.lemon.rabbitmq.utils.ConnectionUtil; import cn.lemon.rabbitmq.utils.ConsumerUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; //消息消費者 public class Consumer { public static void main(String[] args) throws Exception { //1. 調用工具類,獲取連接 Connection connection = ConnectionUtil.getConnection(); //2. 創建頻道 Channel channel = connection.createChannel(); //3. 聲明隊列 這里可以不用申明隊列,因為生產者哪里已經創建了 // channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null); //4. 調用工具類,獲取消費者,消費隊列中的消息 DefaultConsumer consumer = ConsumerUtil.getConsumer(channel); //監聽消息 /** * 參數1:隊列名稱 * 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了, * mq接收到回復會刪除消息,設置為false則需要手動確認 * 參數3:消息接收到后回調 */ channel.basicConsume(Producer.QUEUE_NAME, true, consumer); //不關閉資源,應該一直監聽消息 } }
b. work工作隊列模式:一個生產者發送消息到隊列中可由多個消費者接收;多個消費者之間消息是競爭接收。
package cn.lemon.rabbitmq.work; import cn.lemon.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /* 消息生產者 : 發送30個消息到隊列 創建兩個消費者去監聽同一個隊列,查看兩個消費者接收到的消息是否存在重復。 */ public class Producer { public static final String QUEUE_NAME = "simple_queue"; public static void main(String[] args) throws Exception { //1. 創建連接 Connection connection = ConnectionUtil.getConnection(); //2. 創建頻道 Channel channel = connection.createChannel(); //3. 聲明隊列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //4. 發送消息 for (int i = 1; i <= 30; i++) { String message = "你好,小兔子! " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("已發送消息: " + message); } //5. 關閉資源 channel.close(); connection.close(); } }
package cn.itcast.rabbitmq.work; import cn.lemon.rabbitmq.utils.ConnectionUtil; import cn.lemon.rabbitmq.utils.ConsumerUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; //消息消費者 public class Consumer1 { public static void main(String[] args) throws Exception { //1. 創建連接 Connection connection = ConnectionUtil.getConnection(); //2. 創建頻道 Channel channel = connection.createChannel(); //3. 聲明隊列 隊列已經存在,可以不用創建 //channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null); //4. 創建消息消費者 DefaultConsumer consumer = ConsumerUtil.getConsumer(channel); //監聽消息 /** * 參數1:隊列名稱 * 參數2:是否自動確認,設置為true為表示消息接收到自動向mq回復接收到了, * mq接收到回復會刪除消息,設置為false則需要手動確認 * 參數3:消息接收到后回調 */ channel.basicConsume(Producer.QUEUE_NAME, true, consumer); //不關閉資源,應該一直監聽消息 } }
package cn.lemon.rabbitmq.work; import cn.lemon.rabbitmq.utils.ConnectionUtil; import cn.lemon.rabbitmq.utils.ConsumerUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; //消息消費者 public class Consumer2 { public static void main(String[] args) throws Exception { //1. 創建連接 Connection connection = ConnectionUtil.getConnection(); //2. 創建頻道 Channel channel = connection.createChannel(); //3. 聲明隊列 隊列已經存在,不用在創建 //channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null); //4. 創建消息消費者 DefaultConsumer consumer = ConsumerUtil.getConsumer(channel); //監聽消息 channel.basicConsume(Producer.QUEUE_NAME, true, consumer); //不關閉資源,應該一直監聽消息 } }
2. 使用Exchange交換機;訂閱模式(廣播fanout,定向direct,通配符topic)
a. 發布與訂閱模式:使用了fanout類型的交換機,可以將一個消息發送到所有與交換機綁定的隊列並被消費者接收。
package cn.lemon.rabbitmq.ps; import cn.lemon.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /* 消息生產者:發布訂閱模式 發布訂閱模式Publish/subscribe 1.需要設置類型為fanout的交換機 2.並且交換機和隊列進行綁定,當發送消息到交換機后,交換機會將消息發送到綁定的隊列 */ public class Producer { // 交換機名稱 public static final String FANOUT_EXCHANGE = "fanout_exchange"; // 隊列名稱1 public static final String FANOUT_QUEUE_1 = "fanout_queue_1"; // 隊列名稱2 public static final String FANOUT_QUEUE_2 = "fanout_queue_2"; public static void main(String[] args) throws Exception { // 1.創建連接 Connection connection = ConnectionUtil.getConnection(); // 2.創建頻道 Channel channel = connection.createChannel(); /** * 3.聲明交換機 * 參數1:交換機名稱 * 參數2:交換機類型:fanout、direct、topic、headers */ channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT); /** * 4.聲明(創建)隊列 * 參數1:隊列名稱 * 參數2:是否定義持久化隊列 * 參數3:是否獨占本次連接 * 參數4:是否在不使用的時候自動刪除隊列 * 參數5:隊列其它參數 */ channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null); channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null); // 5.隊列綁定交換機 channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHANGE, ""); channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHANGE, ""); // 6. 發送多個消息 for (int i = 1; i <= 10; i++) { // 要發送的信息 String message = "你好;小兔子!" + i; /** * 參數1:交換機名稱,如果沒有指定則使用默認Default Exchage * 參數2:路由key,簡單模式可以傳遞隊列名稱 * 參數3:消息其它屬性 * 參數4:消息內容 */ channel.basicPublish(FANOUT_EXCHANGE, "", null, message.getBytes()); System.out.println("已發送消息:" + message); } // 7. 關閉資源 channel.close(); connection.close(); } }
package cn.lemon.rabbitmq.ps; import cn.lemon.rabbitmq.utils.ConnectionUtil; import cn.lemon.rabbitmq.utils.ConsumerUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; /** * 消息的消費者,通過設置監聽自動獲取隊列中的消息,實現消費 */ public class Consumer1 { public static void main(String[] args) throws Exception { // 1.創建連接 Connection connection = ConnectionUtil.getConnection(); // 2.創建頻道 Channel channel = connection.createChannel(); // 3.申明(創建)交換機 // 隊列綁定到交換機,只要在生產者綁定,消費者可以不用再綁定 //channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT); // 4.聲明(創建)隊列 //channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null); // 5.隊列綁定到交換機 //channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,""); // 6.創建消費者;並設置消息處理 DefaultConsumer consumer = ConsumerUtil.getConsumer(channel); // 7.監聽消息 channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer); } }
package cn.lemon.rabbitmq.ps; import cn.lemon.rabbitmq.utils.ConnectionUtil; import cn.lemon.rabbitmq.utils.ConsumerUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; /** * 消息的消費者,通過設置監聽自動獲取隊列中的消息,實現消費 */ public class Consumer2 { public static void main(String[] args) throws Exception { // 1.創建連接 Connection connection = ConnectionUtil.getConnection(); // 2.創建頻道 Channel channel = connection.createChannel(); // 3.創建交換機 channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT); // 4.聲明(創建)隊列 channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null); // 5.隊列綁定到交換機 channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,""); // 6.創建消費者;並設置消息處理 DefaultConsumer consumer = ConsumerUtil.getConsumer(channel); // 7.監聽消息 channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer); } }
b. 路由模式:使用了direct類型的交換機,可以將一個消息發送到routing key相關的隊列並被消費者接收。
package cn.lemon.rabbitmq.routing; import cn.lemon.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /* 路由消息生產者:消息發送到交換機 生產者發送兩個消息(路由key分別為:insert、update) 創建兩個消費者,分別綁定的隊列中路由為(insert,update) */ public class Producer { //交換機名 public static final String DIRECT_EXCHANGE = "direct_exchange"; //隊列名 public static final String DIRECT_QUEUE_INSERT = "direct_queue_insert"; public static final String DIRECT_QUEUE_UPDATE = "direct_queue_update"; public static void main(String[] args) throws Exception { //1. 創建連接 Connection connection = ConnectionUtil.getConnection(); //2. 創建頻道 Channel channel = connection.createChannel(); /** * 3.聲明交換機 * 參數1:交換機名稱 * 參數2:交換機類型:fanout、direct、topic、headers */ channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT); /** * 4.聲明(創建)隊列 * 參數1:隊列名稱 * 參數2:是否定義持久化隊列 * 參數3:是否獨占本次連接 * 參數4:是否在不使用的時候自動刪除隊列 * 參數5:隊列其它參數 */ channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null); channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null); /** * 5.隊列綁定交換機 * 參數1:隊列名 * 參數2:交換機名 * 參數3:路由key */ channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHANGE, "insert"); channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHANGE, "update"); // 6.發送消息 String message = "新增了商品,路由模式;routing key 為 insert "; channel.basicPublish(DIRECT_EXCHANGE, "insert", null, message.getBytes()); System.out.println("已發送消息:" + message); message = "修改了商品,路由模式;routing key 為 update "; channel.basicPublish(DIRECT_EXCHANGE, "update", null, message.getBytes()); System.out.println("已發送消息:" + message); // 7.關閉資源 channel.close(); connection.close(); } }
package cn.lemon.rabbitmq.routing; import cn.lemon.rabbitmq.utils.ConnectionUtil; import cn.lemon.rabbitmq.utils.ConsumerUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; /** * 消息消費者: 消費隊列中的消息 * 消息路由為insert */ public class Consumer1 { public static void main(String[] args) throws Exception { // 1.創建連接 Connection connection = ConnectionUtil.getConnection(); // 2.創建頻道 Channel channel = connection.createChannel(); // 3.創建交換機 生產者已經創建了,可以不用在創建 //channel.exchangeDeclare(Producer.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT); // 4.聲明(創建)隊列 channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null); // 5.隊列綁定到交換機 channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHANGE, "insert"); // 6.創建消費者,並設置消息處理 DefaultConsumer consumer = ConsumerUtil.getConsumer(channel); // 7.監聽消息 channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer); } }
package cn.lemon.rabbitmq.routing; import cn.lemon.rabbitmq.utils.ConnectionUtil; import cn.lemon.rabbitmq.utils.ConsumerUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; /** * 消息消費者: 消費隊列中的消息 * 消息路由為update */ public class Consumer2 { public static void main(String[] args) throws Exception { // 1.創建連接 Connection connection = ConnectionUtil.getConnection(); // 2.創建頻道 Channel channel = connection.createChannel(); // 3.創建交換機 channel.exchangeDeclare(Producer.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT); // 4.聲明(創建)隊列 channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null); // 5.隊列綁定到交換機 channel.queueBind(Producer.DIRECT_QUEUE_UPDATE, Producer.DIRECT_EXCHANGE, "update"); // 6.創建消費者;並設置消息處理 DefaultConsumer consumer = ConsumerUtil.getConsumer(channel); // 7.監聽消息 channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, consumer); } }
c. 通配符模式:使用了topic類型的交換機,可以將一個消息發送到routing key(*,#)相關的隊列並被消費者接收。
package cn.lemon.rabbitmq.topic; import cn.lemon.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * Topic通配符模型消息生產者:消息發送到交換機 * 生產者:發送包含有item.insert,item.update,item.delete的3種路由key的消息 * RoutingKey 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割 * 通配符規則: * # 匹配一個或多個詞 * * 匹配一個詞 */ public class Producer { // 交換機名稱 public static final String TOPIC_EXCHANGE = "topic_exchange"; // 隊列名稱 public static final String TOPIC_QUEUE_1 = "topic_queue_1"; // 隊列名稱 public static final String TOPIC_QUEUE_2 = "topic_queue_2"; public static void main(String[] args) throws Exception { //1. 創建連接 Connection connection = ConnectionUtil.getConnection(); //2. 創建頻道 Channel channel = connection.createChannel(); //3. 聲明交換機 參數1(交換機名) 參數2(交換機類型) channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); //4. 聲明隊列 消費者已經聲明,生產者可以不用聲明 //channel.queueDeclare(TOPIC_QUEUE_1, true, false, false, null); // 5.隊列綁定到交換機 消費者已經綁定,生產者不用綁定 //channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE,"item.*"); //5.發送消息 String message = "新增了商品,Topic模式,路由key為item.insert"; channel.basicPublish(TOPIC_EXCHANGE, "item.insert", null, message.getBytes()); System.out.println("已發送消息:" + message); message = "修改了商品,Topic模式,路由key為item.update"; channel.basicPublish(TOPIC_EXCHANGE, "item.update", null, message.getBytes()); System.out.println("已發送消息:" + message); message = "刪除了商品,Topic模式,路由key為item.delete"; channel.basicPublish(TOPIC_EXCHANGE, "item.delete", null, message.getBytes()); System.out.println("已發送消息:" + message); // 6.關閉資源 channel.close(); connection.close(); } }
package cn.lemon.rabbitmq.topic; import cn.lemon.rabbitmq.utils.ConnectionUtil; import cn.lemon.rabbitmq.utils.ConsumerUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; public class Consumer1 { public static void main(String[] args) throws Exception { //1. 創建連接 Connection connection = ConnectionUtil.getConnection(); //2. 創建頻道 Channel channel = connection.createChannel(); //3. 聲明交換機 參數1(交換機名) 參數2(交換機類型) //channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC); //4. 聲明隊列 生產者已經創建了,消費者可以不用創建 channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null); // 5.隊列綁定到交換機 channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHANGE, "item.*"); // 6.創建消費者;並設置消息處理 DefaultConsumer consumer = ConsumerUtil.getConsumer(channel); // 7.監聽消息 channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer); } }
package cn.lemon.rabbitmq.topic; import cn.lemon.rabbitmq.utils.ConnectionUtil; import cn.lemon.rabbitmq.utils.ConsumerUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; /** * 消息消費者: 消費隊列中的消息 */ public class Consumer2 { public static void main(String[] args) throws Exception { // 1.創建連接 Connection connection = ConnectionUtil.getConnection(); // 2.創建頻道 Channel channel = connection.createChannel(); // 3.創建交換機 //channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC); // 4.聲明(創建)隊列 消費者創建,生產者沒有創建 channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null); // 5.隊列綁定到交換機 channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHANGE, "item.update"); channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHANGE, "item.delete"); // 6.創建消費者;並設置消息處理 DefaultConsumer consumer = ConsumerUtil.getConsumer(channel); // 7.監聽消息 channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer); } }