Californium 源碼分析
1. Californium 項目簡介
Californium 是一款基於Java實現的Coap技術框架,該項目實現了Coap協議的各種請求響應定義,支持CON/NON不同的可靠性傳輸模式。
Californium 基於分層設計且高度可擴展,其內部模塊設計及接口定義存在許多學習之處;
值得一提的是,在同類型的 Coap技術實現中,Californium的性能表現是比較突出的,如下圖:
更多的數據可以參考Californium-可擴展雲服務白皮書
本文以框架的源碼分析為主,其他內容不做展開。
2. 項目結構
目前Californium 項目穩定版本為 2.0.0-M2,項目的托管地址在:
https://github.com/eclipse/californium
模塊說明
~.californium-core
californium 核心模塊,定義了一系列協議棧核心接口,並提供了Coap協議棧的完整實現,
~.element-connector
從core模塊剝離的連接器模塊,用於抽象網絡傳輸層的接口,使得coap可以同時運行於udp和tcp多種傳輸協議之上;
~.scandium-core
Coap over DTLS 支持模塊,提供了DTLS 傳輸的Connector實現;
~.californium-osgi
californium 的osgi 封裝模塊;
~.californium-proxy
coap 代理模塊,用於支持coap2coap、coap2http、http2coap的轉換;
~.demo-xxx
樣例程序;
其中,californium-core和element-connector是coap技術實現最關鍵的模塊,后面的分析將圍繞這兩個模塊進行。
3. 分層設計
Californiium 定義了三層架構:
1 網絡層,負責處理端口監聽,網絡數據收發;
2 協議層,負責Coap協議數據包解析及封裝,實現消息的路由、可靠性傳輸、Token處理、觀察者模型等等;
3 邏輯層,負責 Resource定義和映射,一個Resource 對應一個URL,可獨立實現Coap 請求處理。
異步線程池
三層架構中都可以支持獨立的線程池,其中網絡層與協議層的線程池保持獨立;
邏輯層可為每個Resource指定獨立的線程池,並支持父級繼承的機制,即當前Resource若沒有定義則沿用父級Resource線程池;
若邏輯層未指定線程池,則默認使用協議層的線程池。
4. 包結構分析
4.1 californium-core
core 模塊定義了協議棧相關的所有關鍵接口,根據功能職責的不同拆分為多個子 package;
根級 package定義的是Coap應用的一些入口類,如Client/Server實現、包括應用層CoapResource的定義。
4.1.1 package-coap
實現 coap協議 RFC7252 實體定義,包括消息類型、消息頭、Observe機制等。
具體定義見下圖
Coap 消息划分為Request/Response/EmptyMessage 三類;
MessageObserver 接口用於實現消息的狀態跟蹤,如重傳、確認等。
4.1.2 package-network
network 是協議棧核心機制實現的關鍵模塊,其涵蓋了網絡傳輸及協議層的定義及實現;
模塊實現了一些關鍵接口定義,如將網絡傳輸端點抽象為Endpoint,根據請求響應的關聯模型定義了Exchange等。
協議棧的分層定義、消息編解碼、攔截處理也由network包提供。
endpoins定義
Endpoint 定義為一個端點,通常與一個IP和端口對應,其屏蔽了client和server交互時的網絡傳輸細節。
對於client來說,Endpoint代表通訊的服務端地址端口;而對於server來說則代表了綁定監聽的地址及端口。
CoapEndpoint實現了Endpoint接口,通過RawDataChannel(見elements-connector部分)接口實現消息接收,通過Outbox接口實現消息發送。
通常CoapEndpoint 會關聯一個Connector,以實現傳輸層的收發;
CoapStack對應了協議棧接口,用於處理CoapEndpoint上層的消息鏈路;
除此之外,CoapEndpoint 還應該包括消息編解碼、攔截處理等功能。
exchange定義
Exchange描述了請求-響應模型,一個Exchange會對應一個Request,相應的Response,以及當前的Endpoint;
ExchangeObserver用於實現對Exchange狀態的變更監聽;
Exchange 通常存在於兩種場景:
1 發送請求后初始化並存儲,當接收到對應的響應之后變更為completed(執行清理工作)。
2 接收請求后初始化並存儲,當發送響應時執行清理;
matcher定義
Matcher 是用於實現Exchange 生成及銷毀的模塊,提供了幾個收發接口;
用於消息在進入協議棧CoapStack處理之前完成配對處理;
messagetool定義
MessageExchangeStore 實現了Exchange的查詢、存儲;
MessageIdProvider 用於提供Coap消息的MID,一個MID代表了一個唯一的消息(在消息生命周期內);
TokenProvider 用於提供Coap消息的Token,而Request及Response通過Token實現匹配;
network子模塊
package-config
提供網絡參數配置定義
package-deduplication
提供消息去重機制的實現
package-interceptors
提供消息傳輸攔截器定義
package-serialization
提供消息包的解析及編碼實現
package-stack
提供協議棧分層定義及實現
4.1.3 package-server
應用層 server端實現的一些定義,包括Server接口、Resource定義。
CoapServer 可包含多個Endpoint,體現為一個Coap服務可架設在多個傳輸端口之上;
MessageDeliverer 是消息路由的接口,ServerMessageDelivery 實現了根據uri 查找Resource的功能;
ConcurrentCoapResource則為Resource 提供了一個獨立線程池的執行方式。
4.1.4 package-observe
應用層 observe機制的定義,如下圖
ObserveRelation 定義一個觀察關系,對應一個觀察者即觀察目標Resource;
ObserveEndpoint 定義了一個觀察者端點,並包含一個關系列表(一個觀察者可以觀察多個Resource);
ObserveManager 由CoapServer持有,用於管理觀察者端點列表;
CoapResource 也會持有一個Relation集合以實現跟蹤;其通過ObserveRelationFilter接口決定是否接受來自觀察者的注冊請求;
4.2 elements-connector
connector 模塊由core模塊剝離,用於實現網絡傳輸層的抽象,這使得Coap協議可以運行於UDP、TCP、DTLS等多種協議之上。
Connector定義了連接器需實現的相關方法,包括啟動停止、數據的收發;
RawData包含了網絡消息包的原始字節數據,其解析和編碼需要交由上層協議實現;
CorrelationContext 描述了上下文,用於支持傳輸協議的一些會話數據讀寫,如DTLS會話。
4.3. 核心接口
下面擬用一張關系圖概括Californium 框架的全貌(部分內容未體現):
與分層設計對應,框架分為 transport 傳輸層、protocol 協議層、logic 邏輯層。
transport 傳輸層,由Connector 提供傳輸端口的抽象,UDPConnector是其主要實現;
數據包通過RawData對象封裝;該層還提供了CorrelationContext 實現傳輸層會話數據的讀寫支持。
protocol 協議層,提供了Coap 協議棧機制的完整實現;CoapEndpoint是核心的操作類,數據的編解碼通過
DataSerializer、DataParser實現,MessageInterceptor提供了消息收發的攔截功能,Request/Response的映射處理
由 Matcher實現,Exchange 描述了映射模型;協議棧CoapStack 是一個分層的內核實現,在這里完成分塊、重傳等機制。
logic 邏輯層,定義了CoapClient、CoapServer的入口,包括消息的路由機制,Resource的繼承機制;
Observe機制的關系維護、狀態管理由ObserveManager提供入口。
5. 關鍵機制
5.1 協議棧;
californium-core 采用了分層接口來定義協議棧,其中CoapStack 描述整個棧對象,Layer則對應分層的處理;
這相當於采用了過濾器模式,分層的定義使得特性間互不影響,子模塊可保持獨立的關注點;
CoapStack定義如下:
public interface CoapStack {
// delegate to top
void sendRequest(Request request);
// delegate to top
void sendResponse(Exchange exchange, Response response);
...
// delegate to bottom
void receiveRequest(Exchange exchange, Request request);
// delegate to bottom
void receiveResponse(Exchange exchange, Response response);
接口包括了幾個消息收發函數,而Layer也定義了一樣的接口。
一個CoapUdpStack 包括的分層如下圖:
CoapUdpStack 構造函數與此對應:
public CoapUdpStack(final NetworkConfig config, final Outbox outbox) {
...
Layer layers[] = new Layer[] {
new ExchangeCleanupLayer(),
new ObserveLayer(config),
new BlockwiseLayer(config),
reliabilityLayer };
setLayers(layers);
}
StackTopLayer和StackBottomLayer由基礎類BaseCoapStack提供,實現了協議棧頂層和底層邏輯;
MessageDeliver是膠合應用層的接口,其從StackTopLayer收到Coap消息之后將繼續分發到Resource;
StackBottomLayer則膠合了傳輸層,通過Inbox/Outbox接口實現與Connector的交互。
其他Layer的功能
ExchangeCleanLayer 提供Exchange清理功能,當取消請求時觸發Exchange的清理功能;
ObserveLayer 提供Coap Observe機制實現;
BlockwiseLayer 提供Coap 分塊傳輸機制實現;
ReliabilityLayer 提供可靠性傳輸,實現自動重傳機制;
5.2 Exchange生命周期
Exchange對應於請求/響應模型,其生命周期也由交互模型決定,一般在響應結束之后Exchange便不再存活;
然而在Observe場景下例外,一旦啟動了Observe請求,Exchange會一直存活直到Observe被取消或中斷。
1 LocalExchange,即本地的Exchange, 對應於本地請求對方響應的交互。
BaseCoapStack.StackTopLayer實現了初始化:
public void sendRequest(final Request request) {
Exchange exchange = new Exchange(request, Origin.LOCAL);
...
當接收響應時進行銷毀,observe類型的請求在這里被忽略:
public void receiveResponse(final Exchange exchange, final Response response) {
if (!response.getOptions().hasObserve()) {
exchange.setComplete();
}
UdpMatcher 實現了銷毀動作:
UdpMatcher--
public void sendRequest(final Exchange exchange, final Request request) {
exchange.setObserver(exchangeObserver);
exchangeStore.registerOutboundRequest(exchange);
if (LOGGER.isLoggable(Level.FINER)) {
這是在發送請求時為Exchange添加觀察者接口,當exchange執行complete操作時觸發具體的銷毀工作:
UdpMatcher.ExchangeObserverImpl--
if (exchange.getOrigin() == Origin.LOCAL) {
// this endpoint created the Exchange by issuing a request
KeyMID idByMID = KeyMID.fromOutboundMessage(exchange.getCurrentRequest());
KeyToken idByToken = KeyToken.fromOutboundMessage(exchange.getCurrentRequest());
exchangeStore.remove(idByToken);
// in case an empty ACK was lost
exchangeStore.remove(idByMID);
...
值得一說的是,californium大量采用了觀察者設計模式,這種方法在設計異步消息機制時非常有用.
此外,request的取消、中斷操作(RST信號)、傳輸的超時都會導致exchange生命周期結束。
LocalExchange的生命周期如下圖:
2 RemoteExchange,即遠程的Exchange,對應於本地接收請求並返回響應的交互。
UdpMatcher實現了遠程Exchange的初始化:
UdpMatcher--
public Exchange receiveRequest(final Request request) {
...
KeyMID idByMID = KeyMID.fromInboundMessage(request);
if (!request.getOptions().hasBlock1() && !request.getOptions().hasBlock2()) {
Exchange exchange = new Exchange(request, Origin.REMOTE);
Exchange previous = exchangeStore.findPrevious(idByMID, exchange);
if (previous == null) {
exchange.setObserver(exchangeObserver);
...
在發送響應時,Exchange被銷毀,仍然由UdpMatcher實現:
UdpMatcher--
public void sendResponse(final Exchange exchange, final Response response) {
response.setToken(exchange.getCurrentRequest().getToken());
...
// Only CONs and Observe keep the exchange active (CoAP server side)
if (response.getType() != Type.CON && response.isLast()) {
exchange.setComplete();
}
注意到這里對response進行了last屬性的判斷,該屬性默認為true,而ObserveLayer將其置為false,使得observe響應不會導致Exchange結束:
ObserveLayer--
public void sendResponse(final Exchange exchange, Response response) {
...
response.setLast(false);
連接中斷(RST信號)、傳輸超時會導致Exchange的結束,此外由客戶端發起的observe取消請求也會產生一樣的結果。
RemoteExchange的生命周期如下圖所示:
5.3 分塊傳輸;
分塊傳輸一般用於發送較大的請求體或接受較大的響應體,比如上傳下載固件包場景,由於受到MTU的限制,需要實現分塊傳輸;
Coap定義了分塊傳輸的方式,采用Block1/Block2機制
Option選項
BlockOption是用於描述分塊信息的選項類型,選項值為0-3個字節,編碼包含了3個字段:當前分塊編號;是否結束;當前分塊大小。
為區分請求和響應的不同,分別有block1和block2 兩個選項:
block1:用於發送POST/PUT請求時傳輸較大的內容體;
block2:用於響應GET/POST/PUT請求時傳輸較大的內容體;
size1:指示請求體的總大小;
size2:指示響應體的總大小;
配置選項
maxMessageSize:消息大小閾值,當發送的消息大於該閾值時需采用分塊傳輸,該值必須小於MTU;
preferredBlockSize:用於指示分塊的大小;
maxResourceBodySize:最大資源內容體大小,用於限制接收的請求或響應的總大小,若超過將提示錯誤或取消處理;
blockLifeTime:分塊傳輸的生命周期時長,若超過該時長分塊傳輸未完成則視為失敗;
BlockwiseLayer實現了分塊傳輸的完整邏輯,其中sendRequest的代碼片段:
public void sendRequest(final Exchange exchange, final Request request) {
BlockOption block2 = request.getOptions().getBlock2();
if (block2 != null && block2.getNum() > 0) {
//應用層指定的分塊..
} else if (requiresBlockwise(request)) {
//自動計算分塊
startBlockwiseUpload(exchange, request);
} else {
//不需要分塊
exchange.setCurrentRequest(request);
lower().sendRequest(exchange, request);
}
}
...
//實現分塊閾值判斷
private boolean requiresBlockwise(final Request request) {
boolean blockwiseRequired = false;
if (request.getCode() == Code.PUT || request.getCode() == Code.POST) {
blockwiseRequired = request.getPayloadSize() > maxMessageSize;
}
...
//startBlockwiseUpload實現了request分塊邏輯,通過在請求的Option中加入Block1作為標識
private void startBlockwiseUpload(final Exchange exchange, final Request request) {
BlockwiseStatus status = findRequestBlockStatus(exchange, request);
final Request block = getNextRequestBlock(request, status);
block.getOptions().setSize1(request.getPayloadSize());
...
lower().sendRequest(exchange, block);
}
接收端檢測Request的Block1選項,返回continue響應碼,直到所有分塊傳輸完成后進行組裝交由上層處理:
private void handleInboundBlockwiseUpload(final BlockOption block1, final Exchange exchange, final Request request) {
//檢查是否超過限制
if (requestExceedsMaxBodySize(request)) {
Response error = Response.createResponse(request, ResponseCode.REQUEST_ENTITY_TOO_LARGE);
error.setPayload(String.format("body too large, can process %d bytes max", maxResourceBodySize));
error.getOptions().setSize1(maxResourceBodySize);
lower().sendResponse(exchange, error);
} else {
...
if (block1.getNum() == status.getCurrentNum()) {
if (status.hasContentFormat(request.getOptions().getContentFormat())) {
status.addBlock(request.getPayload());
status.setCurrentNum(status.getCurrentNum() + 1);
if ( block1.isM() ) {
//存在后面的block,返回Continue響應
Response piggybacked = Response.createResponse(request, ResponseCode.CONTINUE);
piggybacked.getOptions().setBlock1(block1.getSzx(), true, block1.getNum());
piggybacked.setLast(false);
exchange.setCurrentResponse(piggybacked);
lower().sendResponse(exchange, piggybacked);
} else {
...
//已經完成,組裝后交由上層處理
Request assembled = new Request(request.getCode());
assembled.setSenderIdentity(request.getSenderIdentity());
assembleMessage(status, assembled);
upper().receiveRequest(exchange, assembled);
}
因此,一個請求體分塊傳輸流程如下圖所示:
響應體分塊傳輸的邏輯與此類似,交互流程如下圖:
5.4 消息重傳;
Coap消息支持重傳機制,當發送CON類型的消息時,要求接收端響應對應的ACK消息;如果在指定時間內沒有收到響應,則進行重傳。
基礎消息重傳由ReliabilityLayer實現,sendRequest 代碼片段:
if (request.getType() == null) {
request.setType(Type.CON);
}
if (request.getType() == Type.CON) {
prepareRetransmission(exchange, new RetransmissionTask(exchange, request) {
public void retransmit() {
sendRequest(exchange, request);
}
});
}
lower().sendRequest(exchange, request);
當發送CON類型消息時,通過 prepareRetransmission函數實現重傳准備:
int timeout;
if (exchange.getFailedTransmissionCount() == 0) {
timeout = getRandomTimeout(ack_timeout, (int) (ack_timeout * ack_random_factor));
} else {
timeout = (int) (ack_timeout_scale * exchange.getCurrentTimeout());
}
exchange.setCurrentTimeout(timeout);
ScheduledFuture<?> f = executor.schedule(task, timeout, TimeUnit.MILLISECONDS);
exchange.setRetransmissionHandle(f);
exchange.getFailedTransmissionCount() 返回0 代表第一次傳輸,采用的超時時間是:
timeout = random(ack_timeout, act_timeout*ack_random_factor)
//其中ack_timeout(超時起始值)、ack_random_factor(隨機因子)由配置文件提供;
后續的重傳時間將由上一次的timeout和ack_timeout_scale系數決定:
timeout = timeout * ack_timeout_scale
當接收ACK時,有必要取消重傳處理,看看receiveResponse的實現:
@Override
public void receiveResponse(final Exchange exchange, final Response response) {
exchange.setFailedTransmissionCount(0);
exchange.getCurrentRequest().setAcknowledged(true);
exchange.setRetransmissionHandle(null);
...
可以看到,接收到響應之后,將Request標記為ack狀態,exchange.setRestransmissionHandler會導致上一次的重傳schedu任務被取消。
最終重傳任務由RetransmissionTask實現:
int failedCount = exchange.getFailedTransmissionCount() + 1;
exchange.setFailedTransmissionCount(failedCount);
if (message.isAcknowledged()) {
return;
} else if (message.isRejected()) {
return;
} else if (message.isCanceled()) {
return;
} else if (failedCount <= max_retransmit) {
// Trigger MessageObservers
message.retransmitting();
// MessageObserver might have canceled
if (!message.isCanceled()) {
retransmit();
}
} else {
exchange.setTimedOut();
message.setTimedOut(true);
}
滿足重傳的條件:
1 消息未被確認(收到ACK)或拒絕(收到RST)
2 消息未被取消;
3 消息未超過重傳次數限制;
其中重傳次數max_retransmit由配置提供,當超過該次數限制時消息將發生傳輸超時。
默認參數配置
ack_timeout=2s
ack_random_factor=1.5
ack_timeout_scale=2
max_retransmit=4
5.5 防止重復包;
由於存在重傳機制,加上UDP傳輸的不穩定性,傳輸兩端很可能會受到重復的消息包;
通常重復消息的檢測要求實現消息容器以記錄和匹配重復消息ID,然而執行時間越長,消息會越來越多,
因此消息容器必須具備清除機制,基於此點不同,californium 提供了兩種實現機制:
5.5.1 標記清除
清除器維持一個消息容器,每個消息都保持一個初始的時間戳;
清除器定時進行掃描,發現太老的消息則將其清除。
SweepDeduplicator 提供了實現,清除代碼片段:
private void sweep() {
final long oldestAllowed = System.currentTimeMillis() - exchangeLifetime;
final long start = System.currentTimeMillis();
for (Map.Entry<?, Exchange> entry : incomingMessages.entrySet()) {
Exchange exchange = entry.getValue();
if (exchange.getTimestamp() < oldestAllowed) {
incomingMessages.remove(entry.getKey());
}
}
...
其中incomingMessage采用了ConcurrentHashMap數據結構,這是一個並發性良好的線程安全集合;
然而從上面的代碼也可以發現,sweep在這里是一個遍歷操作,定時清除的老化時間默認為247s,假設1s內處理1000條消息,
那么每次清除時駐留的消息數量為247000,即需要遍歷這么多的次數,對於CPU來說存在一定的開銷。
采用這種方式,消息的存活時間基本上由exchangeLifetime參數和掃描間隔決定。
5.5.2 翻轉清除
清除器維持三個消息容器,保持1、2、3三個索引分別指向相應消息容器,其中索引1、2代表了活動的消息容器,
索引3 代表老化的消息容器,如下圖所示
消息索引首次會往 I1容器寫入,同時也會往 I2容器存入拷貝;
查找消息時主要從I1 容器查找;
每個周期會執行一次翻轉,幾個容器指針發生置換(I1->I2,I2->I3,I3->I1),之后I3 指向的容器會被清理;
CropRotation 實現了翻轉的邏輯,代碼如下:
private void rotation() {
synchronized (maps) {
int third = first;
first = second;
second = (second+1)%3;
maps[third].clear();
}
基於上述的算法分析,I2容器的消息存活時間會小於一個周期,I1容器的消息則存活一個周期到兩個周期之間,I3 容器則超過2個周期,是最老的容器;
基於這樣的邏輯,翻轉清除機制的消息存活時間是1-2個周期之間,而該機制相比標記清除的優點在於清除機制是整個容器一塊清除,而不需要遍歷操作,然而缺點是增加了存儲開銷。
JVM的垃圾回收機制也存在類似的設計,相信californium的開發者借鑒了一些思路。
至此,Californium框架的基本全貌已經分析完畢。如果希望對框架有更深入的理解,那么建議你直接在項目中直接使用它,並針對自己感興趣的幾個問題進行源碼分析或調試,相信收獲會更多。
6. 擴展閱讀
RFC關於分塊傳輸的定義
https://tools.ietf.org/html/draft-ietf-core-block-21
Hands on with Coap(需要翻牆)
https://docs.google.com/presentation/d/1dDZ7VTdjBZxnqcIt6qoX742d6dHbzap-D_H8Frf3LRE/edit#slide=id.p
Californium項目早期的介紹文檔
https://people.inf.ethz.ch/mkovatsc/resources/californium/cf-thesis.pdf
Californium 項目源碼
https://github.com/eclipse/californium
后記
往往我們在使用優秀開源框架的時候都是信手拈來,知其一則止步。
這或許跟環境有着極大的關系,試想如果公司讓你天天陷於加班趕改的狀態,項目上不合理分配資源,只要結果卻不關心個人的成長。長此以往,誰還能回歸到技術的路上?
然而,改變不了環境的結果只能改變自己,路漫漫其修遠兮,無論你的選擇如何,努力過的世界終究是精彩的。