分布式調度平台XXL-JOB源碼分析-執行器端


上一篇文章已經說到調度中心端如何進行任務管理及調度,本文將分析執行器端是如何接收到任務調度請求,然后執行業務代碼的。
XxlJobExecutorApplication為我們執行器的啟動項,其中有個XxlJobConfig的配置項,發現其中有個屬性為adminAddresses,這個就是我們調度中心的地址。

XxlJobSpringExecutor

聲明了init方法為start,點進來,
它又實現了ApplicationContextAware接口,用來保存spring的上下文信息。
它還有個父類XxlJobExecutor,暫時未找到其他子類。
程序開始執行start方法,
第一步,調用了本類的私有方法,這個方法就是把JobHandler的實現類取出來,再調用registJobHandler(name, handler)進行注冊。
registJobHandler的實現方法在父類中,很簡單,維護了一個Map,估計后面進行任務的執行時會來這個map里面進行查詢。
第二步,估計是初始化了Glue的執行器。
第三步,調用父類的start方法,大部分的業務邏輯都在這里。
1.日志處理器初始化
2.向adminBizList字段中放入XxlRpcReferenceBean返回的代理類,作用之后會單獨開一篇注冊心跳的文章說明。
3.任務日志清除
4.任務結果回調處理線程
5.啟動另一個執行器的執行線程XxlRpcProviderFactory這個類是XXl其他的開源項目,自研RPC。

XxlRpcProviderFactory

看名字就知道這個類是可以返回Rpc調用服務提供端的工廠類,接上文,看他的initRpcProvider方法。

由上面的代碼跟進來,發現這就是啟動了一個以netty作為通訊模型、Hessian作為序列化方式的、ExecutorServiceRegistry作為注冊邏輯實現類的服務提供端。
接着我們向其添加一個服務,名稱為ExecutorBiz,版本為null,處理請求的實現類為ExecutorBizImpl,最后我們調用start方法完成執行器端的服務暴露。

此時的ServiceRegistry為ExecutorServiceRegistry,調用其start,以30秒的間隔和調度中心進行心跳通知,然后調用server的start方法,此時server為NettyHttpServer。

 

 整個代碼結構就是用netty啟動了個服務,來看最后一個ChannelHandler,NettyHttpServerHandler。

 調用私有方法process。

 

 這里調用了xxlRpcProviderFactory的invokeService方法完成了服務實現的反射調用。

 從serviceData中拿到我們之前調用addService方法添加的服務實現類,這里是ExecutorBizImpl,這里反射調用的方法是run。 

ExecutorBizImpl

這個類實現了ExecutorBiz接口,看接口定義的方法,主要是作為執行器,提供給調度中心幾個接口方法。

 重點來看run方法。

 首先XxlJobExecutor內部會有個以jobId為key,執行這個任務的線程為value的字段jobThreadRepository,我們首先去嘗試的獲取當前正在執行這個任務的線程,如果有,那就根據任務設置的運行模式進行處理,如下圖。

如果沒有正在執行此任務的線程,那就調用XxlJobExecutor.registJobThread()啟動一個線程,最后將任務數據推送給這個可能是從jobThreadRepository獲取到的也可能是新創建的線程,如下圖。

JobThread的run方法會從triggerQueue里poll出任務,然后用之前設置的 handler進行execute的方法調用,並利用idleTimes字段進行線程無任務空轉的次數控制,如下圖。

 

 至此,執行器完成了啟動,暴露ExecutorBiz服務,接收任務調度數據TriggerParam,並在JobThread線程中完成任務配置的業務handler的執行。

時序圖


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM