RabbitMQ Consumer獲取消息的兩種方式(poll,subscribe)解析


以下轉自:http://blog.csdn.net/yangbutao/article/details/10395599

rabbitMQ中consumer通過建立到queue的連接,創建channel對象,通過channel通道獲取message, Consumer可以聲明式的以API輪詢poll的方式主動從queue的獲取消息,也可以通過訂閱的方式被動的從Queue中消費消息, 最近翻閱了基於java的客戶端的相關源碼,簡單做個分析。 編程模型偽代碼如下: ConnectionFactory factory = new ConnectionFactory(); Connection conn = factory.newConnection(); Channel channel=conn.createChannel(); 創建Connection需要指定MQ的物理地址和端口,是socket tcp物理連接,而channel是一個邏輯的概念,支持在tcp連接上創建多個MQ channel 以下是基於channel上的兩種消費方式。

1、Subscribe訂閱方式 boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag",      new DefaultConsumer(channel) {          @Override          public void handleDelivery(String consumerTag,                                     Envelope envelope,                                     AMQP.BasicProperties properties,                                     byte[] body)              throws IOException          {              String routingKey = envelope.getRoutingKey();              String contentType = properties.contentType;              long deliveryTag = envelope.getDeliveryTag();              // (process the message components here ...)              channel.basicAck(deliveryTag, false);          }      });

訂閱方式其實是向queue注冊consumer,通過rpc向queue server發送注冊consumer的消息,rabbitMQ Server在收到消息后,根據消息的內容類型判斷這是一個訂閱消息, 這樣當MQ 中queue有消息時,會自動把消息通過該socket(長連接)通道發送出去。 參見ChannelN中的方法     public String basicConsume(String queue, boolean autoAck, String consumerTag,                                boolean noLocal, boolean exclusive, Map<String, Object> arguments,                                final Consumer callback)         throws IOException     {     ......         rpc((Method)             new Basic.Consume.Builder()              .queue(queue)              .consumerTag(consumerTag)              .noLocal(noLocal)              .noAck(autoAck)              .exclusive(exclusive)              .arguments(arguments)             .build(),             k);

        try {             return k.getReply();         } catch(ShutdownSignalException ex) {             throw wrap(ex);         }     }

Consumer接收消息的過程: 創建Connection后,會啟動MainLoop后台線程,循環從socket(FrameHandler)中獲取數據包(Frame),調用channel.handleFrame(Frame frame)處理消息,     public void handleFrame(Frame frame) throws IOException {         AMQCommand command = _command;         if (command.handleFrame(frame)) { // 對消息進行協議assemble             _command = new AMQCommand(); // prepare for the next one             handleCompleteInboundCommand(command);//對消息消費處理         }     } ChannelN.handleCompleteInboundCommand        ---ChannelN.processAsync            ----dispatcher.handleDelivery                  ---QueueingConsumer.handleDelivery                      ---this._queue.add(new Delivery(envelope, properties, body));//消息最終放到隊列中 每個Consumer都有一個BlockQueue,用於緩存從socket中獲取的消息。 接下來,Consumer對象就可以調用api來從客戶端緩存的_queue中依次獲取消息,進行消費,參見QueueingConsumer.nextDelivery()

對於這種長連接的方式,沒看到心跳功能,以防止長連接的因網絡等原因連接失效

2、poll API方式 ChannelN: GetResponse basicGet(String queue, boolean autoAck) 這種方式比較簡單,直接通過RPC從MQ Server端獲取隊列中的消息


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM