鴻蒙分布式軟總線技術研究


鴻蒙分布式軟總線技術研究

1、HarmonyOS概述

1.1 系統定義

HarmonyOS 是一款“面向未來”、面向全場景(移動辦公、運動健康、社交通 信、媒體娛樂等)的分布式操作系統。在傳統的單設備系統能力的基礎上,HarmonyOS 提出了基於同一套系統能力、適配多種終端形態的分布式理念,能夠支持手機、平板、智能穿戴、智慧屏、車機等多種終端設備。

1.2 系統架構

HarmonyOS 整體遵從分層設計,從下向上依次為:內核層、系統服務層、框架層和應用層。系統功能按照“系統 > 子系統 > 功能/模塊”逐級展開,在多設備部署場景下,支持根據實際需求裁剪某些非必要的子系統或功能/模塊。HarmonyOS 技術架構如下所示。

image-20210117200859231

1.3 分布式技術特性

HarmonyOS 中,多種設備之間能夠實現硬件互助、資源共享,依賴的關鍵技術 包括分布式軟總線、分布式設備虛擬化、分布式數據管理、分布式任務調度等。

image-20210117200843365

2、分布式軟總線模塊解析

2.1 分布式軟總線的功能

在鴻蒙系統中,分布式軟總線是手機、平板、智能穿戴、智慧屏、車機等分布式 設備的通信基座,為設備之間的互聯互通提供了統一的分布式通信能力,為設備之 間的無感發現和零等待傳輸創造了條件。依托軟總線技術,可以輕松實現多台設備 共同協作完成一項任務,任務也可以由一台設備傳遞至另一台設備繼續執行。對於用戶而言,無需關注多台設備的組網,軟總線可以實現自發現、自組網。對於開發 者而言,也無需針對不同設備開發不同版本的軟件、適配不同的網絡協議和標准規 范。

2.2 分布式軟總線的原理

相較於傳統計算機中的硬總線,鴻蒙系統中的分布式軟總線是一條虛擬的、“無形”的總線。可以連接同處於一個局域網內部的所有鴻蒙設備(1+8+N,如下圖所示), 並且具有自發現、自組網、高帶寬和低時延等特點。

image-20210117200814365

除了連接處於同樣網絡協議中的硬件設備,軟總線技術還支持對不同協議的異構網絡進行組網。傳統場景下,需要藍牙傳輸的兩台設備必須都具有藍牙,需要 WiFi 傳輸的設備必須都具有 WiFi。而藍牙/WiFi 之間是無法進行數據通信的。軟總線提出 藍牙/WiFi 融合網絡組網技術(架構如下圖所示),解決了不同協議設備進行數據通 信的問題。使得多個鴻蒙設備能夠自動構建一個邏輯全連接網絡,用戶或者業務開發者無需關心組網方式與物理協議。

image-20210117200753531

傳統協議的傳輸速率差異較大,多設備交互式時延和可靠性也難以保證。軟總線傳輸提出三個目標:高帶寬、低時延、高可靠。相較於傳統網絡的 7 層模型,軟總 線提出了 4 層的“極簡協議”(如下圖所示),將中間的 4 層協議精簡為一層以提 升有效載荷,有效帶寬提升 20%。設備間基於 UDP 協議進行數據傳輸,摒棄傳統滑 動窗口機制,實現丟包快速回復,且具有智能網絡變化感知功能,可以自適應流量 控制和擁塞控制。

image-20210117200935822

2.3 分布式軟總線源碼結構分析

分布式軟中線代碼倉庫地址如下:

  • communication_interfaces_kits_softbuskit_lite

https://gitee.com/openharmony/communication_interfaces_kits_softbuskit_lite

  • communication_services_softbus_lite

https://gitee.com/openharmony/communication_services_softbus_lite

顧名思義,分別對應它的接口和實現;而communication_services_softbus_lite源碼結構中,又分為authmanager、discovery、trans_service、 和為兼容系統差別而生的os_adapter四大目錄。

  1. discover:提供基於 COAP 協議的設備發現機制;
  2. authmanager:提供設備認證機制和知識庫管理功能;
  3. trans_service:提供身份驗證和數據傳輸通道;
  4. os_adapter:檢測運行設備性能,決定部分功能是否執行。

image-20210117201256661

2.3.1 discover

作為鴻蒙 OS 分布式軟總線重要組成單元,discovery 單元提供了基於 coap(Constrained Application Protocol,受限應用協議,RFC7252)協議的設備發現機制。 為什么使用 coap 協議?是因為考慮到運行 harmonyOS 的設備除了硬件性能較好的手 機、電腦等設備,還有資源受限的物聯網設備,這些設備的 ram、rom 相對較小。coap 協議支持輕量的可靠傳輸,采用 coap 協議,可以擴大組網范圍。

discovery 的實現前提是確保發現端設備與接收端設備在同一個局域網內且能互 相收到對方的報文。流程為以下三步:

  1. 發現端設備,使用 coap 協議在局域網內發送廣播;
  2. 接收端設備使用 PublishService 接口發布服務,接收端收到廣播后,發送 coap 協議單播給發現端;
  3. 發現端設備收到回復單播報文,更新設備信息。

discovery 部分代碼由兩部分組成(目錄如下圖所示)。其中 coap 部分是 coap 協 議的封裝實現, discovery_service 是基於 coap 協議的設備間發現流程的實現。

image-20210117202121033

coap 目錄中:

  1. coap_def.h:定義 coap 協議包的格式、報文結構,且使用 UDP 協議傳輸;
  2. coap_adapter.c:實現 coap 協議的編碼、解碼函數;
  3. coap_socket.c:實現 coap 包的發現、接收服務;
  4. Coap_discovery.c:實現基於 coap 協議的設備發現功能。本文件定義了 socket通訊過程

discovery_service 目錄中:

  1. comman_info_manager.h:定義了鴻蒙系統當前支持的設備類型與級別;
  2. Discovery_service.c:實現了設備暴露、發現和連接流程。這里需要注意的 是,考慮到同一局域網下,主設備發出連接請求廣播后,多個物聯網設備都會回復 單播應答報文從而造成信道沖突。為避免此情況發生,每個物聯網設備均維護一套 信號量機制,實現多設備的有序等待。
2.3.2 authmanager

作為軟總線代碼執行流程中的第二部分:authmanager 單元提供了設備認證機制。設備通過加密和解密的方式,互相建立信任關系,確保在互聯場景下,用戶數據在 對的設備之間進行流轉,實現用戶數據的加密傳輸。軟總線中定義加密和解密算法 的內容在 trans_service/utils/aes_gcm.h 中,authmanager 中的處理流程如下圖所示:

image-20210117202542224

image-20210117202612222

authmanager 目錄中:

  1. auth_conn.c:提供發送、接收、認證和獲取密鑰的功能;
  2. auth_interface.c:管理會話、鏈接、密鑰節點,提供增刪改查功能;
  3. msg_get_deviceid.c:以 cJSON 格式獲取各個設備的信息,包括設備 id、鏈接信息、設備名、設備類型等;
  4. bus_manager.c:創建不同的 listen,用以監聽系統上有哪些 device 並創建新的 device 節點,以及節點數據的處理。bus_manager.c 主要由 discovery 單元調用, 通過判斷本文件中 flag 標志位決定是否啟動總線(start_bus()函數)或關閉當前總線 (stop_bus()函數)。discovery 調用后,bus_manager 執行流程如圖 10:
  5. wifi_auth_manager.c:實現了鏈接管理和數據接收功能。

image-20210117202828554

2.3.3 trans_service

經過第一階段協議確定、設備發現,第二階段設備鏈接,軟總線模塊執行到了第三階段:數據傳輸階段,即目錄中 trans_service 單元。trans_service 模塊依賴於 harmonyOS 提供的網絡 socket 服務,向認證模塊提供認證通道管理和認證數據的收 發;向業務模塊提供 session 管理和基於 session 的數據收發功能,並且通過 GCM 模 塊的加密功能提供收發報文的加密/解密保護。如下圖所示為 trans_service 模塊在系統架構中的位置:

image-20210117202924463

trans_service 目錄下源碼的結構及其功能如下:

image-20210117202941456

3、編譯

3.1 環境

image-20201231014441938

3.2 編譯過程

配置Makefile

image-20201231014938382

image-20201231015019799

執行編譯

image-20201231014802094

make install 后 softbus include目錄拷貝到/usr/local/softbus/include 整合依賴打包后的softbus_lite.so拷貝到/usr/local/softbus/lib/ 目錄下

image-20201231020137830

4、測試運行

4.1 准備

前面編譯生成softbus動態庫時需要-g 選項,可以在運行時輸出更多的信息

image-20210101221822038

涉及ipc相關以及部分線程參數設置需要sudo權限,故調試和執行時應在sudo權限下進行。

未在sudo權限下運行出錯:

image-20201231022228200

4.2 測試demo

#include <discovery_service.h>
#include <stdio.h>
#include <string.h>
#include <session.h>
#include <tcp_session_manager.h>
#include <nstackx.h>
#include <coap_discover.h>

// 定義業務自身的業務名稱,會話名稱及相關回調
const char *g_pkgName = "BUSINESS_NAME";
const char *g_sessionName = "SESSION_NAME";
struct ISessionListener * g_sessionCallback= NULL;
#define NAME_LENGTH 64
#define TRANS_FAILED -1
// 回調實現:接收對方通過SendBytes發送的數據,此示例實現是接收到對端發送的數據后回復固定消息
void OnBytesReceivedTest(int sessionId, const void* data, unsigned int dataLen)
{
    printf("OnBytesReceivedTest\n");
    printf("Recv Data: %s\n", (char *)data);
    printf("Recv Data dataLen: %d\n", dataLen);
    char *testSendData = "Hello World, Hello!";
    SendBytes(sessionId, testSendData, strlen(testSendData));
    return;
}
// 回調實現:用於處理會話關閉后的相關業務操作,如釋放當前會話相關的業務資源,會話無需業務主動釋放
void OnSessionClosedEventTest(int sessionId)
{
    printf("Close session successfully, sessionId=%d\n", sessionId);
}
// 回調實現:用於處理會話打開后的相關業務操作。返回值為0,表示接收;反之,非0表示拒絕。此示例表示只接受其他設備的同名會話連接
int OnSessionOpenedEventTest(int sessionId)
{
    char sessionNameBuffer[NAME_LENGTH+1];
    if(GetPeerSessionName(sessionId,sessionNameBuffer,NAME_LENGTH) == TRANS_FAILED) {
        printf("GetPeerSessionName faild, which sessionId = %d\n",sessionId);
        return -1;
    }
    if (strcmp(sessionNameBuffer,g_sessionName) != 0) {
        printf("Reject the session which name is different from mine, sessionId=%d\n", sessionId);
        return -1;
    }
    printf("Open session successfully, sessionId=%d\n", sessionId);
    return 0;
}
// 向SoftBus注冊業務會話服務及其回調
int StartSessionServer()
{
    if (g_sessionCallback == NULL) {
        g_sessionCallback = (struct ISessionListener*)malloc(sizeof(struct ISessionListener));
    }
    if (g_sessionCallback == NULL) {
        printf("Failed to malloc g_sessionCallback!\n");
        return -1;
    }
    g_sessionCallback->onBytesReceived = OnBytesReceivedTest;
    g_sessionCallback->onSessionOpened = OnSessionOpenedEventTest;
    g_sessionCallback->onSessionClosed = OnSessionClosedEventTest;
    int ret = CreateSessionServer(g_pkgName, g_sessionName, g_sessionCallback);
    if (ret < 0) {
        printf("Failed to create session server!\n");
        free(g_sessionCallback);
        g_sessionCallback = NULL;
    }
    return ret;
}
// 從SoftBus中刪除業務會話服務及其回調
void StopSessionServer(int x)
{
    int ret = RemoveSessionServer(g_pkgName, g_sessionName);
    if (ret < 0) {
        printf("Failed to remove session server!\n");
        return;
    }
    if (g_sessionCallback != NULL) {
        free(g_sessionCallback);
        g_sessionCallback = NULL;
    }
}

// 服務發布接口使用
void onSuccess(int publishId)
{
    printf("publish succeeded, publishId = %d\r\n", publishId);
    char ipbuff[NSTACKX_MAX_IP_STRING_LEN] = {"0.0.0.0"};
    CoapGetIp(ipbuff,NSTACKX_MAX_IP_STRING_LEN,0);
    printf("CoapGetIp = %s\n",ipbuff);
    if(StartSessionServer()!=-1)
        printf("StartSessionServer successed!\n");
}
void onFail(int publishId, PublishFailReason reason)
{
    printf("publish failed, publishId = %d, reason = %d\r\n", publishId, reason);
}


int main()
{
    PublishInfo info = {0};
    IPublishCallback cb = {0};
    cb.onPublishSuccess = onSuccess;
    cb.onPublishFail = onFail;
    char a[] = "01";
    info.capabilityData = a;
    info.capability = "ddmpCapability";
    info.dataLen = strlen(a);
    info.medium = 2;
    info.publishId = 1;
    PublishService("cxx", &info, &cb);
    sleep(100000);
}

4.3 編譯運行

編譯時需要鏈接所需庫,正確鏈接動態庫需要自行配置動態庫路徑 LD_LIBRARY_PATH

-lsoftbus_lite softbus源碼編譯生成的動態庫
-lrt mqueue相關
-lpthread 線程相關

image-20201231022159081

5、源碼分析

5.1 發現機制

用戶使用發現功能時,需要保證發現端設備與被發現端設備在同一個局域網內,並且互相能收到對方以下流程的報文。

(1) 發現端設備,發起discover請求后,使用coap協議在局域網內發送廣播。報文如下:

img

(2)被發現端設備使用PublishService接口發布服務,接收端收到廣播后,發送coap協議單播給發現端。報文格式如下:

img

(3)發現端設備收到報文會更新設備信息。

5.2 PublishService

  • SoftBusCheckPermission 權限檢查
  • SemCreate 信號量
  • SemWait 對應SemPost,即PV操作
  • InitService 初始化服務 [重點]
  • AddPublishModule 將PublishInfo結構體內容加入到g_publishModule全局數組
  • CoapRegisterDefaultService 注冊Coap服務
  • PublishCallback 回調Publish結果 對應測試demo里的onSuccess和onFail

image-20201231022723209

PublishService執行流程

image-20210101231211481

img

InitService執行流程

img

5.3 SoftbusCheckPermission

L0對應非linux版本,僅檢查permissionName

int SoftBusCheckPermission(const char* permissionName)
{
    if (permissionName == NULL) {
        return -1;
    }
    return 0;
}

L1對應linux版本,檢查permissionName后再去調用CheckPermission作進一步的檢查

int SoftBusCheckPermission(const char* permissionName)
{
    if (permissionName == NULL) {
        return -1;
    }

    if (CheckPermission(0, permissionName) != GRANTED) {
        SOFTBUS_PRINT("[SOFTBUS] CheckPermission fail\n");
        return -1;
    }
    return 0;
}

5.4 SemCreate

​ 考慮到同一局域網下,主設備發出連接請求廣播后,多個物聯網設備都會回復 單播應答報文從而造成信道沖突。為避免此情況發生,每個物聯網設備均維護一套 信號量機制,實現多設備的有序等待。

​ SemCreate()在LiteOS中使用了LOS_SemCreate()創建信號量,在Linux上用sem_init()這個Posix標准接口創建信號量。

int SemCreate(unsigned short count, unsigned long *semHandle)
{
    if (semHandle == NULL) {
        return -1;
    }

    (void)count;
    int ret = sem_init((sem_t *)semHandle, 1, 0);
    if (ret == 0) {
        return sem_post((sem_t *)semHandle);
    }
    return ret;
}

5.5 InitService

在InitService中

  1. 判斷是否已經初始化過了,如果是,則直接返回
  2. 調用InitCommonManager
  3. 為g_publishModule分配空間(保存所有發布服務的模塊的信息數組)
  4. 為g_capabilityData分配空間
  5. 注冊wificallback
  6. 調用CoapInit 初始化TCPIP協議棧
  7. 調用CoapRegisterDeviceInfo
  8. 調用BusManager 啟動軟總線

image-20210101235502334

InitCommonManager
int InitCommonManager(void)
{
  //調用InitLocalDeviceInfo
    if (InitLocalDeviceInfo() != 0) {
        SOFTBUS_PRINT("[DISCOVERY] InitCommonManager fail\n");
        return ERROR_FAIL;
    }
    return ERROR_SUCCESS;
}
int InitLocalDeviceInfo(void)
{
    char deviceId[DEVICEID_MAX_NUM] = {0};
//初始化g_deviceInfo
    if (g_deviceInfo != NULL) {
        memset_s(g_deviceInfo, sizeof(DeviceInfo), 0, sizeof(DeviceInfo));
    } else {
        g_deviceInfo = (DeviceInfo *)calloc(1, sizeof(DeviceInfo));
        if (g_deviceInfo == NULL) {
            return ERROR_FAIL;
        }
    }
 /*
 獲取IP
 在CoapGetIp中循環調用CoapGetWifiIp來從宏定義的eth或wlan設備中通過調用ioctl函數獲取ip地址
 */
#if defined(__LITEOS_A__) || defined(__LINUX__)
    CoapGetIp(g_deviceInfo->deviceIp, NSTACKX_MAX_IP_STRING_LEN, 1);
#endif
    g_deviceInfo->devicePort = -1;
    g_deviceInfo->isAccountTrusted = 1;

/*
獲取deviceID
通過函數GetDeviceIdFromFile()調用取得。這個函數會從"/storage/data/softbus/deviceid"文件中讀取,如果讀取不到,那么使用隨機數字符串組成deviceId,然后再寫入到上面的文件中。
*/
    unsigned int ret;
    ret = GetDeviceIdFromFile(deviceId, MAX_VALUE_SIZE);
    if (ret != ERROR_SUCCESS) {
        SOFTBUS_PRINT("[DISCOVERY] Get device fail\n");
        return ERROR_FAIL;
    }

  //給g_deviceInfo結構體賦值
#if defined(__LITEOS_M__) || defined(__LITEOS_RISCV__)
    g_deviceInfo->deviceType = L0;
    ret = (unsigned int)strcpy_s(g_deviceInfo->deviceName, sizeof(g_deviceInfo->deviceName), L0_DEVICE_NAME);
#else
    g_deviceInfo->deviceType = L1;
    ret = (unsigned int)strcpy_s(g_deviceInfo->deviceName, sizeof(g_deviceInfo->deviceName), L1_DEVICE_NAME);
#endif

    ret |= (unsigned int)strcpy_s(g_deviceInfo->deviceId, sizeof(g_deviceInfo->deviceId), deviceId);
    ret |= (unsigned int)strcpy_s(g_deviceInfo->version, sizeof(g_deviceInfo->version), "1.0.0");
    if (ret != 0) {
        return ERROR_FAIL;
    }

    SOFTBUS_PRINT("[DISCOVERY] InitLocalDeviceInfo ok\n");
    return ERROR_SUCCESS;
}
g_publishModule

