[RabbitMQ]Java客戶端:源碼概覽


本文簡要介紹RabbitMQ提供的Java客戶端中最基本的功能性接口/類及相關源碼。

Mavan依賴:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.13.1</version>
</dependency>

0 AMQP

com.rabbitmq.client.AMQP接口將AMQP(Advanced Message Queue Protocol,高級消息隊列協議)中的方法和消息屬性封裝成Java對象,便於以面向對象的思維模式進行編程。

該接口類圖簡要如下:

AMQP

AMQP接口中包含許多內部類,大體可以分成三類:

0.1 協議信息

PROTOCOL內部類,保存了AMQP的協議版本等信息。

public static class PROTOCOL {
    public static final int MAJOR = 0;
    public static final int MINOR = 9;
    public static final int REVISION = 1;
    public static final int PORT = 5672;
}

0.2 方法

包括ConnectionChannelAccessExchangeQueueBasicTxConfirm內部類,分別封裝了向Broker發送的不同方法的基本數據格式和內容。

它們都實現了com.rabbitmq.client.impl.Method抽象類(后續會介紹),在發送請求時,通過Method抽象類的toFrame()方法可以轉換成Frame(幀),然后com.rabbitmq.client.impl.AMQConnection將其以二進制數據的方式通過TCP協議發送給Broker

它們都提供了各自的Builder,便於實例化方法對象(建造者模式)。例如,最常用的Publish方法類圖簡要如下:

Publish

通過如下代碼實例化出Publish對象:

AMQP.Basic.Publish publish = new AMQP.Basic.Publish.Builder().exchange("myExchange").routingKey("myRoutingKey").mandatory(true).build();

在發送給Broker前可以通過如下代碼將Publish對象轉成幀:

Method method = (Method) publish;
Frame frame = method.toFrame(1);

com.rabbitmq.client.impl.AMQConnection對象管理着與Broker的連接,它通過如下代碼將方法發送給Broker

AMQConnection connection = channel.getConnection();
connection.writeFrame(frame);

0.3 消息屬性

BasicProperties內部類,封裝了消息的基本屬性。

它也提供了Builder,我們在發送消息時可以使用BasicProperties實例攜帶消息頭信息,類圖如下:

BasicProperties

通過如下代碼實例化出BasicProperties對象,並發送消息:

Map<String, Object> headers = new HashMap<>();
headers.put("color", "blue");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).expiration("10000").build();
channel.basicPublish("myExchange", "myRoutingKey", true, properties, "hello".getBytes());

BasicProperties對象在發送前最終會被轉換成com.rabbitmq.client.impl.AMQContentHeader對象,代表AMQ消息內容的頭。

BasicProperties2

AMQContentHeadertoFrame()方法也可以將其轉換成Frame(幀),然后com.rabbitmq.client.impl.AMQConnection將其以二進制數據的方式通過TCP協議發送給Broker

AMQConnection connection = channel.getConnection();
Frame headerFrame = contentHeader.toFrame(channelNumber, body.length);
connection.writeFrame(headerFrame);

1 ConnectionFactory

com.rabbitmq.client.ConnectionFactory類是用來創建與RabbitMQ服務器連接(com.rabbitmq.client.Connection)的工廠類。簡要類圖如下:

ConnectionFactory

ConnectionFactory內部封裝了許多屬性,用來設置與ConnectionSocket相關的連接信息。

它還提供了一套默認配置:

public static final String DEFAULT_VHOST = "/";
public static final String DEFAULT_HOST = "localhost";
public static final int    DEFAULT_AMQP_PORT = AMQP.PROTOCOL.PORT; // 5672
public static final int    DEFAULT_AMQP_OVER_SSL_PORT = 5671;
public static final String DEFAULT_PASS = "guest";	// CredentialsProvider.username
public static final String DEFAULT_USER = "guest";	// CredentialsProvider.password
private boolean nio = false;
private boolean automaticRecovery               = true;

ConnectionFactory的基本使用如下:

ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = connectionFactory.newConnection();//返回RecoveryAwareAMQConnection或AMQConnection對象

底層會創建出java.net.Socketjava.nio.channels.SocketChannel,代表與RabbitMQ服務器的TCP連接:

Socket socket = SocketFactory.getDefault().createSocket();
SocketChannel channel = SocketChannel.open();

