Scala進階之路-Spark底層通信小案例
作者:尹正傑
版權聲明:原創作品,謝絕轉載!否則將追究法律責任。
一.Spark Master和worker通信過程簡介
1>.Worker會向master注冊自己;
2>.Master收到worker的注冊信息之后,會告訴你已經注冊成功,並給worker發送啟動執行器的消息;
3>.Worker收到master的注冊消息之后,會定期向master匯報自己的狀態;
4>.master收到worker的心跳信息后,定期的更新worker的狀態,因為worker在發送心跳的時候會攜帶心跳發送的時間,master會監測master發送過來的心跳信時間和當前時間的差,如果大於5分鍾,master會監測發送過來的心跳時間和當前時間的差,如果大於5分鍾,則認為worker已死。然后master在分配任務的時候就不會給worker下發任務!
關於Master和Worker之間的通信機制,我們可以用以下一張圖介紹:
二.編寫源代碼
1>.Maven依賴
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>cn.org.yinzhengjie</groupId> 8 <artifactId>MyActor</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 <!-- 定義一下常量 --> 12 <properties> 13 <encoding>UTF-8</encoding> 14 <scala.version>2.11.8</scala.version> 15 <scala.compat.version>2.11</scala.compat.version> 16 <akka.version>2.4.17</akka.version> 17 </properties> 18 19 <dependencies> 20 <!-- 添加scala的依賴 --> 21 <dependency> 22 <groupId>org.scala-lang</groupId> 23 <artifactId>scala-library</artifactId> 24 <version>${scala.version}</version> 25 </dependency> 26 27 <!-- 添加akka的actor依賴 --> 28 <dependency> 29 <groupId>com.typesafe.akka</groupId> 30 <artifactId>akka-actor_${scala.compat.version}</artifactId> 31 <version>${akka.version}</version> 32 </dependency> 33 34 <!-- 多進程之間的Actor通信 --> 35 <dependency> 36 <groupId>com.typesafe.akka</groupId> 37 <artifactId>akka-remote_${scala.compat.version}</artifactId> 38 <version>${akka.version}</version> 39 </dependency> 40 </dependencies> 41 42 <!-- 指定插件--> 43 <build> 44 <!-- 指定源碼包和測試包的位置 --> 45 <sourceDirectory>src/main/scala</sourceDirectory> 46 <testSourceDirectory>src/test/scala</testSourceDirectory> 47 </build> 48 </project>
2>.MessageProtocol源代碼
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.spark 7 8 /** 9 * worker -> master ,即worker向master發送消息 10 */ 11 case class RegisterWorkerInfo(id: String, core: Int, ram: Int) // worker向master注冊自己(信息) 12 case class HearBeat(id: String) // worker給master發送心跳信息 13 14 /** 15 * master -> worker,即master向worker發送消息 16 */ 17 case object RegisteredWorkerInfo // master向worker發送注冊成功消息 18 case object SendHeartBeat // worker 發送發送給自己的消息,告訴自己說要開始周期性的向master發送心跳消息 19 case object CheckTimeOutWorker //master自己給自己發送一個檢查超時worker的信息,並啟動一個調度器,周期新檢測刪除超時worker 20 case object RemoveTimeOutWorker // master發送給自己的消息,刪除超時的worker 21 22 /** 23 * 定義存儲worker信息的類 24 * @param id : 每個worker的id是不變的且唯一的! 25 * @param core : 機器的核數 26 * @param ram : 內存大小 27 */ 28 case class WorkerInfo(val id: String, core: Int, ram: Int) { 29 //定義最后一次的心跳時間,初始值為null。 30 var lastHeartBeatTime: Long = _ 31 }
3>.SparkWorker源代碼
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.spark 7 8 import java.util.UUID 9 10 import akka.actor.{Actor, ActorSelection, ActorSystem, Props} 11 import com.typesafe.config.ConfigFactory 12 import scala.concurrent.duration._ // 導入時間單位 13 14 /** 15 * 定義主構造器,用於指定master的地址 16 */ 17 class SparkWorker(masterUrl: String) extends Actor{ 18 var masterProxy:ActorSelection = _ //定義master的引用對象(actorRef) 19 val workId:String = UUID.randomUUID().toString //定義worker的uuid,每個worker的id是不變的且唯一的! 20 /** 21 * 通過preStart方法拿到master的引用對象(actorRef),我們重寫該方法就會在receive方法執行之前執行!也就是拿到master對象只需要拿一次。 22 */ 23 override def preStart(): Unit = { 24 masterProxy = context.actorSelection(masterUrl) 25 } 26 override def receive: Receive = { 27 case "started" => { // 自己已就緒 28 // 向master注冊自己的信息,id, core, ram 29 masterProxy ! RegisterWorkerInfo(workId, 4, 32 * 1024) // 此時master會收到該條信息 30 } 31 32 /** 33 * 處理master發送給自己的注冊成功消息 34 */ 35 case RegisteredWorkerInfo => { 36 import context.dispatcher // 使用調度器時候必須導入dispatcher,因為該包涉及到隱式轉換的東西。 37 /** 38 * worker通過"context.system.scheduler.schedule"啟動一個定時器,定時向master 發送心跳信息,需要指定 39 * 四個參數: 40 * 第一個參數是需要指定延時時間,此處指定的間隔時間為0毫秒; 41 * 第二個參數是間隔時間,即指定定時器的周期性執行時間,我們這里指定為1秒; 42 * 第三個參數是發送消息給誰,我們這里指定發送消息給自己,使用變量self即可; 43 * 第四個參數是指發送消息的具體內容; 44 * 注意:由於我們將消息周期性的發送給自己,因此我們自己需要接受消息並處理,也就是需要定義下面的SendHeartBeat 45 */ 46 context.system.scheduler.schedule(0 millis, 1000 millis, self, SendHeartBeat) 47 } 48 case SendHeartBeat => { 49 // 開始向master發送心跳了 50 println(s"------- $workId 發送心跳 -------") 51 masterProxy ! HearBeat(workId) // 此時master將會收到心跳信息 52 } 53 } 54 } 55 56 57 object SparkWorker { 58 def main(args: Array[String]): Unit = { 59 // 檢驗參數 60 if(args.length != 4) { 61 println( 62 """ 63 |請輸入參數:<host> <port> <workName> <masterURL> 64 """.stripMargin) 65 sys.exit() // 退出程序 66 } 67 /** 68 * 定義參數,主機,端口號,worker名稱以及master的URL。 69 */ 70 val host = args(0) 71 val port = args(1) 72 val workName = args(2) 73 val masterURL = args(3) 74 /** 75 * 我們使用ConfigFactory.parseString來創建讀取參數配置的對象config 76 */ 77 val config = ConfigFactory.parseString( 78 s""" 79 |akka.actor.provider="akka.remote.RemoteActorRefProvider" 80 |akka.remote.netty.tcp.hostname=$host 81 |akka.remote.netty.tcp.port=$port 82 """.stripMargin) 83 val actorSystem = ActorSystem("sparkWorker", config) 84 /** 85 * 創建worker的actorRef 86 */ 87 val workerActorRef = actorSystem.actorOf(Props(new SparkWorker(masterURL)), workName) 88 workerActorRef ! "started" //給自己發送一個以啟動的消息,表示自己已經就緒了 89 } 90 }
4>.SparkMaster源代碼
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.spark 7 8 import akka.actor.{Actor, ActorSystem, Props} 9 import com.typesafe.config.ConfigFactory 10 import scala.concurrent.duration._ 11 12 class SparkMaster extends Actor{ 13 14 // 定義存儲worker的信息的saveWorkerInfo對象 15 val saveWorkerInfo = collection.mutable.HashMap[String, WorkerInfo]() 16 17 // override def preStart(): Unit = { 18 // context.system.scheduler.schedule(0 millis, 6000 millis, self, RemoveTimeOutWorker) 19 // } 20 21 override def receive: Receive = { 22 /** 23 * 處理收到worker注冊過來的信息 24 */ 25 case RegisterWorkerInfo(wkId, core, ram) => { 26 /** 27 * 存儲之前需要判斷之前是否已經存儲過了,如果沒有存儲就以wkId為key將worker的信息存儲起來,存儲到HashMap, 28 */ 29 if (!saveWorkerInfo.contains(wkId)) { 30 val workerInfo = new WorkerInfo(wkId, core, ram) 31 saveWorkerInfo += ((wkId, workerInfo)) 32 /** 33 * master存儲完worker注冊的數據之后,要告訴worker說你已經注冊成功 34 */ 35 sender() ! RegisteredWorkerInfo // 此時worker會收到注冊成功消息 36 } 37 } 38 /** 39 * master收到worker的心跳消息之后,更新woker的上一次心跳時間 40 */ 41 case HearBeat(wkId) => { 42 val workerInfo = saveWorkerInfo(wkId) 43 val currentTime = System.currentTimeMillis() 44 workerInfo.lastHeartBeatTime = currentTime // 更改心跳時間 45 } 46 case CheckTimeOutWorker => { 47 import context.dispatcher // 使用調度器時候必須導入dispatcher,因為該包涉及到隱式轉換的東西。 48 context.system.scheduler.schedule(0 millis, 5000 millis, self, RemoveTimeOutWorker) 49 } 50 case RemoveTimeOutWorker => { 51 /** 52 * 將hashMap中的所有的value都拿出來,然后查看當前時間和上一次心跳時間差是否超過三次心跳時間, 53 * 即三次沒有發送心跳信息就認為超時,每次心跳時間默認為1000毫秒,三次則為3000毫秒 54 */ 55 val workerInfos = saveWorkerInfo.values 56 val currentTime = System.currentTimeMillis() 57 58 59 workerInfos 60 .filter(workerInfo => currentTime - workerInfo.lastHeartBeatTime > 3000) //過濾超時的worker 61 .foreach(workerInfo => saveWorkerInfo.remove(workerInfo.id)) //將過濾超時的worker刪除掉 62 println(s"====== 還剩 ${saveWorkerInfo.size} 存活的Worker ======") 63 } 64 } 65 } 66 67 object SparkMaster { 68 private var name = "" 69 private val age = 100 70 def main(args: Array[String]): Unit = { 71 // 檢驗參數 72 if(args.length != 3) { 73 println( 74 """ 75 |請輸入參數:<host> <port> <masterName> 76 """.stripMargin) 77 sys.exit() // 退出程序 78 } 79 /** 80 * 定義參數,主機,端口號,master名稱 81 */ 82 val host = args(0) 83 val port = args(1) 84 val masterName = args(2) 85 /** 86 * 我們使用ConfigFactory.parseString來創建讀取參數配置的對象config 87 */ 88 val config = ConfigFactory.parseString( 89 s""" 90 |akka.actor.provider="akka.remote.RemoteActorRefProvider" 91 |akka.remote.netty.tcp.hostname=$host 92 |akka.remote.netty.tcp.port=$port 93 """.stripMargin) 94 95 val actorSystem = ActorSystem("sparkMaster", config) 96 val masterActorRef = actorSystem.actorOf(Props[SparkMaster], masterName) 97 /** 98 * 自己給自己發送一個消息,去啟動一個調度器,定期的檢測HashMap中超時的worker 99 */ 100 masterActorRef ! CheckTimeOutWorker 101 } 102 }
三.本機測試
1>.啟動master端
配置參數如下:
127.0.0.1 8888 master
2>.啟動woker端
兩個worker的配置參數如下: 127.0.0.1 6665 worker akka.tcp://sparkMaster@127.0.0.1:8888//user/master 127.0.0.1 6666 worker akka.tcp://sparkMaster@127.0.0.1:8888//user/master
服務端輸出如下:
四.master worker打包部署到linux多台服務測試
1>.打包SparkMaster
第一步:修改Maven配置如下:

1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 <groupId>cn.org.yinzhengjie</groupId> 7 <artifactId>MyActor</artifactId> 8 <version>1.0-SNAPSHOT</version> 9 <!-- 定義一下常量 --> 10 <properties> 11 <encoding>UTF-8</encoding> 12 <scala.version>2.11.8</scala.version> 13 <scala.compat.version>2.11</scala.compat.version> 14 <akka.version>2.4.17</akka.version> 15 </properties> 16 <dependencies> 17 <!-- 添加scala的依賴 --> 18 <dependency> 19 <groupId>org.scala-lang</groupId> 20 <artifactId>scala-library</artifactId> 21 <version>${scala.version}</version> 22 </dependency> 23 <!-- 添加akka的actor依賴 --> 24 <dependency> 25 <groupId>com.typesafe.akka</groupId> 26 <artifactId>akka-actor_${scala.compat.version}</artifactId> 27 <version>${akka.version}</version> 28 </dependency> 29 <!-- 多進程之間的Actor通信 --> 30 <dependency> 31 <groupId>com.typesafe.akka</groupId> 32 <artifactId>akka-remote_${scala.compat.version}</artifactId> 33 <version>${akka.version}</version> 34 </dependency> 35 </dependencies> 36 <!-- 指定插件--> 37 <build> 38 <!-- 指定源碼包和測試包的位置 --> 39 <sourceDirectory>src/main/scala</sourceDirectory> 40 <testSourceDirectory>src/test/scala</testSourceDirectory> 41 <plugins> 42 <!-- 指定編譯scala的插件 --> 43 <plugin> 44 <groupId>net.alchim31.maven</groupId> 45 <artifactId>scala-maven-plugin</artifactId> 46 <version>3.2.2</version> 47 <executions> 48 <execution> 49 <goals> 50 <goal>compile</goal> 51 <goal>testCompile</goal> 52 </goals> 53 <configuration> 54 <args> 55 <arg>-dependencyfile</arg> 56 <arg>${project.build.directory}/.scala_dependencies</arg> 57 </args> 58 </configuration> 59 </execution> 60 </executions> 61 </plugin> 62 <!-- maven打包的插件 --> 63 <plugin> 64 <groupId>org.apache.maven.plugins</groupId> 65 <artifactId>maven-shade-plugin</artifactId> 66 <version>2.4.3</version> 67 <executions> 68 <execution> 69 <phase>package</phase> 70 <goals> 71 <goal>shade</goal> 72 </goals> 73 <configuration> 74 <filters> 75 <filter> 76 <artifact>*:*</artifact> 77 <excludes> 78 <exclude>META-INF/*.SF</exclude> 79 <exclude>META-INF/*.DSA</exclude> 80 <exclude>META-INF/*.RSA</exclude> 81 </excludes> 82 </filter> 83 </filters> 84 <transformers> 85 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> 86 <resource>reference.conf</resource> 87 </transformer> 88 <!-- 指定main方法:cn.org.yinzhengjie.spark.SparkMaster --> 89 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> 90 <mainClass>cn.org.yinzhengjie.spark.SparkMaster</mainClass> 91 </transformer> 92 </transformers> 93 </configuration> 94 </execution> 95 </executions> 96 </plugin> 97 </plugins> 98 </build> 99 </project>
第二步:點擊package開始打包:
第三步:查看依賴包內部結構:
2>.打包SparkWorker
第一步:修改Maven配置如下:

1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 <groupId>cn.org.yinzhengjie</groupId> 7 <artifactId>MyActor</artifactId> 8 <version>1.0-SNAPSHOT</version> 9 <!-- 定義一下常量 --> 10 <properties> 11 <encoding>UTF-8</encoding> 12 <scala.version>2.11.8</scala.version> 13 <scala.compat.version>2.11</scala.compat.version> 14 <akka.version>2.4.17</akka.version> 15 </properties> 16 <dependencies> 17 <!-- 添加scala的依賴 --> 18 <dependency> 19 <groupId>org.scala-lang</groupId> 20 <artifactId>scala-library</artifactId> 21 <version>${scala.version}</version> 22 </dependency> 23 <!-- 添加akka的actor依賴 --> 24 <dependency> 25 <groupId>com.typesafe.akka</groupId> 26 <artifactId>akka-actor_${scala.compat.version}</artifactId> 27 <version>${akka.version}</version> 28 </dependency> 29 <!-- 多進程之間的Actor通信 --> 30 <dependency> 31 <groupId>com.typesafe.akka</groupId> 32 <artifactId>akka-remote_${scala.compat.version}</artifactId> 33 <version>${akka.version}</version> 34 </dependency> 35 </dependencies> 36 <!-- 指定插件--> 37 <build> 38 <!-- 指定源碼包和測試包的位置 --> 39 <sourceDirectory>src/main/scala</sourceDirectory> 40 <testSourceDirectory>src/test/scala</testSourceDirectory> 41 <plugins> 42 <!-- 指定編譯scala的插件 --> 43 <plugin> 44 <groupId>net.alchim31.maven</groupId> 45 <artifactId>scala-maven-plugin</artifactId> 46 <version>3.2.2</version> 47 <executions> 48 <execution> 49 <goals> 50 <goal>compile</goal> 51 <goal>testCompile</goal> 52 </goals> 53 <configuration> 54 <args> 55 <arg>-dependencyfile</arg> 56 <arg>${project.build.directory}/.scala_dependencies</arg> 57 </args> 58 </configuration> 59 </execution> 60 </executions> 61 </plugin> 62 <!-- maven打包的插件 --> 63 <plugin> 64 <groupId>org.apache.maven.plugins</groupId> 65 <artifactId>maven-shade-plugin</artifactId> 66 <version>2.4.3</version> 67 <executions> 68 <execution> 69 <phase>package</phase> 70 <goals> 71 <goal>shade</goal> 72 </goals> 73 <configuration> 74 <filters> 75 <filter> 76 <artifact>*:*</artifact> 77 <excludes> 78 <exclude>META-INF/*.SF</exclude> 79 <exclude>META-INF/*.DSA</exclude> 80 <exclude>META-INF/*.RSA</exclude> 81 </excludes> 82 </filter> 83 </filters> 84 <transformers> 85 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> 86 <resource>reference.conf</resource> 87 </transformer> 88 <!-- 指定main方法:cn.org.yinzhengjie.spark.SparkWorker --> 89 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> 90 <mainClass>cn.org.yinzhengjie.spark.SparkWorker</mainClass> 91 </transformer> 92 </transformers> 93 </configuration> 94 </execution> 95 </executions> 96 </plugin> 97 </plugins> 98 </build> 99 </project>
接下來的兩步還是和上面的步驟一直,將打包完成后的文件改名並查看主類信息如下:
3>.開啟三台虛擬機並在master節點上傳master.jar並運行
[yinzhengjie@s101 download]$ ll total 67320 -rw-r--r--. 1 yinzhengjie yinzhengjie 20124547 Jul 31 20:42 master.jar -rw-r--r--. 1 yinzhengjie yinzhengjie 28678231 Jul 20 21:18 scala-2.11.8.tgz -rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 21:52 worker.jar [yinzhengjie@s101 download]$ [yinzhengjie@s101 download]$ [yinzhengjie@s101 download]$ java -jar master.jar 172.16.30.101 8888 master
4>.將worker.jar包上傳到另外的兩個節點並運行,如下:
172.16.30.102節點操作如下: [yinzhengjie@s102 download]$ ll total 19656 -rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 22:01 worker.jar [yinzhengjie@s102 download]$ [yinzhengjie@s102 download]$ java -jar worker.jar 172.16.30.102 6665 worker akka.tcp://sparkMaster@172.16.30.101:8888//user/master [yinzhengjie@s102 download]$ 172.16.30.103節點操作如下: [yinzhengjie@s103 download]$ ll total 19656 -rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 22:00 worker.jar [yinzhengjie@s103 download]$ [yinzhengjie@s103 download]$ [yinzhengjie@s103 download]$ java -jar worker.jar 172.16.30.103 6665 worker akka.tcp://sparkMaster@172.16.30.101:8888//user/master
172.16.30.102節點操作如下:
172.16.30.103節點操作如下:
172.16.30.101輸出信息如下: