dolphin 1.6已經增加了企業微信的功能。
公司用的釘釘,那么增加功能到釘釘指定分組,可以針對分組將錯誤的任務或者被殺死的任務進行提示。
alert模塊,增加配置:
1 enterprise.dingtalk.enable=true 2 enterprise.dingtalk.secret=釘釘秘鑰 3 enterprise.dingtalk.url=釘釘地址 4 # enterprise.dingtalk.url=https://oapi.dingtalk.com/robot/send?access_token=token
增加配置,Constants增加配置
1 public static final String ENTERPRISE_DINGTALK_ENABLE="enterprise.dingtalk.enable"; 2 3 public static final String ENTERPRISE_DINGTALK_SECRET="enterprise.dingtalk.secret"; 4 5 public static final String ENTERPRISE_DINGTALK_URL="enterprise.dingtalk.url";
增加工具類
1 package org.apache.dolphinscheduler.alert.utils; 2 3 import com.alibaba.fastjson.JSON; 4 import com.alibaba.fastjson.JSONObject; 5 import org.apache.dolphinscheduler.common.utils.HttpUtils; 6 import org.apache.http.HttpEntity; 7 import org.apache.http.client.ClientProtocolException; 8 import org.apache.http.client.methods.CloseableHttpResponse; 9 import org.apache.http.client.methods.HttpPost; 10 import org.apache.http.entity.StringEntity; 11 import org.apache.http.impl.client.CloseableHttpClient; 12 import org.apache.http.impl.client.HttpClients; 13 import org.apache.http.util.EntityUtils; 14 import org.slf4j.Logger; 15 import org.slf4j.LoggerFactory; 16 17 import javax.crypto.Mac; 18 import javax.crypto.spec.SecretKeySpec; 19 import java.io.IOException; 20 import java.net.URLEncoder; 21 import java.util.Base64; 22 23 public class EnterpriseDingTalkUtils { 24 public static final Logger logger = LoggerFactory.getLogger(EnterpriseWeChatUtils.class); 25 26 // private static final boolean ENTERPRISE_DINGTALK_ENABLE = PropertyUtils.getBoolean(Constants.ENTERPRISE_DINGTALK_ENABLE); 27 28 private static final String ENTERPRISE_DINGTALK_URL = PropertyUtils.getString(Constants.ENTERPRISE_DINGTALK_URL); 29 30 private static final String ENTERPRISE_DINGTALK_SECRET = PropertyUtils.getString(Constants.ENTERPRISE_DINGTALK_SECRET); 31 32 /** 33 * get Enterprise Ding Ding is enable 34 * @return isEnable 35 */ 36 public static boolean isEnable(){ 37 return PropertyUtils.getBoolean(Constants.ENTERPRISE_DINGTALK_ENABLE); 38 } 39 40 /** 41 * get Secret string and time number 42 */ 43 public static String encodeKey(){ 44 try { 45 //獲取時間戳 46 Long timestamp = System.currentTimeMillis(); 47 //把時間戳和密鑰拼接成字符串,中間加入一個換行符 48 String stringToSign = timestamp + "\n" + ENTERPRISE_DINGTALK_SECRET; 49 //聲明一個Mac對象,用來操作字符串 50 Mac mac = null; 51 mac = Mac.getInstance("HmacSHA256"); 52 53 //初始化,設置Mac對象操作的字符串是UTF-8類型,加密方式是SHA256 54 mac.init(new SecretKeySpec(ENTERPRISE_DINGTALK_SECRET.getBytes("UTF-8"), "HmacSHA256")); 55 //把字符串轉化成字節形式 56 byte[] signData = mac.doFinal(stringToSign.getBytes("UTF-8")); 57 //新建一個Base64編碼對象 58 Base64.Encoder encoder = Base64.getEncoder(); 59 //把上面的字符串進行Base64加密后再進行URL編碼 60 String sign = URLEncoder.encode(new String(encoder.encodeToString(signData)),"UTF-8"); 61 System.out.println(timestamp); 62 System.out.println(sign); 63 64 String result = "×tamp=" + timestamp + "&sign=" + sign; 65 return result; 66 } catch (Exception e) { 67 e.printStackTrace(); 68 return null; 69 } 70 } 71 public static String getJsonBodyString(String msg){ 72 JSONObject result = new JSONObject(); 73 JSONObject text = new JSONObject(); 74 text.put("content", msg); 75 result.put("text", text); 76 result.put("msgtype", "text"); 77 String jsonString = JSON.toJSONString(result); 78 return jsonString; 79 } 80 81 public static void sendMessageToDingTalk(String msg){ 82 String enterDingTalkUrl = ENTERPRISE_DINGTALK_URL + encodeKey(); 83 String jsonBodyStr = getJsonBodyString(msg); 84 CloseableHttpClient httpClient = HttpClients.createDefault(); 85 try { 86 HttpPost httpPost = new HttpPost(enterDingTalkUrl); 87 httpPost.setHeader("Content-Type", "application/json;charset=utf8"); 88 httpPost.setEntity(new StringEntity(jsonBodyStr, Constants.UTF_8)); 89 CloseableHttpResponse response = httpClient.execute(httpPost); 90 String resp; 91 try { 92 HttpEntity entity = response.getEntity(); 93 resp = EntityUtils.toString(entity, Constants.UTF_8); 94 EntityUtils.consume(entity); 95 } finally { 96 response.close(); 97 } 98 logger.info("Enterprise DingTalk send [{}], param:{}, resp:{}", 99 ENTERPRISE_DINGTALK_URL, msg, resp); 100 } catch (ClientProtocolException e) { 101 e.printStackTrace(); 102 } catch (IOException e) { 103 e.printStackTrace(); 104 } finally { 105 try { 106 httpClient.close(); 107 } catch (IOException e) { 108 e.printStackTrace(); 109 } 110 } 111 } 112 113 }
在server模塊的worker運行邏輯(TaskExecuteThread)加入:
@Override public void run() { String result = ""; TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()); try { logger.info("script path : {}", taskExecutionContext.getExecutePath()); // task node TaskNode taskNode = JSONObject.parseObject(taskExecutionContext.getTaskJson(), TaskNode.class); // copy hdfs/minio file to local downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), logger); taskExecutionContext.setTaskParams(taskNode.getParams()); taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath()); taskExecutionContext.setDefinedParams(getGlobalParamsMap()); // set task timeout setTaskTimeout(taskExecutionContext, taskNode); taskExecutionContext.setTaskAppId(String.format("%s_%s_%s", taskExecutionContext.getProcessDefineId(), taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId())); task = TaskManager.newTask(taskExecutionContext, taskLogger); // task init task.init(); result = String.format("task name=%s,task-id=%s,type=%s error",taskNode.getName(),taskNode.getId(),taskNode.getType()); // task handle task.handle(); // task result process task.after(); responseCommand.setStatus(task.getExitStatus().getCode()); responseCommand.setEndTime(new Date()); responseCommand.setProcessId(task.getProcessId()); responseCommand.setAppIds(task.getAppIds()); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus()); } catch (Exception e) { logger.error("task scheduler failure", e); kill(); responseCommand.setStatus(ExecutionStatus.FAILURE.getCode()); responseCommand.setEndTime(new Date()); responseCommand.setProcessId(task.getProcessId()); responseCommand.setAppIds(task.getAppIds()); } finally { taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT); taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); clearTaskExecPath(); // if(EnterpriseDingTalkUtils.isEnable()) { if (responseCommand.getStatus() == ExecutionStatus.FAILURE.getCode() || responseCommand.getStatus() == ExecutionStatus.KILL.getCode()) { EnterpriseDingTalkUtils.sendMessageToDingTalk(result); } } } }
參考:https://segmentfault.com/a/1190000022077236