前面講了xxl-job的搭建,現在來粗略的解析下該分布式調度系統的源碼,先來客戶點代碼
客戶端源碼
- 客戶端開啟的時候會向服務中心進行注冊,其實現用的是jetty連接,且每隔半分鍾會發送一次心跳,來告訴服務中心該執行器是否正常
- 查看源碼可以從配置文件入手
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-jobhandler config init.");
XxlJobExecutor xxlJobExecutor = new XxlJobExecutor();
xxlJobExecutor.setAdminAddresses(adminAddresses);
xxlJobExecutor.setAppName(appName);
xxlJobExecutor.setIp(ip);
xxlJobExecutor.setPort(port);
xxlJobExecutor.setAccessToken(accessToken);
xxlJobExecutor.setLogPath(logPath);
xxlJobExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobExecutor;
}
很明顯,在把配置信息注入以后,該配置執行了start方法,進入其中
3. 可以看到以下代碼,英文注釋我斗膽翻譯下~
// ---------------------- start + stop ----------------------
public void start() throws Exception {
// init admin-client 初始化服務中心
initAdminBizList(adminAddresses, accessToken);
// init executor-jobHandlerRepository
// 初始化jobHandler也就是繼承了該類的所有定時方法,模仿spring ioc,把實例化對象都保存了起來
initJobHandlerRepository(applicationContext);
// init logpath 初始化日志文件,設置文件路徑
XxlJobFileAppender.initLogPath(logPath);
// init executor-server 初始化執行器服務,看參數知道這里就是jetty連接的主要地方了
initExecutorServer(port, ip, appName, accessToken);
// init JobLogFileCleanThread 看名字也知道 初始化日志清理線程,應該是用來定時清理日志的
JobLogFileCleanThread.getInstance().start(logRetentionDays);
}
國人編寫的代碼就是有國人自己的風格,至少比國外的開源代碼好看懂點
4. 這里主要深入執行器部分吧,其他還是容易看懂的,繼續深入
private void initExecutorServer(int port, String ip, String appName, String accessToken) throws Exception {
// valid param 可以看到,我們不配置jetty端口,它默認也是9999
port = port>0?port: NetUtil.findAvailablePort(9999);
// start server
NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); // rpc-service, base on jetty
NetComServerFactory.setAccessToken(accessToken);
//主要就是這個方法了
serverFactory.start(port, ip, appName); // jetty + registry
}
繼續深入
// ---------------------- server start ----------------------
//可以看到用到了jetty服務
JettyServer server = new JettyServer();
public void start(int port, String ip, String appName) throws Exception {
server.start(port, ip, appName);
}
繼續深入
public void start(final int port, final String ip, final String appName) throws Exception {
thread = new Thread(new Runnable() {
@Override
public void run() {
// The Server
server = new Server(new ExecutorThreadPool()); // 非阻塞
// HTTP connector
ServerConnector connector = new ServerConnector(server);
if (ip!=null && ip.trim().length()>0) {
connector.setHost(ip); // The network interface this connector binds to as an IP address or a hostname. If null or 0.0.0.0, then bind to all interfaces.
}
connector.setPort(port);
server.setConnectors(new Connector[]{connector});
// Set a handler
HandlerCollection handlerc =new HandlerCollection();
handlerc.setHandlers(new Handler[]{new JettyServerHandler()});
server.setHandler(handlerc);
try {
// Start server
server.start();
logger.info(">>>>>>>>>>> xxl-job jetty server start success at port:{}.", port);
// Start Registry-Server 注冊到服務中心方法,單獨線程執行
ExecutorRegistryThread.getInstance().start(port, ip, appName);
// Start Callback-Server 定時任務回調方法,單獨線程執行
TriggerCallbackThread.getInstance().start();
server.join(); // block until thread stopped
logger.info(">>>>>>>>>>> xxl-rpc server join success, netcon={}, port={}", JettyServer.class.getName(), port);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
//destroy();
}
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
可以看出這里主要關注
ExecutorRegistryThread.getInstance().start(port, ip, appName);
TriggerCallbackThread.getInstance().start();
這兩個方法
繼續深入
public void start(final int port, final String ip, final String appName){
// valid
if (appName==null || appName.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appName is null.");
return;
}
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
return;
}
// executor address (generate addredd = ip:port)
final String executorAddress;
if (ip != null && ip.trim().length()>0) {
executorAddress = ip.trim().concat(":").concat(String.valueOf(port));
} else {
executorAddress = IpUtil.getIpPort(port);
}
registryThread = new Thread(new Runnable() {
@Override
public void run() {
// registry 此線程為守護線程,不銷毀,循環執行
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
// 睡眠30秒
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
// registry remove 服務中心移除此任務起效果
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");
}
});
registryThread.setDaemon(true);
registryThread.start();
}
繼續
public void start() {
// valid
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
return;
}
triggerCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
// normal callback
while(!toStop){
try {
//這里采用了阻塞隊列,可以看出,當服務中心發送任務到此隊列,就會被消費
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {
// callback list param
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
// callback, will retry if error
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
// last callback
try {
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");
}
});
triggerCallbackThread.setDaemon(true);
triggerCallbackThread.start();
}
可以看到此方法放入任務
public static void pushCallBack(HandleCallbackParam callback){
getInstance().callBackQueue.add(callback);
logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
}
后面可以追溯好多層級,最頂上發現
private RpcResponse doInvoke(HttpServletRequest request) {
try {
// deserialize request
byte[] requestBytes = HttpClientUtil.readBytes(request);
if (requestBytes == null || requestBytes.length==0) {
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setError("RpcRequest byte[] is null");
return rpcResponse;
}
RpcRequest rpcRequest = (RpcRequest) HessianSerializer.deserialize(requestBytes, RpcRequest.class);
// invoke 主要就是這個調用了,調用一次會執行一次對應任務
RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
return rpcResponse;
} catch (Exception e) {
logger.error(e.getMessage(), e);
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setError("Server-error:" + e.getMessage());
return rpcResponse;
}
}
而這個對象實際在JettyServer服務類中已經加入
// Set a handler
HandlerCollection handlerc =new HandlerCollection();
handlerc.setHandlers(new Handler[]{new JettyServerHandler()});
server.setHandler(handlerc);