本文簡要介紹RabbitMQ提供的Java客戶端中最基本的功能性接口/類及相關源碼。
Mavan依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.1</version>
</dependency>
0 AMQP
com.rabbitmq.client.AMQP
接口將AMQP(Advanced Message Queue Protocol,高級消息隊列協議)中的方法和消息屬性封裝成Java對象,便於以面向對象的思維模式進行編程。
該接口類圖簡要如下:
AMQP
接口中包含許多內部類,大體可以分成三類:
0.1 協議信息
PROTOCOL
內部類,保存了AMQP的協議版本等信息。
public static class PROTOCOL {
public static final int MAJOR = 0;
public static final int MINOR = 9;
public static final int REVISION = 1;
public static final int PORT = 5672;
}
0.2 方法
包括Connection
、Channel
、Access
、Exchange
、Queue
、Basic
、Tx
和Confirm
內部類,分別封裝了向Broker
發送的不同方法的基本數據格式和內容。
它們都實現了com.rabbitmq.client.impl.Method
抽象類(后續會介紹),在發送請求時,通過Method
抽象類的toFrame()
方法可以轉換成Frame
(幀),然后com.rabbitmq.client.impl.AMQConnection
將其以二進制數據的方式通過TCP
協議發送給Broker
。
它們都提供了各自的Builder
,便於實例化方法對象(建造者模式)。例如,最常用的Publish
方法類圖簡要如下:
通過如下代碼實例化出Publish
對象:
AMQP.Basic.Publish publish = new AMQP.Basic.Publish.Builder().exchange("myExchange").routingKey("myRoutingKey").mandatory(true).build();
在發送給Broker
前可以通過如下代碼將Publish
對象轉成幀:
Method method = (Method) publish;
Frame frame = method.toFrame(1);
com.rabbitmq.client.impl.AMQConnection
對象管理着與Broker
的連接,它通過如下代碼將方法發送給Broker
:
AMQConnection connection = channel.getConnection();
connection.writeFrame(frame);
0.3 消息屬性
BasicProperties
內部類,封裝了消息的基本屬性。
它也提供了Builder
,我們在發送消息時可以使用BasicProperties
實例攜帶消息頭信息,類圖如下:
通過如下代碼實例化出BasicProperties
對象,並發送消息:
Map<String, Object> headers = new HashMap<>();
headers.put("color", "blue");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).expiration("10000").build();
channel.basicPublish("myExchange", "myRoutingKey", true, properties, "hello".getBytes());
BasicProperties
對象在發送前最終會被轉換成com.rabbitmq.client.impl.AMQContentHeader
對象,代表AMQ
消息內容的頭。
AMQContentHeader
的toFrame()
方法也可以將其轉換成Frame
(幀),然后com.rabbitmq.client.impl.AMQConnection
將其以二進制數據的方式通過TCP
協議發送給Broker
。
AMQConnection connection = channel.getConnection();
Frame headerFrame = contentHeader.toFrame(channelNumber, body.length);
connection.writeFrame(headerFrame);
1 ConnectionFactory
com.rabbitmq.client.ConnectionFactory
類是用來創建與RabbitMQ服務器連接(com.rabbitmq.client.Connection
)的工廠類。簡要類圖如下:
ConnectionFactory
內部封裝了許多屬性,用來設置與Connection
或Socket
相關的連接信息。
它還提供了一套默認配置:
public static final String DEFAULT_VHOST = "/";
public static final String DEFAULT_HOST = "localhost";
public static final int DEFAULT_AMQP_PORT = AMQP.PROTOCOL.PORT; // 5672
public static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671;
public static final String DEFAULT_PASS = "guest"; // CredentialsProvider.username
public static final String DEFAULT_USER = "guest"; // CredentialsProvider.password
private boolean nio = false;
private boolean automaticRecovery = true;
ConnectionFactory
的基本使用如下:
ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = connectionFactory.newConnection();//返回RecoveryAwareAMQConnection或AMQConnection對象
底層會創建出java.net.Socket
或java.nio.channels.SocketChannel
,代表與RabbitMQ服務器的TCP
連接:
Socket socket = SocketFactory.getDefault().createSocket();
SocketChannel channel = SocketChannel.open();
Socket
或SocketChannel
會被封裝到com.rabbitmq.client.impl.FrameHandler
中:
// nio==false,使用Socket(默認值)
SocketFrameHandler frameHandler = new SocketFrameHandler(sock, this.shutdownExecutor);
// nio==true,使用SocketChannel
SocketChannelFrameHandlerState state = new SocketChannelFrameHandlerState(channel, nioLoopContext, nioParams, sslEngine);
SocketChannelFrameHandler frameHandler = new SocketChannelFrameHandler(state);
FrameHandler
中提供了readFrame()
和writeFrame()
,分別可以從Socket/SocketChannel
中讀取或寫入數據。
FrameHandler
又會被封裝到com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnection
或com.rabbitmq.client.impl.AMQConnection
中:
// automaticRecovery==true,默認值
FrameHandler frameHandler = factory.create(addr, connectionName());
RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
// automaticRecovery==false
FrameHandler handler = fhFactory.create(addr, clientProvidedName);
AMQConnection conn = createConnection(params, handler, metricsCollector);
因此,我們可以使用返回的Connection
對象與RabbitMQ服務器進行交互。
2 Connection
com.rabbitmq.client.Connection
接口代表與RabbitMQ服務器的TCP連接,類圖簡要如下:
Connection
主要提供了createChannel()
和openChannel()
方法,用來創建Channel
。后者提供了幾乎所有與RabbitMQ進行交互的方法,是項目中使用頻率最高的一個接口。
Connection
的基本使用如下:
Channe channel = connection.createChannel();
Connection
的實現類主要包括以下幾種,分別代表不同類型的連接:
AMQConnection
類:代表最基本的與RabbitMQ服務器的連接。內部持有FrameHandler
等成員變量,用來與服務器交互。RecoveryAwareAMQConnection
接口:代表自動重連的連接,內部沒有方法,類似與標志性接口。AutorecoveringConnection
類:自動重連Connection
的實現類,在非正常斷開情況下會自動重連,例如I/O
異常。它持有RecoveryAwareAMQConnection
對象作為代理,從而間接可以使用FrameHandler
對象與服務器進行交互。重連時,內部組件也會按如下順序自動重連:Exchanges
Queues
Bindings
(both queue and exchange-to-exchange)Consumers
RecoveryAwareAMQConnection
:它是對AMQConnection
的修改,主要用作AutorecoveringConnection
的成員變量。它與AMQConnection
主要區別在於它內部使用com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN
作為Channel
。
在項目中使用的實現類主要為AMQConnection
和AutorecoveringConnection
(根據ConnectionFactory
的automaticRecovery
成員變量進行選擇)。
AMQConnection.createChannel()
方法會使用ChannelManager
創建出ChannelN
類型的通道:
ChannelManager cm = _channelManager;
Channel channel = cm.createChannel(this);
// 底層:
new ChannelN(connection, channelNumber, workService, this.metricsCollector);
而AutorecoveringConnection.createChannel()
方法會使用RecoveryAwareAMQConnection
創建出RecoveryAwareChannelN
類型的通道,並使用AutorecoveringChannel
包裝:
RecoveryAwareChannelN ch = (RecoveryAwareChannelN) delegate.createChannel();
final AutorecoveringChannel channel = new AutorecoveringChannel(this, delegateChannel);
ChannelManager
是AMQConnection
中一個十分重要的成員變量,它管理着AMQConnection
對象所屬的所有Channel
對象(key
為通道編號,取值范圍為1~_channelMax
):
private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>();
AMQConnection
中有一個十分重要的方法writeFrame()
,可以將數據發送給RabbitMQ服務器:
public void writeFrame(Frame f) throws IOException {
_frameHandler.writeFrame(f);
_heartbeatSender.signalActivity();
}
// SocketFrameHandler
public void writeFrame(Frame frame) throws IOException {
synchronized (_outputStream) {
frame.writeTo(_outputStream);
}
}
public void writeTo(DataOutputStream os) throws IOException {
os.writeByte(type);
os.writeShort(channel);
if (accumulator != null) {
os.writeInt(accumulator.size());
accumulator.writeTo(os);
} else {
os.writeInt(payload.length);
os.write(payload);
}
os.write(AMQP.FRAME_END);
}
AMQConnection
中有一個十分重要的方法startMainLoop()
,可以創建新線程監聽RabbitMQ服務器發送來的消息:
public void startMainLoop() {
MainLoop loop = new MainLoop();
final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
mainLoopThread = Environment.newThread(threadFactory, loop, name);
mainLoopThread.start();
}
其核心在於MainLoop
內部類,核心步驟如下:
- 調用
Frame frame = _frameHandler.readFrame()
讀取RabbitMQ服務器發送來的消息。 - 調用
readFrame(frame)
處理消息。
private class MainLoop implements Runnable {
@Override
public void run() {
boolean shouldDoFinalShutdown = true;
try {
while (_running) {
Frame frame = _frameHandler.readFrame();
readFrame(frame);
}
} catch (Throwable ex) {
if (ex instanceof InterruptedException) {
shouldDoFinalShutdown = false;
} else {
handleFailure(ex);
}
} finally {
if (shouldDoFinalShutdown) {
doFinalShutdown();
}
}
}
}
readFrame(frame)
方法會從ChannelManager
成員變量中獲取該消息對應的通道,然后調用channel.handleFrame(frame)
方法進行業務處理(最終會調用channel.processAsync(Command command)
方法):
private void readFrame(Frame frame) throws IOException {
if (frame != null) {
_missedHeartbeats = 0;
if (frame.type == AMQP.FRAME_HEARTBEAT) {
} else {
if (frame.channel == 0) {
_channel0.handleFrame(frame);
} else {
if (isOpen()) {
ChannelManager cm = _channelManager;
if (cm != null) {
ChannelN channel;
try {
channel = cm.getChannel(frame.channel);
} catch(UnknownChannelException e) {
LOGGER.info("Received a frame on an unknown channel, ignoring it");
return;
}
channel.handleFrame(frame); // 業務處理
}
}
}
}
} else {
handleSocketTimeout();
}
}
3 Channel
com.rabbitmq.client.Channel
中封裝了與RabbitMQ服務器交互的API,簡要類圖如下:
Channel
的基本使用方式如下:
// 聲明交換機
channel.exchangeDeclare("myExchange", BuiltinExchangeType.DIRECT);
// 聲明隊列
channel.queueDeclare("myQueue", true, false, false, null);
// 聲明綁定
channel.exchangeBind("myQueue", "myExchange", "myRoutingKey");
// 發送消息
channel.basicPublish("myExchange", "myRoutingKey", true, null, "hello".getBytes());
// 訂閱消息
channel.basicConsume("myQueue", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
// 拉取消息
channel.basicGet("myQueue", true);
Channel
是實現類包括以下幾種:
ChannelN
:AMQP
協議功能API
的主要實現類。RecoveryAwareChannelN
:重寫了basicAck()
、basicReject()
和basicNack()
方法,對ChannelN
功能進行擴展,實時跟蹤delivery tag
,對最新的tag
進行響應。AutorecoveringChannel
:在connection
重連時會自動恢復的通道,內部通過持有RecoveryAwareChannelN
代理對象來實現具體操作。
3.1 ChannelN
com.rabbitmq.client.impl.ChannelN
是對AMQP
協議功能性API
的主要實現類,它除了實現Channel
中定義的AMQP
協議功能性API
,還繼承了AMQChannel
抽象類,通過其_connection
成員變量可以在底層調用到Socket
或SocketChannel
向RabbitMQ服務器進行讀寫操作。
除此之外,為了實現AMQP
協議的特定功能,如消息確認機制。ChannelN
內部封裝了如下成員變量:
_consumers
:消息消費者,以consumerTag
作為key
,用於監聽消息。returnListeners
:監聽RabbitMQ服務器找不到對應交換機時的返回消息(basicPublish
方法發送消息時設置mandatory
或immediate
)。confirmListeners
:監聽RabbitMQ服務器的確認消息(ack
或nack
)。defaultConsumer
:默認的消息消費者。dispatcher
:啟動線程執行_consumers
中的任務。
ChannelN
中監聽消息的核心源碼如下:
public boolean processAsync(Command command) throws IOException {
Method method = command.getMethod();
if (method instanceof Channel.Close) {
asyncShutdown(command);
return true;
}
if (isOpen()) {
// 根據不同方法類型調用對應的處理方法
if (method instanceof Basic.Deliver) {
processDelivery(command, (Basic.Deliver) method);
return true;
} else if (method instanceof Basic.Return) {
callReturnListeners(command, (Basic.Return) method);
return true;
} else if (method instanceof Channel.Flow) {
Channel.Flow channelFlow = (Channel.Flow) method;
synchronized (_channelMutex) {
_blockContent = !channelFlow.getActive();
transmit(new Channel.FlowOk(!_blockContent));
_channelMutex.notifyAll();
}
return true;
} else if (method instanceof Basic.Ack) {
Basic.Ack ack = (Basic.Ack) method;
callConfirmListeners(command, ack);
handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);
return true;
} else if (method instanceof Basic.Nack) {
Basic.Nack nack = (Basic.Nack) method;
callConfirmListeners(command, nack);
handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
return true;
} else if (method instanceof Basic.RecoverOk) {
for (Map.Entry<String, Consumer> entry : Utility.copy(_consumers).entrySet()) {
this.dispatcher.handleRecoverOk(entry.getValue(), entry.getKey());
}
return false;
} else if (method instanceof Basic.Cancel) {
Basic.Cancel m = (Basic.Cancel)method;
String consumerTag = m.getConsumerTag();
Consumer callback = _consumers.remove(consumerTag);
if (callback == null) {
callback = defaultConsumer;
}
if (callback != null) {
try {
this.dispatcher.handleCancel(callback, consumerTag);
} catch (WorkPoolFullException e) {
throw e;
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleConsumerException(this,
ex,
callback,
consumerTag,
"handleCancel");
}
} else {
LOGGER.warn("Could not cancel consumer with unknown tag {}", consumerTag);
}
return true;
} else {
return false;
}
} else {
if (method instanceof Channel.CloseOk) {
return false;
} else {
return true;
}
}
}
可見,該方法類似於SpringMVC中的DispatcherServlet
,它會根據監聽到Command
對象的方法類型進行分發處理。接下來介紹的各成員變量方法調用的入口都在這個方法中。
3.1.1 ConsumerDispatcher
com.rabbitmq.client.impl.ConsumerDispatcher
的作用是從線程池中獲取空閑線程處理消息。它的主要作用是開啟線程,而實際處理消息的業務邏輯在具體Consumer
代理對象中處理。
例如,在處理生產者發布的消息時,ConsumerDispatcher
會進行如下處理:
public void handleDelivery(final Consumer delegate,
final String consumerTag,
final Envelope envelope,
final AMQP.BasicProperties properties,
final byte[] body) throws IOException {
executeUnlessShuttingDown(
new Runnable() {
@Override
public void run() {
try {
delegate.handleDelivery(consumerTag,
envelope,
properties,
body);
} catch (Throwable ex) {
connection.getExceptionHandler().handleConsumerException(
channel,
ex,
delegate,
consumerTag,
"handleDelivery");
}
}
});
}
3.1.2 Consumer
com.rabbitmq.client.Consumer
接口中定義了不同消息的處理方法,實例對象則表示消息消費者。
com.rabbitmq.client.DefaultConsumer
是默認實現類,它實現了接口中的所有方法(空方法)。我們可以采取匿名內部類的方式,實現具體某個需要的方法,而不是實現所有方法。
我們可以使用如下代碼添加消費者:
channel.basicConsume("myQueue", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
});
在ChannelN
中調用消費者處理消息方法(handleDelivery()
)的源碼如下:
protected void processDelivery(Command command, Basic.Deliver method) {
Basic.Deliver m = method;
Consumer callback = _consumers.get(m.getConsumerTag());
if (callback == null) {
if (defaultConsumer == null) {
throw new IllegalStateException("Unsolicited delivery -" +
" see Channel.setDefaultConsumer to handle this" +
" case.");
}
else {
callback = defaultConsumer;
}
}
Envelope envelope = new Envelope(m.getDeliveryTag(),
m.getRedelivered(),
m.getExchange(),
m.getRoutingKey());
try {
metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag());
this.dispatcher.handleDelivery(callback,
m.getConsumerTag(),
envelope,
(BasicProperties) command.getContentHeader(),
command.getContentBody());
} catch (WorkPoolFullException e) {
throw e;
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleConsumerException(this,
ex,
callback,
m.getConsumerTag(),
"handleDelivery");
}
}
需要注意的是,在調用 this.dispatcher.handleDelivery()
之前,會首先調用Consumer callback = _consumers.get(m.getConsumerTag())
根據consumerTag
獲取對應的消費者。因此,消費者處理消息是一對一的。
消費者其他方法的調用也可以在ChannelN.processAsync()
中找到。
3.1.3 ReturnListener
com.rabbitmq.client.ReturnListener
接口中定義了監聽返回消息的通用方法handleReturn()
,主要用於消息發布者監聽返回消息。
消息發布者通過basicPublish
方法發送消息時設置mandatory
或immediate
,但RabbitMQ服務器找不到對應交換機時會返回消息。消息發布者通過往Channel
對象中添加ReturnListener
實現類,即可監聽到返回消息:
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("return message: " + new String(body));
}
});
在ChannelN
中處理返回消息的源碼如下:
private void callReturnListeners(Command command, Basic.Return basicReturn) {
try {
for (ReturnListener l : this.returnListeners) {
l.handleReturn(basicReturn.getReplyCode(),
basicReturn.getReplyText(),
basicReturn.getExchange(),
basicReturn.getRoutingKey(),
(BasicProperties) command.getContentHeader(),
command.getContentBody());
}
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleReturnListenerException(this, ex);
} finally {
metricsCollector.basicPublishUnrouted(this);
}
}
ReturnListener
是針對ChannelN
級別的。接收到返回消息后,所有添加到ChannelN
對象的ReturnListener
監聽器都會被調用。
3.1.4 ConfirmListener
com.rabbitmq.client.ConfirmListener
接口中定義的監聽RabbitMQ服務器確認消息(ack
或nack
)的回調方法,主要用於消息發布者使用。
基本使用代碼如下:
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// 業務處理
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// 業務處理
}
});
在ChannelN
中處理返回消息的源碼如下:
private void callConfirmListeners(@SuppressWarnings("unused") Command command, Basic.Ack ack) {
try {
for (ConfirmListener l : this.confirmListeners) {
l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
}
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleConfirmListenerException(this, ex);
} finally {
metricsCollector.basicPublishAck(this, ack.getDeliveryTag(), ack.getMultiple());
}
}
private void callConfirmListeners(@SuppressWarnings("unused") Command command, Basic.Nack nack) {
try {
for (ConfirmListener l : this.confirmListeners) {
l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
}
} catch (Throwable ex) {
getConnection().getExceptionHandler().handleConfirmListenerException(this, ex);
} finally {
metricsCollector.basicPublishNack(this, nack.getDeliveryTag(), nack.getMultiple());
}
}
ConfirmListener
是針對ChannelN
級別的。接收到確認消息后,所有添加到ChannelN
對象的ConfirmListener
監聽器都會被調用。
3.1.5 basicPublish()
前幾小節講述的都是RabbitMQ客戶端監聽從服務器響應的消息,本小節簡要分析客戶端發送消息的流程。
發送消息的基本方式如下:
channel.basicPublish("myExchange", "myRoutingKey", null, "hello".getBytes());
- 在
ChannelN
中的basicPublish()
方法中執行如下代碼,核心步驟如下:- 將形參轉換成
AMQCommand
對象中的CommandAssembler
成員變量:exchange
和routingKey
→Basic.Publish
方法對象(Method
),properties
→AMQContentHeader
對象,body
→List<byte[]>
對象。 - 調用
transmit(command)
方法,發送命令。
- 將形參轉換成
public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
BasicProperties props, byte[] body)
throws IOException
{
if (nextPublishSeqNo > 0) {
unconfirmedSet.add(getNextPublishSeqNo());
nextPublishSeqNo++;
}
if (props == null) {
props = MessageProperties.MINIMAL_BASIC;
}
AMQCommand command = new AMQCommand(
new Basic.Publish.Builder()
.exchange(exchange)
.routingKey(routingKey)
.mandatory(mandatory)
.immediate(immediate)
.build(), props, body);
try {
transmit(command);
} catch (IOException e) {
metricsCollector.basicPublishFailure(this, e);
throw e;
}
metricsCollector.basicPublish(this);
}
- 在
ChannelN
中執行transmit()
和quiescingTransmit()
方法,最終會調用AMQCommand.transmit()
方法:
public void transmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
ensureIsOpen();
quiescingTransmit(c);
}
}
public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelMutex.wait();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
ensureIsOpen();
}
}
this._trafficListener.write(c);
c.transmit(this);
}
}
- 在
AMQCommand
中執行transmit()
方法,核心步驟如下:- 獲取
AMQConnection
對象。 - 分別將
AMQContentHeader
、Method
和List<byte[]>
對象轉換成Frame
對象。 - 通過
AMQConnection
對象發送數據。
- 獲取
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized (assembler) {
Method m = this.assembler.getMethod();
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
int frameMax = connection.getFrameMax();
boolean cappedFrameMax = frameMax > 0;
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
if (cappedFrameMax && headerFrame.size() > frameMax) {
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
throw new IllegalArgumentException(msg);
}
connection.writeFrame(m.toFrame(channelNumber));
connection.writeFrame(headerFrame);
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
: bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body,
offset, fragmentLength);
connection.writeFrame(frame);
}
} else {
connection.writeFrame(m.toFrame(channelNumber));
}
}
connection.flush();
}
3.1.6 basicGet()
除了添加Comsumer
監聽器,我們還可以主動調用basicGet()
向RabbitMQ服務器“拉取”消息。
basicGet()
方法本質上是向RabbitMQ服務器發送一個Basic.Get
請求,然后等待響應。
basicGet()
方法的基本使用如下:
// 自動確認模式
GetResponse message = channel.basicGet("myQueue", true);
System.out.println(new String(message.getBody()));
// 手動確認模式
GetResponse myQueue = channel.basicGet("myQueue", false);
System.out.println(new String(message.getBody()));
channel.basicAck(myQueue.getEnvelope().getDeliveryTag(), false);
-
在
ChannelN
中的basicGet()
方法中執行如下代碼,核心步驟如下:- 將形參轉換成
AMQCommand
對象中的CommandAssembler
成員變量:queue
→Basic.Get
方法對象(Method
),properties
→AMQContentHeader
對象,body
→List<byte[]>
對象。 - 調用
exnWrappingRpc(command)
方法,發送命令。 - 等待響應
replyCommand
,並封裝成GetResponse
對象返回,
- 將形參轉換成
public GetResponse basicGet(String queue, boolean autoAck) throws IOException{
validateQueueNameLength(queue);
AMQCommand replyCommand = exnWrappingRpc(new Basic.Get.Builder()
.queue(queue)
.noAck(autoAck)
.build());
Method method = replyCommand.getMethod();
if (method instanceof Basic.GetOk) {
Basic.GetOk getOk = (Basic.GetOk)method;
Envelope envelope = new Envelope(getOk.getDeliveryTag(),
getOk.getRedelivered(),
getOk.getExchange(),
getOk.getRoutingKey());
BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
byte[] body = replyCommand.getContentBody();
int messageCount = getOk.getMessageCount();
metricsCollector.consumedMessage(this, getOk.getDeliveryTag(), autoAck);
return new GetResponse(envelope, props, body, messageCount);
} else if (method instanceof Basic.GetEmpty) {
return null;
} else {
throw new UnexpectedMethodError(method);
}
}
public AMQCommand exnWrappingRpc(Method m) throws IOException {
try {
return privateRpc(m);
} catch (AlreadyClosedException ace) {
throw ace;
} catch (ShutdownSignalException ex) {
throw wrap(ex);
}
}
-
在
ChannelN
中的privateRpc()
方法中執行如下代碼,核心步驟如下:- 實例化
SimpleBlockingRpcContinuation
對象,用於獲取響應。 - 調用
rpc(m, k)
方法,發送Basic.Get
請求。 - 調用
k.getReply()
方法,等待響應並返回。
- 實例化
private AMQCommand privateRpc(Method m) throws IOException, ShutdownSignalException{
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
rpc(m, k); // 發送請求
if(_rpcTimeout == NO_RPC_TIMEOUT) {
return k.getReply(); // 等待響應
} else {
try {
return k.getReply(_rpcTimeout);
} catch (TimeoutException e) {
throw wrapTimeoutException(m, e);
}
}
}
public void rpc(Method m, RpcContinuation k) throws IOException {
synchronized (_channelMutex) {
ensureIsOpen();
quiescingRpc(m, k);
}
}
public void quiescingRpc(Method m, RpcContinuation k) throws IOException {
synchronized (_channelMutex) {
enqueueRpc(k);
quiescingTransmit(m);
}
}
public void enqueueRpc(RpcContinuation k) {
doEnqueueRpc(() -> new RpcContinuationRpcWrapper(k));
}
public void quiescingTransmit(Method m) throws IOException {
synchronized (_channelMutex) {
quiescingTransmit(new AMQCommand(m));
}
}
public void quiescingTransmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
if (c.getMethod().hasContent()) {
while (_blockContent) {
try {
_channelMutex.wait();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
ensureIsOpen();
}
}
this._trafficListener.write(c);
c.transmit(this);
}
}
-
最終,在
AMQCommand
中執行transmit()
方法,核心步驟如下:- 獲取
AMQConnection
對象。 - 分別將
AMQContentHeader
、Method
和List<byte[]>
對象轉換成Frame
對象。 - 通過
AMQConnection
對象發送數據。
- 獲取
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized (assembler) {
Method m = this.assembler.getMethod();
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
int frameMax = connection.getFrameMax();
boolean cappedFrameMax = frameMax > 0;
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
if (cappedFrameMax && headerFrame.size() > frameMax) {
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
throw new IllegalArgumentException(msg);
}
connection.writeFrame(m.toFrame(channelNumber));
connection.writeFrame(headerFrame);
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
: bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body,
offset, fragmentLength);
connection.writeFrame(frame);
}
} else {
connection.writeFrame(m.toFrame(channelNumber));
}
}
connection.flush();
}
4 AMQCommand
com.rabbitmq.client.impl.AMQCommand
類實現了com.rabbitmq.client.Command
接口,其成員變量CommandAssembler
對象是AMQP
規范中method
、header
和body
的容器。
AMQCommand
中提供了一個十分重要的方法:transmit(AMQChannel)
。調用該方法能夠將method
、header
和body
通過Connection
發送給RabbitMQ服務器。該方法在前幾個小節都有介紹,核心步驟如下:
- 獲取
AMQConnection
對象。 - 分別將
AMQContentHeader
、Method
和List<byte[]>
對象轉換成Frame
對象。 - 通過
AMQConnection
對象發送數據。
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized (assembler) {
Method m = this.assembler.getMethod();
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
int frameMax = connection.getFrameMax();
boolean cappedFrameMax = frameMax > 0;
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
if (cappedFrameMax && headerFrame.size() > frameMax) {
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
throw new IllegalArgumentException(msg);
}
connection.writeFrame(m.toFrame(channelNumber));
connection.writeFrame(headerFrame);
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
: bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body,
offset, fragmentLength);
connection.writeFrame(frame);
}
} else {
connection.writeFrame(m.toFrame(channelNumber));
}
}
connection.flush();
}
4.1 CommandAssembler
com.rabbitmq.client.impl.CommandAssembler
類中封裝了AMQP
規范中method
、header
和body
。
4.1.1 Method
com.rabbitmq.client.impl.Method
抽象類代表AMQP
規范中method
,我們平常所使用的com.rabbitmq.client.AMQP
接口中的Connection
、Channel
、Access
、Exchange
、Queue
、Basic
、Tx
和Confirm
等內部類都實現了該抽象類。
我們調用channel.basicPublish()
等方法向RabbitMQ服務器發送消息,或者從通過注冊Consumer
監聽RabbitMQ服務器的消息時,都會將method
數據段轉換成Method
實現類進行處理。
Method.toFrame()
方法則能將自己轉換成Frame
對象,進行發送。
4.1.2 AMQContentHeader
com.rabbitmq.client.impl.AMQContentHeader
抽象類代表AMQP
規范中header
。channel.basicPublish()
方法形參BasicProperties
實現了該抽象類,我們可以通過該對象為消息設置屬性。
我們調用channel.basicPublish()
等方法向RabbitMQ服務器發送消息,或者從通過注冊Consumer
監聽RabbitMQ服務器的消息時,都會將method
數據段轉換成AMQContentHeader
實現類進行處理。
AMQContentHeader.toFrame()
方法則能將自己轉換成Frame
對象,進行發送。
5 Frame
com.rabbitmq.client.impl.Frame
代表AMQP wire-protocol frame
(幀),主要包含以下成員變量:
type
:幀類型。channel
:所屬通道。payload
:輸入載荷。accumulator
:輸出載荷。
Frame
還提供了靜態方法readFrom()
,可以從輸入流中讀取到Frame
對象,主要提供給FrameHandler.readFrame()
方法調用:
public static Frame readFrom(DataInputStream is) throws IOException {
int type;
int channel;
try {
type = is.readUnsignedByte();
} catch (SocketTimeoutException ste) {
return null; // failed
}
if (type == 'A') {
protocolVersionMismatch(is);
}
channel = is.readUnsignedShort();
int payloadSize = is.readInt();
byte[] payload = new byte[payloadSize];
is.readFully(payload);
int frameEndMarker = is.readUnsignedByte();
if (frameEndMarker != AMQP.FRAME_END) {
throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);
}
return new Frame(type, channel, payload);
}