本文簡要介紹RabbitMQ提供的Java客戶端中最基本的功能性接口/類及相關源碼。
Mavan依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.1</version>
</dependency>
0 AMQP
com.rabbitmq.client.AMQP接口將AMQP(Advanced Message Queue Protocol,高級消息隊列協議)中的方法和消息屬性封裝成Java對象,便於以面向對象的思維模式進行編程。
該接口類圖簡要如下:

AMQP接口中包含許多內部類,大體可以分成三類:
0.1 協議信息
PROTOCOL內部類,保存了AMQP的協議版本等信息。
public static class PROTOCOL {
public static final int MAJOR = 0;
public static final int MINOR = 9;
public static final int REVISION = 1;
public static final int PORT = 5672;
}
0.2 方法
包括Connection、Channel、Access、Exchange、Queue、Basic、Tx和Confirm內部類,分別封裝了向Broker發送的不同方法的基本數據格式和內容。
它們都實現了com.rabbitmq.client.impl.Method抽象類(后續會介紹),在發送請求時,通過Method抽象類的toFrame()方法可以轉換成Frame(幀),然后com.rabbitmq.client.impl.AMQConnection將其以二進制數據的方式通過TCP協議發送給Broker。
它們都提供了各自的Builder,便於實例化方法對象(建造者模式)。例如,最常用的Publish方法類圖簡要如下:

通過如下代碼實例化出Publish對象:
AMQP.Basic.Publish publish = new AMQP.Basic.Publish.Builder().exchange("myExchange").routingKey("myRoutingKey").mandatory(true).build();
在發送給Broker前可以通過如下代碼將Publish對象轉成幀:
Method method = (Method) publish;
Frame frame = method.toFrame(1);
com.rabbitmq.client.impl.AMQConnection對象管理着與Broker的連接,它通過如下代碼將方法發送給Broker:
AMQConnection connection = channel.getConnection();
connection.writeFrame(frame);
0.3 消息屬性
BasicProperties內部類,封裝了消息的基本屬性。
它也提供了Builder,我們在發送消息時可以使用BasicProperties實例攜帶消息頭信息,類圖如下:

通過如下代碼實例化出BasicProperties對象,並發送消息:
Map<String, Object> headers = new HashMap<>();
headers.put("color", "blue");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).expiration("10000").build();
channel.basicPublish("myExchange", "myRoutingKey", true, properties, "hello".getBytes());
BasicProperties對象在發送前最終會被轉換成com.rabbitmq.client.impl.AMQContentHeader對象,代表AMQ消息內容的頭。

AMQContentHeader的toFrame()方法也可以將其轉換成Frame(幀),然后com.rabbitmq.client.impl.AMQConnection將其以二進制數據的方式通過TCP協議發送給Broker。
AMQConnection connection = channel.getConnection();
Frame headerFrame = contentHeader.toFrame(channelNumber, body.length);
connection.writeFrame(headerFrame);
1 ConnectionFactory
com.rabbitmq.client.ConnectionFactory類是用來創建與RabbitMQ服務器連接(com.rabbitmq.client.Connection)的工廠類。簡要類圖如下:

ConnectionFactory內部封裝了許多屬性,用來設置與Connection或Socket相關的連接信息。
它還提供了一套默認配置:
public static final String DEFAULT_VHOST = "/";
public static final String DEFAULT_HOST = "localhost";
public static final int DEFAULT_AMQP_PORT = AMQP.PROTOCOL.PORT; // 5672
public static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671;
public static final String DEFAULT_PASS = "guest"; // CredentialsProvider.username
public static final String DEFAULT_USER = "guest"; // CredentialsProvider.password
private boolean nio = false;
private boolean automaticRecovery = true;
ConnectionFactory的基本使用如下:
ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = connectionFactory.newConnection();//返回RecoveryAwareAMQConnection或AMQConnection對象
底層會創建出java.net.Socket或java.nio.channels.SocketChannel,代表與RabbitMQ服務器的TCP連接:
Socket socket = SocketFactory.getDefault().createSocket();
SocketChannel channel = SocketChannel.open();
Socket或SocketChannel會被封裝到com.rabbitmq.client.impl.FrameHandler中:
// nio==false,使用Socket(默認值)
SocketFrameHandler frameHandler = new SocketFrameHandler(sock, this.shutdownExecutor);
// nio==true,使用SocketChannel
SocketChannelFrameHandlerState state = new SocketChannelFrameHandlerState(channel, nioLoopContext, nioParams, sslEngine);
SocketChannelFrameHandler frameHandler = new SocketChannelFrameHandler(state);
FrameHandler中提供了readFrame()和writeFrame(),分別可以從Socket/SocketChannel中讀取或寫入數據。

FrameHandler又會被封裝到com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnection或com.rabbitmq.client.impl.AMQConnection中:
// automaticRecovery==true,默認值
FrameHandler frameHandler = factory.create(addr, connectionName());
RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
// automaticRecovery==false
FrameHandler handler = fhFactory.create(addr, clientProvidedName);
AMQConnection conn = createConnection(params, handler, metricsCollector);
因此,我們可以使用返回的Connection對象與RabbitMQ服務器進行交互。
2 Connection
com.rabbitmq.client.Connection接口代表與RabbitMQ服務器的TCP連接,類圖簡要如下:

Connection主要提供了createChannel()和openChannel()方法,用來創建Channel。后者提供了幾乎所有與RabbitMQ進行交互的方法,是項目中使用頻率最高的一個接口。
Connection的基本使用如下:
Channe channel = connection.createChannel();
Connection的實現類主要包括以下幾種,分別代表不同類型的連接:

AMQConnection類:代表最基本的與RabbitMQ服務器的連接。內部持有FrameHandler等成員變量,用來與服務器交互。RecoveryAwareAMQConnection接口:代表自動重連的連接,內部沒有方法,類似與標志性接口。AutorecoveringConnection類:自動重連Connection的實現類,在非正常斷開情況下會自動重連,例如I/O異常。它持有RecoveryAwareAMQConnection對象作為代理,從而間接可以使用FrameHandler對象與服務器進行交互。重連時,內部組件也會按如下順序自動重連:ExchangesQueuesBindings(both queue and exchange-to-exchange)Consumers
RecoveryAwareAMQConnection:它是對AMQConnection的修改,主要用作AutorecoveringConnection的成員變量。它與AMQConnection主要區別在於它內部使用com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN作為Channel。
在項目中使用的實現類主要為AMQConnection和AutorecoveringConnection(根據ConnectionFactory的automaticRecovery成員變量進行選擇)。
AMQConnection.createChannel()方法會使用ChannelManager創建出ChannelN類型的通道:
ChannelManager cm = _channelManager;
Channel channel = cm.createChannel(this);
// 底層:
new ChannelN(connection, channelNumber, workService, this.metricsCollector);
而AutorecoveringConnection.createChannel()方法會使用RecoveryAwareAMQConnection創建出RecoveryAwareChannelN類型的通道,並使用AutorecoveringChannel包裝:
RecoveryAwareChannelN ch = (RecoveryAwareChannelN) delegate.createChannel();
final AutorecoveringChannel channel = new AutorecoveringChannel(this, delegateChannel);
ChannelManager是AMQConnection中一個十分重要的成員變量,它管理着AMQConnection對象所屬的所有Channel對象(key為通道編號,取值范圍為1~_channelMax):
private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>();
AMQConnection中有一個十分重要的方法writeFrame(),可以將數據發送給RabbitMQ服務器:
public void writeFrame(Frame f) throws IOException {
_frameHandler.writeFrame(f);
_heartbeatSender.signalActivity();
}
// SocketFrameHandler
public void writeFrame(Frame frame) throws IOException {
synchronized (_outputStream) {
frame.writeTo(_outputStream);
}
}
public void writeTo(DataOutputStream os) throws IOException {
os.writeByte(type);
os.writeShort(channel);
if (accumulator != null) {
os.writeInt(accumulator.size());
accumulator.writeTo(os);
} else {
os.writeInt(payload.length);
os.write(payload);
}
os.write(AMQP.FRAME_END);
}
AMQConnection中有一個十分重要的方法startMainLoop(),可以創建新線程監聽RabbitMQ服務器發送來的消息:
public void startMainLoop() {
MainLoop loop = new MainLoop();
final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
mainLoopThread = Environment.newThread(threadFactory, loop, name);
mainLoopThread.start();
}
其核心在於MainLoop內部類,核心步驟如下:
- 調用
Frame frame = _frameHandler.readFrame()讀取RabbitMQ服務器發送來的消息。 - 調用
readFrame(frame)處理消息。
private class MainLoop implements Runnable {
@Override
public void run() {
boolean shouldDoFinalShutdown = true;
try {
while (_running) {
Frame frame = _frameHandler.readFrame();
readFrame(frame);
}
} catch (Throwable ex) {
if (ex instanceof InterruptedException) {
shouldDoFinalShutdown = false;
} else {
handleFailure(ex);
}
} finally {
if (shouldDoFinalShutdown) {
doFinalShutdown();
}
}
}
}
readFrame(frame)方法會從ChannelManager成員變量中獲取該消息對應的通道,然后調用channel.handleFrame(frame)方法進行業務處理(最終會調用channel.processAsync(Command command)方法):
private void readFrame(Frame frame) throws IOException {
if (frame != null) {
_missedHeartbeats = 0;
if (frame.type == AMQP.FRAME_HEARTBEAT) {
} else {
if (frame.channel == 0) {
_channel0.handleFrame(frame);
} else {
if (isOpen()) {
ChannelManager cm = _channelManager;
if (cm != null) {
ChannelN channel;
try {
channel = cm.getChannel(frame.channel);
} catch(UnknownChannelException e) {
LOGGER.info("Received a frame on an unknown channel, ignoring it");
return;
}
channel.handleFrame(frame); // 業務處理
}
}
}
}
} else {
handleSocketTimeout();
}
}
3 Channel
com.rabbitmq.client.Channel中封裝了與RabbitMQ服務器交互的API,簡要類圖如下:

Channel的基本使用方式如下:
// 聲明交換機
channel.exchangeDeclare("myExchange", BuiltinExchangeType.DIRECT);
// 聲明隊列
channel.queueDeclare("myQueue", true, false, false, null);
// 聲明綁定
channel.exchangeBind("myQueue", "myExchange", "myRoutingKey");
// 發送消息
channel.basicPublish("myExchange", "myRoutingKey", true, null, "hello".getBytes());
// 訂閱消息
channel.basicConsume("myQueue", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
// 拉取消息
channel.basicGet("myQueue", true);
Channel是實現類包括以下幾種:

ChannelN:AMQP協議功能API的主要實現類。RecoveryAwareChannelN:重寫了basicAck()、basicReject()和basicNack()方法,對ChannelN功能進行擴展,實時跟蹤delivery tag,對最新的tag進行響應。AutorecoveringChannel:在connection重連時會自動恢復的通道,內部通過持有RecoveryAwareChannelN代理對象來實現具體操作。
3.1 ChannelN

com.rabbitmq.client.impl.ChannelN是對AMQP協議功能性API的主要實現類,它除了實現Channel中定義的AMQP協議功能性API,還繼承了AMQChannel抽象類,通過其_connection成員變量可以在底層調用到Socket或SocketChannel向RabbitMQ服務器進行讀寫操作。
除此之外,為了實現AMQP協議的特定功能,如消息確認機制。ChannelN內部封裝了如下成員變量:
_consumers:消息消費者,以consumerTag作為key,用於監聽消息。returnListeners:監聽RabbitMQ服務器找不到對應交換機時的返回消息(basicPublish方法發送消息時設置mandatory或immediate)。confirmListeners:監聽RabbitMQ服務器的確認消息(ack或nack)。defaultConsumer:默認的消息消費者。dispatcher:啟動線程執行_consumers中的任務。
ChannelN中監聽消息的核心源碼如下:
public boolean processAsync(Command command) throws IOException {
Method method = command.getMethod();
if (method instanceof Channel.Close) {
asyncShutdown(command);
return true;
}
if (isOpen()) {
// 根據不同方法類型調用對應的處理方法
if (method instanceof Basic.Deliver) {
processDelivery(command, (Basic.Deliver) method);
return true;
} else if (method instanceof Basic.Return) {
callReturnListeners(command, (Basic.Return) method);
return true;
} else if (method instanceof Channel.Flow) {
Channel.Flow channelFlow = (Channel.Flow) method;
synchronized (_channelMutex) {
_blockContent = !channelFlow.getActive();
transmit(new Channel.FlowOk(!_blockContent));
_channelMutex.notifyAll();
}
return true;
} else if (method instanceof Basic.Ack) {
Basic.Ack ack = (Basic.Ack) method;
callConfirmListeners(command, ack);
handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);
return true;
} else if (method instanceof Basic.Nack) {
Basic.Nack nack = (Basic.Nack) method;
callConfirmListeners(command, nack);
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
return true;
} else if (method instanceof Basic.RecoverOk) {
for (Map.Entry<String, Consumer> entry : Utility.copy(_consumers).entrySet()) {
this.dispatcher.handleRecoverOk(entry.getValue(), entry.getKey());
}
return false;
} else if (method instanceof Basic.Cancel) {
Basic.Cancel m = (Basic.Cancel)method;
String consumerTag = m.getConsumerTag();
Consumer callback = _consumers.remove(consumerTag);
if (callback == null) {
callback = defaultConsumer;
}
if (callback != null) {
try {
this.dispatcher.handleCancel(callback, consumerTag);
} catch (WorkPoolFullException e) {
throw e;
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleConsumerException(this,
ex,
callback,
consumerTag,
"handleCancel");
}
} else {
LOGGER.warn("Could not cancel consumer with unknown tag {}", consumerTag);
}
return true;
} else {
return false;
}
} else {
if (method instanceof Channel.CloseOk) {
return false;
} else {
return true;
}
}
}
可見,該方法類似於SpringMVC中的DispatcherServlet,它會根據監聽到Command對象的方法類型進行分發處理。接下來介紹的各成員變量方法調用的入口都在這個方法中。
3.1.1 ConsumerDispatcher

com.rabbitmq.client.impl.ConsumerDispatcher的作用是從線程池中獲取空閑線程處理消息。它的主要作用是開啟線程,而實際處理消息的業務邏輯在具體Consumer代理對象中處理。
例如,在處理生產者發布的消息時,ConsumerDispatcher會進行如下處理:
public void handleDelivery(final Consumer delegate,
final String consumerTag,
final Envelope envelope,
final AMQP.BasicProperties properties,
final byte[] body) throws IOException {
executeUnlessShuttingDown(
new Runnable() {
@Override
public void run() {
try {
delegate.handleDelivery(consumerTag,
envelope,
properties,
body);
} catch (Throwable ex) {
connection.getExceptionHandler().handleConsumerException(
channel,
ex,
delegate,
consumerTag,
"handleDelivery");
}
}
});
}
3.1.2 Consumer

com.rabbitmq.client.Consumer接口中定義了不同消息的處理方法,實例對象則表示消息消費者。
com.rabbitmq.client.DefaultConsumer是默認實現類,它實現了接口中的所有方法(空方法)。我們可以采取匿名內部類的方式,實現具體某個需要的方法,而不是實現所有方法。
我們可以使用如下代碼添加消費者:
channel.basicConsume("myQueue", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
在ChannelN中調用消費者處理消息方法(handleDelivery())的源碼如下:
protected void processDelivery(Command command, Basic.Deliver method) {
Basic.Deliver m = method;
Consumer callback = _consumers.get(m.getConsumerTag());
if (callback == null) {
if (defaultConsumer == null) {
throw new IllegalStateException("Unsolicited delivery -" +
" see Channel.setDefaultConsumer to handle this" +
" case.");
}
else {
callback = defaultConsumer;
}
}
Envelope envelope = new Envelope(m.getDeliveryTag(),
m.getRedelivered(),
m.getExchange(),
m.getRoutingKey());
try {
metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag());
this.dispatcher.handleDelivery(callback,
m.getConsumerTag(),
envelope,
(BasicProperties) command.getContentHeader(),
command.getContentBody());
} catch (WorkPoolFullException e) {
throw e;
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleConsumerException(this,
ex,
callback,
m.getConsumerTag(),
"handleDelivery");
}
}
需要注意的是,在調用 this.dispatcher.handleDelivery()之前,會首先調用Consumer callback = _consumers.get(m.getConsumerTag())根據consumerTag獲取對應的消費者。因此,消費者處理消息是一對一的。
消費者其他方法的調用也可以在ChannelN.processAsync()中找到。
3.1.3 ReturnListener

com.rabbitmq.client.ReturnListener接口中定義了監聽返回消息的通用方法handleReturn(),主要用於消息發布者監聽返回消息。
消息發布者通過basicPublish方法發送消息時設置mandatory或immediate,但RabbitMQ服務器找不到對應交換機時會返回消息。消息發布者通過往Channel對象中添加ReturnListener實現類,即可監聽到返回消息:
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("return message: " + new String(body));
}
});
在ChannelN中處理返回消息的源碼如下:
private void callReturnListeners(Command command, Basic.Return basicReturn) {
try {
for (ReturnListener l : this.returnListeners) {
l.handleReturn(basicReturn.getReplyCode(),
basicReturn.getReplyText(),
basicReturn.getExchange(),
basicReturn.getRoutingKey(),
(BasicProperties) command.getContentHeader(),
command.getContentBody());
}
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleReturnListenerException(this, ex);
} finally {
metricsCollector.basicPublishUnrouted(this);
}
}
ReturnListener是針對ChannelN級別的。接收到返回消息后,所有添加到ChannelN對象的ReturnListener監聽器都會被調用。
3.1.4 ConfirmListener

com.rabbitmq.client.ConfirmListener接口中定義的監聽RabbitMQ服務器確認消息(ack或nack)的回調方法,主要用於消息發布者使用。
基本使用代碼如下:
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 業務處理
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// 業務處理
}
});
在ChannelN中處理返回消息的源碼如下:
private void callConfirmListeners(@SuppressWarnings("unused") Command command, Basic.Ack ack) {
try {
for (ConfirmListener l : this.confirmListeners) {
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
}
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleConfirmListenerException(this, ex);
} finally {
metricsCollector.basicPublishAck(this, ack.getDeliveryTag(), ack.getMultiple());
}
}
private void callConfirmListeners(@SuppressWarnings("unused") Command command, Basic.Nack nack) {
try {
for (ConfirmListener l : this.confirmListeners) {
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
}
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleConfirmListenerException(this, ex);
} finally {
metricsCollector.basicPublishNack(this, nack.getDeliveryTag(), nack.getMultiple());
}
}
ConfirmListener是針對ChannelN級別的。接收到確認消息后,所有添加到ChannelN對象的ConfirmListener監聽器都會被調用。
3.1.5 basicPublish()
前幾小節講述的都是RabbitMQ客戶端監聽從服務器響應的消息,本小節簡要分析客戶端發送消息的流程。
發送消息的基本方式如下:
channel.basicPublish("myExchange", "myRoutingKey", null, "hello".getBytes());
- 在
ChannelN中的basicPublish()方法中執行如下代碼,核心步驟如下:- 將形參轉換成
AMQCommand對象中的CommandAssembler成員變量:exchange和routingKey→Basic.Publish方法對象(Method),properties→AMQContentHeader對象,body→List<byte[]>對象。 - 調用
transmit(command)方法,發送命令。
- 將形參轉換成
public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
BasicProperties props, byte[] body)
throws IOException
{
if (nextPublishSeqNo > 0) {
unconfirmedSet.add(getNextPublishSeqNo());
nextPublishSeqNo++;
}
if (props == null) {
props = MessageProperties.MINIMAL_BASIC;
}
AMQCommand command = new AMQCommand(
new Basic.Publish.Builder()
.exchange(exchange)
.routingKey(routingKey)
.mandatory(mandatory)
.immediate(immediate)
.build(), props, body);
try {
transmit(command);
} catch (IOException e) {
metricsCollector.basicPublishFailure(this, e);
throw e;
}
metricsCollector.basicPublish(this);
}
- 在
ChannelN中執行transmit()和quiescingTransmit()方法,最終會調用AMQCommand.transmit()方法:
public void transmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
ensureIsOpen();
quiescingTransmit(c);
}
}
public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelMutex.wait();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
ensureIsOpen();
}
}
this._trafficListener.write(c);
c.transmit(this);
}
}
- 在
AMQCommand中執行transmit()方法,核心步驟如下:- 獲取
AMQConnection對象。 - 分別將
AMQContentHeader、Method和List<byte[]>對象轉換成Frame對象。 - 通過
AMQConnection對象發送數據。
- 獲取
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized (assembler) {
Method m = this.assembler.getMethod();
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
int frameMax = connection.getFrameMax();
boolean cappedFrameMax = frameMax > 0;
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
if (cappedFrameMax && headerFrame.size() > frameMax) {
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
throw new IllegalArgumentException(msg);
}
connection.writeFrame(m.toFrame(channelNumber));
connection.writeFrame(headerFrame);
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
: bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body,
offset, fragmentLength);
connection.writeFrame(frame);
}
} else {
connection.writeFrame(m.toFrame(channelNumber));
}
}
connection.flush();
}
3.1.6 basicGet()
除了添加Comsumer監聽器,我們還可以主動調用basicGet()向RabbitMQ服務器“拉取”消息。
basicGet()方法本質上是向RabbitMQ服務器發送一個Basic.Get請求,然后等待響應。

basicGet()方法的基本使用如下:
// 自動確認模式
GetResponse message = channel.basicGet("myQueue", true);
System.out.println(new String(message.getBody()));
// 手動確認模式
GetResponse myQueue = channel.basicGet("myQueue", false);
System.out.println(new String(message.getBody()));
channel.basicAck(myQueue.getEnvelope().getDeliveryTag(), false);
-
在
ChannelN中的basicGet()方法中執行如下代碼,核心步驟如下:- 將形參轉換成
AMQCommand對象中的CommandAssembler成員變量:queue→Basic.Get方法對象(Method),properties→AMQContentHeader對象,body→List<byte[]>對象。 - 調用
exnWrappingRpc(command)方法,發送命令。 - 等待響應
replyCommand,並封裝成GetResponse對象返回,
- 將形參轉換成
public GetResponse basicGet(String queue, boolean autoAck) throws IOException{
validateQueueNameLength(queue);
AMQCommand replyCommand = exnWrappingRpc(new Basic.Get.Builder()
.queue(queue)
.noAck(autoAck)
.build());
Method method = replyCommand.getMethod();
if (method instanceof Basic.GetOk) {
Basic.GetOk getOk = (Basic.GetOk)method;
Envelope envelope = new Envelope(getOk.getDeliveryTag(),
getOk.getRedelivered(),
getOk.getExchange(),
getOk.getRoutingKey());
BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
byte[] body = replyCommand.getContentBody();
int messageCount = getOk.getMessageCount();
metricsCollector.consumedMessage(this, getOk.getDeliveryTag(), autoAck);
return new GetResponse(envelope, props, body, messageCount);
} else if (method instanceof Basic.GetEmpty) {
return null;
} else {
throw new UnexpectedMethodError(method);
}
}
public AMQCommand exnWrappingRpc(Method m) throws IOException {
try {
return privateRpc(m);
} catch (AlreadyClosedException ace) {
throw ace;
} catch (ShutdownSignalException ex) {
throw wrap(ex);
}
}
-
在
ChannelN中的privateRpc()方法中執行如下代碼,核心步驟如下:- 實例化
SimpleBlockingRpcContinuation對象,用於獲取響應。 - 調用
rpc(m, k)方法,發送Basic.Get請求。 - 調用
k.getReply()方法,等待響應並返回。
- 實例化
private AMQCommand privateRpc(Method m) throws IOException, ShutdownSignalException{
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
rpc(m, k); // 發送請求
if(_rpcTimeout == NO_RPC_TIMEOUT) {
return k.getReply(); // 等待響應
} else {
try {
return k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
throw wrapTimeoutException(m, e);
}
}
}
public void rpc(Method m, RpcContinuation k) throws IOException {
synchronized (_channelMutex) {
ensureIsOpen();
quiescingRpc(m, k);
}
}
public void quiescingRpc(Method m, RpcContinuation k) throws IOException {
synchronized (_channelMutex) {
enqueueRpc(k);
quiescingTransmit(m);
}
}
public void enqueueRpc(RpcContinuation k) {
doEnqueueRpc(() -> new RpcContinuationRpcWrapper(k));
}
public void quiescingTransmit(Method m) throws IOException {
synchronized (_channelMutex) {
quiescingTransmit(new AMQCommand(m));
}
}
public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelMutex.wait();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
ensureIsOpen();
}
}
this._trafficListener.write(c);
c.transmit(this);
}
}
-
最終,在
AMQCommand中執行transmit()方法,核心步驟如下:- 獲取
AMQConnection對象。 - 分別將
AMQContentHeader、Method和List<byte[]>對象轉換成Frame對象。 - 通過
AMQConnection對象發送數據。
- 獲取
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized (assembler) {
Method m = this.assembler.getMethod();
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
int frameMax = connection.getFrameMax();
boolean cappedFrameMax = frameMax > 0;
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
if (cappedFrameMax && headerFrame.size() > frameMax) {
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
throw new IllegalArgumentException(msg);
}
connection.writeFrame(m.toFrame(channelNumber));
connection.writeFrame(headerFrame);
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
: bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body,
offset, fragmentLength);
connection.writeFrame(frame);
}
} else {
connection.writeFrame(m.toFrame(channelNumber));
}
}
connection.flush();
}
4 AMQCommand

com.rabbitmq.client.impl.AMQCommand類實現了com.rabbitmq.client.Command接口,其成員變量CommandAssembler對象是AMQP規范中method、header和body的容器。
AMQCommand中提供了一個十分重要的方法:transmit(AMQChannel)。調用該方法能夠將method、header和body通過Connection發送給RabbitMQ服務器。該方法在前幾個小節都有介紹,核心步驟如下:
- 獲取
AMQConnection對象。 - 分別將
AMQContentHeader、Method和List<byte[]>對象轉換成Frame對象。 - 通過
AMQConnection對象發送數據。
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized (assembler) {
Method m = this.assembler.getMethod();
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
int frameMax = connection.getFrameMax();
boolean cappedFrameMax = frameMax > 0;
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
if (cappedFrameMax && headerFrame.size() > frameMax) {
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
throw new IllegalArgumentException(msg);
}
connection.writeFrame(m.toFrame(channelNumber));
connection.writeFrame(headerFrame);
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
: bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body,
offset, fragmentLength);
connection.writeFrame(frame);
}
} else {
connection.writeFrame(m.toFrame(channelNumber));
}
}
connection.flush();
}
4.1 CommandAssembler

com.rabbitmq.client.impl.CommandAssembler類中封裝了AMQP規范中method、header和body。
4.1.1 Method

com.rabbitmq.client.impl.Method抽象類代表AMQP規范中method,我們平常所使用的com.rabbitmq.client.AMQP接口中的Connection、Channel、Access、Exchange、Queue、Basic、Tx和Confirm等內部類都實現了該抽象類。
我們調用channel.basicPublish()等方法向RabbitMQ服務器發送消息,或者從通過注冊Consumer監聽RabbitMQ服務器的消息時,都會將method數據段轉換成Method實現類進行處理。
Method.toFrame()方法則能將自己轉換成Frame對象,進行發送。
4.1.2 AMQContentHeader

com.rabbitmq.client.impl.AMQContentHeader抽象類代表AMQP規范中header。channel.basicPublish()方法形參BasicProperties實現了該抽象類,我們可以通過該對象為消息設置屬性。
我們調用channel.basicPublish()等方法向RabbitMQ服務器發送消息,或者從通過注冊Consumer監聽RabbitMQ服務器的消息時,都會將method數據段轉換成AMQContentHeader實現類進行處理。
AMQContentHeader.toFrame()方法則能將自己轉換成Frame對象,進行發送。
5 Frame

com.rabbitmq.client.impl.Frame代表AMQP wire-protocol frame(幀),主要包含以下成員變量:
type:幀類型。channel:所屬通道。payload:輸入載荷。accumulator:輸出載荷。
Frame還提供了靜態方法readFrom(),可以從輸入流中讀取到Frame對象,主要提供給FrameHandler.readFrame()方法調用:
public static Frame readFrom(DataInputStream is) throws IOException {
int type;
int channel;
try {
type = is.readUnsignedByte();
} catch (SocketTimeoutException ste) {
return null; // failed
}
if (type == 'A') {
protocolVersionMismatch(is);
}
channel = is.readUnsignedShort();
int payloadSize = is.readInt();
byte[] payload = new byte[payloadSize];
is.readFully(payload);
int frameEndMarker = is.readUnsignedByte();
if (frameEndMarker != AMQP.FRAME_END) {
throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);
}
return new Frame(type, channel, payload);
}
