californium 框架設計分析


Californium 源碼分析

1. Californium 項目簡介

Californium 是一款基於Java實現的Coap技術框架,該項目實現了Coap協議的各種請求響應定義,支持CON/NON不同的可靠性傳輸模式。
Californium 基於分層設計且高度可擴展,其內部模塊設計及接口定義存在許多學習之處;

值得一提的是,在同類型的 Coap技術實現中,Californium的性能表現是比較突出的,如下圖:

更多的數據可以參考Californium-可擴展雲服務白皮書
本文以框架的源碼分析為主,其他內容不做展開。

2. 項目結構

目前Californium 項目穩定版本為 2.0.0-M2,項目的托管地址在:
https://github.com/eclipse/californium

模塊說明

~.californium-core
californium 核心模塊,定義了一系列協議棧核心接口,並提供了Coap協議棧的完整實現,

~.element-connector
從core模塊剝離的連接器模塊,用於抽象網絡傳輸層的接口,使得coap可以同時運行於udp和tcp多種傳輸協議之上;

~.scandium-core
Coap over DTLS 支持模塊,提供了DTLS 傳輸的Connector實現;

~.californium-osgi
californium 的osgi 封裝模塊;

~.californium-proxy
coap 代理模塊,用於支持coap2coap、coap2http、http2coap的轉換;

~.demo-xxx
樣例程序;

其中,californium-core和element-connector是coap技術實現最關鍵的模塊,后面的分析將圍繞這兩個模塊進行。

3. 分層設計

Californiium 定義了三層架構
1 網絡層,負責處理端口監聽,網絡數據收發;
2 協議層,負責Coap協議數據包解析及封裝,實現消息的路由、可靠性傳輸、Token處理、觀察者模型等等;
3 邏輯層,負責 Resource定義和映射,一個Resource 對應一個URL,可獨立實現Coap 請求處理。

異步線程池

三層架構中都可以支持獨立的線程池,其中網絡層與協議層的線程池保持獨立;
邏輯層可為每個Resource指定獨立的線程池,並支持父級繼承的機制,即當前Resource若沒有定義則沿用父級Resource線程池;
若邏輯層未指定線程池,則默認使用協議層的線程池。

4. 包結構分析

4.1 californium-core

core 模塊定義了協議棧相關的所有關鍵接口,根據功能職責的不同拆分為多個子 package;
根級 package定義的是Coap應用的一些入口類,如Client/Server實現、包括應用層CoapResource的定義。

4.1.1 package-coap

實現 coap協議 RFC7252 實體定義,包括消息類型、消息頭、Observe機制等。

具體定義見下圖

Coap 消息划分為Request/Response/EmptyMessage 三類;
MessageObserver 接口用於實現消息的狀態跟蹤,如重傳、確認等。

4.1.2 package-network

network 是協議棧核心機制實現的關鍵模塊,其涵蓋了網絡傳輸及協議層的定義及實現;
模塊實現了一些關鍵接口定義,如將網絡傳輸端點抽象為Endpoint,根據請求響應的關聯模型定義了Exchange等。
協議棧的分層定義、消息編解碼、攔截處理也由network包提供。

endpoins定義

Endpoint 定義為一個端點,通常與一個IP和端口對應,其屏蔽了client和server交互時的網絡傳輸細節。
對於client來說,Endpoint代表通訊的服務端地址端口;而對於server來說則代表了綁定監聽的地址及端口。
CoapEndpoint實現了Endpoint接口,通過RawDataChannel(見elements-connector部分)接口實現消息接收,通過Outbox接口實現消息發送。
通常CoapEndpoint 會關聯一個Connector,以實現傳輸層的收發;

CoapStack對應了協議棧接口,用於處理CoapEndpoint上層的消息鏈路;
除此之外,CoapEndpoint 還應該包括消息編解碼、攔截處理等功能。

exchange定義

Exchange描述了請求-響應模型,一個Exchange會對應一個Request,相應的Response,以及當前的Endpoint;
ExchangeObserver用於實現對Exchange狀態的變更監聽;

Exchange 通常存在於兩種場景:
1 發送請求后初始化並存儲,當接收到對應的響應之后變更為completed(執行清理工作)。
2 接收請求后初始化並存儲,當發送響應時執行清理;

matcher定義

Matcher 是用於實現Exchange 生成及銷毀的模塊,提供了幾個收發接口;
用於消息在進入協議棧CoapStack處理之前完成配對處理;

messagetool定義

MessageExchangeStore 實現了Exchange的查詢、存儲;
MessageIdProvider 用於提供Coap消息的MID,一個MID代表了一個唯一的消息(在消息生命周期內);
TokenProvider 用於提供Coap消息的Token,而Request及Response通過Token實現匹配;

network子模塊

package-config
提供網絡參數配置定義

package-deduplication
提供消息去重機制的實現

package-interceptors
提供消息傳輸攔截器定義

package-serialization
提供消息包的解析及編碼實現

package-stack
提供協議棧分層定義及實現

4.1.3 package-server

應用層 server端實現的一些定義,包括Server接口、Resource定義。

CoapServer 可包含多個Endpoint,體現為一個Coap服務可架設在多個傳輸端口之上;
MessageDeliverer 是消息路由的接口,ServerMessageDelivery 實現了根據uri 查找Resource的功能;
ConcurrentCoapResource則為Resource 提供了一個獨立線程池的執行方式。

4.1.4 package-observe

應用層 observe機制的定義,如下圖

ObserveRelation 定義一個觀察關系,對應一個觀察者即觀察目標Resource;
ObserveEndpoint 定義了一個觀察者端點,並包含一個關系列表(一個觀察者可以觀察多個Resource);
ObserveManager 由CoapServer持有,用於管理觀察者端點列表;
CoapResource 也會持有一個Relation集合以實現跟蹤;其通過ObserveRelationFilter接口決定是否接受來自觀察者的注冊請求;

4.2 elements-connector

connector 模塊由core模塊剝離,用於實現網絡傳輸層的抽象,這使得Coap協議可以運行於UDP、TCP、DTLS等多種協議之上。
Connector定義了連接器需實現的相關方法,包括啟動停止、數據的收發;
RawData包含了網絡消息包的原始字節數據,其解析和編碼需要交由上層協議實現;
CorrelationContext 描述了上下文,用於支持傳輸協議的一些會話數據讀寫,如DTLS會話。

4.3. 核心接口

下面擬用一張關系圖概括Californium 框架的全貌(部分內容未體現):

與分層設計對應,框架分為 transport 傳輸層、protocol 協議層、logic 邏輯層
transport 傳輸層,由Connector 提供傳輸端口的抽象,UDPConnector是其主要實現;
數據包通過RawData對象封裝;該層還提供了CorrelationContext 實現傳輸層會話數據的讀寫支持。

protocol 協議層,提供了Coap 協議棧機制的完整實現;CoapEndpoint是核心的操作類,數據的編解碼通過
DataSerializer、DataParser實現,MessageInterceptor提供了消息收發的攔截功能,Request/Response的映射處理
由 Matcher實現,Exchange 描述了映射模型;協議棧CoapStack 是一個分層的內核實現,在這里完成分塊、重傳等機制。

logic 邏輯層,定義了CoapClient、CoapServer的入口,包括消息的路由機制,Resource的繼承機制;
Observe機制的關系維護、狀態管理由ObserveManager提供入口。

5. 關鍵機制

5.1 協議棧;

californium-core 采用了分層接口來定義協議棧,其中CoapStack 描述整個棧對象,Layer則對應分層的處理;
這相當於采用了過濾器模式,分層的定義使得特性間互不影響,子模塊可保持獨立的關注點;

CoapStack定義如下:

public interface CoapStack {
    // delegate to top
    void sendRequest(Request request);
    // delegate to top
    void sendResponse(Exchange exchange, Response response);
    ...
    // delegate to bottom
    void receiveRequest(Exchange exchange, Request request);
    // delegate to bottom
    void receiveResponse(Exchange exchange, Response response);

接口包括了幾個消息收發函數,而Layer也定義了一樣的接口。

一個CoapUdpStack 包括的分層如下圖:

CoapUdpStack 構造函數與此對應:

public CoapUdpStack(final NetworkConfig config, final Outbox outbox) {
        ...
      Layer layers[] = new Layer[] {
                new ExchangeCleanupLayer(),
                new ObserveLayer(config),
                new BlockwiseLayer(config),
                reliabilityLayer };
        setLayers(layers);
    }  

StackTopLayer和StackBottomLayer由基礎類BaseCoapStack提供,實現了協議棧頂層和底層邏輯;
MessageDeliver是膠合應用層的接口,其從StackTopLayer收到Coap消息之后將繼續分發到Resource;
StackBottomLayer則膠合了傳輸層,通過Inbox/Outbox接口實現與Connector的交互。

其他Layer的功能
ExchangeCleanLayer 提供Exchange清理功能,當取消請求時觸發Exchange的清理功能;
ObserveLayer 提供Coap Observe機制實現;
BlockwiseLayer 提供Coap 分塊傳輸機制實現;
ReliabilityLayer 提供可靠性傳輸,實現自動重傳機制;

5.2 Exchange生命周期

Exchange對應於請求/響應模型,其生命周期也由交互模型決定,一般在響應結束之后Exchange便不再存活;
然而在Observe場景下例外,一旦啟動了Observe請求,Exchange會一直存活直到Observe被取消或中斷。

1 LocalExchange,即本地的Exchange, 對應於本地請求對方響應的交互。
BaseCoapStack.StackTopLayer實現了初始化:

public void sendRequest(final Request request) {
   Exchange exchange = new Exchange(request, Origin.LOCAL); 
      ...

當接收響應時進行銷毀,observe類型的請求在這里被忽略:

    public void receiveResponse(final Exchange exchange, final Response response) {
        if (!response.getOptions().hasObserve()) {
            exchange.setComplete();
        } 

UdpMatcher 實現了銷毀動作:

UdpMatcher--
    public void sendRequest(final Exchange exchange, final Request request) {
        exchange.setObserver(exchangeObserver);
        exchangeStore.registerOutboundRequest(exchange);
        if (LOGGER.isLoggable(Level.FINER)) {  

這是在發送請求時為Exchange添加觀察者接口,當exchange執行complete操作時觸發具體的銷毀工作:

UdpMatcher.ExchangeObserverImpl--
            if (exchange.getOrigin() == Origin.LOCAL) {
                // this endpoint created the Exchange by issuing a request
                KeyMID idByMID = KeyMID.fromOutboundMessage(exchange.getCurrentRequest());
                KeyToken idByToken = KeyToken.fromOutboundMessage(exchange.getCurrentRequest());
                exchangeStore.remove(idByToken);
                // in case an empty ACK was lost
                exchangeStore.remove(idByMID);
                            ...

值得一說的是,californium大量采用了觀察者設計模式,這種方法在設計異步消息機制時非常有用.

此外,request的取消、中斷操作(RST信號)、傳輸的超時都會導致exchange生命周期結束。
LocalExchange的生命周期如下圖:

2 RemoteExchange,即遠程的Exchange,對應於本地接收請求並返回響應的交互。

UdpMatcher實現了遠程Exchange的初始化:

UdpMatcher--
    public Exchange receiveRequest(final Request request) {
        ...        
        KeyMID idByMID = KeyMID.fromInboundMessage(request);
        if (!request.getOptions().hasBlock1() && !request.getOptions().hasBlock2()) {
            Exchange exchange = new Exchange(request, Origin.REMOTE);
            Exchange previous = exchangeStore.findPrevious(idByMID, exchange);
            if (previous == null) {
                exchange.setObserver(exchangeObserver);  
                            ...

在發送響應時,Exchange被銷毀,仍然由UdpMatcher實現:

UdpMatcher--
    public void sendResponse(final Exchange exchange, final Response response) {
        response.setToken(exchange.getCurrentRequest().getToken());
        ...
        // Only CONs and Observe keep the exchange active (CoAP server side)
        if (response.getType() != Type.CON && response.isLast()) {
            exchange.setComplete();
        }  

注意到這里對response進行了last屬性的判斷,該屬性默認為true,而ObserveLayer將其置為false,使得observe響應不會導致Exchange結束:

ObserveLayer--
    public void sendResponse(final Exchange exchange, Response response) {
            ...
            response.setLast(false);  

連接中斷(RST信號)、傳輸超時會導致Exchange的結束,此外由客戶端發起的observe取消請求也會產生一樣的結果。
RemoteExchange的生命周期如下圖所示:

5.3 分塊傳輸;

分塊傳輸一般用於發送較大的請求體或接受較大的響應體,比如上傳下載固件包場景,由於受到MTU的限制,需要實現分塊傳輸;
Coap定義了分塊傳輸的方式,采用Block1/Block2機制

Option選項
BlockOption是用於描述分塊信息的選項類型,選項值為0-3個字節,編碼包含了3個字段:當前分塊編號;是否結束;當前分塊大小。
為區分請求和響應的不同,分別有block1和block2 兩個選項:
block1:用於發送POST/PUT請求時傳輸較大的內容體;
block2:用於響應GET/POST/PUT請求時傳輸較大的內容體;
size1:指示請求體的總大小;
size2:指示響應體的總大小;

配置選項
maxMessageSize:消息大小閾值,當發送的消息大於該閾值時需采用分塊傳輸,該值必須小於MTU;
preferredBlockSize:用於指示分塊的大小;
maxResourceBodySize:最大資源內容體大小,用於限制接收的請求或響應的總大小,若超過將提示錯誤或取消處理;
blockLifeTime:分塊傳輸的生命周期時長,若超過該時長分塊傳輸未完成則視為失敗;

BlockwiseLayer實現了分塊傳輸的完整邏輯,其中sendRequest的代碼片段:

public void sendRequest(final Exchange exchange, final Request request) {
        BlockOption block2 = request.getOptions().getBlock2();
        if (block2 != null && block2.getNum() > 0) {
            //應用層指定的分塊..
        } else if (requiresBlockwise(request)) {
            //自動計算分塊
            startBlockwiseUpload(exchange, request);
        } else {
            //不需要分塊
            exchange.setCurrentRequest(request);
            lower().sendRequest(exchange, request);
        }
    }  
...
//實現分塊閾值判斷
private boolean requiresBlockwise(final Request request) {
        boolean blockwiseRequired = false;
        if (request.getCode() == Code.PUT || request.getCode() == Code.POST) {
            blockwiseRequired = request.getPayloadSize() > maxMessageSize;
        }  
...
//startBlockwiseUpload實現了request分塊邏輯,通過在請求的Option中加入Block1作為標識
private void startBlockwiseUpload(final Exchange exchange, final Request request) {
        BlockwiseStatus status = findRequestBlockStatus(exchange, request);
        final Request block = getNextRequestBlock(request, status);
        block.getOptions().setSize1(request.getPayloadSize());
        ...
        lower().sendRequest(exchange, block);
    }  

接收端檢測Request的Block1選項,返回continue響應碼,直到所有分塊傳輸完成后進行組裝交由上層處理:

private void handleInboundBlockwiseUpload(final BlockOption block1, final Exchange exchange, final Request request) {
        //檢查是否超過限制
        if (requestExceedsMaxBodySize(request)) {
            Response error = Response.createResponse(request, ResponseCode.REQUEST_ENTITY_TOO_LARGE);
            error.setPayload(String.format("body too large, can process %d bytes max", maxResourceBodySize));
            error.getOptions().setSize1(maxResourceBodySize);
            lower().sendResponse(exchange, error);
        } else {
            ...
            if (block1.getNum() == status.getCurrentNum()) {
                if (status.hasContentFormat(request.getOptions().getContentFormat())) {
                    status.addBlock(request.getPayload());
                    status.setCurrentNum(status.getCurrentNum() + 1);
                    
                    if ( block1.isM() ) {
                        //存在后面的block,返回Continue響應
                        Response piggybacked = Response.createResponse(request, ResponseCode.CONTINUE);
                        piggybacked.getOptions().setBlock1(block1.getSzx(), true, block1.getNum());
                        piggybacked.setLast(false);
                        exchange.setCurrentResponse(piggybacked);
                        lower().sendResponse(exchange, piggybacked);
                    } else {
                        ...
                        //已經完成,組裝后交由上層處理
                        Request assembled = new Request(request.getCode());
                        assembled.setSenderIdentity(request.getSenderIdentity());
                        assembleMessage(status, assembled);
                        upper().receiveRequest(exchange, assembled);
                    }  

因此,一個請求體分塊傳輸流程如下圖所示:

響應體分塊傳輸的邏輯與此類似,交互流程如下圖:

5.4 消息重傳;

Coap消息支持重傳機制,當發送CON類型的消息時,要求接收端響應對應的ACK消息;如果在指定時間內沒有收到響應,則進行重傳。
基礎消息重傳由ReliabilityLayer實現,sendRequest 代碼片段:

        if (request.getType() == null) {
            request.setType(Type.CON);
        }
        if (request.getType() == Type.CON) {
            prepareRetransmission(exchange, new RetransmissionTask(exchange, request) {
                public void retransmit() {
                    sendRequest(exchange, request);
                }
            });
        }
        lower().sendRequest(exchange, request);  

當發送CON類型消息時,通過 prepareRetransmission函數實現重傳准備:

        int timeout;
        if (exchange.getFailedTransmissionCount() == 0) {
            timeout = getRandomTimeout(ack_timeout, (int) (ack_timeout * ack_random_factor));
        } else {
            timeout = (int) (ack_timeout_scale * exchange.getCurrentTimeout());
        }
        exchange.setCurrentTimeout(timeout);
        ScheduledFuture<?> f = executor.schedule(task, timeout, TimeUnit.MILLISECONDS);
        exchange.setRetransmissionHandle(f);  

exchange.getFailedTransmissionCount() 返回0 代表第一次傳輸,采用的超時時間是:
timeout = random(ack_timeout, act_timeout*ack_random_factor)
//其中ack_timeout(超時起始值)、ack_random_factor(隨機因子)由配置文件提供;

后續的重傳時間將由上一次的timeout和ack_timeout_scale系數決定:
timeout = timeout * ack_timeout_scale

當接收ACK時,有必要取消重傳處理,看看receiveResponse的實現:

    @Override
    public void receiveResponse(final Exchange exchange, final Response response) {
        exchange.setFailedTransmissionCount(0);
        exchange.getCurrentRequest().setAcknowledged(true);
        exchange.setRetransmissionHandle(null); 
              ... 

可以看到,接收到響應之后,將Request標記為ack狀態,exchange.setRestransmissionHandler會導致上一次的重傳schedu任務被取消。
最終重傳任務由RetransmissionTask實現:

                int failedCount = exchange.getFailedTransmissionCount() + 1;
                exchange.setFailedTransmissionCount(failedCount);
                if (message.isAcknowledged()) {
                    return;
                } else if (message.isRejected()) {
                    return;
                } else if (message.isCanceled()) {
                    return;
                } else if (failedCount <= max_retransmit) {
                    // Trigger MessageObservers
                    message.retransmitting();
                    // MessageObserver might have canceled
                    if (!message.isCanceled()) {
                        retransmit();
                    }
                } else {
                    exchange.setTimedOut();
                    message.setTimedOut(true);
                }  

滿足重傳的條件
1 消息未被確認(收到ACK)或拒絕(收到RST)
2 消息未被取消;
3 消息未超過重傳次數限制;
其中重傳次數max_retransmit由配置提供,當超過該次數限制時消息將發生傳輸超時。

默認參數配置

ack_timeout=2s
ack_random_factor=1.5
ack_timeout_scale=2
max_retransmit=4

5.5 防止重復包;

由於存在重傳機制,加上UDP傳輸的不穩定性,傳輸兩端很可能會受到重復的消息包;
通常重復消息的檢測要求實現消息容器以記錄和匹配重復消息ID,然而執行時間越長,消息會越來越多,
因此消息容器必須具備清除機制,基於此點不同,californium 提供了兩種實現機制:

5.5.1 標記清除

清除器維持一個消息容器,每個消息都保持一個初始的時間戳;
清除器定時進行掃描,發現太老的消息則將其清除。

SweepDeduplicator 提供了實現,清除代碼片段:

private void sweep() {
        final long oldestAllowed = System.currentTimeMillis() - exchangeLifetime;
        final long start = System.currentTimeMillis();
        for (Map.Entry<?, Exchange> entry : incomingMessages.entrySet()) {
            Exchange exchange = entry.getValue();
            if (exchange.getTimestamp() < oldestAllowed) {
                incomingMessages.remove(entry.getKey());
            }
        }
           ...

其中incomingMessage采用了ConcurrentHashMap數據結構,這是一個並發性良好的線程安全集合;
然而從上面的代碼也可以發現,sweep在這里是一個遍歷操作,定時清除的老化時間默認為247s,假設1s內處理1000條消息,
那么每次清除時駐留的消息數量為247000,即需要遍歷這么多的次數,對於CPU來說存在一定的開銷。
采用這種方式,消息的存活時間基本上由exchangeLifetime參數和掃描間隔決定。

5.5.2 翻轉清除

清除器維持三個消息容器,保持1、2、3三個索引分別指向相應消息容器,其中索引1、2代表了活動的消息容器,
索引3 代表老化的消息容器,如下圖所示

消息索引首次會往 I1容器寫入,同時也會往 I2容器存入拷貝;
查找消息時主要從I1 容器查找;
每個周期會執行一次翻轉,幾個容器指針發生置換(I1->I2,I2->I3,I3->I1),之后I3 指向的容器會被清理;

CropRotation 實現了翻轉的邏輯,代碼如下:

private void rotation() {
    synchronized (maps) {
        int third = first;
        first = second;
        second = (second+1)%3;
        maps[third].clear();
    } 

基於上述的算法分析,I2容器的消息存活時間會小於一個周期,I1容器的消息則存活一個周期到兩個周期之間,I3 容器則超過2個周期,是最老的容器;
基於這樣的邏輯,翻轉清除機制的消息存活時間是1-2個周期之間,而該機制相比標記清除的優點在於清除機制是整個容器一塊清除,而不需要遍歷操作,然而缺點是增加了存儲開銷。

JVM的垃圾回收機制也存在類似的設計,相信californium的開發者借鑒了一些思路。

至此,Californium框架的基本全貌已經分析完畢。如果希望對框架有更深入的理解,那么建議你直接在項目中直接使用它,並針對自己感興趣的幾個問題進行源碼分析或調試,相信收獲會更多。

6. 擴展閱讀

RFC關於分塊傳輸的定義
https://tools.ietf.org/html/draft-ietf-core-block-21

Hands on with Coap(需要翻牆)
https://docs.google.com/presentation/d/1dDZ7VTdjBZxnqcIt6qoX742d6dHbzap-D_H8Frf3LRE/edit#slide=id.p

Californium項目早期的介紹文檔
https://people.inf.ethz.ch/mkovatsc/resources/californium/cf-thesis.pdf

Californium 項目源碼
https://github.com/eclipse/californium


后記
往往我們在使用優秀開源框架的時候都是信手拈來,知其一則止步。
這或許跟環境有着極大的關系,試想如果公司讓你天天陷於加班趕改的狀態,項目上不合理分配資源,只要結果卻不關心個人的成長。長此以往,誰還能回歸到技術的路上?
然而,改變不了環境的結果只能改變自己,路漫漫其修遠兮,無論你的選擇如何,努力過的世界終究是精彩的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM