以下轉自:http://blog.csdn.net/yangbutao/article/details/10395599
rabbitMQ中consumer通過建立到queue的連接,創建channel對象,通過channel通道獲取message, Consumer可以聲明式的以API輪詢poll的方式主動從queue的獲取消息,也可以通過訂閱的方式被動的從Queue中消費消息, 最近翻閱了基於java的客戶端的相關源碼,簡單做個分析。 編程模型偽代碼如下: ConnectionFactory factory = new ConnectionFactory(); Connection conn = factory.newConnection(); Channel channel=conn.createChannel(); 創建Connection需要指定MQ的物理地址和端口,是socket tcp物理連接,而channel是一個邏輯的概念,支持在tcp連接上創建多個MQ channel 以下是基於channel上的兩種消費方式。
1、Subscribe訂閱方式 boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.contentType; long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) channel.basicAck(deliveryTag, false); } });
訂閱方式其實是向queue注冊consumer,通過rpc向queue server發送注冊consumer的消息,rabbitMQ Server在收到消息后,根據消息的內容類型判斷這是一個訂閱消息, 這樣當MQ 中queue有消息時,會自動把消息通過該socket(長連接)通道發送出去。 參見ChannelN中的方法 public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, final Consumer callback) throws IOException { ...... rpc((Method) new Basic.Consume.Builder() .queue(queue) .consumerTag(consumerTag) .noLocal(noLocal) .noAck(autoAck) .exclusive(exclusive) .arguments(arguments) .build(), k);
try { return k.getReply(); } catch(ShutdownSignalException ex) { throw wrap(ex); } }
Consumer接收消息的過程: 創建Connection后,會啟動MainLoop后台線程,循環從socket(FrameHandler)中獲取數據包(Frame),調用channel.handleFrame(Frame frame)處理消息, public void handleFrame(Frame frame) throws IOException { AMQCommand command = _command; if (command.handleFrame(frame)) { // 對消息進行協議assemble _command = new AMQCommand(); // prepare for the next one handleCompleteInboundCommand(command);//對消息消費處理 } } ChannelN.handleCompleteInboundCommand ---ChannelN.processAsync ----dispatcher.handleDelivery ---QueueingConsumer.handleDelivery ---this._queue.add(new Delivery(envelope, properties, body));//消息最終放到隊列中 每個Consumer都有一個BlockQueue,用於緩存從socket中獲取的消息。 接下來,Consumer對象就可以調用api來從客戶端緩存的_queue中依次獲取消息,進行消費,參見QueueingConsumer.nextDelivery()
對於這種長連接的方式,沒看到心跳功能,以防止長連接的因網絡等原因連接失效
2、poll API方式 ChannelN: GetResponse basicGet(String queue, boolean autoAck) 這種方式比較簡單,直接通過RPC從MQ Server端獲取隊列中的消息
