sparkLauncher 代碼記錄
1.概述
sparkLauncher 是一個在代碼里提交spark任務的類
這個類底層使用的依然是spark-submit腳本進行提交,通過ProcessBuilder 來設置相關環境參數調用
主要的方法有下面幾個
launch 提交一個任務,任務的提交輸出結果如何由用戶自己處理
createBuilder launch和startApplication方法中調用生成ProcessBuilder執行shell腳本對象的方法
startApplication 提交一個任務,並根據監聽任務狀態的改變來執行用戶指定的listener
2.launch方法
//這個方法是在設置完sparkLauncher的其它參數方法之后調用這個方法執行任務代碼
//通過createBuilder 創建一個Process 的對象然后執行
public Process launch() throws IOException {
Process childProc = createBuilder().start(); //在這里調用
if (redirectToLog) {
String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
new OutputRedirector(childProc.getInputStream(), loggerName, REDIRECTOR_FACTORY);
}
return childProc;
}
3.createBuilder方法
//這個方法是構造執行命令,生成ProcessBuilder的函數
//通過對象設置的javahome,sparkhome 找到對應的spark-submit腳本然后構建參數,生成cmd 命令,最后放到processBuilder中生成執行子進程
private ProcessBuilder createBuilder() {
List<String> cmd = new ArrayList<>();
String script = isWindows() ? "spark-submit.cmd" : "spark-submit"; //找到對應的腳本
cmd.add(join(File.separator, builder.getSparkHome(), "bin", script)); //設置腳本絕對路徑
cmd.addAll(builder.buildSparkSubmitArgs()); //腳本的設置參數
//下面的就是做一些目錄檢查,系統檢查,標准輸出錯誤輸出等設置之類的。
// Since the child process is a batch script, let's quote things so that special characters are
// preserved, otherwise the batch interpreter will mess up the arguments. Batch scripts are
// weird.
if (isWindows()) {
List<String> winCmd = new ArrayList<>();
for (String arg : cmd) {
winCmd.add(quoteForBatchScript(arg));
}
cmd = winCmd;
}
//這里創建的ProcessBuilder對象,並設置環境變量(sparkConf通過環境變量獲取spark相關配置,所以把參數設置到環境變量中可以讓sparkConf獲取到)
ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()]));
for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
pb.environment().put(e.getKey(), e.getValue());
}
if (workingDir != null) {
pb.directory(workingDir);
}
// Only one of redirectError and redirectError(...) can be specified.
// Similarly, if redirectToLog is specified, no other redirections should be specified.
checkState(!redirectErrorStream || errorStream == null,
"Cannot specify both redirectError() and redirectError(...) ");
checkState(!redirectToLog ||
(!redirectErrorStream && errorStream == null && outputStream == null),
"Cannot used redirectToLog() in conjunction with other redirection methods.");
if (redirectErrorStream || redirectToLog) {
pb.redirectErrorStream(true);
}
if (errorStream != null) {
pb.redirectError(errorStream);
}
if (outputStream != null) {
pb.redirectOutput(outputStream);
}
return pb;
}
3.startApplication方法
//這個方法也是用來執行任務的。只是比launch 多了一個狀態監控的功能,用戶可以通過提供sparkAppHandle.Listener來實現任務狀態變化時做一些動作
public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {
ChildProcAppHandle handle = LauncherServer.newAppHandle();//這里啟動一個LauncherServer的服務來接收app狀態變化的數據,更詳細的將會在其它部分記錄
for (SparkAppHandle.Listener l : listeners) {
handle.addListener(l); //這里把用戶提供的監聽放到執行函數里執行,具體如何實現將在其它部分記錄
}
//這里就是獲取spark-submit標准輸出數據的log文件名稱,如果沒有設置,就通過下面代碼進行生成和獲取,根據類名和包名自動生成
String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
ProcessBuilder pb = createBuilder();
// Only setup stderr + stdout to logger redirection if user has not otherwise configured output
// redirection.
if (loggerName == null) {
String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
if (appName == null) {
if (builder.appName != null) {
appName = builder.appName;
} else if (builder.mainClass != null) {
int dot = builder.mainClass.lastIndexOf(".");
if (dot >= 0 && dot < builder.mainClass.length() - 1) {
appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
} else {
appName = builder.mainClass;
}
} else if (builder.appResource != null) {
appName = new File(builder.appResource).getName();
} else {
appName = String.valueOf(COUNTER.incrementAndGet());
}
}
String loggerPrefix = getClass().getPackage().getName(); //獲取包名
loggerName = String.format("%s.app.%s", loggerPrefix, appName); //根據包名和appname生成log文件名
pb.redirectErrorStream(true);
}
//這里把LauncherServer的端口號通告到環境變量里,LauncherBackend就是通過環境變量獲取LauncherServer的端口號進行通信的。
//LauncherBackend 的具體詳情將會在其他部分進行記錄
pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT,
String.valueOf(LauncherServer.getServerInstance().getPort()));
pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret());
try {
handle.setChildProc(pb.start(), loggerName); //啟動spark-submit 提交任務,並把標准log輸出到之前設置的loggername里。
} catch (IOException ioe) {
handle.kill();
throw ioe;
}
return handle;
}