首先看一下RemotingCommand的幾個重要屬性:
private int code;
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
private int opaque = requestId.getAndIncrement();
private int flag = 0;
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
private transient byte[] body;
除了static之外,還有body、extfields是transitent,除此之外都是要直接進行序列化的,默認用fastjson直接序列化。
這里面的extfields跟customHeader是互相轉換的,也就是序列化的時候用前者傳入,在代碼里面用反序列化后的customer對象做操作。customHeader是一個接口,他有很多實現類,不同的request請求體除了body不同之外,還有一個不同就在於customHeader不同。
比如注冊broker的:
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
這里的requestHeader屬於RegisterBrokerRequestHeader,就是一種特殊的commandheader,他有自己獨特的屬性,比如brokeraddr、brokerId等等。統統塞入header里面。
再構造需要的body體,把header跟序列化后的body一起:
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); request.setBody(body);
public static RemotingCommand createRequestCommand(int code, CommandCustomHeader customHeader) {
RemotingCommand cmd = new RemotingCommand();
cmd.setCode(code);
cmd.customHeader = customHeader;
setCmdVersion(cmd);
return cmd;
}
把header、body都放入RemotingCommand里面,回到command的屬性,只有code、version、remark、opaque才是通用的屬性,其他統統通過header、body實現個性化。對於注冊broker來說,個性化的body就是注冊的具體內容。
拿到一個完整的command以后,就是如何在netty里面進行序列化和反序列化了,這里面的body已經被序列化了,還剩下通用屬性和header屬性。
回到encode方法:
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
if (remotingCommand != null) {
log.error(remotingCommand.toString());
}
RemotingUtil.closeChannel(ctx.channel());
}
}
簡單來看就是header跟body一起放入out里面。進入encodeHeader方法:
public ByteBuffer encodeHeader() {
return encodeHeader(this.body != null ? this.body.length : 0);
}
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData;
headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
length += bodyLength;
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
result.flip();
return result;
}
使用LengthFieldBasedFrameDecoder的時候,第一個put的數據就是總體長度:所有header(包括公共的和extrafields的)+ body+markprotocol得到的長度(所有header長度),這里的總體長度不需要包括總體長度本身,也就是putint(length)本身不需要額外算4個字節,這里面多出來的4是由於markprotocol的長度。
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
在makeCustomHeaderToNet方法里面,通過反射把header的內容全部放入extrafields里面,這樣再通過encode(this)就可以直接把header的內容(已經轉換成extrafields)和公共屬性全部序列化成result。
再看看反序列化:
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit();
int oriHeaderLen = byteBuffer.getInt();
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
雖然我們在encode的時候塞入了總體length,但是LengthFieldBasedFrameDecoder已經幫我們解析了去掉了,我們第一個取數據的時候是后面的markeprotocolType得到的header長度,
根據header長度以后拿到header,他就是除了body之外所有的數據。body可以通過后面的buff拿到,header反序列化后得到的extrafields可以進一步轉換成commandHeader,由於在公共屬性里面我們塞入了code,於是我們可以知道應該轉換成具體哪一種commandHeader。
