消息模式實例
視頻教程:https://ke.qq.com/course/304104
編寫代碼前,最好先添加好用戶並設置virtual hosts
一、簡單模式
1.導入jar包
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.5.0</version> </dependency>
2.創建連接
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Sender { private final static String QUEUE = "testhello"; //隊列名字 public static void main(String[] args) throws Exception{ //獲取連接 Connection connection = ConnectionUtil.getConnection(); //創建通道 Channel channel = connection.createChannel(); //聲明隊列,如果隊列存在則什么都不做,如果隊列不存在才創建 //參數一: 隊列的名字 //參數二: 是否持久化隊列,我們的隊列模式是在內存中的,如果rabbit重啟會丟失,如果我們設置為true 則會保存到erlng自帶的數據庫中,重啟會重新獲取 //參數三: 是否排外,有兩個作用,第一個當我們的鏈接關閉后是否會自動刪除隊列,作用二,是否私有當前隊列,如果私有了,其他通道不可以訪問當前隊列,如果為true 一般適合一個隊列消費者的時候 //參數四: 是否自動刪除 //參數五 我們的一些其他的參數 channel.queueDeclare(QUEUE, false, false, false, null); //發送內容 channel.basicPublish("", QUEUE, null, "hello world".getBytes()); //關閉連接 channel.close(); connection.close(); } }
3.消費者
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Receiver { private final static String QUEUE = "testhello"; //隊列名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); QueueingConsumer consumer = new QueueingConsumer(channel); //接收消息,參數二 是自動確認 channel.basicConsume(QUEUE, true, consumer); while (true) { //獲取消息 如果沒有消息會等待,有的話就獲取執行然后銷毀,是一次性的 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("message:"+message); } } }
二、工作模式
1.生產者
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Sender { private final static String QUEUE = "testwork"; //隊列名字 public static void main(String[] args) throws Exception{ //獲取連接 Connection connection = ConnectionUtil.getConnection(); //創建通道 Channel channel = connection.createChannel(); //聲明隊列,如果隊列存在則什么都不做,如果隊列不存在才創建 //參數一: 隊列的名字 //參數二: 是否持久化隊列,我們的隊列模式是在內存中的,如果rabbit重啟會丟失,如果我們設置為true 則會保存到erlng自帶的數據庫中,重啟會重新獲取 //參數三: 是否排外,有兩個作用,第一個當我們的鏈接關閉后是否會自動刪除隊列,作用二,是否私有當前隊列,如果私有了,其他通道不可以訪問當前隊列,如果為true 一般適合一個隊列消費者的時候 //參數四: 是否自動刪除 //參數五 我們的一些其他的參數 channel.queueDeclare(QUEUE, false, false, false, null); for (int i = 0; i < 20; i++){ //發送內容 channel.basicPublish("", QUEUE, null, ("hello world "+i).getBytes()); } //關閉連接 channel.close(); connection.close(); } }
2.消費者1
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver1 { private final static String QUEUE = "testwork"; //隊列名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); //告訴服務器,在我沒有確認當前消息完成之前,不要給我發新消息 channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //當我們收到消息的時候調用 System.out.println("消費者1 收到的消息內容是:" + new String(body)); //確認 參數2,false為確認收到消息,true 為拒絕接收 channel.basicAck(envelope.getDeliveryTag(), false); } }; //注冊消費者,參數2 收到確認,代表我們收到消息后需要手動告訴服務器,我們收到消息了 channel.basicConsume(QUEUE, false, consumer); } }
3.消費者2
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver2 { private final static String QUEUE = "testwork"; //隊列名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE, false, false, false, null); //告訴服務器,在我沒有確認當前消息完成之前,不要給我發新消息 channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //當我們收到消息的時候調用 System.out.println("消費者2 收到的消息內容是:" + new String(body)); //確認 參數2,false為確認收到消息,true 為拒絕接收 channel.basicAck(envelope.getDeliveryTag(), false); } }; //注冊消費者,參數2 收到確認,代表我們收到消息后需要手動告訴服務器,我們收到消息了 channel.basicConsume(QUEUE, false, consumer); } }
三、發布訂閱模式
1.生產者
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Sender { private final static String EXCHANGE_NAME = "testexchange"; //定義交換機名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//定義一個交換機,類型是fanout //發布訂閱模式,因為消息是先發布到交換機中,而交換機是沒有保存功能的,所以如果沒有消費者,消息則會丟失 channel.basicPublish(EXCHANGE_NAME, "", null, "發布訂閱模式的消息".getBytes()); channel.close(); connection.close(); } }
2.消費者1
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver1 { private final static String EXCHANGE_NAME = "testexchange"; //定義交換機名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("testpubQueue1", false, false, false, null); //綁定隊列到交換機 channel.queueBind("testpubQueue1", EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者1:"+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testpubQueue1", false, consumer); } }
3.消費者2
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver2 { private final static String EXCHANGE_NAME = "testexchange"; //定義交換機名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("testpubQueue2", false, false, false, null); //綁定隊列到交換機 channel.queueBind("testpubQueue2", EXCHANGE_NAME, ""); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者2:"+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testpubQueue2", false, consumer); } }
四、路由模式
1.生產者
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Sender { private final static String EXCHANGE_NAME = "testexroute"; //定義交換機名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME, "direct");//定義一個路由格式的交換機 //發布訂閱模式,因為消息是先發布到交換機中,而交換機是沒有保存功能的,所以如果沒有消費者,消息則會丟失 // routingKey 為key1 channel.basicPublish(EXCHANGE_NAME, "key3", null, "路由模式的消息".getBytes()); channel.close(); connection.close(); } }
2.消費者1
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver1 { private final static String EXCHANGE_NAME = "testexroute"; //定義交換機名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("testRouteQueue1", false, false, false, null); //綁定隊列到交換機 //參數3標記,綁定到交換機的時候會指定一個標記,只有和它一樣的標記的消息才會被當前消費者接收到 channel.queueBind("testRouteQueue1", EXCHANGE_NAME, "key1"); //如果需要綁定多個標記 在執行一次即可 channel.queueBind("testRouteQueue1", EXCHANGE_NAME, "key3"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者1:"+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testRouteQueue1", false, consumer); } }
3.消費者2
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver2 { private final static String EXCHANGE_NAME = "testexroute"; //定義交換機名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("testRouteQueue2", false, false, false, null); //綁定隊列到交換機 //參數3標記,綁定到交換機的時候會指定一個標記,只有和它一樣的標記的消息才會被當前消費者接收到 channel.queueBind("testRouteQueue2", EXCHANGE_NAME, "key1"); //如果需要綁定多個標記 在執行一次即可 channel.queueBind("testRouteQueue2", EXCHANGE_NAME, "key2"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者2:"+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testRouteQueue2", false, consumer); } }
五、主題模式
1.生產者
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Sender { private final static String EXCHANGE_NAME = "testexchangetopic"; //定義交換機名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明交換機 channel.exchangeDeclare(EXCHANGE_NAME, "topic");//定義一個topic 格式的交換機 //發布訂閱模式,因為消息是先發布到交換機中,而交換機是沒有保存功能的,所以如果沒有消費者,消息則會丟失 // routingKey 為key1 // * 只能匹配一個字符 # 可以匹配多個字符 channel.basicPublish(EXCHANGE_NAME, "abc.1.3", null, "topic模式的消息".getBytes()); channel.close(); connection.close(); } }
2.消費者1
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver1 { private final static String EXCHANGE_NAME = "testexchangetopic"; //定義交換機名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("testTopicQueue1", false, false, false, null); //綁定隊列到交換機 //參數3標記,綁定到交換機的時候會指定一個標記,只有和它一樣的標記的消息才會被當前消費者接收到 channel.queueBind("testTopicQueue1", EXCHANGE_NAME, "key.*"); //如果需要綁定多個標記 在執行一次即可 channel.queueBind("testTopicQueue1", EXCHANGE_NAME, "abc.#"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者1:"+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testTopicQueue1", false, consumer); } }
3.消費者2
import com.idelan.rabbitmq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; public class Receiver2 { private final static String EXCHANGE_NAME = "testexchangetopic"; //定義交換機名字 public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare("testTopicQueue2", false, false, false, null); //綁定隊列到交換機 //參數3標記,綁定到交換機的時候會指定一個標記,只有和它一樣的標記的消息才會被當前消費者接收到 channel.queueBind("testTopicQueue2", EXCHANGE_NAME, "key.#"); //如果需要綁定多個標記 在執行一次即可 channel.queueBind("testTopicQueue2", EXCHANGE_NAME, "abc.*"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者2:"+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("testTopicQueue2", false, consumer); } }