用Crontab打造簡易工作流引擎


1. 引言

眾所周知,Oozie(1, 2)是基於時間條件與數據生成來做工作流調度的,但是Oozie的數據觸發條件只支持HDFS路徑,故而面臨着這樣的問題:

  • 無法判斷Hive partition是否已存在;
  • 無法判斷Elasticsearch index是否已寫入完成;
  • ...

因此,靈活可擴展的工作流引擎才是正確姿勢!下面,我將介紹如何用Crontab來打造一個類似於Oozie的簡易工作流引擎;對標Oozie,其應滿足功能:

  • 時間條件與數據生成觸發任務,如Oozie coordinator的datasetsinput-events
  • 支持觸發條件的輪詢;
  • 支持任務並行執行,如Oozie workflow的forkjoin
  • 捕獲錯誤及運行狀態日志。

2. 實現

觸發條件

判斷Hive partition是否已存在,思路比較簡單——show partitions <tb_name>后能否grep到該partition:

# check wheter $1's partition ($2) exists
hive_partition_exists() {
    table_name=$1
    partition=$2
    hive -e "show partitions ${table_name}" | grep ${partition}
    [ $? == 0 ]
}

獲取Hive 表的最后一個partition,grep命令配合正則表達式中的Lookahead匹配:

# get latest hive partition
latest_hive_partition() {
    table_name=$1
    partition_name=$2
    hive -e "show partitions ${table_name}" | tail -1 | grep -Po "(?<=${partition_name}=).*"
}

在檢查ES index是否寫入完成時,可用思路——定時flush index,然后判斷當前時刻的doc數較上一時刻是否發生變化;若變化,則說明正在寫入。Shell腳本處理json太蛋疼了,故不給出代碼啦。

條件輪詢

所謂“條件輪詢”,是指如果數據未生成,則會一直輪詢該條件是否滿足。我們采用while循環中sleep的方式來實現條件輪詢:

hive_partition_exists etl.ad_tb1 ${log_partition}
ad1_exists=$?
hive_partition_exists etl.ad_tb2 ${log_partition}
ad2_exists=$?
while (( ${ad1_exists} != 0 || ${ad2_exists} != 0))
do
    echo "`date -d "now"`: log partitions ${log_partition} not exist, and waiting" >> ${log_path}
    sleep 1m
    hive_partition_exists etl.ad_tb1 ${log_partition}
    ad1_exists=$?
    hive_partition_exists etl.ad_tb2 ${log_partition}
    ad2_exists=$?
done

實例

接下來,以Hive寫Elasticsearch的為例,說明如何用crontab做定時Hive任務。hiveql腳本如下:

add jar /path/to/jars/elasticsearch-hadoop-2.3.1.jar;
set mapred.job.name=ad_tag-${LOG_PARTITION}~~${TAG_PARTITION};
set hive.map.aggr = false;

insert overwrite table ad_tag
select media, a.dvc as dvc, case when c1_arr is null then array('empty') else c1_arr end as c1_arr, '${LOG_PARTITION}' as week_time
from (
select dvc, app_name as media
from ad_log
where is_exposure = '1' and day_time between date_sub('${LOG_PARTITION}', 6) and '${LOG_PARTITION}'
group by dvc, app_name
) a 
left outer join (
select dvc, collect_set(c1) as c1_arr
from tag
lateral view inline(tag) in_tb
where day_time = '${TAG_PARTITION}'
group by dvc
) b
on a.dvc = b.dvc;

為了實現任務的並行執行,我用到Linux命令中的&

log_partition=`date -d "5 day ago" "+%Y-%m-%d"`
tag_partition=$(latest_hive_partition tag.dmp_tag  day_time)
log_path="${log_partition}.log"

echo "`date -d "now"`: log partitions ${log_partition} exist" >> ${log_path}
echo "`date -d "now"`: latest tag partition ${tag_partition}" >> ${log_path}
hive -f ad_tag1.hql --hivevar LOG_PARTITION=${log_partition} --hivevar TAG_PARTITION=${tag_partition} & hive -f ad_tag2.hql --hivevar LOG_PARTITION=${log_partition} --hivevar TAG_PARTITION=${tag_partition}

exit 1

PS: 當手動執行腳本是OK的,但是crontab去執行時卻出錯,最可能的原因是crontab未能正確加載用戶的環境變量;故可以在運行腳本中加入:

source /etc/profile
source /path/to/.bashrc

但是,用crontab做工作流調度,會存在如下問題:

  • 無法很好地管理任務之間的依賴關系;
  • 無法更好地監控任務的運行狀況;
  • 因Shell腳本的編程處理能力的限制,無法更自由地做擴展。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM