【原】Spark Standalone如何通過start-all.sh啟動集群


1.start-all.sh腳本分析


圖1 start-all.sh部分內容

我們可以從start-all.sh腳本源文件中看到它其實是start-master.sh和start-slaves.sh兩個腳本的組合。

 


圖2 start-master.sh部分內容

由圖2可見,start-master.sh最終是通過類org.apache.spark.deploy.master.Master來完成的,待會兒我們分析.

 


圖3 start-slaves.sh部分內容

由圖3可見,start-slaves.sh是由slaves.sh和start-slave.sh來組成的。

 


圖4 slaves.sh和start-slave.sh部分內容

由圖4可見,可以看到slave節點是由org.apache.spark.deploy.worker.Worker類來完成的,master和slave的start都是由spark-daemon.sh腳本來運行的

2.具體執行類分析

腳本最后的執行者其實是類。我們具體看一下Master、Worker的執行過程。

2.1 Master節點啟動分析

Master.scala文件由一個Master類和其伴生對象組成。

從main函數開始,主要啟動Rpc環境,目前Spark中提供了兩種Rpc環境:Akka和Netty

def main(argStrings: Array[String]) {

SignalLogger.register(log)

val conf = new SparkConf

//命令轉換器,將通過腳本傳遞過來的參數轉化為類Master的變量

val args = new MasterArguments(argStrings, conf)

//啟動master並返回一個三元組:(1)Master Rpc環境(2)web UI綁定的端口號(3)REST server綁定的端口號

val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)

//等待直到RpcEnv退出

rpcEnv.awaitTermination()

}

(1)master參數主要是通過MasterArguments類來完成的,如下所示,由代碼可見master默認的端口是7070,web端口是8080

 


圖5 Master轉換類

(2)通過startRpcEnvAndEndpoint方法實現啟動Master並返回三元組,由Master RpcEnv、綁定的web UI端口號和REST server綁定的端口號

def startRpcEnvAndEndpoint(

host: String,

port: Int,

webUiPort: Int,

conf: SparkConf): (RpcEnv, Int, Option[Int]) = {

val securityMgr = new SecurityManager(conf)

//通過RpcEnvFactory生成RpcEnv,這里默認使用的是NettyRpcEnvFactory

val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)

//返回一個Master的遠程調用masterEndpoint

val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,

new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))

//綁定端口的請求

val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)

(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)

}

2.2 Worker節點啟動分析

Worker節點的啟動和Master的很類似,如下所示:

def main(argStrings: Array[String]) {

SignalLogger.register(log)

val conf = new SparkConf

//命令轉換器,將通過腳本傳遞過來的參數轉化為類Worker的變量

val args = new WorkerArguments(argStrings, conf)

//啟動Worker Rpc環境

val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,

args.memory, args.masters, args.workDir)

//等待直到RpcEnv退出

rpcEnv.awaitTermination()

}

(1)啟動Worker Rpc環境如下所示

def startRpcEnvAndEndpoint(

host: String,

port: Int,

webUiPort: Int,

cores: Int,

memory: Int,

masterUrls: Array[String],

workDir: String,

workerNumber: Option[Int] = None,

conf: SparkConf = new SparkConf): RpcEnv = {

// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments

//LocalSparkCluster啟動多個本地的sparkWorker RPC環境,系統名為sparkWorker1,sparkWorker2.。。

val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")

val securityMgr = new SecurityManager(conf)

//通過RpcEnvFactory生成RpcEnv,這里默認使用的是NettyRpcEnvFactory

val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)

//從RpcAddress得到master的地址,即從spark://host:port解析得到host和port封裝到RpcAddress

val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))

//返回一個Worker的遠程調用

rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,

masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr))

rpcEnv

}

下一篇我們繼續了解Spark Rpc,了解Master、Worker和Client是如何通信的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM