http://blog.csdn.net/bdchome/article/details/52438540
每個公司都有自己的一套或者多套調度系統,從簡單到復雜,滿足各種定制化的需求。
Crontab任務調度
在沒有工作流調度系統之前,公司里面的任務都是通過crontab來定義的,時間長了后會發現很多問題:
- 大量的crontab任務需要管理
- 任務沒有按時執行,各種原因失敗,需要重試
- 多服務器環境下,crontab分散在很多集群上,光是查看log就很花時間
於是,出現了一些管理crontab任務的調度系統。
CronHub
CronHub是暴風影音的馬晨開源的時間調度系統。CronHub通過web界面來管理分散在不同機器上的crontab任務。詳見:Cronhub 開源的時間調度系統。

CronWeb
CronWeb也是類似的系統
CronWeb的UI:
工作流調度系統
在很多公司,隨着處理的數據量增長,需要管理的工作流也變得越來越大,里面充斥着大量的ETL任務:有些依賴某個時間點去執行,有些依賴數據或者外部的事件來執行。工作流中的有些任務會失敗,需要有重試和報警功能,而不是等到整個工作流失敗才發現問題。
總的來說,一個workflow scheduler system需要的主要功能有:
- DAG定義 (DAG的定義方式:表達式,腳本定義,通過WEB UI定義;還需要支持子DAG)
- 執行節點 (節點類型:unix cmd,shell,python,mapreduce, logging etc…)
- 節點控制(ignore, retry, suspend, run now, test mode)
- Metrics (需要對比一段時間內任務的運行時間)
- Monitor (失敗策略,報警通知功能)
- CLI & Web UI (查詢workflow執行情況,以及簡單的控制)
Hadoop工作流引擎
OOZIE
Oozie是一個管理hadoop任務的工作流/協調系統。Oozie工作流中擁有多個Action,如Hadoop Map/Reuce job,Hadoop Pig job等,所有的Action以有向無環圖(DAG Direct Acyclic Graph)的模式部署運行。詳見:Hadoop工作流調度系統Oozie
Oozie的優點是與Hadoop生態圈結合緊密,比如:
- 有MapReduce的Action,定義一個mapreduce任務很方便,而且可以直接通過job id關聯到hadoop history頁面
- Oozie任務的資源文件都必須存放在HDFS上
- Action也方便擴展,比如添加自定義的任務類型或者報警方式
缺點是通過XML文件來定義DAG依賴,雖然支持的功能很多,比如分支,ok,failed節點,但是總感覺定義過於復雜,維護成本高。
Oozie的UI:
Oozie目前是Hadoop生態圈工作流調度事實上的標准,很多公司都在使用。其實,很難用,UI只能查看,不能進行任何操作。好在社區出現了HUE,可以彌補oozie的不足。
HUE是一個開源的Apache Hadoop UI系統,最早是由Cloudera Desktop演化而來,由Cloudera貢獻給開源社區,它是基於Python Web框架Django實現的。通過使用Hue我們可以在瀏覽器端的Web控制台上與Hadoop集群進行交互來分析處理數據,例如操作HDFS上的數據,運行MapReduce Job等等。
HUE提供了一個Oozie編輯器,可以通過儀表板提交和監控Workflow、Coordinator和Bundle。
HUE的UI:
AZKABAN
Azkaban是由Linkedin開源的一個批量工作流任務調度器。用於在一個工作流內以一個特定的順序運行一組工作和流程。Azkaban定義了一種KV文件格式來建立任務之間的依賴關系,並提供一個易於使用的web用戶界面維護和跟蹤你的工作流。詳見:工作流調度器Azkaban
Azkaban和Oozie一樣,也屬於Hadoop生態圈。它的DAG定義方式比Oozie簡單很多,在properties文件里面可以通過dependencies指定任務的上游依賴。Azkaban支持可插拔的擴展插件,方便擴展,比如支持pid,hive等。
Azkaban的特點是所有的任務資源文件都需要打成一個zip包上傳。這個在資源文件較大的時候不是太方便,當前也可以進行擴展,比如存放在HDFS上,任務實際運行的時候才拉到本地。
Azkaban的UI:
ZEUS
Zeus是Alibaba開源的一個完整的Hadoop的作業平台,用於從Hadoop任務的調試運行到生產任務的周期調度。
宙斯支持任務的整個生命周期。從功能上來說,支持:
* Hadoop MapReduce任務的調試運行
* Hive任務的調試運行
* Shell任務的運行
* Hive元數據的可視化查詢與數據預覽
* Hadoop任務的自動調度
* 完整的文檔管理
Zeus是針對Hadoop集群任務定制的,通用性不強。
基於Python的工作流引擎
基於Python的工作流引擎優點是:
The DAG definition is code
因此可維護性,版本管理,可測性和協作性更好。
dagobah
dagobah是一個Python寫的基於DAG的任務調度系統。Dagobah可以使用Cron語法調度周期性任務,任務之間可以定義依賴關系。Dagobash可以支持重試某個失敗的任務,並在任務結束或失敗的后發送匯總郵件,跟蹤任務的輸出log,持久化到不同的后端。Dagobash是一個相當輕量級的調度系統。
dagobah可以通過Web UI通過拖拉的方式操作dag,也支持直接通過代碼定義:
from dagobah import Dagobah from dagobah.backend.base import BaseBackend my_dagobah = Dagobah(BaseBackend()) my_dagobah.add_job('My Job') my_job = my_dagobah.get_job('My Job') my_job.add_task('python required_task.py', 'Required Task') my_job.add_task('python dependent_task.py', 'Dependent Task') my_job.add_dependency('Required Task', 'Dependent Task') my_job.schedule('0 10 * * *')
dogobah的UI:
Luigi
Luigi 是Spotify開源的,關注流程復雜的長時間運行的批次任務,例如Hadoop任務,推送數據,或者從數據庫拉數據的任務,機器學習算法任務等。它能處理依賴關系,工作流關系,可視化等。它內置對Hadoop的支持。
一個hadoop上運行的wordcount任務定義如下:
import luigi import luigi.contrib.hadoop import luigi.contrib.hdfs class InputText(luigi.ExternalTask): """ This task is a :py:class:`luigi.task.ExternalTask` which means it doesn't generate the :py:meth:`~.InputText.output` target on its own instead relying on the execution something outside of Luigi to produce it. """ date = luigi.DateParameter() def output(self): """ Returns the target output for this task. In this case, it expects a file to be present in HDFS. :return: the target output for this task. :rtype: object (:py:class:`luigi.target.Target`) """ return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('/tmp/text/%Y-%m-%d.txt')) class WordCount(luigi.contrib.hadoop.JobTask): """ This task runs a :py:class:`luigi.contrib.hadoop.JobTask` over the target data returned by :py:meth:`~/.InputText.output` and writes the result into its :py:meth:`~.WordCount.output` target. This class uses :py:meth:`luigi.contrib.hadoop.JobTask.run`. """ date_interval = luigi.DateIntervalParameter() def requires(self): """ This task's dependencies: * :py:class:`~.InputText` :return: list of object (:py:class:`luigi.task.Task`) """ return [InputText(date) for date in self.date_interval.dates()] def output(self): """ Returns the target output for this task. In this case, a successful execution of this task will create a file in HDFS. :return: the target output for this task. :rtype: object (:py:class:`luigi.target.Target`) """ return luigi.contrib.hdfs.HdfsTarget('/tmp/text-count/%s' % self.date_interval) def mapper(self, line): for word in line.strip().split(): yield word, 1 def reducer(self, key, values): yield key, sum(values) if __name__ == '__main__': luigi.run()
Luigi的UI:
Pinball
Pinball 是Pinterest開源的擴展性較好的工作流系統。Pinball的架構是一種master-worker模式,master節點負責管理任務調度,worker節點是無狀態的。Pinball無縫集成了Hadoop/Hive/Spark等。
Pinball提供了自動重試,單任務郵件報警,運行時替換,任務優先級,過載策略等功能。
Pinball的UI:
Airflow
Airflow是Airbnb開源的DAG任務調度系統,用於管理,調度和監控工作流。它與Luigi,Pinball很像。后端是基於Flask,Celery,RabbitMQ/Redis。
Airflow工作流的定義:
""" Code that goes along with the Airflow located [here](http://pythonhosted.org/airflow/tutorial.html) """ from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta seven_days_ago = datetime.combine(datetime.today() - timedelta(7), datetime.min.time()) default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': seven_days_ago, 'email': ['airflow@airflow.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), \# 'queue': 'bash_queue', \# 'pool': 'backfill', \# 'priority_weight': 10, \# 'schedule_interval': timedelta(1), \# 'end_date': datetime(2016, 1, 1), } dag = DAG('tutorial', default_args=default_args) \# t1, t2 and t3 are examples of tasks created by instatiating operators t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) t1.doc_md = """\ \#### Task Documentation You can document your task using the attributes `doc_md` (markdown), `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets rendered in the UI's Task Details page.  """ dag.doc_md = __doc__ t2 = BashOperator( task_id='sleep', depends_on_past=False, bash_command='sleep 5', dag=dag) templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """ t3 = BashOperator( task_id='templated', depends_on_past=False, bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, dag=dag) t2.set_upstream(t1) t3.set_upstream(t1)
Airflow的功能很多,對於工作流可以支持Cron語法調度,失敗重試策略,各種任務依賴調度策略等。
Airflow的一大亮點是backfill功能,對於數據倉庫這種應用很有幫助,可以指定開始時間,將一個時間范圍的任務重跑。Airflow還提供了豐富的命令行CLI和UI供操作。
Airflow通過Celery來實現分布式調度,架構的設計上很清晰,利於擴展。
Airflow的UI:
其他
工作流調度系統很多,有人做了統計: