消費端限流:
什么是消費端限流?
場景:
我們RabbitMQ服務器有上萬條未處理的消息,我們隨便打開一個消費者客戶端,會出現下面情況:
巨量的消息瞬間全部推送過來,但是我們單個客戶端無法同時處理這么多數據。(導致服務器崩潰,線上故障)
生產端一次推送幾百條數據庫,客戶端只接收一兩條,在高並發的情況下,不能再生產端做限流,只能在消費端處理。
解決方法:
RabbitMQ提供了一種qos(服務質量保證)功能,在非自動確認消息的前提下,
如果一定數據的消息(通過基於consumer或者channel設置qos的值)未被確認前,不進行消費新的消息。減壓減負
void BasicQos(uint prefetchSize,ushort prefetchCount,bool global);
消費端體現,一次最多能處理多少條消息(基本上為1),限流策略在什么上應用(channel--true,consumer---false)
prefetchSize:0
prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送多余n個消息,
一旦有n個消息還沒有ack,則該consumer將block調,知道有消息ack
global:true\false是否將上面設置應用於channel,簡單的說就是上面限制是channel
級別的還是consumer級別,基本使用false。
注意:prefetchSize和global這兩項,rabbitmq沒有實現,暫不研究
prefetch_count在no_ack=false的情況下生效,在自動應答的情況下兩個值不生效。
//生產端代碼 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_qos_exchange"; String routingKey = "qos.save"; String msg = "Hello RabbitMQ QOS Message"; for(int i =0; i<5; i ++){ channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); }
//消費端代碼 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String routingKey = "qos.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //1 限流方式 第一件事就是 autoAck設置為 false //接收1條消息, channel.basicQos(0, 1, false); channel.basicConsume(queueName, false, new MyConsumer(channel));
//自定義消息端 private Channel channel ; public MyConsumer(Channel channel) { super(channel);
//接收ack進行消息發送 this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); //false不支持批量簽收 channel.basicAck(envelope.getDeliveryTag(), false); }