這個全局變量保存所有發布服務的模塊的信息數組,定義如下

typedef struct {
    char package[MAX_PACKAGE_NAME];
    int publishId;
    unsigned short medium;
    unsigned short capabilityBitmap;
    char *capabilityData; //需要分配空間
    unsigned short dataLength;
    unsigned short used;
} PublishModule;
RegisterWifiCallback
void RegisterWifiCallback(WIFI_PROC_FUNC callback)
{
    g_wifiCallback = callback;
}
CoapInit

COAP初始化,注冊TCP/IP協議棧的處理,注冊session的底層socket的處理【重點】

int CoapInit(void)
{
  //調用了NSTACKX_Init 初始化TCPIP協議棧
    int ret = NSTACKX_Init();
    if (ret != 0) {
        SOFTBUS_PRINT("[DISCOVERY] CoapInit NSTACKX_Init fail\n");
        return ERROR_FAIL;
    }
    return ERROR_SUCCESS;
}

查看NSTACKX_Init的代碼

int NSTACKX_Init()
{
    int ret;
  //判斷g_nstackInitState是否為NSTACKX_INIT_STATE_START
    if (g_nstackInitState != NSTACKX_INIT_STATE_START) {
        return NSTACKX_EOK;
    }
	//將g_nstackInitState置為ONGOING狀態
    g_nstackInitState = NSTACKX_INIT_STATE_ONGOING;
    cJSON_InitHooks(NULL);

  //調用CoapInitDiscovery
    ret = CoapInitDiscovery();
    if (ret != NSTACKX_EOK) {
        goto L_ERR_INIT;
    }
    g_nstackInitState = NSTACKX_INIT_STATE_DONE;
    return NSTACKX_EOK;

L_ERR_INIT:
    ret = NSTACKX_Deinit();
    if (ret != NSTACKX_EOK) {
        SOFTBUS_PRINT("[DISCOVERY] deinit fail\n");
    }
    return NSTACKX_EFAILED;
}

繼續查看CoapInitDiscovery的代碼

int CoapInitDiscovery(void)
{
  //調用CoapInitSocket初始化Socket
    int ret = CoapInitSocket();
    if (ret != NSTACKX_EOK) {
        SOFTBUS_PRINT("[DISCOVERY] Init socket fail\n");
        return ret;
    }
#if defined(__LITEOS_M__) || defined(__LITEOS_RISCV__)
    int rtn = CoapInitWifiEvent();
    if (rtn != NSTACKX_EOK) {
        SOFTBUS_PRINT("[DISCOVERY] Init wifi event fail\n");
        return rtn;
    }
#endif
  //調用CreateCoapListenThread 創建監聽線程
    return CreateCoapListenThread();
}

查看CoapInitSocket

int CoapInitSocket(void)
{
  //判斷是否已經初始化過g_serverFd了
    if (g_serverFd >= 0) {
        return NSTACKX_EOK;
    }
  //初始化sockaddr_in
    struct sockaddr_in sockAddr;
    (void)memset_s(&sockAddr, sizeof(sockAddr), 0, sizeof(sockAddr));
    sockAddr.sin_port = htons(COAP_DEFAULT_PORT);
  //調用CoapCreateUdpServer來創建UDP socket並bind,COAP_DEFAULT_PORT(5684)端口,返回sockFd
    g_serverFd = CoapCreateUdpServer(&sockAddr);
    if (g_serverFd < 0) {
        return NSTACKX_OVERFLOW;
    }
  
  //初始化g_msgId
    COAP_SoftBusInitMsgId();
    return NSTACKX_EOK;
}

查看CreateCoapListenThread

int CreateCoapListenThread(void)
{
    g_terminalFlag = 1;
    if (g_coapTaskId != -1) {
        return NSTACKX_EOK;
    }

  //設置線程相關參數
  /*
struct ThreadAttr {
    const char *name;
    uint32_t stackSize;
    uint8_t priority;
    uint8_t reserved1;
    uint16_t reserved2;
};
*/
    ThreadAttr attr = {"coap_listen_task", 0x800, 20, 0, 0};
  
  //創建線程,線程中執行CoapReadHandle
    int error = CreateThread((Runnable)CoapReadHandle, NULL, &attr, (unsigned int*)&g_coapTaskId);
    if (error != 0) {
        g_terminalFlag = 0;
        SOFTBUS_PRINT("[DISCOVERY] create task fail\n");
        return NSTACKX_EFAILED;
    }
  
    return NSTACKX_EOK;
}

查看CreateThread

int CreateThread(Runnable run, void *argv, const ThreadAttr *attr, unsigned int *threadId)
{
    pthread_attr_t threadAttr; //線程屬性
    pthread_attr_init(&threadAttr);//init
    pthread_attr_setstacksize(&threadAttr, (attr->stackSize | MIN_STACK_SIZE));
    struct sched_param sched = {attr->priority};//線程優先級
    pthread_attr_setschedparam(&threadAttr, &sched); //設置線程優先級
    int errCode = pthread_create((pthread_t *)threadId, &threadAttr, run, argv); //創建線程,run為傳入的CoapReadHandle函數指針
    return errCode;
}

查看CoapReadHandle

static void CoapReadHandle(unsigned int uwParam1, unsigned int uwParam2, unsigned int uwParam3, unsigned int uwParam4)
{
    (void)uwParam1;
    (void)uwParam2;
    (void)uwParam3;
    (void)uwParam4;
    int ret;
    fd_set readSet;
    int serverFd = GetCoapServerSocket();//獲取serverFd
    SOFTBUS_PRINT("[DISCOVERY] CoapReadHandle coin select begin\n");
  
  /*
  使用了io多路復用的select, 對於io多路復用還有改進版的poll和epoll
  */
    while (g_terminalFlag) {
      //select維護了一個bitset,每輪循環要先都設置為0
        FD_ZERO(&readSet);
      //將serverFd加入select監聽集合中
        FD_SET(serverFd, &readSet);
      //select函數 成功時返回事件的個數 
        ret = select(serverFd + 1, &readSet, NULL, NULL, NULL);//timeval設置為Null時會阻塞等待,直到有描述符准備好IO后才返回
        if (ret > 0) {
            if (FD_ISSET(serverFd, &readSet)) { //判斷serverFd是否可讀
                HandleReadEvent(serverFd);//處理IO事件
            }
        } else {
            SOFTBUS_PRINT("[DISCOVERY]ret:%d,error:%d\n", ret, errno);
        }
    }
    SOFTBUS_PRINT("[DISCOVERY] CoapReadHandle exit\n");
}

繼續查看HandleReadEvent

static void HandleReadEvent(int fd)
{
    int socketFd = fd;
    unsigned char *recvBuffer = calloc(1, COAP_MAX_PDU_SIZE + 1);
    if (recvBuffer == NULL) {
        return;
    }
    ssize_t nRead;
  //調用CoapSocketRecv來接收數據,讀入recvBuffer中
    nRead = CoapSocketRecv(socketFd, recvBuffer, COAP_MAX_PDU_SIZE);
    if ((nRead == 0) || (nRead < 0 && errno != EAGAIN &&
        errno != EWOULDBLOCK && errno != EINTR)) {
        free(recvBuffer);
        return;
    }
  
    COAP_Packet decodePacket;
    (void)memset_s(&decodePacket, sizeof(COAP_Packet), 0, sizeof(COAP_Packet));
    decodePacket.protocol = COAP_UDP;
  //調用COAP_SoftBusDecode()函數對COAP協議進行解析,解析的內容放在decodePacket結構中
    COAP_SoftBusDecode(&decodePacket, recvBuffer, nRead);
  //最后調用PostServiceDiscover()發現端設備發送的DISCOVER消息進行回應
    PostServiceDiscover(&decodePacket);
    free(recvBuffer);
}

繼續查看PostServiceDiscover

void PostServiceDiscover(COAP_Packet *pkt)
{
    char *remoteUrl = NULL;
    DeviceInfo deviceInfo;

    if (pkt == NULL) {
        return;
    }
 //獲取deviceInfo
    (void)memset_s(&deviceInfo, sizeof(deviceInfo), 0, sizeof(deviceInfo));
    if (GetServiceDiscoverInfo(pkt->payload.buffer, pkt->payload.len, &deviceInfo, &remoteUrl) != NSTACKX_EOK) {
        return;
    }
 //獲取wifiIpAddr
    char wifiIpAddr[NSTACKX_MAX_IP_STRING_LEN];
    (void)memset_s(wifiIpAddr, sizeof(wifiIpAddr), 0, sizeof(wifiIpAddr));
    (void)inet_ntop(AF_INET, &deviceInfo.netChannelInfo.wifiApInfo.ip, wifiIpAddr, sizeof(wifiIpAddr));

    if (remoteUrl != NULL) {
      //調用CoapResponseService
        CoapResponseService(pkt, remoteUrl, wifiIpAddr);
        free(remoteUrl);
    }
}

繼續查看CoapResponseService

static int CoapResponseService(const COAP_Packet *pkt, const char* remoteUrl, const char* remoteIp)
{
    int ret;
    CoapRequest coapRequest;
    (void)memset_s(&coapRequest, sizeof(coapRequest), 0, sizeof(coapRequest));
    coapRequest.remoteUrl = remoteUrl;
    coapRequest.remoteIp = remoteIp;
    char *payload = PrepareServiceDiscover();
    if (payload == NULL) {
        return NSTACKX_EFAILED;
    }

    COAP_ReadWriteBuffer sndPktBuff = {0};
    sndPktBuff.readWriteBuf = calloc(1, COAP_MAX_PDU_SIZE);
    if (sndPktBuff.readWriteBuf == NULL) {
        free(payload);
        return NSTACKX_EFAILED;
    }
    sndPktBuff.size = COAP_MAX_PDU_SIZE;
    sndPktBuff.len = 0;

    ret = BuildSendPkt(pkt, remoteIp, payload, &sndPktBuff);
    free(payload);
    if (ret != DISCOVERY_ERR_SUCCESS) {
        free(sndPktBuff.readWriteBuf);
        sndPktBuff.readWriteBuf = NULL;
        return ret;
    }
    coapRequest.data = sndPktBuff.readWriteBuf;
    coapRequest.dataLength = sndPktBuff.len;
  
  //在這之前是發送前准備,調用CoapSendRequest
    ret = CoapSendRequest(&coapRequest);
    free(sndPktBuff.readWriteBuf);
    sndPktBuff.readWriteBuf = NULL;

    return ret;
}

繼續查看CoapSendRequest

static int CoapSendRequest(const CoapRequest *coapRequest)
{
    if (coapRequest == NULL || coapRequest->remoteUrl == NULL) {
        return NSTACKX_EFAILED;
    }

    struct sockaddr_in sockAddr = {0};
    if (coapRequest->remoteIp == NULL) {
        return NSTACKX_EFAILED;
    }

    sockAddr.sin_addr.s_addr = inet_addr(coapRequest->remoteIp);
    sockAddr.sin_port = htons(COAP_DEFAULT_PORT);
    sockAddr.sin_family = AF_INET;

    int ret = CoapCreatUdpClient(&sockAddr);
    if (ret != NSTACKX_EOK) {
        return NSTACKX_EFAILED;
    }
    SocketInfo socket = {0};
    socket.cliendFd = GetCoapClientSocket();
    socket.dstAddr = sockAddr;
  //定位到CoapSocketSend CoapSocketSend最終調用sendto進行數據發送
    if (CoapSocketSend(&socket, (uint8_t *)coapRequest->data, coapRequest->dataLength) == -1) {
        SOFTBUS_PRINT("[DISCOVERY]reponse coap failed.\r\n");
        return NSTACKX_EFAILED;
    }
    return NSTACKX_EOK;
}
CoapRegisterDeviceInfo

注冊設備信息

int CoapRegisterDeviceInfo(void)
{
    NSTACKX_LocalDeviceInfo localDeviceInfo;
    int ret;

    (void)memset_s(&localDeviceInfo, sizeof(NSTACKX_LocalDeviceInfo), 0, sizeof(NSTACKX_LocalDeviceInfo));
  //獲取localDeviceInfo
  /*
  typedef struct {
    char name[NSTACKX_MAX_DEVICE_NAME_LEN];
    char deviceId[NSTACKX_MAX_DEVICE_ID_LEN];
    char btMacAddr[NSTACKX_MAX_MAC_STRING_LEN];
    char wifiMacAddr[NSTACKX_MAX_MAC_STRING_LEN];
    char networkIpAddr[NSTACKX_MAX_IP_STRING_LEN];
    char networkName[NSTACKX_MAX_INTERFACE_NAME_LEN];
    uint8_t is5GHzBandSupported;
    int deviceType;
    char version[NSTACKX_MAX_HICOM_VERSION];
} NSTACKX_LocalDeviceInfo;
  */
    ret = CoapGetLocalDeviceInfo(&localDeviceInfo);
    if (ret != 0) {
        return ERROR_FAIL;
    }

  //注冊設備信息
    ret = NSTACKX_RegisterDeviceAn(&localDeviceInfo, DEV_HASH_ID);
    if (ret != 0) {
        SOFTBUS_PRINT("[DISCOVERY] CoapRegisterDeviceInfo RegisterDeviceAn fail\n");
        return ERROR_FAIL;
    }

    return ERROR_SUCCESS;
}
BusManage
int BusManager(unsigned int startFlag)
{
    if (startFlag == 1) {
        return StartBus();
    } else {
        return StopBus();
    }
}

繼續查看StartBus

int StartBus(void)
{
    if (g_busStartFlag == 1) {
        return 0;
    }
    DeviceInfo *info = GetCommonDeviceInfo();
    if (info == NULL) {
        return ERROR_FAIL;
    }
	//OnConnectEvent()函數中完成對新連接的處理, OnDataEvent()函數中完成對新數據的處理。
    g_baseLister.onConnectEvent = OnConnectEvent;
    g_baseLister.onDataEvent = OnDataEvent;
  
  //StartListener()函數負責為認證模塊提供通道完成初始化
    int authPort = StartListener(&g_baseLister, info->deviceIp);
    if (authPort < 0) {
        SOFTBUS_PRINT("[AUTH] StartBus StartListener fail\n");
        return ERROR_FAIL;
    }
    info->devicePort = authPort;

  //StartSession()函數負責初始化業務的session管理
    int sessionPort = StartSession(info->deviceIp);
    if (sessionPort < 0) {
        SOFTBUS_PRINT("[AUTH] StartBus StartSession fail\n");
        StopListener();
        return ERROR_FAIL;
    }

    AuthMngInit(authPort, sessionPort);
    g_busStartFlag = 1;

    SOFTBUS_PRINT("[AUTH] StartBus ok\n");
    return 0;
}

繼續查看StartListener

int StartListener(BaseListener *callback, const char *ip)
{
    if (callback == NULL || ip == NULL) {
        return -DBE_BAD_PARAM;
    }

    g_callback = callback;

    //StartListener()調用InitListenFd()函數完成監聽TCP socket的創建和監聽
    int rc = InitListenFd(ip, SESSIONPORT);
    if (rc != DBE_SUCCESS) {
        return -DBE_BAD_PARAM;
    }

    signal(SIGPIPE, SIG_IGN);
    ThreadAttr attr = {"auth", 0x800, 20, 0, 0};
  
  //Linux下 AuthCreate()會調用POSIX的pthread_create()完成線程的創建,線程的入口函數為static void WaitProcess(void)
    register ThreadId threadId = (ThreadId)AuthCreate((Runnable)WaitProcess, &attr);
    if (threadId == NULL) {
        SOFTBUS_PRINT("[TRANS] StartListener AuthCreate fail\n");
        return -1;
    }
    return GetSockPort(g_listenFd);
}

繼續查看WaitProcess

static void WaitProcess(void)
{
    SOFTBUS_PRINT("[TRANS] WaitProcess begin\n");
    fd_set readSet;
    fd_set exceptfds;

    while (1) {
        //與CoapInit中CoapReadHandle類似,同樣使用io多路復用的select來實現對io事件的處理
        FD_ZERO(&readSet);
        FD_ZERO(&exceptfds);
        FD_SET(g_listenFd, &readSet);
        if (g_dataFd >= 0) {
            FD_SET(g_dataFd, &readSet);
            FD_SET(g_dataFd, &exceptfds);
        }
        int ret = select(g_maxFd + 1, &readSet, NULL, &exceptfds, NULL);
        if (ret > 0) {
          /*WaitProcess()使用忙等方式,調用select()來監聽listenFd和數據g_dataFd的信息,如果監聽到有數據可讀,則進入ProcessAuthData來處理。
          無論是新連接請求,還是已有連接中有數據到來,均會進入ProcessAuthData
          函數通過FD_ISSET()判斷是否是listenFd上存在消息,是的話說明有新連接,則調用onConnectEvent來處理新到來的連接請求,並將新創建的fd和client的IP地址告知認證模塊。與此同時,創建g_dataFd時候需要刷新g_maxFd,以保證在WaitProcess()中的下一次select()操作時中,會監聽到g_dataFd上的事件
          如果FD_ISSET()判斷出g_dataFd上存在消息,則說明已完成握手的連接向本節點發送了數據,這時函數回調onDataEvent,以處理接收到的數據
          */
            if (!ProcessAuthData(g_listenFd, &readSet)) {
                SOFTBUS_PRINT("[TRANS] WaitProcess ProcessAuthData fail\n");
                StopListener();
                break;
            }
        } else if (ret < 0) {
          //如果發現g_dataFd有異常信息,則將其關閉。其中g_dataFd是由listenFd監聽到連接時創建的socket
            if (errno == EINTR || (g_dataFd > 0 && FD_ISSET(g_dataFd, &exceptfds))) {
                SOFTBUS_PRINT("[TRANS] errno == EINTR or g_dataFd is in exceptfds set.\n");
                CloseAuthSessionFd(g_dataFd);
                continue;
            }
            SOFTBUS_PRINT("[TRANS] WaitProcess select fail, stop listener\n");
            StopListener();
            break;
        }
    }
}

查看StartSession

int StartSession(const char *ip)
{
    int port = CreateTcpSessionMgr(true, ip);
    return port;
}

繼續查看CreateTcpSessionMgr

int CreateTcpSessionMgr(bool asServer, const char* localIp)
{
    if (g_sessionMgr != NULL || localIp == NULL) {
        return TRANS_FAILED;
    }
  //初始化g_sessionMgr
    g_sessionMgr = malloc(sizeof(TcpSessionMgr));
    if (g_sessionMgr == NULL) {
        return TRANS_FAILED;
    }
    (void)memset_s(g_sessionMgr, sizeof(TcpSessionMgr), 0, sizeof(TcpSessionMgr));
    g_sessionMgr->asServer = asServer;
    g_sessionMgr->listenFd = -1;
    g_sessionMgr->isSelectLoopRunning = false;

    if (InitTcpMgrLock() != 0 || GetTcpMgrLock() != 0) {
        FreeSessionMgr();
        return TRANS_FAILED;
    }

    for (int i = 0; i < MAX_SESSION_SUM_NUM; i++) {
        g_sessionMgr->sessionMap_[i] = NULL;
    }

    for (int i = 0; i < MAX_SESSION_SERVER_NUM; i++) {
        g_sessionMgr->serverListenerMap[i] = NULL;
    }

    if (ReleaseTcpMgrLock() != 0) {
        FreeSessionMgr();
        return TRANS_FAILED;
    }
	//創建OpenTcpServer完成了socket的創建和bind,返回listenFd
    int listenFd = OpenTcpServer(localIp, DEFAULT_TRANS_PORT);
    if (listenFd < 0) {
        SOFTBUS_PRINT("[TRANS] CreateTcpSessionMgr OpenTcpServer fail\n");
        FreeSessionMgr();
        return TRANS_FAILED;
    }
  //listen,返回sessionId
    int rc = listen(listenFd, LISTEN_BACKLOG);
    if (rc != 0) {
        SOFTBUS_PRINT("[TRANS] CreateTcpSessionMgr listen fail\n");
        CloseSession(listenFd);
        FreeSessionMgr();
        return TRANS_FAILED;
    }
    g_sessionMgr->listenFd = listenFd;

    signal(SIGPIPE, SIG_IGN);
  //StartSelectLoop
    if (StartSelectLoop(g_sessionMgr) != 0) {
        SOFTBUS_PRINT("[TRANS] CreateTcpSessionMgr StartSelectLoop fail\n");
        CloseSession(listenFd);
        FreeSessionMgr();
        return TRANS_FAILED;
    }
    return GetSockPort(listenFd);
}

繼續查看StartSelectLoop

int StartSelectLoop(TcpSessionMgr *tsm)
{
    if (tsm == NULL) {
        return TRANS_FAILED;
    }
    if (tsm->isSelectLoopRunning) {
        return 0;
    }
    ThreadAttr attr = {"tcp", 0x800, 20, 0, 0};
    register ThreadId threadId = (ThreadId)TcpCreate((Runnable)SelectSessionLoop, tsm, &attr);
    if (threadId == NULL) {
        return TRANS_FAILED;
    }
    tsm->isSelectLoopRunning = true;
    return 0;
}

//同樣是用select實現
static void SelectSessionLoop(TcpSessionMgr *tsm)
{
    if (tsm == NULL) {
        return;
    }
    SOFTBUS_PRINT("[TRANS] SelectSessionLoop begin\n");
    tsm->isSelectLoopRunning = true;
    while (true) {
        fd_set readfds;
        fd_set exceptfds;
        int maxFd = InitSelectList(tsm, &readfds, &exceptfds);
        if (maxFd < 0) {
            break;
        }

        errno = 0;
        int ret = select(maxFd + 1, &readfds, NULL, &exceptfds, NULL);
        if (ret < 0) {
            SOFTBUS_PRINT("RemoveExceptSessionFd\r\n");
            if (errno == EINTR || RemoveExceptSessionFd(tsm, &exceptfds) == 0) {
                continue;
            }
            SOFTBUS_PRINT("[TRANS] SelectSessionLoop close all Session\n");
            CloseAllSession(tsm);
            break;
        } else if (ret == 0) {
            continue;
        } else {
          //對事件的處理
            ProcessData(tsm, &readfds);
        }
    }
    tsm->isSelectLoopRunning = false;
}

5.6 AddPublishModule

AddPublishModule()函數,將把moduleName和info(PublishInfo結構)中的內容加入到g_publishModule全局數組中

PublishModule *AddPublishModule(const char *packageName, const PublishInfo *info)
{
    if (packageName == NULL || g_publishModule == NULL || info == NULL) {
        return NULL;
    }

    if (info->dataLen > MAX_SERVICE_DATA_LEN) {
        return NULL;
    }

    if (FindExistModule(packageName, info->publishId) != NULL) {
        return NULL;
    }

    if (FindFreeModule() == NULL) {
        return NULL;
    }
    int ret;
    for (int i = 0; i < MAX_MODULE_COUNT; i++) {
        if (g_publishModule[i].used == 1) {
            continue;
        }

        if (ParseCapability(info->capability, &g_publishModule[i].capabilityBitmap)) {
            return NULL;
        }

        g_publishModule[i].used = 1;
        g_publishModule[i].capabilityData = calloc(1, info->dataLen + 1);
        if (g_publishModule[i].capabilityData == NULL) {
            memset_s(&g_publishModule[i], sizeof(g_publishModule[i]), 0, sizeof(g_publishModule[i]));
            return NULL;
        }
        g_publishModule[i].dataLength = info->dataLen + 1;
        ret = memcpy_s(g_publishModule[i].capabilityData,
                       g_publishModule[i].dataLength,
                       info->capabilityData, info->dataLen);
        if (ret != 0) {
            free(g_publishModule[i].capabilityData);
            g_publishModule[i].capabilityData = NULL;
            memset_s(&g_publishModule[i], sizeof(g_publishModule[i]), 0, sizeof(g_publishModule[i]));
            return NULL;
        }
        g_publishModule[i].medium = info->medium;
        g_publishModule[i].publishId = info->publishId;
        ret = memcpy_s(g_publishModule[i].package, MAX_PACKAGE_NAME, packageName, strlen(packageName));
        if (ret != 0) {
            free(g_publishModule[i].capabilityData);
            g_publishModule[i].capabilityData = NULL;
            memset_s(&g_publishModule[i], sizeof(g_publishModule[i]), 0, sizeof(g_publishModule[i]));
            return NULL;
        }
        return &g_publishModule[i];
    }
    return NULL;
}

