Oozie4.3
一 簡介

1 官網
Apache Oozie Workflow Scheduler for Hadoop
Hadoop生態的工作流調度器
Overview
Oozie is a workflow scheduler system to manage Apache Hadoop jobs.
Oozie Workflow jobs are Directed Acyclical Graphs (DAGs) of actions.
Oozie Coordinator jobs are recurrent Oozie Workflow jobs triggered by time (frequency) and data availability.
Oozie is integrated with the rest of the Hadoop stack supporting several types of Hadoop jobs out of the box (such as Java map-reduce, Streaming map-reduce, Pig, Hive, Sqoop and Distcp) as well as system specific jobs (such as Java programs and shell scripts).
Oozie is a scalable, reliable and extensible system.

2 部署

3 數據庫表結構

wf_jobs:工作流實例
wf_actions:任務實例
coord_jobs:調度實例
coord_actions:調度任務實例
4 概念
l Control Node:工作流的開始、結束以及決定Workflow的執行路徑的節點(start、end、kill、decision、fork/join)
l Action Node:工作流執行的計算任務,支持的類型包括(HDFS、MapReduce、Java、Shell、SSH、Pig、Hive、E-Mail、Sub-Workflow、Sqoop、Distcp),即任務
l Workflow:由Control Node以及一系列Action Node組成的工作流,即工作流
l Coordinator:根據指定Cron信息觸發workflow,即調度
l Bundle:按照組的方式批量管理Coordinator任務,實現集中的啟停
二 代碼解析
1 啟動過程
加載配置的所有service:
ServicesLoader.contextInitialized
Services.init
Services.loadServices (oozie.services, oozie.services.ext)
Service結構:
Service
org.apache.oozie.service.SchedulerService,
org.apache.oozie.service.InstrumentationService,
org.apache.oozie.service.MemoryLocksService,
org.apache.oozie.service.UUIDService,
org.apache.oozie.service.ELService,
org.apache.oozie.service.AuthorizationService,
org.apache.oozie.service.UserGroupInformationService,
org.apache.oozie.service.HadoopAccessorService,
org.apache.oozie.service.JobsConcurrencyService,
org.apache.oozie.service.URIHandlerService,
org.apache.oozie.service.DagXLogInfoService,
org.apache.oozie.service.SchemaService,
org.apache.oozie.service.LiteWorkflowAppService,
org.apache.oozie.service.JPAService,
org.apache.oozie.service.StoreService,
org.apache.oozie.service.SLAStoreService,
org.apache.oozie.service.DBLiteWorkflowStoreService,
org.apache.oozie.service.CallbackService,
org.apache.oozie.service.ActionService,
org.apache.oozie.service.ShareLibService,
org.apache.oozie.service.CallableQueueService,
org.apache.oozie.service.ActionCheckerService,
org.apache.oozie.service.RecoveryService,
org.apache.oozie.service.PurgeService,
org.apache.oozie.service.CoordinatorEngineService,
org.apache.oozie.service.BundleEngineService,
org.apache.oozie.service.DagEngineService,
org.apache.oozie.service.CoordMaterializeTriggerService,
org.apache.oozie.service.StatusTransitService,
org.apache.oozie.service.PauseTransitService,
org.apache.oozie.service.GroupsService,
org.apache.oozie.service.ProxyUserService,
org.apache.oozie.service.XLogStreamingService,
org.apache.oozie.service.JvmPauseMonitorService,
org.apache.oozie.service.SparkConfigurationService
2 核心引擎
BaseEngine
DAGEngine (負責workflow執行)
CoordinatorEngine (負責coordinator執行)
BundleEngine (負責bundle執行)
3 workflow提交執行過程
DAGEngine.submitJob| submitJobFromCoordinator (提交workflow)
SubmitXCommand.call
execute
LiteWorkflowAppService.parseDef (解析得到WorkflowApp)
LiteWorkflowLib.parseDef
LiteWorkflowAppParser.validateAndParse
parse
WorkflowLib.createInstance (創建WorkflowInstance)
BatchQueryExecutor.executeBatchInsertUpdateDelete (保存WorkflowJobBean 到wf_jobs)
StartXCommand.call
SignalXCommand.call
execute
WorkflowInstance.start
LiteWorkflowInstance.start
signal
NodeHandler.enter
ActionNodeHandler.enter
start
LiteWorkflowStoreService.liteExecute (添加WorkflowActionBean到ACTIONS_TO_START)
WorkflowStoreService.getActionsToStart (從ACTIONS_TO_START取Action)
ActionStartXCommand.call
ActionExecutor.start
WorkflowNotificationXCommand.call
BatchQueryExecutor.executeBatchInsertUpdateDelete (保存WorkflowActionBean到wf_actions)
ActionExecutor.start是異步的,還需要檢查Action執行狀態來推進流程,有兩種情況:
一種是Oozie Server正常運行:利用JobEndNotification
CallbackServlet.doGet
DagEngine.processCallback
CompletedActionXCommand.call
ActionCheckXCommand.call
ActionExecutor.check
ActionEndXCommand.call
SignalXCommand.call
一種是Oozie Server重啟:利用ActionCheckerService
ActionCheckerService.init
ActionCheckRunnable.run
runWFActionCheck (GET_RUNNING_ACTIONS, oozie.service.ActionCheckerService.action.check.delay=600)
ActionCheckXCommand.call
ActionExecutor.check
ActionEndXCommand.call
SignalXCommand.call
runCoordActionCheck
4 coordinator提交執行過程
CoordinatorEngine.submitJob(提交coordinator)
CoordSubmitXCommand.call
submit
submitJob
storeToDB
CoordJobQueryExecutor.insert (保存CoordinatorJobBean到coord_jobs)
queueMaterializeTransitionXCommand
CoordMaterializeTransitionXCommand.call
execute
materialize
materializeActions
CoordCommandUtils.materializeOneInstance(創建CoordinatorActionBean)
storeToDB
performWrites
BatchQueryExecutor.executeBatchInsertUpdateDelete(保存CoordinatorActionBean到coord_actions)
CoordActionInputCheckXCommand.call
CoordActionReadyXCommand.call
CoordActionStartXCommand.call
DAGEngine.submitJobFromCoordinator
定時任務觸發Materialize:
CoordMaterializeTriggerService.init
CoordMaterializeTriggerRunnable.run
CoordMaterializeTriggerService.runCoordJobMatLookup
materializeCoordJobs (GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION)
CoordMaterializeTransitionXCommand.call
5 分布式
有些內部任務只能啟動一個,單server環境Oozie中通過MemoryLocksService來保證,多server環境Oozie通過ZKLocksService來保證,要開啟ZK,需要開啟一些service:
org.apache.oozie.service.ZKLocksService,
org.apache.oozie.service.ZKXLogStreamingService,
org.apache.oozie.service.ZKJobsConcurrencyService,
org.apache.oozie.service.ZKUUIDService
同時需要配置oozie.zookeeper.connection.string
6 任務執行過程
ActionExecutor是任務執行的核心抽象基類,所有的具體任務都是這個類的子類
ActionExecutor
JavaActionExecutor
SshActionExecutor
FsActionExecutor
SubWorkflowActionExecutor
其中JavaActionExecutor是最重要的一個子類,很多其他的任務都是這個類的子類(比如HiveActionExecutor、SparkActionExecutor等)
JavaActionExecutor.start
prepareActionDir
submitLauncher
JobClient.getJob
injectLauncherCallback
ActionExecutor.Context.getCallbackUrl
job.end.notification.url
createLauncherConf
LauncherMapperHelper.setupLauncherInfo
JobClient.submitJob
check
JavaActionExecutor執行時會提交一個map任務到yarn,即LauncherMapper,
LauncherMapper.map
LauncherMain.main
LauncherMain是具體任務的執行類
LauncherMain
JavaMain
HiveMain
Hive2Main
SparkMain
ShellMain
SqoopMain
