修改后的源碼倉庫地址:GitHub. :
改造原因
- 原有的xxl-job使用自己實現的http協議進行注冊以及調度等,與目前框架中本身的注冊中心格格不入,會影響健康檢查、日志處理、問題排查。
- 技術棧統一。避免執行器內包含兩套注冊邏輯。
- 提高分布式健壯性,原有的服務注冊以及發現等功能較弱,且與實際應用可用與否完全無關,經常存在xxl-job線程出問題,但主服務正常,或主服務出問題,但xxl-job線程正常。
- 灰度擴展,目前系統灰度使用eureka定制實現,為執行器支持灰度,必須進行改造。
主要改造思路
調度中心
調度中心側獲取服務時,將原有的基於數據庫的地址list,修改為動態從eureka中心獲取服務的地址列表,兩者通過xxl-job-admin配置的執行器app-name,與執行器的spring.application.name(即注冊到eureka的服務標識)關聯。
//com.xxl.job.admin.core.trigger.XxlJobTrigger
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
//@edit 如果是自動獲取地址的話,則使用
if (group.getAddressType() == 0) {
group.setAddressList(SpringAdminContext.getEurekaAddressList(group.getAppname()));
}
//這種靜態和spring容器不分的寫法,還是有點別扭的
@Component("springAdminContext")
public class SpringAdminContext {
@Autowired
DiscoveryClient discoveryClient;
private static SpringAdminContext springAdminContext;
@PostConstruct
public void initialize() {
springAdminContext = this;
springAdminContext.discoveryClient=this.discoveryClient;
}
public static String getEurekaAddressList(String appName){
//may be springContext not init
if(springAdminContext !=null){
DiscoveryClient discoveryClient = SpringAdminContext.springAdminContext.discoveryClient;
List<ServiceInstance> instances = discoveryClient.getInstances(appName);
StringBuilder addressBuilder = new StringBuilder();
for (int i = 0; i < instances.size(); i++) {
addressBuilder.append(instances.get(i).getUri().toString());
if(i!=instances.size()) {
addressBuilder.append(",");
}
}
return addressBuilder.toString();
}else {
return "";
}
}
}
調度中心側調度服務時,增加灰度策略,在獲取到eureka的instanceList后,從instance的meta原數據中取出灰度標識,進行灰度調度。代碼結合1中的列表獲取,具體灰度實現與百度到的eureka灰度相同,略。
調度中心 執行器側
通過修改core包,將原有的注冊線程刪除,並刪除embedServer的實現,修改為springMVC。
//修改后的代替embedServer的處理類
@Controller
public class XxlJobHandlerController {
private static final Logger logger = LoggerFactory.getLogger(XxlJobHandlerController.class);
private ExecutorBiz executorBiz;
@Value("${xxl.job.accessToken:}")
private String accessToken;
@Autowired
ThreadPoolExecutor bizThreadPool;
@PostConstruct
public void start() {
executorBiz = new ExecutorBizImpl();
}
@PostMapping("/job/{method}")
@ResponseBody
public ReturnT jobHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, @PathVariable("method") String methodName) {
return doHandlerReq(httpServletRequest,httpServletResponse,"/"+methodName);
}
// ---------------------- registry ----------------------
protected ReturnT doHandlerReq(HttpServletRequest httpServletRequest,HttpServletResponse httpServletResponse,String method) {
try {
//read request
//@edit 這里請求都模擬的原有處理方式,包括header這些
int contentLength = httpServletRequest.getContentLength();
byte[] reqBody=new byte[contentLength];
httpServletRequest.getInputStream().read(reqBody,0,contentLength);
String requestData=new String(reqBody, StandardCharsets.UTF_8);
String uri = method;
String accessTokenReq = httpServletRequest.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
//@edit 這里原有是netty純異步,但是到這邊http就不太合適了,這么寫雖然看起來吞吐會下降,但是一般web容器現在底層也支持nio了,應該關系不大。
FutureTask<ReturnT> stringFutureTask=new FutureTask<ReturnT>(() -> process(uri, requestData, accessTokenReq));
// invoke
bizThreadPool.execute(stringFutureTask);
ReturnT returnT = stringFutureTask.get();
httpServletResponse.setHeader(HttpHeaders.CONTENT_TYPE, "text/html;charset=UTF-8");
return returnT;
} catch (Exception e) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
}
private ReturnT process(String uri, String requestData, String accessTokenReq) {
if (uri == null || uri.trim().length() == 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (accessToken != null
&& accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
try {
if ("/beat".equals(uri)) {
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
}
}
修改處理callback的邏輯,使得任務執行結果可以回調到admin
//com.xxl.job.core.executor
//@edit 這個方法主要用於執行器在獲取調度中心列表時調用,目的是執行器獲取調度中心列表進行callback回調通知執行結果。
//如果callback失敗或者這里出問題,那么會導致在管理台看到的執行結果永遠沒有。
//管理台的調度結果和執行結果是分開的,調度結果依賴單次http請求,執行結果依賴callback
// @see TriggerCallbackThread
public static List<AdminBiz> getAdminBizList(){
List<AdminBiz> adminBizs=new ArrayList<>();
String adminAppName="xxl-job-admin-cloud";
List<String> addressList = Arrays.asList(SpringContext.getEurekaAddressList(adminAppName).split(","));
addressList.forEach(e ->{
adminBizs.add(new AdminBizClient(e.concat("/").concat("xxl-job-admin"),accessToken));
});
return adminBizs;
}
總結
其實還有一些細節的修改包括eureka的配置,原有注冊代碼等的調整,就不好一一列出來了,具體可以拉下來項目搜索@edit,主要修改的地方我都加了這個。