5.7 CoapRegisterDefaultService

int CoapRegisterDefualtService(void)
{
    DeviceInfo *info = GetCommonDeviceInfo();
    if (info == NULL) {
        return ERROR_FAIL;
    }

    char serviceData[MAX_DEFAULT_SERVICE_DATA_LEN] = {0};
 // 代碼中的 info->devicePort 就是基於TCP的認證服務的socket綁定的端口號(在StartBus()函數中賦值的)。而serviceData就是 “port:%d”的子串
    if (sprintf_s(serviceData, sizeof(serviceData), "port:%d", info->devicePort) == -1) {
        return ERROR_FAIL;
    }

    return NSTACKX_RegisterServiceData(serviceData);
}

//把g_localDeviceInfo.serverData賦值成 “port:auth_port”這樣的子串
int NSTACKX_RegisterServiceData(const char* serviceData)
{
    if (serviceData == NULL) {
        return NSTACKX_EINVAL;
    }

    if (g_nstackInitState != NSTACKX_INIT_STATE_DONE) {
        return NSTACKX_EFAILED;
    }
    unsigned int serviceLen = strlen(serviceData);
    if (serviceLen >= NSTACKX_MAX_SERVICE_DATA_LEN) {
        return NSTACKX_EINVAL;
    }

    if (RegisterServiceData(serviceData, serviceLen + 1) != NSTACKX_EOK) {
        return NSTACKX_EINVAL;
    }
    return NSTACKX_EOK;
}

int RegisterServiceData(const char* serviceData, int length)
{
    if (serviceData == NULL) {
        return NSTACKX_EINVAL;
    }

    (void)memset_s(g_localDeviceInfo.serviceData, sizeof(g_localDeviceInfo.serviceData),
        0, sizeof(g_localDeviceInfo.serviceData));
    if (strcpy_s(g_localDeviceInfo.serviceData, NSTACKX_MAX_SERVICE_DATA_LEN, serviceData) != EOK)  {
        return NSTACKX_EFAILED;
    }

    (void)length;
    return NSTACKX_EOK;
}

6、編譯和調試過程中遇到的問題

6.1 uint16_t unknown type

image-20210102025115778

解決:

在tcp_socket.h中加上

typedef unsigned short uint16_t;

或者用第三方musl-gcc編譯

6.2 CoapGetIp獲取IP地址失敗的問題

Coap底層調用ioctl獲取ip地址

解決:

ipconfig查看網絡設備和地址

image-20210102032821776

discovery/coap/source/coap_discover.c中修改宏定義

image-20210102033008754

6.3 sem_init 在linux semaphore實現中越界讀寫的問題

在調試過程中發現在linux下會有異常修改lisenFD的現象。原因在於在discovery_service.c中將g_serviceSemID定義為一個unsigned long類型,在后面強制類型轉換為sem_t,對應sem_t的結構體。在linux下的posix semaphore實現的信號量中訪問了越界的地址區域,導致了listenFd被異常修改。

代碼的g_serviceSemId定義 后面被強制類型轉換為信號量sem_t

image-20210101233700628

linux下的posix標准semaphore中關於信號量sem_t的定義

image-20210101234601883

由於listenFd被異常修改導致InitService BusManager出錯

image-20210102021742794

gdb調試

image-20210102021955948

解決方法是為g_serviceSemId開辟與sem_t同樣大小的靜態變量空間

static unsigned long g_serviceSemId[sizeof(sem_t)/sizeof(long)]= {INVALID_SEM_ID};

6.4 在linux下多線程創建中遇到的問題

代碼路徑trans_service/source/libdistbus/tcp_session_manager.c

//源代碼實現
ThreadId TcpCreate(Runnable run, void *argv, const ThreadAttr *attr)
{
    if (attr == NULL) {
        return NULL;
    }
    int ret;
    pthread_attr_t threadAttr;

    ret = pthread_attr_init(&threadAttr);
    if (ret != 0) {
        return NULL;
    }
    ret = pthread_attr_setstacksize(&threadAttr, (attr->stackSize | MIN_STACK_SIZE));
    if (ret != 0) {
        return NULL;
    }

    struct sched_param sched = {attr->priority};

    ret = pthread_attr_setschedparam(&threadAttr, &sched); //
    if (ret != 0) {
        return NULL;
    }
  
    pthread_t threadId = 0;
    ret = pthread_create(&threadId, &threadAttr, run, argv);
    if (ret != 0) {
        return NULL;
    }
    if (attr->name != NULL) {
        ret = pthread_setname_np(threadId, attr->name);
        if (ret != 0) {
            SOFTBUS_PRINT("[TRANS] TcpCreate setname fail\n");
        }
    }

    return (ThreadId)threadId;
}

執行出錯

image-20210102023408441

gdb調試,是位於TcpCreate/pthread_attr_setschedparam 設置線程優先級時失敗導致錯誤

image-20210102023553822

查看函數棧

image-20210102023801590

解決方法

//在設置優先級前
//必需設置inher的屬性為 PTHREAD_EXPLICIT_SCHED,否則設置線程的優先級會被忽略  
ret = pthread_attr_setinheritsched(&threadAttr,PTHREAD_EXPLICIT_SCHED);
if (ret != 0) {
  return NULL;
}
//設置線程調度策略  
/*
linux內核的三種調度方法:
1,SCHED_OTHER 分時調度策略,
2,SCHED_FIFO實時調度策略,先到先服務
3,SCHED_RR實時調度策略,時間片輪轉
*/
ret = pthread_attr_setschedpolicy(&threadAttr,SCHED_RR); //需要在sudo權限下
if (ret != 0) {
  return NULL;
}
//設置線程優先級
struct sched_param sched = {attr->priority};
ret = pthread_attr_setschedparam(&threadAttr, &sched);
if (ret != 0) {
  return NULL;
}

或者采用third_party/musl的第三方線程庫進行源碼編譯 用musl-gcc編譯

7、參考

軟總線調研報告-朱浩-SA20225646

編譯構建子系統README

分布式通信子系統README

鴻蒙子系統解讀-分布式軟總線子系統初步研究


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM