摘要:鴻蒙輕內核的任務排序鏈表,用於任務延遲到期/超時喚醒等業務場景,是一個非常重要、非常基礎的數據結構。
本文會繼續給讀者介紹鴻蒙輕內核源碼中重要的數據結構:任務排序鏈表TaskSortLinkAttr
。鴻蒙輕內核的任務排序鏈表,用於任務延遲到期/超時喚醒等業務場景,是一個非常重要、非常基礎的數據結構。本文中所涉及的源碼,以OpenHarmony LiteOS-M
內核為例,均可以在開源站點https://gitee.com/openharmony/kernel_liteos_m 獲取。
1 任務排序鏈表
我們先看下任務排序鏈接的數據結構。任務排序鏈表是一個環狀的雙向鏈表數組,任務排序鏈表屬性結構體TaskSortLinkAttr
作為雙向鏈表的頭結點,指向雙向鏈表數組的第一個元素,還維護游標信息,記錄當前的位置信息。我們先看下排序鏈表屬性的結構體的定義。
1.1 任務排序鏈表屬性結構體定義
在kernel\include\los_task.h
頭文件中定義了排序鏈表屬性的結構體TaskSortLinkAttr
。該結構體定義了排序鏈表的頭節點LOS_DL_LIST *sortLink
,游標UINT16 cursor
,還有一個保留字段,暫時沒有使用。
源碼如下:
typedef struct { LOS_DL_LIST *sortLink; UINT16 cursor; UINT16 reserved; } TaskSortLinkAttr;
在文件kernel\src\los_task.c
中定義了排序鏈表屬性結構體TaskSortLinkAttr
類型的全局變量g_taskSortLink
,該全局變量的成員變量sortLink
作為排序鏈表的頭結點,指向一個長度為32的環狀的雙向鏈表數組,成員變量cursor
作為游標記錄環狀數組的當前游標位置。源代碼如下。
LITE_OS_SEC_BSS TaskSortLinkAttr g_taskSortLink;
我們使用示意圖來講述一下。任務排序鏈表是環狀雙向鏈表數組,長度為32,每一個元素是一個雙向鏈表,掛載任務LosTaskCB
的鏈表節點timerList
。任務LosTaskCB
的成員變量idxRollNum
記錄數組的索引和滾動數。全局變量g_taskSortLink
的成員變量cursor
記錄當前游標位置,每過一個Tick
,游標指向下一個位置,轉一輪需要32 ticks
。當運行到的數組位置,雙向鏈表不為空,則把第一個節點維護的滾動數減1。這樣的數據結構類似鍾表表盤,也稱為時間輪
。
我們舉個例子來說明,基於時間輪實現的任務排序鏈表是如何管理任務延遲超時的。假如當前游標cursor
為1,當一個任務需要延時72 ticks
,72=2*32+8,表示排序索引sortIndex
為8,滾動數rollNum
為2。會把任務插入數組索引為sortIndex
+cursor
=9的雙向鏈表位置,索要9處的雙向鏈表維護節點的滾動為2。隨着Tick
時間的進行,從當前游標位置運行到數組索引位置9,歷時8 ticks
。運行到9時,如果滾動數大於0,則把滾動數減1。等再運行2輪,共需要72 ticks
,任務就會延遲到期,可以從排序鏈表移除。每個數組元素對應的雙向鏈表的第一個鏈表節點的滾動數表示需要轉多少輪,節點任務才到期。第二個鏈表節點的滾動數需要加上第一個節點的滾動數,表示第二個節點需要轉的輪數。依次類推。
示意圖如下:
1.2 任務排序鏈表宏定義
在OS_TSK_SORTLINK_LEN
頭文件中定義了一些和任務排序鏈表相關的宏定義。延遲任務雙向鏈表數組的長度定義為32,高階bit
位位數為5,低階bit
位位數為27。對於任務的超時時間,取其高27位作為滾動數,低5位作為數組索引。
源碼如下:
/** * 延遲任務雙向鏈表數組的數量(桶的數量):32 */ #define OS_TSK_SORTLINK_LEN 32 /** * 高階bit位數目:5 */ #define OS_TSK_HIGH_BITS 5U /** * 低階bit位數目:27 */ #define OS_TSK_LOW_BITS (32U - OS_TSK_HIGH_BITS) /** * 滾動數最大值:0xFFFF FFDF,1111 0111 1111 1111 1111 1111 1101 1111 */ #define OS_TSK_MAX_ROLLNUM (0xFFFFFFFFU - OS_TSK_SORTLINK_LEN) /** * 任務延遲時間數的位寬:5 */ #define OS_TSK_SORTLINK_LOGLEN 5 /** * 延遲任務的桶編號的掩碼:31、0001 1111 */ #define OS_TSK_SORTLINK_MASK (OS_TSK_SORTLINK_LEN - 1U) /** * 滾動數的高階掩碼:1111 1000 0000 0000 0000 0000 0000 0000 */ #define OS_TSK_HIGH_BITS_MASK (OS_TSK_SORTLINK_MASK << OS_TSK_LOW_BITS) /** * 滾動數的低階掩碼:0000 0111 1111 1111 1111 1111 1111 1111 */ #define OS_TSK_LOW_BITS_MASK (~OS_TSK_HIGH_BITS_MASK)
2 任務排序鏈表操作
我們分析下任務排序鏈表的操作,包含初始化,插入,刪除,滾動數更新,獲取下一個到期時間等。
2.1 初始化排序鏈表
在系系統內核初始化啟動階段,在函數UINT32 OsTaskInit(VOID)
中初始化任務排序鏈表。該函數的調用關系如下,main.c:main() --> kernel\src\los_init.c:LOS_KernelInit() --> kernel\src\los_task.c:OsTaskInit()
。
初始化排序鏈表函數的源碼如下:
LITE_OS_SEC_TEXT_INIT UINT32 OsTaskInit(VOID) { UINT32 size; UINT32 index; LOS_DL_LIST *listObject = NULL; ...... ⑴ size = sizeof(LOS_DL_LIST) * OS_TSK_SORTLINK_LEN; listObject = (LOS_DL_LIST *)LOS_MemAlloc(m_aucSysMem0, size); ⑵ if (listObject == NULL) { (VOID)LOS_MemFree(m_aucSysMem0, g_taskCBArray); return LOS_ERRNO_TSK_NO_MEMORY; } ⑶ (VOID)memset_s((VOID *)listObject, size, 0, size); ⑷ g_taskSortLink.sortLink = listObject; g_taskSortLink.cursor = 0; for (index = 0; index < OS_TSK_SORTLINK_LEN; index++, listObject++) { ⑸ LOS_ListInit(listObject); } return LOS_OK; }
⑴處代碼計算需要申請的雙向鏈表的內存大小,OS_TSK_SORTLINK_LEN
為32,即需要為32個雙向鏈表節點申請內存空間。然后申請內存,⑵處申請內存失敗時返回相應錯誤碼。⑶處初始化申請的內存區域為0等。⑷處把申請的雙向鏈表節點賦值給g_taskSortLink
的鏈表節點.sortLink
,作為排序鏈表的頭節點,游標.cursor
初始化為0。然后⑸處的循環,調用LOS_ListInit()
函數把雙向鏈表數組每個元素都初始化為雙向循環鏈表。
2.2 插入排序鏈表
插入排序鏈表的函數為OsTaskAdd2TimerList()
。在任務等待互斥鎖/信號量等資源時,都需要調用該函數將任務加入到對應的排序鏈表中。該函數包含兩個入參,第一個參數LosTaskCB *taskCB
用於指定要延遲的任務,第二個參數UINT32 timeout
指定超時等待時間。
源碼如下:
LITE_OS_SEC_TEXT VOID OsTaskAdd2TimerList(LosTaskCB *taskCB, UINT32 timeout) { LosTaskCB *taskDelay = NULL; LOS_DL_LIST *listObject = NULL; UINT32 sortIndex; UINT32 rollNum; ⑴ sortIndex = timeout & OS_TSK_SORTLINK_MASK; rollNum = (timeout >> OS_TSK_SORTLINK_LOGLEN); ⑵ (sortIndex > 0) ? 0 : (rollNum--); ⑶ EVALUATE_L(taskCB->idxRollNum, rollNum); ⑷ sortIndex = (sortIndex + g_taskSortLink.cursor); sortIndex = sortIndex & OS_TSK_SORTLINK_MASK; ⑸ EVALUATE_H(taskCB->idxRollNum, sortIndex); ⑹ listObject = g_taskSortLink.sortLink + sortIndex; ⑺ if (listObject->pstNext == listObject) { LOS_ListTailInsert(listObject, &taskCB->timerList); } else { ⑻ taskDelay = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList); do { ⑼ if (UWROLLNUM(taskDelay->idxRollNum) <= UWROLLNUM(taskCB->idxRollNum)) { UWROLLNUMSUB(taskCB->idxRollNum, taskDelay->idxRollNum); } else { ⑽ UWROLLNUMSUB(taskDelay->idxRollNum, taskCB->idxRollNum); break; } ⑾ taskDelay = LOS_DL_LIST_ENTRY(taskDelay->timerList.pstNext, LosTaskCB, timerList); } while (&taskDelay->timerList != (listObject)); ⑿ LOS_ListTailInsert(&taskDelay->timerList, &taskCB->timerList); } }
⑴處代碼計算等待時間timeout
的低5位作為數組索引,高27位作為滾動數rollNum
。這2行代碼數學上的意義,就是把等待時間處於32得到的商作為滾動數,余數作為數組索引。⑵處代碼,如果余數為0,可以整除時,滾動數減1。減1設計的原因是,在函數VOID OsTaskScan(VOID)
中,每一個tick
到來時,如果滾動數大於0,滾動數減1,並繼續滾動一圈。后文會分析該函數VOID OsTaskScan(VOID)
。
⑶處代碼把滾動數賦值給任務taskCB->idxRollNum
的低27位。⑷處把數組索引加上游標,然后執行⑸賦值給任務taskCB->idxRollNum
的高5位。⑹根據數組索引獲取雙向鏈表頭結點,⑺如果此處雙向鏈表為空,直接插入鏈表里。如果鏈表不為空,執行⑻獲取第一個鏈表節點對應的任務taskDelay
,然后遍歷循環雙向鏈表,把任務插入到合適的位置。⑼處如果待插入任務taskCB
的滾動數大於等於當前鏈表節點對應任務的滾動數,則從待插入任務taskCB
的滾動數中減去當前鏈表節點對應任務的滾動數,然后執行⑾獲取下一個節點繼續遍歷。⑽處如果待插入任務taskCB
的滾動數小於當前鏈表節點對應任務的滾動數,則從當前鏈表節點對應任務的滾動數中減去待插入任務taskCB
的滾動數,然后跳出循環。執行⑿,完成任務插入。插入過程,可以結合上文的示意圖進行理解。
2.3 從排序鏈表中刪除
從排序鏈表中刪除的函數為VOID OsTimerListDelete(LosTaskCB *taskCB)
。在任務恢復/刪除等場景中,需要調用該函數將任務從任務排序鏈表中刪除。該函數包含一個參數LosTaskCB *taskCB
,用於指定要從排序鏈表中刪除的任務。
源碼如下:
LITE_OS_SEC_TEXT VOID OsTimerListDelete(LosTaskCB *taskCB) { LOS_DL_LIST *listObject = NULL; LosTaskCB *nextTask = NULL; UINT32 sortIndex; ⑴ sortIndex = UWSORTINDEX(taskCB->idxRollNum); ⑵ listObject = g_taskSortLink.sortLink + sortIndex; ⑶ if (listObject != taskCB->timerList.pstNext) { ⑷ nextTask = LOS_DL_LIST_ENTRY(taskCB->timerList.pstNext, LosTaskCB, timerList); UWROLLNUMADD(nextTask->idxRollNum, taskCB->idxRollNum); } ⑸ LOS_ListDelete(&taskCB->timerList); }
⑴處代碼獲取待從排序鏈表中刪除的任務對應的數字索引。⑵處代碼獲取排序鏈表的頭節點listObject
。⑶處代碼判斷待刪除節點是否是最后一個節點,如果不是最后一個節點,執行執行⑷處代碼獲取待刪除節點的下一個節點對應的任務nextTask
,在下一個節點的滾動數中增加待刪除節點的滾動數,然后執行⑸處代碼執行刪除操作。如果是最后一個節點,直接執行⑸處代碼刪除該節點即可。
2.4 獲取下一個超時到期時間
獲取下一個超時到期時間的函數為OsTaskNextSwitchTimeGet()
,我們分析下其代碼。
源碼如下:
UINT32 OsTaskNextSwitchTimeGet(VOID) { LosTaskCB *taskCB = NULL; UINT32 taskSortLinkTick = LOS_WAIT_FOREVER; LOS_DL_LIST *listObject = NULL; UINT32 tempTicks; UINT32 index; ⑴ for (index = 0; index < OS_TSK_SORTLINK_LEN; index++) { ⑵ listObject = g_taskSortLink.sortLink + ((g_taskSortLink.cursor + index) % OS_TSK_SORTLINK_LEN); ⑶ if (!LOS_ListEmpty(listObject)) { ⑷ taskCB = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList); ⑸ tempTicks = (index == 0) ? OS_TSK_SORTLINK_LEN : index; ⑹ tempTicks += (UINT32)(UWROLLNUM((UINT32)taskCB->idxRollNum) * OS_TSK_SORTLINK_LEN); ⑺ if (taskSortLinkTick > tempTicks) { taskSortLinkTick = tempTicks; } } } return taskSortLinkTick; }
⑴處代碼循環遍歷雙向鏈表數組,⑵處代碼從當前游標位置開始獲取排序鏈表的頭節點listObject
。⑶處代碼判斷排序鏈表是否為空,如果排序鏈表為空,則繼續遍歷下一個數組。如果鏈表不為空,⑷處代碼獲取排序鏈表的第一個鏈表節點對應的任務。⑸處如果遍歷的數字索引為0,tick
數目使用32,否則使用具體的數字索引。⑹處獲取任務的滾動數,計算出需要的等待時間,加上⑸處計算出的不足滾動一圈的時間。⑺處計算出需要等待的最小時間,即下一個最快到期的時間。
3 排序鏈表和Tick時間的關系
任務加入到排序鏈表后,時間一個tick
一個tick
的逝去,排序鏈表中的滾動數該如何更新呢?
時間每走過一個tick
,系統就會調用Tick
中斷的處理函數OsTickHandler()
,該函數在kernel\src\los_tick.c
文件中實現。下面是該函數的代碼片段,⑴處代碼分別任務的超時到期情況。
LITE_OS_SEC_TEXT VOID OsTickHandler(VOID) { #if (LOSCFG_BASE_CORE_TICK_HW_TIME == 1) platform_tick_handler(); #endif g_ullTickCount++; #if (LOSCFG_BASE_CORE_TIMESLICE == 1) OsTimesliceCheck(); #endif ⑴ OsTaskScan(); // task timeout scan #if (LOSCFG_BASE_CORE_SWTMR == 1) (VOID)OsSwtmrScan(); #endif }
詳細分析下函數OsTaskScan()
,來了解排序鏈表和tick
時間的關系。函數在kernel\base\los_task.c
文件中實現,代碼片段如下:
LITE_OS_SEC_TEXT VOID OsTaskScan(VOID) { LosTaskCB *taskCB = NULL; BOOL needSchedule = FALSE; LOS_DL_LIST *listObject = NULL; UINT16 tempStatus; UINTPTR intSave; intSave = LOS_IntLock(); ⑴ g_taskSortLink.cursor = (g_taskSortLink.cursor + 1) % OS_TSK_SORTLINK_LEN; listObject = g_taskSortLink.sortLink + g_taskSortLink.cursor; ⑵ if (listObject->pstNext == listObject) { LOS_IntRestore(intSave); return; } ⑶ for (taskCB = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList); &taskCB->timerList != (listObject);) { tempStatus = taskCB->taskStatus; ⑷ if (UWROLLNUM(taskCB->idxRollNum) > 0) { UWROLLNUMDEC(taskCB->idxRollNum); break; } ⑸ LOS_ListDelete(&taskCB->timerList); ⑹ if (tempStatus & OS_TASK_STATUS_PEND) { taskCB->taskStatus &= ~(OS_TASK_STATUS_PEND); LOS_ListDelete(&taskCB->pendList); taskCB->taskSem = NULL; taskCB->taskMux = NULL; } ⑺ else if (tempStatus & OS_TASK_STATUS_EVENT) { taskCB->taskStatus &= ~(OS_TASK_STATUS_EVENT); } ⑻ else if (tempStatus & OS_TASK_STATUS_PEND_QUEUE) { LOS_ListDelete(&taskCB->pendList); taskCB->taskStatus &= ~(OS_TASK_STATUS_PEND_QUEUE); } else { taskCB->taskStatus &= ~(OS_TASK_STATUS_DELAY); } ⑼ if (!(tempStatus & OS_TASK_STATUS_SUSPEND)) { taskCB->taskStatus |= OS_TASK_STATUS_READY; OsHookCall(LOS_HOOK_TYPE_MOVEDTASKTOREADYSTATE, taskCB); OsPriqueueEnqueue(&taskCB->pendList, taskCB->priority); needSchedule = TRUE; } if (listObject->pstNext == listObject) { break; } taskCB = LOS_DL_LIST_ENTRY(listObject->pstNext, LosTaskCB, timerList); } LOS_IntRestore(intSave); ⑽ if (needSchedule) { LOS_Schedule(); } }
⑴處代碼更新全局變量g_taskSortLink
的游標,指向雙向鏈表數組下一個位置,然后獲取該位置的雙向鏈表頭結點listObject
。⑵如果鏈表為空,則返回。如果雙向鏈表不為空,則執行⑶循環遍歷每一個鏈表節點。⑷處如果鏈表節點的滾動數大於0,則滾動數減1,說明任務還需要繼續等待一輪。如果鏈表節點的滾動數等於0,說明任務超時到期,執行⑸從排序鏈表中刪除。接下來需要根據任務狀態分別處理,⑹處如果代碼是阻塞狀態,取消阻塞狀態,並從阻塞鏈表中刪除。⑺處如果任務阻塞在事件中,取消阻塞狀態。⑻如果任務阻塞在隊列,從阻塞鏈表中刪除,取消阻塞狀態,如果不是上述狀態,取消延遲狀態OS_TASK_STATUS_DELAY
。⑼處如果代碼是掛起狀態,設置任務為就緒狀態,加入任務就緒隊列,設置需要重新調度標記。⑽如果設置需要重新調度,調用調度函數觸發任務調度。
小結
掌握鴻蒙輕內核的排序鏈表TaskSortLinkAttr
這一重要的數據結構,會給進一步學習、分析鴻蒙輕內核源代碼打下了基礎,讓后續的學習更加容易。后續也會陸續推出更多的分享文章,敬請期待,也歡迎大家分享學習、使用鴻蒙輕內核的心得,有任何問題、建議,
都可以留言給我們: https://gitee.com/openharmony/kernel_liteos_m/issues 。為了更容易找到鴻蒙輕內核代碼倉,建議訪問 https://gitee.com/openharmony/kernel_liteos_m ,關注Watch
、點贊Star
、並Fork
到自己賬戶下,謝謝。