來某廠接近半年了,幾乎沒寫過C++代碼,說實話還真的有點手生。最近剛好有一個需求,然而我感覺我也沒有辦法用C++以外的語言去實現它。於是還是花了幾天時間用C++完成編碼,這是一個簡單的任務執行引擎,它被我稱作panguan。寫這 篇文章主要記錄一下開發過程中的一些思路和想法。不足之處,勞煩大家給予指出。
1.寫在前面
1.1需求來源
最近有一個需求,假設我們有很多任務需要定時執行甚至定時重復運行,並且其中有一些任務之間可能存在控制依賴和/或數據依賴,甚至我們希望可以利用一些原子任務去組成一個更大的任務。
所以我們需要有一個工具可以將這些任務管理起來:1)解決依賴問題;2)將能並發的任務最大化地並發起來;3)使用簡單,只需填寫配置文件,不需要寫任何代碼即可讓一堆任務有序地Run起來;4)能夠正確調度這些任務的順序;5)may be more。
我調研了一些已經存在的任務執行引擎,包括FStack。然而試用了一下FStack,發現我不會用,這就很尷尬了。
報着“殺雞焉用牛刀”的想法,於是最終花了幾天自己寫了一個可實際運行的工具,即panguan(判官)。
1.2名字來源
這個任務執行引擎被命名為“判官”,就是想給人一種不明覺厲的裝X氣質。
據百度百科記載,傳說中陰曹地府中的判官判處人的輪回生死,對壞人進行懲罰,對好人進行獎勵。而該任務執行引擎的主要工作是調度任務的順序和生存周期,對執行成功和失敗的任務進行處理。感覺這個名字很有這個場景的氣質,於是配置任務屬性的配置文件就順便被命名為shengsibu.xml了。
1.3目前支持的特性
- 通過XML文件配置任務。
- 支持多任務同時運行並且互不干擾。
- 支持單個任務由多個存在數據依賴和控制依賴或者可以並行執行的子任務組成。子任務可以繼續由多個子任務組成,由此可以遞歸成為一棵復雜的任務樹。
- 能夠調度各種復雜的任務場景的任務的順序,使其並發最大化。
- 支持立即執行、定時和重復執行。
- 對任務進行超時檢測。
- 展示每一個任務的當前任務狀態和運行時間。
- 支持原子任務去調用python和shell的腳本。另外可以不實際去執行腳本而讓程序模擬結果,目前返回時間是二項分布~(100,0.05)的隨機結果,是否執行成功是p為0.88的伯努利分布的隨機結果。
2.工程實踐
以下主要簡要介紹一些實踐中的想法和思路。
2.1異步引擎加線程池
為了支持定時機制、多任務、任務組裝、任務依賴等特性,同步的做法顯得力不從心,當然也可能是我水平差導致寫不出來。於是在這里優先采用了異步引擎加線程池的方案。相對於同步的做法,異步的做法開銷更小;然而會導致程序的實現難度增加,也更加難以理解。
我們需要:1)一個異步引擎;2)兩個不同方向的線程安全的消息隊列,一個是請求MQ,一個是返回MQ;3)一個工作線程池。
異步引擎將每個原子任務派發到請求MQ中,線程中的工作線程監聽請求MQ,執行完后插入另一個返回MQ中。異步引擎從返回取回結果。
如下圖所示。
我們可以繼續推廣至一個多機的版本,如下圖。
Agent同時監聽兩個消息隊列(一個自身的MQ和一個Center的請求MQ):Agent從Center拉取消息的模式,可以自動實現負載均衡;Center推給Agent的模式,可以指定將任務派發至某個機器,以此提高執行效率。原因可能是這個任務距離數據目標端的物理距離更近。
2.2對任務進行抽象
2.2.1使用DAG去定義任務
對任務依賴進行抽象,可能最常見的做法是DAG(有向無環圖)。像Linkedin開源的Azkaban,定義一個任務做法像下圖中這樣。
2.2.2使用多叉樹去定義任務
開發過程中,我感覺使用多叉樹去表達任務會顯得更加自然一些。
在一棵多叉樹中,可以存在兩種結點,即葉子結點和非葉結點。於是可以使用葉子結點去表示一個原子任務,非葉結點去表示由一些原子任務組合而成的任務,這時它更像一個控制結點。這樣就可以實現任務的組裝,並且去定義它們之間的依賴關系。
假設有這么一個任務,我們用多叉樹來表示它,如下圖所示。
L是這根樹的葉子,它代表一個原子任務。非葉結點標P表示它的子任務們是並行的,非葉結點標S表示它的子任務們是串行的。事實上我們還可以將任務類型進行擴展。舉個例子,比如子任務是串行的,我們可以不需要所有子任務都運行結束,只需要遇到第一個執行成功的子任務即可。在這種模型下,我們可以將任務的組合推廣至無限復雜的組合,其中包括樹的層級和子結點個數的擴增、結點類型的擴展(可以超越並行和串行這兩個基本類型)。
這樣的多叉樹結構可以很容易用xml格式去定義它。我在實現中使用了tinyxml這個parser去解析文本,實現上它只有六個cpp/h文件,直接放在自己的代碼目錄下參與編譯即可。
在tinyxml中,類TiXmlNode定義了多叉樹的數據結構。而TiXmlElement繼承了TiXmlNode,增加了一些關於屬性的字段的解析代碼。
我在這里定義一個TaskNode的類也去繼承了TiXmlNode類,復用了它的多叉樹的代碼。tinyxml的parser解析文本並生成一棵結點類型為TiXmlElement多叉樹之后,我們就遞歸遍歷這棵多叉樹,調用它的屬性解析函數解析出每個屬性的值,建立一棵對應的類型為TaskNode類的多叉樹。這兩棵樹它的樹型結構是一致,區別在於將XML中定義中每個格點的屬性解析出來賦值給了對應的TaskNode類的成員變量。
TaskNode類可以同時用來定義任務的葉子結點和非葉結點,而它們之間使用一些指針去連接成一棵多叉樹的結構。
2.2.3多叉樹加有限狀態機
有限狀態機,是表示有限個狀態以及在這些狀態之間的轉移和動作等行為的數學模型。
我們為每個任務結點再加上一個狀態,表示它的當前運行狀態,使用{未初始化、等待、錯誤和完成}四個狀態。可以發現父結點的狀態是依賴於子結點們的狀態的。任務結點在不同的狀態做不同的動作。
再進一步統一葉子結點和非葉結點的接口。通過查閱設計模式的書,會發現這種場景可以用“組合模式”去套用。它可以使得對葉子結點和非葉結點的訪問具有一致性,具有一致的接口使得在使用時可以不去特別區別這兩種不同的對象。
統一接口后的偽代碼如下。所有結點的運行入口都是相同一個接口,從Run函數進入。根據自身不同的狀態調用不同的函數。而相同一個函數如RunUninit(),經過重載,葉子結點和非葉結點也可以調用不一樣的代碼。這樣的做法可以使上層接口完全一致,屏蔽掉異構結點在不同狀態 下的細節差異。
void TaskNode::Run()
{
switch(結點狀態)
{
case 未初始化:
RunUninit();
break;
case 等待:
RunWait();
break;
case 錯誤:
RunError();
break;
case 完成:
RunSuccess();
break;
default:
break;
}
}
2.2.4任務順序調度算法
在任務樹已經存在的情況下,我們需要有一個調度算法來決定這些任務的順序。它能夠正確調度這些任務的順序,將能夠並行的任務都並行起來。這里的並行起來的實際含義是同時(實際是在很短的時間間隔內)推至請求MQ中,由線程池來消費它。並且能夠在運行完的任務亂序返回之后,在任務樹中找到下一個可以運行的任務。我們將這個函數命名為TaskNode *TaskNode::NextAvailTask(TaskNode *cur);
作為一個異步的引擎,它應該要能做到在一次執行中,將一棵任務樹從一個可運行狀態一直推至一個不可運行狀態才停止。這時的做法就是將所有的可執行的原子任務推至請求MQ中,並且在這過程中處理所有結點的狀態。
執行過程偽代碼如下,它表示的是任務樹在一次執行過程所進行的所有操作。
void TaskNode::Execute(TaskNode *cur)
{
if(!cur) return;
TaskNode *task = cur;
do
{
task->Run();
if(狀態是成功或者失敗)
{
//立即執行多余任務
task->Run();
}
//找出下一個可以執行的任務
task = TaskNode::NextAvailTask(task);
}while(task);
}
所以我們需要NextAvailTask函數,它要滿足:
目標:找出一棵復雜任務樹中,下一個可以執行的任務結點
輸入:任意一個任務結點
輸出:下一個可以執行的任務結點
分析之后,可以發現輸入雖然可以為任何一個TaskNode,但是可以分為兩種情況:
- 在任務樹新建時,從根結點往下執行,這時WAIT狀態的傳播是自上而下的,這時只需要自上而下地不斷找到最左邊的結點即可,直到第一次遍歷到葉子結點。這時如果該葉子結點是父結點的屬性是並行的,就可以繼續向右遍歷,直到最右。然而關鍵問題來了,如果其它可以同時並行的任務結點和該結點不在同一個父結點下時怎么處理?如上圖中那棵任務樹,遍歷到最左的葉子之后,下一個結點應該是第二層第二個結點的最左的那個葉子結點。然而我們該怎么才到找到它?這時我們給每個結點一個關鍵的Flag,布爾量 isChildOfParallelChildTask,它的含義是,標記該結點的爺爺結點及以上的結點是不是一個並行任務結點。這時通過判斷這個flag,可以將結點遍歷回溯到更高的層次。然后再自上而下去遍歷,就像一開始任務新建時做的那樣。
- 亂序返回的結點,ERROR和SUCCESS的傳播是自下而上的,其余的規則也和1中一致。
使用這樣的遍歷規則,就可以在這棵任務樹中准確地遍歷,不管實際上這棵樹的組成有多么的復雜。
以下是偽代碼,它是一個靜態函數,它的邏輯有些小復雜,該代碼中我們可以很容易地擴充新的任務類型,不局限於只有並行和串行。這個算法的時間度復雜度為O(1)O(h),h為樹的高度,而且大部份時候為O(1),只在需要向上回溯時才處在O(1)O(h)之間。
TaskNode *TaskNode::NextAvailTask(TaskNode *cur)
{
TaskNode *task = cur;
while(task) //當task不為空,循環繼續執行
{
if(該結點是非葉結點,且未初始化的子任務數量為0)
{
return 該結點第一個子結點;
}
if(父結點存在,且子任務們是串行的)
{
if(狀態是錯誤)
{
return 父結點;
}
else if(狀態是成功的)
{
return 下一個結點(優先)或者父結點;
}
}
if(父結點存在,且子任務們是並行的)
{
if(子任務全部完成或者狀態是錯誤)
{
return 父結點;
}
if(存在未初始化的兄弟結點)
{
return 下一個兄弟結點;
}
}
if(爺爺結點及以上的結點是不是一個並行任務結點)
{
task = 該結點的父結點;
//這時會跳回while的開頭
}
else
{
return NULL;
}
}
return NULL;
}
2.3異步引擎的主流程
這個異步的任務執行引擎的主流程要同時處理定時生成新任務進行執行、超時檢測、從返回隊列獲取返回值並進行任務執行。實現的目標是要高效,不浪費不必要的CPU時間。
關鍵問題有:
- 不能被返回隊列阻塞導致不返回而錯過了開始定時任務和超時檢測;
- 也不能過於頻繁地喚醒自己,做沒必要的輪詢。
解決方法是在每次從之前從返回MQ讀取數據前,計算最大可容忍阻塞時間,時間為下次最近一次定時任務開始時間和下一個即將超時的任務的時間二者取小,將該時間作為阻塞超時時間。
可以看到整個流程最多會陷入三層while循環:
- 主流程的while最外層的循環,它處理時處理定時生成新任務進行執行、超時檢測、從返回隊列獲取返回值並進行任務執行。
- 第二層循環是void TaskNode::Execute(TaskNode *task)里的循環。它會將某個任務樹從可執行的狀態一直推至不可執行狀態。
- 最里層的循環是TaskNode *TaskNode::NextAvailTask(TaskNode *cur)里的循環,它在任務樹里不斷遍歷以找出下一個可執行的任務,幫助TaskNode::Execute(task)的狀態不斷向前。
主流程簡化完的偽代碼如下。
void TaskEngine::Start()
{
//初始化
//從配置文件讀取的所有任務的參數,生成一堆任務模板以及它們的任務屬性,比如何時啟動,重復執行次數,執行間隔,超時時間等。
if(!_Init())
return;
while(1)
{
if(任務模板中有當前時刻可以運行的新任務)
{
for(所有可以生成的新任務的任務模板)
{
//從任務模板中fork一個新的任務task,加入執行任務隊列。
//為避免沒必要地析構多叉樹,造成大量沒必要的new和delete
//任務模板中有Buffer池,buffer池沒有現成的多叉樹時,
//才真正去構造,否則直接從池中去獲得
TaskNode::Execute(task);
}
}
//計算最大可容忍阻塞時間
//如該值為一個無限大的值,表示已經沒有任何任務,可以退出程序
if(最大可容忍阻塞時間為無限大)
{
do
{
//回收所有線程
}while(還有線程未結束);
break;
}
StRes res;//返回數據
if(在最大可容忍阻塞時間內從返回MQ獲得數據)
{
//找出該返回數據對應的任務結點的ID號
if(如果該任務結點的ID號還處在執行任務隊列)
{
TaskNode *thisTask = 該任務結點的指針;
//任務結點獲取返回數據
thisTask->GetRes(res);
//從返回的位置進入執行代碼
TaskNode::Execute(thisTask);
//處理整個任務的成功或者失敗的情況
//對應的措施是將這棵多叉樹放回對應的模板的buffer池
_ProcessSuccessOrError(該任務結點對應的ID號);
}
}
else if(最大可容忍阻塞時間 >= 下一個即將超時的任務的時間)
{
//去處理已經超時的任務
//一樣是把多叉樹放回對應的任務模板的buffer池中
_ProcessTimeout(當前時間);
}
}
}
3.舉例
其實感覺還是有些抽象,也可能是我的文字水平有限不能將它講明白。
下面的例子是形如上面那個任務樹的一個執行過程演示。
我們將這棵樹的邏輯結構用xml的格式寫入shengsibu.xml,如下。
<Task Name="task0" StartTime="0" ExecuteTimes="1" TimeInterval="0" Timeout="3600" >
<TaskNode Name="root_task_parallel" Type="parallel">
<TaskNode Name="child_task_serial" Type="serial">
<TaskNode Name="py_success" Type="leaf" ScriptType="python" ScriptAddr="script/py_success.py" Arguments=""/>
<TaskNode Name="sh_success" Type="leaf" ScriptType="shell" ScriptAddr="script/sh_success.sh" Arguments=""/>
<TaskNode Name="random" Type="leaf" ScriptType="random" ScriptAddr="" Arguments=""/>
</TaskNode>
<TaskNode Name="child_task_parallel" Type="parallel">
<TaskNode Name="py_success" Type="leaf" ScriptType="python" ScriptAddr="script/py_success.py" Arguments=""/>
<TaskNode Name="sh_success" Type="leaf" ScriptType="shell" ScriptAddr="script/sh_success.sh" Arguments=""/>
<TaskNode Name="random" Type="leaf" ScriptType="random" ScriptAddr="" Arguments=""/>
</TaskNode>
<TaskNode Name="random" Type="leaf" ScriptType="random" ScriptAddr="" Arguments=""/>
</TaskNode>
</Task>
以下展示任務的執行過程:
圖1,初始狀態,所有任務都處理未運行狀態。縮進代表樹的層級和父子關系。
圖2,第一次開始執行之后。可以並行的任務全部已經並行起來,處於WAIT等待返回結果的狀態。
圖3,child_task_parallel下的sh_success返回,不能推進整個任務前進。
圖4,child_task_serial下的py_success返回,推進了chile_task_serial下sh_success執行。
圖5,又兩個任務返回后,其中child_task_serial下的sh_success返回推進了hild_task_serial下的random執行。
圖6,所有任務紛紛返回之后,在最后一個葉子任務運行結束后,把整個任務組推至成功運行。