5.1簡單模式
5.1.1 消息生產者
-
創建Maven項目
-
添加RabbitMQ連接所需要的依賴
-
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.10.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency>
在resources目錄下創建log4j.properties
-
log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG
log4j.logger.org.mybatis = DEBUG
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n -
創建MQ連接幫助類
-
package com.qfedu.mq.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionUtil { public static Connection getConnection() throws IOException, TimeoutException { //1.創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); //2.在工廠對象中設置MQ的連接信息(ip,port,virtualhost,username,password) factory.setHost("47.96.11.185"); factory.setPort(5672); factory.setVirtualHost("host1"); factory.setUsername("ytao"); factory.setPassword("admin123"); //3.通過工廠對象獲取與MQ的鏈接 Connection connection = factory.newConnection(); return connection; } }
消息生產者發送消息
-
package com.qfedu.mq.service; import com.qfedu.mq.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class SendMsg { public static void main(String[] args) throws Exception{ String msg = "Hello HuangDaoJun!"; Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //定義隊列(使用Java代碼在MQ中新建一個隊列) //參數1:定義的隊列名稱 //參數2:隊列中的數據是否持久化(如果選擇了持久化) //參數3: 是否排外(當前隊列是否為當前連接私有) //參數4:自動刪除(當此隊列的連接數為0時,此隊列會銷毀(無論隊列中是否還有數據)) //參數5:設置當前隊列的參數 //channel.queueDeclare("queue7",false,false,false,null); //參數1:交換機名稱,如果直接發送信息到隊列,則交換機名稱為"" //參數2:目標隊列名稱 //參數3:設置當前這條消息的屬性(設置過期時間 10) //參數4:消息的內容 channel.basicPublish("","queue7",null,msg.getBytes()); System.out.println("發送:" + msg); channel.close(); connection.close(); } }
5.1.2 消息消費者
-
創建Maven項目
-
-
log4j.properties
-
ConnetionUtil.java
-
消費者消費消息
-
package com.qfedu.mq.service; import com.qfedu.mq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是從隊列中獲取的數據 String msg = new String(body); System.out.println("接收:"+msg); } }; channel.basicConsume("queue1",true,consumer); } }
-
public class SendMsg { public static void main(String[] args) throws Exception{ System.out.println("請輸入消息:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicPublish("","queue2",null,msg.getBytes()); System.out.println("發送:" + msg); channel.close(); connection.close(); } } }
5.2.2 消費者1
-
public class ReceiveMsg { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是從隊列中獲取的數據 String msg = new String(body); System.out.println("Consumer1接收:"+msg); if("wait".equals(msg)){ try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; channel.basicConsume("queue2",true,consumer); } }
5.2.3 消費者2
-
public class ReceiveMsg { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是從隊列中獲取的數據 String msg = new String(body); System.out.println("Consumer2接收:"+msg); } }; channel.basicConsume("queue2",true,consumer); } }
5.3 訂閱模式
5.3.1 發送者 發送消息到交換機
-
public class SendMsg { public static void main(String[] args) throws Exception{ System.out.println("請輸入消息:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicPublish("ex1","",null,msg.getBytes()); System.out.println("發送:" + msg); channel.close(); connection.close(); } } }
5.3.2 消費者1
-
public class ReceiveMsg1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是從隊列中獲取的數據 String msg = new String(body); System.out.println("Consumer1接收:"+msg); if("wait".equals(msg)){ try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; channel.basicConsume("queue3",true,consumer); } }
5.3.3 消費者2
-
public class ReceiveMsg2 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是從隊列中獲取的數據 String msg = new String(body); System.out.println("Consumer2接收:"+msg); } }; channel.basicConsume("queue4",true,consumer); } }
5.4 路由模式
-
public class SendMsg { public static void main(String[] args) throws Exception{ System.out.println("請輸入消息:"); Scanner scanner = new Scanner(System.in); String msg = null; while(!"quit".equals(msg = scanner.nextLine())){ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); if(msg.startsWith("a")){ channel.basicPublish("ex2","a",null,msg.getBytes()); }else if(msg.startsWith("b")){ channel.basicPublish("ex2","b",null,msg.getBytes()); } System.out.println("發送:" + msg); channel.close(); connection.close(); } } }
5.4.2 消費者1
-
public class ReceiveMsg1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是從隊列中獲取的數據 String msg = new String(body); System.out.println("Consumer1接收:"+msg); if("wait".equals(msg)){ try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; channel.basicConsume("queue5",true,consumer); } }
5.4.3 消費者2
-
public class ReceiveMsg2 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //body就是從隊列中獲取的數據 String msg = new String(body); System.out.println("Consumer2接收:"+msg); } }; channel.basicConsume("queue6",true,consumer); } }
-