如果是高並發下,rabbitmq服務器上收到成千上萬條消息,那么當打開消費端時,這些消息必定噴涌而來,導致消費端消費不過來甚至掛掉都有可能。
在非自動確認的模式下,可以采用限流模式,rabbitmq 提供了服務質量保障qos機制來控制一次消費消息數量。
下面直接上代碼:
生產端:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.rabbitmq.client.AMQP; 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.ConfirmListener; 9 import com.rabbitmq.client.Connection; 10 import com.rabbitmq.client.ConnectionFactory; 11 import com.rabbitmq.client.ReturnListener; 12 import com.rabbitmq.client.AMQP.BasicProperties; 13 14 public class Producter { 15 16 public static void main(String[] args) throws IOException, TimeoutException { 17 // TODO Auto-generated method stub 18 ConnectionFactory factory = new ConnectionFactory(); 19 factory.setHost("192.168.10.110"); 20 factory.setPort(5672); 21 factory.setUsername("guest"); 22 factory.setPassword("guest"); 23 factory.setVirtualHost("/"); 24 Connection conn = factory.newConnection(); 25 Channel channel = conn.createChannel(); 26 String exchange001 = "exchange_001"; 27 String queue001 = "queue_001"; 28 String routingkey = "mq.topic"; 29 String body = "hello rabbitmq!===============限流策略"; 30 // 開啟確認模式 31 channel.confirmSelect(); 32 // 循環發送多條消息 33 for(int i = 0 ;i<10;i++){ 34 channel.basicPublish(exchange001, routingkey, null, body.getBytes()); 35 } 36 37 // 添加一個返回監聽========消息返回模式重要添加 38 channel.addConfirmListener(new ConfirmListener() { 39 40 @Override 41 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 42 System.out.println("===========NACK============"); 43 44 } 45 46 @Override 47 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 48 System.out.println("===========ACK============"); 49 50 } 51 }); 52 } 53 54 }
消費端:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 10 public class Receiver { 11 12 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { 13 // TODO Auto-generated method stub 14 ConnectionFactory factory = new ConnectionFactory(); 15 factory.setHost("192.168.10.110"); 16 factory.setPort(5672); 17 factory.setUsername("guest"); 18 factory.setPassword("guest"); 19 factory.setVirtualHost("/"); 20 Connection conn = factory.newConnection(); 21 Channel channel = conn.createChannel(); 22 String exchange001 = "exchange_001"; 23 String queue001 = "queue_001"; 24 String routingkey = "mq.*"; 25 channel.exchangeDeclare(exchange001, "topic", true, false, null); 26 channel.queueDeclare(queue001, true, false, false, null); 27 channel.queueBind(queue001, exchange001, routingkey); 28 // 設置限流策略 29 // channel.basicQos(獲取消息最大數[0-無限制], 依次獲取數量, 作用域[true作用於整個channel,false作用於具體消費者]); 30 channel.basicQos(0, 2, false); 31 // 自定義消費者 32 MyConsumer myConsumer = new MyConsumer(channel); 33 // 進行消費,簽收模式一定要為手動簽收 34 Thread.sleep(3000); 35 channel.basicConsume(queue001, false, myConsumer); 36 } 37 38 }
自定義消費者:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.AMQP.BasicProperties; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.DefaultConsumer; 8 import com.rabbitmq.client.Envelope; 9 10 /** 11 * 可以繼承,可以實現,實現的話要覆寫的方法比較多,所以這里用了繼承 12 * 13 */ 14 public class MyConsumer extends DefaultConsumer{ 15 private Channel channel; 16 public MyConsumer(Channel channel) { 17 super(channel); 18 // TODO Auto-generated constructor stub 19 this.channel=channel; 20 } 21 22 @Override 23 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 24 throws IOException { 25 System.out.println("消費標簽:"+consumerTag); 26 System.out.println("envelope.getDeliveryTag():==="+envelope.getDeliveryTag()); 27 System.out.println("envelope.getExchange():==="+envelope.getExchange()); 28 System.out.println("envelope.getRoutingKey():==="+envelope.getRoutingKey()); 29 System.out.println("body:==="+new String(body)); 30 // 手動簽收,一定要有消費者簽收,如果沒有如下代碼,則限流模式下,僅能打印出來channel.basicQos(0, 2, false);第二參數的2條信息 31 channel.basicAck(envelope.getDeliveryTag(), false); 32 } 33 34 35 }
重回隊列模式,是當投遞消息失敗時,讓該消息重新回到隊列的模式,該模式需要手動簽收,並需要在消費者中進行判斷,調用重回隊列的確認模式
代碼如下
生產端:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 import java.util.concurrent.TimeoutException; 7 8 import org.springframework.amqp.core.Message; 9 10 import com.rabbitmq.client.AMQP.BasicProperties; 11 import com.rabbitmq.client.Channel; 12 import com.rabbitmq.client.ConfirmListener; 13 import com.rabbitmq.client.Connection; 14 import com.rabbitmq.client.ConnectionFactory; 15 import com.rabbitmq.client.ReturnListener; 16 17 public class Producter { 18 19 public static void main(String[] args) throws IOException, TimeoutException { 20 // TODO Auto-generated method stub 21 ConnectionFactory factory = new ConnectionFactory(); 22 factory.setHost("192.168.10.110"); 23 factory.setPort(5672); 24 factory.setUsername("guest"); 25 factory.setPassword("guest"); 26 factory.setVirtualHost("/"); 27 Connection conn = factory.newConnection(); 28 Channel channel = conn.createChannel(); 29 String exchange001 = "exchange_001"; 30 String queue001 = "queue_001"; 31 String routingkey = "mq.topic"; 32 33 // 循環發送多條消息 34 for(int i = 0 ;i<5;i++){ 35 String body = "hello rabbitmq!===============ACK&重回隊列,第"+i+"條"; 36 Map<String,Object> head = new HashMap<>(); 37 head.put("n", i); 38 BasicProperties properties = new BasicProperties(null, "utf-8", head, 2, 1, null, null, null, null, null, null, null, null, null); 39 40 channel.basicPublish(exchange001, routingkey, properties, body.getBytes()); 41 } 42 43 } 44 45 }
消費端:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.Connection; 8 import com.rabbitmq.client.ConnectionFactory; 9 10 public class Receiver { 11 12 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { 13 // TODO Auto-generated method stub 14 ConnectionFactory factory = new ConnectionFactory(); 15 factory.setHost("192.168.10.110"); 16 factory.setPort(5672); 17 factory.setUsername("guest"); 18 factory.setPassword("guest"); 19 factory.setVirtualHost("/"); 20 Connection conn = factory.newConnection(); 21 Channel channel = conn.createChannel(); 22 String exchange001 = "exchange_001"; 23 String queue001 = "queue_001"; 24 String routingkey = "mq.*"; 25 channel.exchangeDeclare(exchange001, "topic", true, false, null); 26 channel.queueDeclare(queue001, true, false, false, null); 27 channel.queueBind(queue001, exchange001, routingkey); 28 // 自定義消費者 29 MyConsumer myConsumer = new MyConsumer(channel); 30 // 進行消費,簽收模式一定要為手動簽收 31 channel.basicConsume(queue001, false, myConsumer); 32 } 33 34 }
自定義消費者:
1 package com.zxy.demo.rabbitmq; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.AMQP.BasicProperties; 6 import com.rabbitmq.client.Channel; 7 import com.rabbitmq.client.DefaultConsumer; 8 import com.rabbitmq.client.Envelope; 9 10 /** 11 * 可以繼承,可以實現,實現的話要覆寫的方法比較多,所以這里用了繼承 12 * 13 */ 14 public class MyConsumer extends DefaultConsumer{ 15 private Channel channel; 16 public MyConsumer(Channel channel) { 17 super(channel); 18 // TODO Auto-generated constructor stub 19 this.channel=channel; 20 } 21 22 @Override 23 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 24 throws IOException { 25 System.out.println("消費標簽:"+consumerTag); 26 System.out.println("envelope.getDeliveryTag():==="+envelope.getDeliveryTag()); 27 System.out.println("envelope.getExchange():==="+envelope.getExchange()); 28 System.out.println("envelope.getRoutingKey():==="+envelope.getRoutingKey()); 29 System.out.println("body:==="+new String(body)); 30 System.out.println("===================休眠以便查看==============="); 31 try { 32 Thread.sleep(2000); 33 } catch (InterruptedException e) { 34 // TODO Auto-generated catch block 35 e.printStackTrace(); 36 } 37 // 手動簽收 38 Integer i = (Integer) properties.getHeaders().get("n"); 39 System.out.println("iiiiiiiiiiiiiiiii======================================================"+i); 40 if(i==1) { 41 channel.basicNack(envelope.getDeliveryTag(),false, true);//第三個參數為是否重返隊列 42 }else { 43 channel.basicAck(envelope.getDeliveryTag(), false); 44 } 45 } 46 47 48 }
下面是重回隊列執行結果,可以看到當消費完后第一條不斷的被扔回隊列然后消費再扔回。
1 消費標簽:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 2 envelope.getDeliveryTag():===1 3 envelope.getExchange():===exchange_001 4 envelope.getRoutingKey():===mq.topic 5 body:===hello rabbitmq!===============ACK&重回隊列,第0條 6 ===================休眠以便查看=============== 7 iiiiiiiiiiiiiiiii======================================================0 8 消費標簽:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 9 envelope.getDeliveryTag():===2 10 envelope.getExchange():===exchange_001 11 envelope.getRoutingKey():===mq.topic 12 body:===hello rabbitmq!===============ACK&重回隊列,第1條 13 ===================休眠以便查看=============== 14 iiiiiiiiiiiiiiiii======================================================1 15 消費標簽:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 16 envelope.getDeliveryTag():===3 17 envelope.getExchange():===exchange_001 18 envelope.getRoutingKey():===mq.topic 19 body:===hello rabbitmq!===============ACK&重回隊列,第2條 20 ===================休眠以便查看=============== 21 iiiiiiiiiiiiiiiii======================================================2 22 消費標簽:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 23 envelope.getDeliveryTag():===4 24 envelope.getExchange():===exchange_001 25 envelope.getRoutingKey():===mq.topic 26 body:===hello rabbitmq!===============ACK&重回隊列,第3條 27 ===================休眠以便查看=============== 28 iiiiiiiiiiiiiiiii======================================================3 29 消費標簽:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 30 envelope.getDeliveryTag():===5 31 envelope.getExchange():===exchange_001 32 envelope.getRoutingKey():===mq.topic 33 body:===hello rabbitmq!===============ACK&重回隊列,第4條 34 ===================休眠以便查看=============== 35 iiiiiiiiiiiiiiiii======================================================4 36 消費標簽:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 37 envelope.getDeliveryTag():===6 38 envelope.getExchange():===exchange_001 39 envelope.getRoutingKey():===mq.topic 40 body:===hello rabbitmq!===============ACK&重回隊列,第1條 41 ===================休眠以便查看=============== 42 iiiiiiiiiiiiiiiii======================================================1 43 消費標簽:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 44 envelope.getDeliveryTag():===7 45 envelope.getExchange():===exchange_001 46 envelope.getRoutingKey():===mq.topic 47 body:===hello rabbitmq!===============ACK&重回隊列,第1條 48 ===================休眠以便查看=============== 49 iiiiiiiiiiiiiiiii======================================================1 50 消費標簽:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 51 envelope.getDeliveryTag():===8 52 envelope.getExchange():===exchange_001 53 envelope.getRoutingKey():===mq.topic 54 body:===hello rabbitmq!===============ACK&重回隊列,第1條 55 ===================休眠以便查看=============== 56 iiiiiiiiiiiiiiiii======================================================1 57 消費標簽:amq.ctag-eeeaa9jgrJ3tDhtQR2XReg 58 envelope.getDeliveryTag():===9 59 envelope.getExchange():===exchange_001 60 envelope.getRoutingKey():===mq.topic 61 body:===hello rabbitmq!===============ACK&重回隊列,第1條 62 ===================休眠以便查看===============