SocketSocketChannel會被封裝到com.rabbitmq.client.impl.FrameHandler中:

// nio==false,使用Socket(默認值)
SocketFrameHandler frameHandler = new SocketFrameHandler(sock, this.shutdownExecutor);
// nio==true,使用SocketChannel
SocketChannelFrameHandlerState state = new SocketChannelFrameHandlerState(channel, nioLoopContext, nioParams, sslEngine);
SocketChannelFrameHandler frameHandler = new SocketChannelFrameHandler(state);

FrameHandler中提供了readFrame()writeFrame(),分別可以從Socket/SocketChannel中讀取或寫入數據。

FrameHandler

FrameHandler又會被封裝到com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectioncom.rabbitmq.client.impl.AMQConnection中:

// automaticRecovery==true,默認值
FrameHandler frameHandler = factory.create(addr, connectionName());
RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
// automaticRecovery==false
FrameHandler handler = fhFactory.create(addr, clientProvidedName);
AMQConnection conn = createConnection(params, handler, metricsCollector);

因此,我們可以使用返回的Connection對象與RabbitMQ服務器進行交互。

2 Connection

com.rabbitmq.client.Connection接口代表與RabbitMQ服務器的TCP連接,類圖簡要如下:

Connection

Connection主要提供了createChannel()openChannel()方法,用來創建Channel。后者提供了幾乎所有與RabbitMQ進行交互的方法,是項目中使用頻率最高的一個接口。

Connection的基本使用如下:

Channe channel = connection.createChannel();

Connection的實現類主要包括以下幾種,分別代表不同類型的連接:

Connection2

  • AMQConnection類:代表最基本的與RabbitMQ服務器的連接。內部持有FrameHandler等成員變量,用來與服務器交互。
  • RecoveryAwareAMQConnection接口:代表自動重連的連接,內部沒有方法,類似與標志性接口。
  • AutorecoveringConnection類:自動重連Connection的實現類,在非正常斷開情況下會自動重連,例如I/O異常。它持有RecoveryAwareAMQConnection對象作為代理,從而間接可以使用FrameHandler對象與服務器進行交互。重連時,內部組件也會按如下順序自動重連:
    • Exchanges
    • Queues
    • Bindings (both queue and exchange-to-exchange)
    • Consumers
  • RecoveryAwareAMQConnection:它是對AMQConnection的修改,主要用作AutorecoveringConnection的成員變量。它與AMQConnection主要區別在於它內部使用com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN作為Channel

在項目中使用的實現類主要為AMQConnectionAutorecoveringConnection(根據ConnectionFactoryautomaticRecovery成員變量進行選擇)。

AMQConnection.createChannel()方法會使用ChannelManager創建出ChannelN類型的通道:

ChannelManager cm = _channelManager;
Channel channel = cm.createChannel(this);
// 底層:
new ChannelN(connection, channelNumber, workService, this.metricsCollector);

AutorecoveringConnection.createChannel()方法會使用RecoveryAwareAMQConnection創建出RecoveryAwareChannelN類型的通道,並使用AutorecoveringChannel包裝:

RecoveryAwareChannelN ch = (RecoveryAwareChannelN) delegate.createChannel();
final AutorecoveringChannel channel = new AutorecoveringChannel(this, delegateChannel);

ChannelManagerAMQConnection中一個十分重要的成員變量,它管理着AMQConnection對象所屬的所有Channel對象(key為通道編號,取值范圍為1~_channelMax):

private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>();

AMQConnection中有一個十分重要的方法writeFrame(),可以將數據發送給RabbitMQ服務器:

public void writeFrame(Frame f) throws IOException {
    _frameHandler.writeFrame(f);
    _heartbeatSender.signalActivity();
}

// SocketFrameHandler
public void writeFrame(Frame frame) throws IOException {
    synchronized (_outputStream) {
        frame.writeTo(_outputStream);
    }
}
public void writeTo(DataOutputStream os) throws IOException {
    os.writeByte(type);
    os.writeShort(channel);
    if (accumulator != null) {
        os.writeInt(accumulator.size());
        accumulator.writeTo(os);
    } else {
        os.writeInt(payload.length);
        os.write(payload);
    }
    os.write(AMQP.FRAME_END);
}

AMQConnection中有一個十分重要的方法startMainLoop(),可以創建新線程監聽RabbitMQ服務器發送來的消息:

public void startMainLoop() {
    MainLoop loop = new MainLoop();
    final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
    mainLoopThread = Environment.newThread(threadFactory, loop, name);
    mainLoopThread.start();
}

其核心在於MainLoop內部類,核心步驟如下:

  1. 調用Frame frame = _frameHandler.readFrame()讀取RabbitMQ服務器發送來的消息。
  2. 調用readFrame(frame)處理消息。
private class MainLoop implements Runnable {
    @Override
    public void run() {
        boolean shouldDoFinalShutdown = true;
        try {
            while (_running) {
                Frame frame = _frameHandler.readFrame();
                readFrame(frame);
            }
        } catch (Throwable ex) {
            if (ex instanceof InterruptedException) {
                shouldDoFinalShutdown = false;
            } else {
                handleFailure(ex);
            }
        } finally {
            if (shouldDoFinalShutdown) {
                doFinalShutdown();
            }
        }
    }
}

readFrame(frame)方法會從ChannelManager成員變量中獲取該消息對應的通道,然后調用channel.handleFrame(frame)方法進行業務處理(最終會調用channel.processAsync(Command command)方法):

private void readFrame(Frame frame) throws IOException {
    if (frame != null) {
        _missedHeartbeats = 0;
        if (frame.type == AMQP.FRAME_HEARTBEAT) {
        } else {
            if (frame.channel == 0) {
                _channel0.handleFrame(frame);
            } else {
                if (isOpen()) {
                    ChannelManager cm = _channelManager;
                    if (cm != null) {
                        ChannelN channel;
                        try {
                            channel = cm.getChannel(frame.channel);
                        } catch(UnknownChannelException e) {
                            LOGGER.info("Received a frame on an unknown channel, ignoring it");
                            return;
                        }
                        channel.handleFrame(frame);	// 業務處理
                    }
                }
            }
        }
    } else {
        handleSocketTimeout();
    }
}

3 Channel

com.rabbitmq.client.Channel中封裝了與RabbitMQ服務器交互的API,簡要類圖如下:

Channel

Channel的基本使用方式如下:

// 聲明交換機
channel.exchangeDeclare("myExchange", BuiltinExchangeType.DIRECT);
// 聲明隊列
channel.queueDeclare("myQueue", true, false, false, null);
// 聲明綁定
channel.exchangeBind("myQueue", "myExchange", "myRoutingKey");
// 發送消息
channel.basicPublish("myExchange", "myRoutingKey", true, null, "hello".getBytes());
// 訂閱消息
channel.basicConsume("myQueue", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        });
// 拉取消息
channel.basicGet("myQueue", true);

Channel是實現類包括以下幾種:

Channel2

  • ChannelNAMQP協議功能API的主要實現類。
  • RecoveryAwareChannelN:重寫了basicAck()basicReject()basicNack()方法,對ChannelN功能進行擴展,實時跟蹤delivery tag,對最新的tag進行響應。
  • AutorecoveringChannel:在connection重連時會自動恢復的通道,內部通過持有RecoveryAwareChannelN代理對象來實現具體操作。

3.1 ChannelN

ChannelN

com.rabbitmq.client.impl.ChannelN是對AMQP協議功能性API的主要實現類,它除了實現Channel中定義的AMQP協議功能性API,還繼承了AMQChannel抽象類,通過其_connection成員變量可以在底層調用到SocketSocketChannel向RabbitMQ服務器進行讀寫操作。

除此之外,為了實現AMQP協議的特定功能,如消息確認機制。ChannelN內部封裝了如下成員變量:

  • _consumers:消息消費者,以consumerTag作為key,用於監聽消息。
  • returnListeners:監聽RabbitMQ服務器找不到對應交換機時的返回消息(basicPublish方法發送消息時設置mandatoryimmediate)。
  • confirmListeners:監聽RabbitMQ服務器的確認消息(acknack)。
  • defaultConsumer:默認的消息消費者。
  • dispatcher:啟動線程執行_consumers中的任務。

ChannelN中監聽消息的核心源碼如下:

