一.Calinfornium簡介
Californium 是一款基於Java實現的Coap技術框架,該項目實現了Coap協議的各種請求響應定義,支持CON/NON不同的可靠性傳輸模。
在同等的Coap協議的技術實現中,Calinfornium框架在眾多框架中以性能最優。如下:
二.Californium的項目結構
目前Californium 項目的托管地址在:https://github.com/eclipse/californium
Californium 主要由如下幾個模塊組成:
(1)californium-core
californium 核心模塊,定義了一系列協議棧核心接口,並提供了Coap協議棧的完整實現。
(2)element-connector
從core模塊剝離的連接器模塊,用於抽象網絡傳輸層的接口,使得coap可以同時運行於udp和tcp多種傳輸協議之上。
(3)scandium-core
Coap over DTLS 支持模塊,提供了DTLS 傳輸的Connector實現。
(4)californium-osgi
californium 的osgi 封裝模塊。
(5)californium-proxy
coap 代理模塊,用於支持coap2coap、coap2http、http2coap的轉換。
注:californium-core和element-connector是coap技術實現最關鍵的模塊,后面的分析將圍繞這兩個模塊進行。
三.Californium的分層設計
Californium 定義了三層結構:
第一層:網絡層,負責處理端口監聽,網絡數據收發;
第二層: 協議層,負責Coap協議數據包解析及封裝,實現消息的路由、可靠性傳輸、Token處理、觀察者模型等等;
第三層: 邏輯層,負責 Resource定義和映射,一個Resource 對應一個URL,可獨立實現Coap 請求處理。
Californium 異步線程池
三層架構中都可以支持獨立的線程池,其中網絡層與協議層的線程池保持獨立;
邏輯層可為每個Resource指定獨立的線程池,並支持父級繼承的機制,即當前Resource若沒有定義則沿用父級Resource線程池;若邏輯層未指定線程池,則默認使用協議層的線程池。
四.Californium包結構
(1)Californium core包結構
Californium Core 模塊定義了協議棧相關的所有關鍵接口,根據功能職責的不同拆分為多個子 package;
根級 package定義的是Coap應用的一些入口類,如Client/Server實現、包括應用層CoapResource的定義。
Package Coap包:
主要實現了RFC7252 實體定義,包括消息類型、消息頭、Observe機制等。
Coap 消息划分為Request/Response/EmptyMessage 三類;MessageObserver 接口用於實現消息的狀態跟蹤,如重傳、確認等。
Package network包:
network 是協議棧核心機制實現的關鍵模塊,其涵蓋了網絡傳輸及協議層的定義及實現;
模塊實現了一些關鍵接口定義,如將網絡傳輸端點抽象為Endpoint,根據請求響應的關聯模型定義了Exchange等。
協議棧的分層定義、消息編解碼、攔截處理也由network包提供。
Endpoint 定義為一個端點,通常與一個IP和端口對應,其屏蔽了client和server交互時的網絡傳輸細節。對於client來說,Endpoint代表通訊的服務端地址端口;而對於server來說則代表了綁定監聽的地址及端口。CoapEndpoint實現了Endpoint接口,通過RawDataChannel(見elements-connector部分)接口實現消息接收,通過Outbox接口實現消息發送。通常CoapEndpoint 會關聯一個Connector,以實現傳輸層的收發;CoapStack對應了協議棧接口,用於處理CoapEndpoint上層的消息鏈路;除此之外,CoapEndpoint 還應該包括消息編解碼、攔截處理等功能。
Exchange描述了請求-響應模型,一個Exchange會對應一個Request,相應的Response,以及當前的Endpoint;ExchangeObserver用於實現對Exchange狀態的變更監聽;
Exchange 通常存在於兩種場景:
(1)發送請求后初始化並存儲,當接收到對應的響應之后變更為completed(執行清理工作)。
(2)接收請求后初始化並存儲,當發送響應時執行清理;
Matcher 是用於實現Exchange 生成及銷毀的模塊,提供了幾個收發接口;用於消息在進入協議棧CoapStack處理之前完成配對處理;
MessageExchangeStore 實現了Exchange的查詢、存儲;MessageIdProvider 用於提供Coap消息的MID,一個MID代表了一個唯一的消息(在消息生命周期內);TokenProvider 用於提供Coap消息的Token,而Request及Response通過Token實現匹配;
network子模塊:
package-config
提供網絡參數配置定義
package-deduplication
提供消息去重機制的實現
package-interceptors
提供消息傳輸攔截器定義
package-serialization
提供消息包的解析及編碼實現
package-stack
提供協議棧分層定義及實現
Package Server包:
應用層 server端實現的一些定義,包括Server接口、Resource定義。
CoapServer 可包含多個Endpoint,體現為一個Coap服務可架設在多個傳輸端口之上;MessageDeliverer 是消息路由的接口,ServerMessageDelivery 實現了根據uri 查找Resource的功能;ConcurrentCoapResource則為Resource 提供了一個獨立線程池的執行方式。
ObserveRelation 定義一個觀察關系,對應一個觀察者即觀察目標Resource;
ObserveEndpoint 定義了一個觀察者端點,並包含一個關系列表(一個觀察者可以觀察多個Resource);
ObserveManager 由CoapServer持有,用於管理觀察者端點列表;
CoapResource 也會持有一個Relation集合以實現跟蹤;其通過ObserveRelationFilter接口決定是否接受來自觀察者的注冊請求;
connector 模塊由core模塊剝離,用於實現網絡傳輸層的抽象,這使得Coap協議可以運行於UDP、TCP、DTLS等多種協議之上。
Connector定義了連接器需實現的相關方法,包括啟動停止、數據的收發;
RawData包含了網絡消息包的原始字節數據,其解析和編碼需要交由上層協議實現;
CorrelationContext 描述了上下文,用於支持傳輸協議的一些會話數據讀寫,如DTLS會話。
三.核心接口
Californium框架分為 transport 傳輸層、protocol 協議層、logic 邏輯層。
transport 傳輸層,由Connector 提供傳輸端口的抽象,UDPConnector是其主要實現;
數據包通過RawData對象封裝;該層還提供了CorrelationContext 實現傳輸層會話數據的讀寫支持。
protocol 協議層,提供了Coap 協議棧機制的完整實現;CoapEndpoint是核心的操作類,數據的編解碼通過
DataSerializer、DataParser實現,MessageInterceptor提供了消息收發的攔截功能,Request/Response的映射處理
由 Matcher實現,Exchange 描述了映射模型;協議棧CoapStack 是一個分層的內核實現,在這里完成分塊、重傳等機制。
logic 邏輯層,定義了CoapClient、CoapServer的入口,包括消息的路由機制,Resource的繼承機制;
Observe機制的關系維護、狀態管理由ObserveManager提供入口。
四.關鍵機制
4.1 協議棧
californium-core 采用了分層接口來定義協議棧,其中CoapStack 描述整個棧對象,Layer則對應分層的處理;
這相當於采用了過濾器模式,分層的定義使得特性間互不影響,子模塊可保持獨立的關注點;
CoapStack定義如下:
public interface CoapStack { // delegate to top void sendRequest(Request request); // delegate to top void sendResponse(Exchange exchange, Response response); ... // delegate to bottom void receiveRequest(Exchange exchange, Request request); // delegate to bottom void receiveResponse(Exchange exchange, Response response);
接口包括了幾個消息收發函數,而Layer也定義了一樣的接口。
一個CoapUdpStack 包括的分層如下圖:
CoapUdpStack 構造函數與此對應:
public CoapUdpStack(final NetworkConfig config, final Outbox outbox) { ... Layer layers[] = new Layer[] { new ExchangeCleanupLayer(), new ObserveLayer(config), new BlockwiseLayer(config), reliabilityLayer }; setLayers(layers); }
StackTopLayer和StackBottomLayer由基礎類BaseCoapStack提供,實現了協議棧頂層和底層邏輯;
MessageDeliver是膠合應用層的接口,其從StackTopLayer收到Coap消息之后將繼續分發到Resource;
StackBottomLayer則膠合了傳輸層,通過Inbox/Outbox接口實現與Connector的交互。
其他Layer的功能
ExchangeCleanLayer 提供Exchange清理功能,當取消請求時觸發Exchange的清理功能;
ObserveLayer 提供Coap Observe機制實現;
BlockwiseLayer 提供Coap 分塊傳輸機制實現;
ReliabilityLayer 提供可靠性傳輸,實現自動重傳機制;
4.2 Exchange生命周期
Exchange對應於請求/響應模型,其生命周期也由交互模型決定,一般在響應結束之后Exchange便不再存活;
然而在Observe場景下例外,一旦啟動了Observe請求,Exchange會一直存活直到Observe被取消或中斷。
(1)LocalExchange,即本地的Exchange, 對應於本地請求對方響應的交互。
BaseCoapStack.StackTopLayer實現了初始化:
public void sendRequest(final Request request) { Exchange exchange = new Exchange(request, Origin.LOCAL); ...
當接收響應時進行銷毀,observe類型的請求在這里被忽略:
public void receiveResponse(final Exchange exchange, final Response response) { if (!response.getOptions().hasObserve()) { exchange.setComplete(); }
UdpMatcher 實現了銷毀動作:
public void sendRequest(final Exchange exchange, final Request request) { exchange.setObserver(exchangeObserver); exchangeStore.registerOutboundRequest(exchange); if (LOGGER.isLoggable(Level.FINER)) {
這是在發送請求時為Exchange添加觀察者接口,當exchange執行complete操作時觸發具體的銷毀工作:
UdpMatcher.ExchangeObserverImpl-- if (exchange.getOrigin() == Origin.LOCAL) { // this endpoint created the Exchange by issuing a request KeyMID idByMID = KeyMID.fromOutboundMessage(exchange.getCurrentRequest()); KeyToken idByToken = KeyToken.fromOutboundMessage(exchange.getCurrentRequest()); exchangeStore.remove(idByToken); // in case an empty ACK was lost exchangeStore.remove(idByMID); ...
californium大量采用了觀察者設計模式,這種方法在設計異步消息機制時非常有用.
此外,request的取消、中斷操作(RST信號)、傳輸的超時都會導致exchange生命周期結束。
LocalExchange的生命周期如下圖:
(2)RemoteExchange,即遠程的Exchange,對應於本地接收請求並返回響應的交互。
UdpMatcher實現了遠程Exchange的初始化:
public Exchange receiveRequest(final Request request) { ... KeyMID idByMID = KeyMID.fromInboundMessage(request); if (!request.getOptions().hasBlock1() && !request.getOptions().hasBlock2()) { Exchange exchange = new Exchange(request, Origin.REMOTE); Exchange previous = exchangeStore.findPrevious(idByMID, exchange); if (previous == null) { exchange.setObserver(exchangeObserver); ...
在發送響應時,Exchange被銷毀,仍然由UdpMatcher實現:
public void sendResponse(final Exchange exchange, final Response response) { response.setToken(exchange.getCurrentRequest().getToken()); ... // Only CONs and Observe keep the exchange active (CoAP server side) if (response.getType() != Type.CON && response.isLast()) { exchange.setComplete(); }
注意到這里對response進行了last屬性的判斷,該屬性默認為true,而ObserveLayer將其置為false,使得observe響應不會導致Exchange結束:
public void sendResponse(final Exchange exchange, Response response) { ... response.setLast(false);
連接中斷(RST信號)、傳輸超時會導致Exchange的結束,此外由客戶端發起的observe取消請求也會產生一樣的結果。
RemoteExchange的生命周期如下圖所示:
4.3 分塊傳輸
分塊傳輸一般用於發送較大的請求體或接受較大的響應體,比如上傳下載固件包場景,由於受到MTU的限制,需要實現分塊傳輸;
Coap定義了分塊傳輸的方式,采用Block1/Block2機制
Option選項
BlockOption是用於描述分塊信息的選項類型,選項值為0-3個字節,編碼包含了3個字段:當前分塊編號;是否結束;當前分塊大小。
為區分請求和響應的不同,分別有block1和block2 兩個選項:
block1:用於發送POST/PUT請求時傳輸較大的內容體;
block2:用於響應GET/POST/PUT請求時傳輸較大的內容體;
size1:指示請求體的總大小;
size2:指示響應體的總大小;
配置選項
maxMessageSize:消息大小閾值,當發送的消息大於該閾值時需采用分塊傳輸,該值必須小於MTU;
preferredBlockSize:用於指示分塊的大小;
maxResourceBodySize:最大資源內容體大小,用於限制接收的請求或響應的總大小,若超過將提示錯誤或取消處理;
blockLifeTime:分塊傳輸的生命周期時長,若超過該時長分塊傳輸未完成則視為失敗;
BlockwiseLayer實現了分塊傳輸的完整邏輯,其中sendRequest的代碼片段:
public void sendRequest(final Exchange exchange, final Request request) { BlockOption block2 = request.getOptions().getBlock2(); if (block2 != null && block2.getNum() > 0) { //應用層指定的分塊.. } else if (requiresBlockwise(request)) { //自動計算分塊 startBlockwiseUpload(exchange, request); } else { //不需要分塊 exchange.setCurrentRequest(request); lower().sendRequest(exchange, request); } } ... //實現分塊閾值判斷 private boolean requiresBlockwise(final Request request) { boolean blockwiseRequired = false; if (request.getCode() == Code.PUT || request.getCode() == Code.POST) { blockwiseRequired = request.getPayloadSize() > maxMessageSize; } ... //startBlockwiseUpload實現了request分塊邏輯,通過在請求的Option中加入Block1作為標識 private void startBlockwiseUpload(final Exchange exchange, final Request request) { BlockwiseStatus status = findRequestBlockStatus(exchange, request); final Request block = getNextRequestBlock(request, status); block.getOptions().setSize1(request.getPayloadSize()); ... lower().sendRequest(exchange, block); }
接收端檢測Request的Block1選項,返回continue響應碼,直到所有分塊傳輸完成后進行組裝交由上層處理:
private void handleInboundBlockwiseUpload(final BlockOption block1, final Exchange exchange, final Request request) { //檢查是否超過限制 if (requestExceedsMaxBodySize(request)) { Response error = Response.createResponse(request, ResponseCode.REQUEST_ENTITY_TOO_LARGE); error.setPayload(String.format("body too large, can process %d bytes max", maxResourceBodySize)); error.getOptions().setSize1(maxResourceBodySize); lower().sendResponse(exchange, error); } else { ... if (block1.getNum() == status.getCurrentNum()) { if (status.hasContentFormat(request.getOptions().getContentFormat())) { status.addBlock(request.getPayload()); status.setCurrentNum(status.getCurrentNum() + 1); if ( block1.isM() ) { //存在后面的block,返回Continue響應 Response piggybacked = Response.createResponse(request, ResponseCode.CONTINUE); piggybacked.getOptions().setBlock1(block1.getSzx(), true, block1.getNum()); piggybacked.setLast(false); exchange.setCurrentResponse(piggybacked); lower().sendResponse(exchange, piggybacked); } else { ... //已經完成,組裝后交由上層處理 Request assembled = new Request(request.getCode()); assembled.setSenderIdentity(request.getSenderIdentity()); assembleMessage(status, assembled); upper().receiveRequest(exchange, assembled); }
一個請求體分塊傳輸流程如下圖所示:
響應體分塊傳輸的邏輯與此類似,交互流程如下圖:
4.4 消息重傳
Coap消息支持重傳機制,當發送CON類型的消息時,要求接收端響應對應的ACK消息;如果在指定時間內沒有收到響應,則進行重傳。
基礎消息重傳由ReliabilityLayer實現,sendRequest 代碼片段:
if (request.getType() == null) { request.setType(Type.CON); } if (request.getType() == Type.CON) { prepareRetransmission(exchange, new RetransmissionTask(exchange, request) { public void retransmit() { sendRequest(exchange, request); } }); } lower().sendRequest(exchange, request);
當發送CON類型消息時,通過 prepareRetransmission函數實現重傳准備:
int timeout; if (exchange.getFailedTransmissionCount() == 0) { timeout = getRandomTimeout(ack_timeout, (int) (ack_timeout * ack_random_factor)); } else { timeout = (int) (ack_timeout_scale * exchange.getCurrentTimeout()); } exchange.setCurrentTimeout(timeout); ScheduledFuture<?> f = executor.schedule(task, timeout, TimeUnit.MILLISECONDS); exchange.setRetransmissionHandle(f);
exchange.getFailedTransmissionCount() 返回0 代表第一次傳輸,采用的超時時間是:
**timeout = random(ack_timeout, act_timeout*ack_random_factor)**
//其中ack_timeout(超時起始值)、ack_random_factor(隨機因子)由配置文件提供;
后續的重傳時間將由上一次的timeout和ack_timeout_scale系數決定:
timeout = timeout * ack_timeout_scale
當接收ACK時,有必要取消重傳處理,看看receiveResponse的實現:
@Override public void receiveResponse(final Exchange exchange, final Response response) { exchange.setFailedTransmissionCount(0); exchange.getCurrentRequest().setAcknowledged(true); exchange.setRetransmissionHandle(null); ...
可以看到,接收到響應之后,將Request標記為ack狀態,exchange.setRestransmissionHandler會導致上一次的重傳schedu任務被取消。
最終重傳任務由RetransmissionTask實現:
int failedCount = exchange.getFailedTransmissionCount() + 1; exchange.setFailedTransmissionCount(failedCount); if (message.isAcknowledged()) { return; } else if (message.isRejected()) { return; } else if (message.isCanceled()) { return; } else if (failedCount <= max_retransmit) { // Trigger MessageObservers message.retransmitting(); // MessageObserver might have canceled if (!message.isCanceled()) { retransmit(); } } else { exchange.setTimedOut(); message.setTimedOut(true); }
滿足重傳的條件:
1 消息未被確認(收到ACK)或拒絕(收到RST)
2 消息未被取消;
3 消息未超過重傳次數限制;
其中重傳次數max_retransmit由配置提供,當超過該次數限制時消息將發生傳輸超時。
默認參數配置
ack_timeout=2sack_random_factor=1.5ack_timeout_scale=2max_retransmit=4
4.5 防止重復包;
由於存在重傳機制,加上UDP傳輸的不穩定性,傳輸兩端很可能會受到重復的消息包;
通常重復消息的檢測要求實現消息容器以記錄和匹配重復消息ID,然而執行時間越長,消息會越來越多,
因此消息容器必須具備清除機制,基於此點不同,californium 提供了兩種實現機制:
4.5.1 標記清除
清除器維持一個消息容器,每個消息都保持一個初始的時間戳;
清除器定時進行掃描,發現太老的消息則將其清除。
SweepDeduplicator 提供了實現,清除代碼片段:
private void sweep() { final long oldestAllowed = System.currentTimeMillis() - exchangeLifetime; final long start = System.currentTimeMillis(); for (Map.Entry<?, Exchange> entry : incomingMessages.entrySet()) { Exchange exchange = entry.getValue(); if (exchange.getTimestamp() < oldestAllowed) { incomingMessages.remove(entry.getKey()); } } ...
其中incomingMessage采用了ConcurrentHashMap數據結構,這是一個並發性良好的線程安全集合;
然而從上面的代碼也可以發現,sweep在這里是一個遍歷操作,定時清除的老化時間默認為247s,假設1s內處理1000條消息,
那么每次清除時駐留的消息數量為247000,即需要遍歷這么多的次數,對於CPU來說存在一定的開銷。
采用這種方式,消息的存活時間基本上由exchangeLifetime參數和掃描間隔決定。
4.5.2 翻轉清除
清除器維持三個消息容器,保持1、2、3三個索引分別指向相應消息容器,其中索引1、2代表了活動的消息容器,
索引3 代表老化的消息容器,如下圖所示
消息索引首次會往 I1容器寫入,同時也會往 I2容器存入拷貝;
查找消息時主要從I1 容器查找;
每個周期會執行一次翻轉,幾個容器指針發生置換(I1->I2,I2->I3,I3->I1),之后I3 指向的容器會被清理;
CropRotation 實現了翻轉的邏輯,代碼如下:
private void rotation() { synchronized (maps) { int third = first; first = second; second = (second+1)%3; maps[third].clear(); }
基於上述的算法分析,I2容器的消息存活時間會小於一個周期,I1容器的消息則存活一個周期到兩個周期之間,I3 容器則超過2個周期,是最老的容器;
基於這樣的邏輯,翻轉清除機制的消息存活時間是1-2個周期之間,而該機制相比標記清除的優點在於清除機制是整個容器一塊清除,而不需要遍歷操作,然而缺點是增加了存儲開銷。