public boolean processAsync(Command command) throws IOException {
    Method method = command.getMethod();
    if (method instanceof Channel.Close) {
        asyncShutdown(command);
        return true;
    }
    if (isOpen()) {
        // 根據不同方法類型調用對應的處理方法
        if (method instanceof Basic.Deliver) {
            processDelivery(command, (Basic.Deliver) method);
            return true;
        } else if (method instanceof Basic.Return) {
            callReturnListeners(command, (Basic.Return) method);
            return true;
        } else if (method instanceof Channel.Flow) {
            Channel.Flow channelFlow = (Channel.Flow) method;
            synchronized (_channelMutex) {
                _blockContent = !channelFlow.getActive();
                transmit(new Channel.FlowOk(!_blockContent));
                _channelMutex.notifyAll();
            }
            return true;
        } else if (method instanceof Basic.Ack) {
            Basic.Ack ack = (Basic.Ack) method;
            callConfirmListeners(command, ack);
            handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);
            return true;
        } else if (method instanceof Basic.Nack) {
            Basic.Nack nack = (Basic.Nack) method;
            callConfirmListeners(command, nack);
            handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
            return true;
        } else if (method instanceof Basic.RecoverOk) {
            for (Map.Entry<String, Consumer> entry : Utility.copy(_consumers).entrySet()) {
                this.dispatcher.handleRecoverOk(entry.getValue(), entry.getKey());
            }
            return false;
        } else if (method instanceof Basic.Cancel) {
            Basic.Cancel m = (Basic.Cancel)method;
            String consumerTag = m.getConsumerTag();
            Consumer callback = _consumers.remove(consumerTag);
            if (callback == null) {
                callback = defaultConsumer;
            }
            if (callback != null) {
                try {
                    this.dispatcher.handleCancel(callback, consumerTag);
                } catch (WorkPoolFullException e) {
                    throw e;
                } catch (Throwable ex) {
                    getConnection().getExceptionHandler().handleConsumerException(this,
                                                                                  ex,
                                                                                  callback,
                                                                                  consumerTag,
                                                                                  "handleCancel");
                }
            } else {
                LOGGER.warn("Could not cancel consumer with unknown tag {}", consumerTag);
            }
            return true;
        } else {
            return false;
        }
    } else {
        if (method instanceof Channel.CloseOk) {
            return false;
        } else {
            return true;
        }
    }
}

可見,該方法類似於SpringMVC中的DispatcherServlet,它會根據監聽到Command對象的方法類型進行分發處理。接下來介紹的各成員變量方法調用的入口都在這個方法中。

3.1.1 ConsumerDispatcher

ConsumerDispatcher

com.rabbitmq.client.impl.ConsumerDispatcher的作用是從線程池中獲取空閑線程處理消息。它的主要作用是開啟線程,而實際處理消息的業務邏輯在具體Consumer代理對象中處理。

例如,在處理生產者發布的消息時,ConsumerDispatcher會進行如下處理:

public void handleDelivery(final Consumer delegate,
                           final String consumerTag,
                           final Envelope envelope,
                           final AMQP.BasicProperties properties,
                           final byte[] body) throws IOException {
    executeUnlessShuttingDown(
    new Runnable() {
        @Override
        public void run() {
            try {
                delegate.handleDelivery(consumerTag,
                        envelope,
                        properties,
                        body);
            } catch (Throwable ex) {
                connection.getExceptionHandler().handleConsumerException(
                        channel,
                        ex,
                        delegate,
                        consumerTag,
                        "handleDelivery");
            }
        }
    });
}

3.1.2 Consumer

Consumer

com.rabbitmq.client.Consumer接口中定義了不同消息的處理方法,實例對象則表示消息消費者。

com.rabbitmq.client.DefaultConsumer是默認實現類,它實現了接口中的所有方法(空方法)。我們可以采取匿名內部類的方式,實現具體某個需要的方法,而不是實現所有方法。

我們可以使用如下代碼添加消費者:

channel.basicConsume("myQueue", new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println(new String(body));
    }
});

ChannelN中調用消費者處理消息方法(handleDelivery())的源碼如下:

protected void processDelivery(Command command, Basic.Deliver method) {
    Basic.Deliver m = method;
    Consumer callback = _consumers.get(m.getConsumerTag());
    if (callback == null) {
        if (defaultConsumer == null) {
            throw new IllegalStateException("Unsolicited delivery -" +
                    " see Channel.setDefaultConsumer to handle this" +
                    " case.");
        }
        else {
            callback = defaultConsumer;
        }
    }

    Envelope envelope = new Envelope(m.getDeliveryTag(),
                                     m.getRedelivered(),
                                     m.getExchange(),
                                     m.getRoutingKey());
    try {
        metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag());
        this.dispatcher.handleDelivery(callback,
                                       m.getConsumerTag(),
                                       envelope,
                                       (BasicProperties) command.getContentHeader(),
                                       command.getContentBody());
    } catch (WorkPoolFullException e) {
        throw e;
    } catch (Throwable ex) {
        getConnection().getExceptionHandler().handleConsumerException(this,
            ex,
            callback,
            m.getConsumerTag(),
            "handleDelivery");
    }
}

需要注意的是,在調用 this.dispatcher.handleDelivery()之前,會首先調用Consumer callback = _consumers.get(m.getConsumerTag())根據consumerTag獲取對應的消費者。因此,消費者處理消息是一對一的。

消費者其他方法的調用也可以在ChannelN.processAsync()中找到。

3.1.3 ReturnListener

ReturnListener

com.rabbitmq.client.ReturnListener接口中定義了監聽返回消息的通用方法handleReturn(),主要用於消息發布者監聽返回消息。

消息發布者通過basicPublish方法發送消息時設置mandatoryimmediate,但RabbitMQ服務器找不到對應交換機時會返回消息。消息發布者通過往Channel對象中添加ReturnListener實現類,即可監聽到返回消息:

channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("return message: " + new String(body));
    }
});

ChannelN中處理返回消息的源碼如下:

private void callReturnListeners(Command command, Basic.Return basicReturn) {
    try {
        for (ReturnListener l : this.returnListeners) {
            l.handleReturn(basicReturn.getReplyCode(),
                basicReturn.getReplyText(),
                basicReturn.getExchange(),
                basicReturn.getRoutingKey(),
                (BasicProperties) command.getContentHeader(),
                command.getContentBody());
        }
    } catch (Throwable ex) {
        getConnection().getExceptionHandler().handleReturnListenerException(this, ex);
    } finally {
        metricsCollector.basicPublishUnrouted(this);
    }
}

ReturnListener是針對ChannelN級別的。接收到返回消息后,所有添加到ChannelN對象的ReturnListener監聽器都會被調用。

3.1.4 ConfirmListener

ConfirmListener

com.rabbitmq.client.ConfirmListener接口中定義的監聽RabbitMQ服務器確認消息(acknack)的回調方法,主要用於消息發布者使用。

基本使用代碼如下:

channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        // 業務處理
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        // 業務處理
    }
});

ChannelN中處理返回消息的源碼如下:

private void callConfirmListeners(@SuppressWarnings("unused") Command command, Basic.Ack ack) {
    try {
        for (ConfirmListener l : this.confirmListeners) {
            l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
        }
    } catch (Throwable ex) {
        getConnection().getExceptionHandler().handleConfirmListenerException(this, ex);
    } finally {
        metricsCollector.basicPublishAck(this, ack.getDeliveryTag(), ack.getMultiple());
    }
}

private void callConfirmListeners(@SuppressWarnings("unused") Command command, Basic.Nack nack) {
    try {
        for (ConfirmListener l : this.confirmListeners) {
            l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
        }
    } catch (Throwable ex) {
        getConnection().getExceptionHandler().handleConfirmListenerException(this, ex);
    } finally {
        metricsCollector.basicPublishNack(this, nack.getDeliveryTag(), nack.getMultiple());
    }
}

ConfirmListener是針對ChannelN級別的。接收到確認消息后,所有添加到ChannelN對象的ConfirmListener監聽器都會被調用。

3.1.5 basicPublish()

前幾小節講述的都是RabbitMQ客戶端監聽從服務器響應的消息,本小節簡要分析客戶端發送消息的流程。

發送消息的基本方式如下:

channel.basicPublish("myExchange", "myRoutingKey", null, "hello".getBytes());
  1. ChannelN中的basicPublish()方法中執行如下代碼,核心步驟如下:
    1. 將形參轉換成AMQCommand對象中的CommandAssembler成員變量:exchangeroutingKeyBasic.Publish方法對象(Method),propertiesAMQContentHeader對象,bodyList<byte[]>對象。
    2. 調用transmit(command)方法,發送命令。
public void basicPublish(String exchange, String routingKey,
                         boolean mandatory, boolean immediate,
                         BasicProperties props, byte[] body)
    throws IOException
{
    if (nextPublishSeqNo > 0) {
        unconfirmedSet.add(getNextPublishSeqNo());
        nextPublishSeqNo++;
    }
    if (props == null) {
        props = MessageProperties.MINIMAL_BASIC;
    }
    AMQCommand command = new AMQCommand(
        new Basic.Publish.Builder()
            .exchange(exchange)
            .routingKey(routingKey)
            .mandatory(mandatory)
            .immediate(immediate)
            .build(), props, body);
    try {
        transmit(command);
    } catch (IOException e) {
        metricsCollector.basicPublishFailure(this, e);
        throw e;
    }
    metricsCollector.basicPublish(this);
}
  1. ChannelN中執行transmit()quiescingTransmit()方法,最終會調用AMQCommand.transmit()方法:
public void transmit(AMQCommand c) throws IOException {
    synchronized (_channelMutex) {
        ensureIsOpen();
        quiescingTransmit(c);
    }
}

public void quiescingTransmit(AMQCommand c) throws IOException {
    synchronized (_channelMutex) {
        if (c.getMethod().hasContent()) {
            while (_blockContent) {
                try {
                    _channelMutex.wait();
                } catch (InterruptedException ignored) {
                    Thread.currentThread().interrupt();
                }
                ensureIsOpen();
            }
        }
        this._trafficListener.write(c);
        c.transmit(this);
    }
}
  1. AMQCommand中執行transmit()方法,核心步驟如下:
    1. 獲取AMQConnection對象。
    2. 分別將AMQContentHeaderMethodList<byte[]>對象轉換成Frame對象。
    3. 通過AMQConnection對象發送數據。
public void transmit(AMQChannel channel) throws IOException {
    int channelNumber = channel.getChannelNumber();
    AMQConnection connection = channel.getConnection();

    synchronized (assembler) {
        Method m = this.assembler.getMethod();
        if (m.hasContent()) {
            byte[] body = this.assembler.getContentBody();

            Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);

            int frameMax = connection.getFrameMax();
            boolean cappedFrameMax = frameMax > 0;
            int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
            if (cappedFrameMax && headerFrame.size() > frameMax) {
                String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
                throw new IllegalArgumentException(msg);
            }
            connection.writeFrame(m.toFrame(channelNumber));
            connection.writeFrame(headerFrame);
            for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
                int remaining = body.length - offset;
                int fragmentLength = (remaining < bodyPayloadMax) ? remaining
                        : bodyPayloadMax;
                Frame frame = Frame.fromBodyFragment(channelNumber, body,
                        offset, fragmentLength);
                connection.writeFrame(frame);
            }
        } else {
            connection.writeFrame(m.toFrame(channelNumber));
        }
    }

    connection.flush();
}

3.1.6 basicGet()

除了添加Comsumer監聽器,我們還可以主動調用basicGet()向RabbitMQ服務器“拉取”消息。

basicGet()方法本質上是向RabbitMQ服務器發送一個Basic.Get請求,然后等待響應。

basicGet()

basicGet()方法的基本使用如下:

// 自動確認模式
GetResponse message = channel.basicGet("myQueue", true);
System.out.println(new String(message.getBody()));
// 手動確認模式
GetResponse myQueue = channel.basicGet("myQueue", false);
System.out.println(new String(message.getBody()));
channel.basicAck(myQueue.getEnvelope().getDeliveryTag(), false);
  1. ChannelN中的basicGet()方法中執行如下代碼,核心步驟如下:

    1. 將形參轉換成AMQCommand對象中的CommandAssembler成員變量:queueBasic.Get方法對象(Method),propertiesAMQContentHeader對象,bodyList<byte[]>對象。
    2. 調用exnWrappingRpc(command)方法,發送命令。
    3. 等待響應replyCommand,並封裝成GetResponse對象返回,
   public GetResponse basicGet(String queue, boolean autoAck) throws IOException{
    validateQueueNameLength(queue);
    AMQCommand replyCommand = exnWrappingRpc(new Basic.Get.Builder()
                                              .queue(queue)
                                              .noAck(autoAck)
                                             .build());
    Method method = replyCommand.getMethod();
    if (method instanceof Basic.GetOk) {
        Basic.GetOk getOk = (Basic.GetOk)method;
        Envelope envelope = new Envelope(getOk.getDeliveryTag(),
                                         getOk.getRedelivered(),
                                         getOk.getExchange(),
                                         getOk.getRoutingKey());
        BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
        byte[] body = replyCommand.getContentBody();
        int messageCount = getOk.getMessageCount();
        metricsCollector.consumedMessage(this, getOk.getDeliveryTag(), autoAck);
        return new GetResponse(envelope, props, body, messageCount);
    } else if (method instanceof Basic.GetEmpty) {
        return null;
    } else {
        throw new UnexpectedMethodError(method);
    }
}
public AMQCommand exnWrappingRpc(Method m) throws IOException {
    try {
        return privateRpc(m);
    } catch (AlreadyClosedException ace) {
        throw ace;
    } catch (ShutdownSignalException ex) {
        throw wrap(ex);
    }
}
  1. ChannelN中的privateRpc()方法中執行如下代碼,核心步驟如下:

    1. 實例化SimpleBlockingRpcContinuation對象,用於獲取響應。
    2. 調用rpc(m, k)方法,發送Basic.Get請求。
    3. 調用k.getReply()方法,等待響應並返回。
   private AMQCommand privateRpc(Method m) throws IOException, ShutdownSignalException{
    SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
    rpc(m, k);	// 發送請求
    if(_rpcTimeout == NO_RPC_TIMEOUT) {
        return k.getReply();	// 等待響應
    } else {
        try {
            return k.getReply(_rpcTimeout);
        } catch (TimeoutException e) {
            throw wrapTimeoutException(m, e);
        }
    }
}

public void rpc(Method m, RpcContinuation k) throws IOException {
    synchronized (_channelMutex) {
        ensureIsOpen();
        quiescingRpc(m, k);
    }
}

public void quiescingRpc(Method m, RpcContinuation k) throws IOException {
    synchronized (_channelMutex) {
        enqueueRpc(k);
        quiescingTransmit(m);
    }
}

public void enqueueRpc(RpcContinuation k) {
    doEnqueueRpc(() -> new RpcContinuationRpcWrapper(k));
}

public void quiescingTransmit(Method m) throws IOException {
    synchronized (_channelMutex) {
        quiescingTransmit(new AMQCommand(m));
    }
}

public void quiescingTransmit(AMQCommand c) throws IOException {
    synchronized (_channelMutex) {
        if (c.getMethod().hasContent()) {
           while (_blockContent) {
                try {
                    _channelMutex.wait();
                } catch (InterruptedException ignored) {
                    Thread.currentThread().interrupt();
                }
                ensureIsOpen();
            }
        }
        this._trafficListener.write(c);
        c.transmit(this);
    }
}
  1. 最終,在AMQCommand中執行transmit()方法,核心步驟如下:

    1. 獲取AMQConnection對象。
    2. 分別將AMQContentHeaderMethodList<byte[]>對象轉換成Frame對象。
    3. 通過AMQConnection對象發送數據。
   public void transmit(AMQChannel channel) throws IOException {
    int channelNumber = channel.getChannelNumber();
    AMQConnection connection = channel.getConnection();
    synchronized (assembler) {
        Method m = this.assembler.getMethod();
        if (m.hasContent()) {
            byte[] body = this.assembler.getContentBody();
            Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
            int frameMax = connection.getFrameMax();
            boolean cappedFrameMax = frameMax > 0;
            int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;

            if (cappedFrameMax && headerFrame.size() > frameMax) {
                String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
                throw new IllegalArgumentException(msg);
            }
            connection.writeFrame(m.toFrame(channelNumber));
            connection.writeFrame(headerFrame);

            for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
                int remaining = body.length - offset;

                int fragmentLength = (remaining < bodyPayloadMax) ? remaining
                        : bodyPayloadMax;
                Frame frame = Frame.fromBodyFragment(channelNumber, body,
                        offset, fragmentLength);
                connection.writeFrame(frame);
            }
        } else {
            connection.writeFrame(m.toFrame(channelNumber));
        }
    }
    connection.flush();
}

4 AMQCommand

AMQCommand

com.rabbitmq.client.impl.AMQCommand類實現了com.rabbitmq.client.Command接口,其成員變量CommandAssembler對象是AMQP規范中methodheaderbody的容器。

AMQCommand中提供了一個十分重要的方法:transmit(AMQChannel)。調用該方法能夠將methodheaderbody通過Connection發送給RabbitMQ服務器。該方法在前幾個小節都有介紹,核心步驟如下:

  1. 獲取AMQConnection對象。
  2. 分別將AMQContentHeaderMethodList<byte[]>對象轉換成Frame對象。
  3. 通過AMQConnection對象發送數據。
public void transmit(AMQChannel channel) throws IOException {
    int channelNumber = channel.getChannelNumber();
    AMQConnection connection = channel.getConnection();

    synchronized (assembler) {
        Method m = this.assembler.getMethod();
        if (m.hasContent()) {
            byte[] body = this.assembler.getContentBody();
            Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
            int frameMax = connection.getFrameMax();
            boolean cappedFrameMax = frameMax > 0;
            int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
            if (cappedFrameMax && headerFrame.size() > frameMax) {
                String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
                throw new IllegalArgumentException(msg);
            }
            connection.writeFrame(m.toFrame(channelNumber));
            connection.writeFrame(headerFrame);
            for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
                int remaining = body.length - offset;
                int fragmentLength = (remaining < bodyPayloadMax) ? remaining
                        : bodyPayloadMax;
                Frame frame = Frame.fromBodyFragment(channelNumber, body,
                        offset, fragmentLength);
                connection.writeFrame(frame);
            }
        } else {
            connection.writeFrame(m.toFrame(channelNumber));
        }
    }
    connection.flush();
}

4.1 CommandAssembler

CommandAssembler

com.rabbitmq.client.impl.CommandAssembler類中封裝了AMQP規范中methodheaderbody

4.1.1 Method

Method

com.rabbitmq.client.impl.Method抽象類代表AMQP規范中method,我們平常所使用的com.rabbitmq.client.AMQP接口中的ConnectionChannelAccessExchangeQueueBasicTxConfirm等內部類都實現了該抽象類。

我們調用channel.basicPublish()等方法向RabbitMQ服務器發送消息,或者從通過注冊Consumer監聽RabbitMQ服務器的消息時,都會將method數據段轉換成Method實現類進行處理。

Method.toFrame()方法則能將自己轉換成Frame對象,進行發送。

4.1.2 AMQContentHeader

AMQContentHeader

com.rabbitmq.client.impl.AMQContentHeader抽象類代表AMQP規范中headerchannel.basicPublish()方法形參BasicProperties實現了該抽象類,我們可以通過該對象為消息設置屬性。

我們調用channel.basicPublish()等方法向RabbitMQ服務器發送消息,或者從通過注冊Consumer監聽RabbitMQ服務器的消息時,都會將method數據段轉換成AMQContentHeader實現類進行處理。

AMQContentHeader.toFrame()方法則能將自己轉換成Frame對象,進行發送。

5 Frame

Frame

com.rabbitmq.client.impl.Frame代表AMQP wire-protocol frame(幀),主要包含以下成員變量:

  • type:幀類型。
  • channel:所屬通道。
  • payload:輸入載荷。
  • accumulator:輸出載荷。

Frame還提供了靜態方法readFrom(),可以從輸入流中讀取到Frame對象,主要提供給FrameHandler.readFrame()方法調用:

public static Frame readFrom(DataInputStream is) throws IOException {
    int type;
    int channel;
    try {
        type = is.readUnsignedByte();
    } catch (SocketTimeoutException ste) {
        return null; // failed
    }
    if (type == 'A') {
        protocolVersionMismatch(is);
    }
    channel = is.readUnsignedShort();
    int payloadSize = is.readInt();
    byte[] payload = new byte[payloadSize];
    is.readFully(payload);
    int frameEndMarker = is.readUnsignedByte();
    if (frameEndMarker != AMQP.FRAME_END) {
        throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);
    }
    return new Frame(type, channel, payload);
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM