Scala進階之路-Spark底層通信小案例


           Scala進階之路-Spark底層通信小案例

                            作者:尹正傑

版權聲明:原創作品,謝絕轉載!否則將追究法律責任。

 

 

 

 

一.Spark Master和worker通信過程簡介

  1>.Worker會向master注冊自己;

  2>.Master收到worker的注冊信息之后,會告訴你已經注冊成功,並給worker發送啟動執行器的消息;

  3>.Worker收到master的注冊消息之后,會定期向master匯報自己的狀態;

  4>.master收到worker的心跳信息后,定期的更新worker的狀態,因為worker在發送心跳的時候會攜帶心跳發送的時間,master會監測master發送過來的心跳信時間和當前時間的差,如果大於5分鍾,master會監測發送過來的心跳時間和當前時間的差,如果大於5分鍾,則認為worker已死。然后master在分配任務的時候就不會給worker下發任務!

  關於Master和Worker之間的通信機制,我們可以用以下一張圖介紹:

二.編寫源代碼

1>.Maven依賴

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6 
 7     <groupId>cn.org.yinzhengjie</groupId>
 8     <artifactId>MyActor</artifactId>
 9     <version>1.0-SNAPSHOT</version>
10 
11     <!-- 定義一下常量 -->
12     <properties>
13         <encoding>UTF-8</encoding>
14         <scala.version>2.11.8</scala.version>
15         <scala.compat.version>2.11</scala.compat.version>
16         <akka.version>2.4.17</akka.version>
17     </properties>
18 
19     <dependencies>
20         <!-- 添加scala的依賴 -->
21         <dependency>
22             <groupId>org.scala-lang</groupId>
23             <artifactId>scala-library</artifactId>
24             <version>${scala.version}</version>
25         </dependency>
26 
27         <!-- 添加akka的actor依賴 -->
28         <dependency>
29             <groupId>com.typesafe.akka</groupId>
30             <artifactId>akka-actor_${scala.compat.version}</artifactId>
31             <version>${akka.version}</version>
32         </dependency>
33 
34         <!-- 多進程之間的Actor通信 -->
35         <dependency>
36             <groupId>com.typesafe.akka</groupId>
37             <artifactId>akka-remote_${scala.compat.version}</artifactId>
38             <version>${akka.version}</version>
39         </dependency>
40     </dependencies>
41 
42     <!-- 指定插件-->
43     <build>
44         <!-- 指定源碼包和測試包的位置 -->
45         <sourceDirectory>src/main/scala</sourceDirectory>
46         <testSourceDirectory>src/test/scala</testSourceDirectory>
47     </build>
48 </project>

2>.MessageProtocol源代碼

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.spark
 7 
 8 /**
 9   * worker -> master ,即worker向master發送消息
10   */
11 case class RegisterWorkerInfo(id: String, core: Int, ram: Int)          // worker向master注冊自己(信息)
12 case class HearBeat(id: String)                                        // worker給master發送心跳信息
13 
14 /**
15   * master -> worker,即master向worker發送消息
16   */
17 case object RegisteredWorkerInfo                    // master向worker發送注冊成功消息
18 case object SendHeartBeat                           // worker 發送發送給自己的消息,告訴自己說要開始周期性的向master發送心跳消息
19 case object CheckTimeOutWorker                      //master自己給自己發送一個檢查超時worker的信息,並啟動一個調度器,周期新檢測刪除超時worker
20 case object RemoveTimeOutWorker                     // master發送給自己的消息,刪除超時的worker
21 
22 /**
23   * 定義存儲worker信息的類
24   * @param id          :    每個worker的id是不變的且唯一的!
25   * @param core        :    機器的核數
26   * @param ram         :    內存大小
27   */
28 case class WorkerInfo(val id: String, core: Int, ram: Int) {
29     //定義最后一次的心跳時間,初始值為null。
30     var lastHeartBeatTime: Long = _
31 }

3>.SparkWorker源代碼

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.spark
 7 
 8 import java.util.UUID
 9 
10 import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
11 import com.typesafe.config.ConfigFactory
12 import scala.concurrent.duration._          // 導入時間單位
13 
14 /**
15   * 定義主構造器,用於指定master的地址
16   */
17 class SparkWorker(masterUrl: String) extends Actor{
18     var masterProxy:ActorSelection = _                  //定義master的引用對象(actorRef)
19     val workId:String = UUID.randomUUID().toString      //定義worker的uuid,每個worker的id是不變的且唯一的!
20     /**
21       * 通過preStart方法拿到master的引用對象(actorRef),我們重寫該方法就會在receive方法執行之前執行!也就是拿到master對象只需要拿一次。
22       */
23     override def preStart(): Unit = {
24         masterProxy = context.actorSelection(masterUrl)
25     }
26     override def receive: Receive = {
27         case "started" => { // 自己已就緒
28             // 向master注冊自己的信息,id, core, ram
29             masterProxy ! RegisterWorkerInfo(workId, 4, 32 * 1024) // 此時master會收到該條信息
30         }
31 
32         /**
33           * 處理master發送給自己的注冊成功消息
34           */
35         case RegisteredWorkerInfo => {
36             import context.dispatcher // 使用調度器時候必須導入dispatcher,因為該包涉及到隱式轉換的東西。
37             /**
38               *         worker通過"context.system.scheduler.schedule"啟動一個定時器,定時向master 發送心跳信息,需要指定
39               * 四個參數:
40               *         第一個參數是需要指定延時時間,此處指定的間隔時間為0毫秒;
41               *         第二個參數是間隔時間,即指定定時器的周期性執行時間,我們這里指定為1秒;
42               *         第三個參數是發送消息給誰,我們這里指定發送消息給自己,使用變量self即可;
43               *         第四個參數是指發送消息的具體內容;
44               *         注意:由於我們將消息周期性的發送給自己,因此我們自己需要接受消息並處理,也就是需要定義下面的SendHeartBeat
45               */
46             context.system.scheduler.schedule(0 millis, 1000 millis, self, SendHeartBeat)
47         }
48         case SendHeartBeat => {
49             // 開始向master發送心跳了
50             println(s"------- $workId 發送心跳 -------")
51             masterProxy ! HearBeat(workId) // 此時master將會收到心跳信息
52         }
53     }
54 }
55 
56 
57 object SparkWorker {
58     def main(args: Array[String]): Unit = {
59         // 檢驗參數
60         if(args.length != 4) {
61             println(
62                 """
63                   |請輸入參數:<host> <port> <workName> <masterURL>
64                 """.stripMargin)
65             sys.exit() // 退出程序
66         }
67         /**
68           * 定義參數,主機,端口號,worker名稱以及master的URL。
69           */
70         val host = args(0)
71         val port = args(1)
72         val workName = args(2)
73         val masterURL = args(3)
74         /**
75           * 我們使用ConfigFactory.parseString來創建讀取參數配置的對象config
76           */
77         val config = ConfigFactory.parseString(
78             s"""
79               |akka.actor.provider="akka.remote.RemoteActorRefProvider"
80               |akka.remote.netty.tcp.hostname=$host
81               |akka.remote.netty.tcp.port=$port
82             """.stripMargin)
83         val actorSystem = ActorSystem("sparkWorker", config)
84         /**
85           * 創建worker的actorRef
86           */
87         val workerActorRef = actorSystem.actorOf(Props(new SparkWorker(masterURL)), workName)
88         workerActorRef ! "started"      //給自己發送一個以啟動的消息,表示自己已經就緒了
89     }
90 }

4>.SparkMaster源代碼

  1 /*
  2 @author :yinzhengjie
  3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
  4 EMAIL:y1053419035@qq.com
  5 */
  6 package cn.org.yinzhengjie.spark
  7 
  8 import akka.actor.{Actor, ActorSystem, Props}
  9 import com.typesafe.config.ConfigFactory
 10 import scala.concurrent.duration._
 11 
 12 class SparkMaster extends Actor{
 13 
 14     // 定義存儲worker的信息的saveWorkerInfo對象
 15     val saveWorkerInfo = collection.mutable.HashMap[String, WorkerInfo]()
 16 
 17 //    override def preStart(): Unit = {
 18 //        context.system.scheduler.schedule(0 millis, 6000 millis, self, RemoveTimeOutWorker)
 19 //    }
 20 
 21     override def receive: Receive = {
 22         /**
 23           * 處理收到worker注冊過來的信息
 24           */
 25         case RegisterWorkerInfo(wkId, core, ram) => {
 26             /**
 27               * 存儲之前需要判斷之前是否已經存儲過了,如果沒有存儲就以wkId為key將worker的信息存儲起來,存儲到HashMap,
 28               */
 29             if (!saveWorkerInfo.contains(wkId)) {
 30                 val workerInfo = new WorkerInfo(wkId, core, ram)
 31                 saveWorkerInfo += ((wkId, workerInfo))
 32                 /**
 33                   * master存儲完worker注冊的數據之后,要告訴worker說你已經注冊成功
 34                   */
 35                 sender() ! RegisteredWorkerInfo // 此時worker會收到注冊成功消息
 36             }
 37         }
 38         /**
 39           *  master收到worker的心跳消息之后,更新woker的上一次心跳時間
 40           */
 41         case HearBeat(wkId) => {
 42             val workerInfo = saveWorkerInfo(wkId)
 43             val currentTime = System.currentTimeMillis()
 44             workerInfo.lastHeartBeatTime = currentTime          // 更改心跳時間
 45         }
 46         case CheckTimeOutWorker => {
 47             import context.dispatcher // 使用調度器時候必須導入dispatcher,因為該包涉及到隱式轉換的東西。
 48             context.system.scheduler.schedule(0 millis, 5000 millis, self, RemoveTimeOutWorker)
 49         }
 50         case RemoveTimeOutWorker => {
 51             /**
 52               *         將hashMap中的所有的value都拿出來,然后查看當前時間和上一次心跳時間差是否超過三次心跳時間,
 53               * 即三次沒有發送心跳信息就認為超時,每次心跳時間默認為1000毫秒,三次則為3000毫秒
 54               */
 55             val workerInfos = saveWorkerInfo.values
 56             val currentTime = System.currentTimeMillis()
 57 
 58 
 59             workerInfos
 60               .filter(workerInfo => currentTime - workerInfo.lastHeartBeatTime > 3000)        //過濾超時的worker
 61               .foreach(workerInfo => saveWorkerInfo.remove(workerInfo.id))                    //將過濾超時的worker刪除掉
 62             println(s"====== 還剩 ${saveWorkerInfo.size} 存活的Worker ======")
 63         }
 64     }
 65 }
 66 
 67 object SparkMaster {
 68     private var name = ""
 69     private val age = 100
 70     def main(args: Array[String]): Unit = {
 71         // 檢驗參數
 72         if(args.length != 3) {
 73             println(
 74                 """
 75                   |請輸入參數:<host> <port> <masterName>
 76                 """.stripMargin)
 77             sys.exit() // 退出程序
 78         }
 79         /**
 80           * 定義參數,主機,端口號,master名稱
 81           */
 82         val host = args(0)
 83         val port = args(1)
 84         val masterName = args(2)
 85         /**
 86           * 我們使用ConfigFactory.parseString來創建讀取參數配置的對象config
 87           */
 88         val config = ConfigFactory.parseString(
 89             s"""
 90                |akka.actor.provider="akka.remote.RemoteActorRefProvider"
 91                |akka.remote.netty.tcp.hostname=$host
 92                |akka.remote.netty.tcp.port=$port
 93             """.stripMargin)
 94 
 95         val actorSystem = ActorSystem("sparkMaster", config)
 96         val masterActorRef = actorSystem.actorOf(Props[SparkMaster], masterName)
 97         /**
 98           * 自己給自己發送一個消息,去啟動一個調度器,定期的檢測HashMap中超時的worker
 99           */
100         masterActorRef ! CheckTimeOutWorker
101     }
102 }

 

三.本機測試

1>.啟動master端

配置參數如下:
127.0.0.1 8888 master

2>.啟動woker端

兩個worker的配置參數如下:
127.0.0.1 6665 worker akka.tcp://sparkMaster@127.0.0.1:8888//user/master
127.0.0.1 6666 worker akka.tcp://sparkMaster@127.0.0.1:8888//user/master

  服務端輸出如下:

 

四.master worker打包部署到linux多台服務測試

1>.打包SparkMaster

  第一步:修改Maven配置如下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6     <groupId>cn.org.yinzhengjie</groupId>
 7     <artifactId>MyActor</artifactId>
 8     <version>1.0-SNAPSHOT</version>
 9     <!-- 定義一下常量 -->
10     <properties>
11         <encoding>UTF-8</encoding>
12         <scala.version>2.11.8</scala.version>
13         <scala.compat.version>2.11</scala.compat.version>
14         <akka.version>2.4.17</akka.version>
15     </properties>
16     <dependencies>
17         <!-- 添加scala的依賴 -->
18         <dependency>
19             <groupId>org.scala-lang</groupId>
20             <artifactId>scala-library</artifactId>
21             <version>${scala.version}</version>
22         </dependency>
23         <!-- 添加akka的actor依賴 -->
24         <dependency>
25             <groupId>com.typesafe.akka</groupId>
26             <artifactId>akka-actor_${scala.compat.version}</artifactId>
27             <version>${akka.version}</version>
28         </dependency>
29         <!-- 多進程之間的Actor通信 -->
30         <dependency>
31             <groupId>com.typesafe.akka</groupId>
32             <artifactId>akka-remote_${scala.compat.version}</artifactId>
33             <version>${akka.version}</version>
34         </dependency>
35     </dependencies>
36     <!-- 指定插件-->
37     <build>
38         <!-- 指定源碼包和測試包的位置 -->
39         <sourceDirectory>src/main/scala</sourceDirectory>
40         <testSourceDirectory>src/test/scala</testSourceDirectory>
41         <plugins>
42             <!-- 指定編譯scala的插件 -->
43             <plugin>
44                 <groupId>net.alchim31.maven</groupId>
45                 <artifactId>scala-maven-plugin</artifactId>
46                 <version>3.2.2</version>
47                 <executions>
48                     <execution>
49                         <goals>
50                             <goal>compile</goal>
51                             <goal>testCompile</goal>
52                         </goals>
53                         <configuration>
54                             <args>
55                                 <arg>-dependencyfile</arg>
56                                 <arg>${project.build.directory}/.scala_dependencies</arg>
57                             </args>
58                         </configuration>
59                     </execution>
60                 </executions>
61             </plugin>
62             <!-- maven打包的插件 -->
63             <plugin>
64                 <groupId>org.apache.maven.plugins</groupId>
65                 <artifactId>maven-shade-plugin</artifactId>
66                 <version>2.4.3</version>
67                 <executions>
68                     <execution>
69                         <phase>package</phase>
70                         <goals>
71                             <goal>shade</goal>
72                         </goals>
73                         <configuration>
74                             <filters>
75                                 <filter>
76                                     <artifact>*:*</artifact>
77                                     <excludes>
78                                         <exclude>META-INF/*.SF</exclude>
79                                         <exclude>META-INF/*.DSA</exclude>
80                                         <exclude>META-INF/*.RSA</exclude>
81                                     </excludes>
82                                 </filter>
83                             </filters>
84                             <transformers>
85                                 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
86                                     <resource>reference.conf</resource>
87                                 </transformer>
88                                 <!-- 指定main方法:cn.org.yinzhengjie.spark.SparkMaster -->
89                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
90                                     <mainClass>cn.org.yinzhengjie.spark.SparkMaster</mainClass>
91                                 </transformer>
92                             </transformers>
93                         </configuration>
94                     </execution>
95                 </executions>
96             </plugin>
97         </plugins>
98     </build>
99 </project>
指定main方法:cn.org.yinzhengjie.spark.SparkMaster

  第二步:點擊package開始打包:

  第三步:查看依賴包內部結構:

 

2>.打包SparkWorker

  第一步:修改Maven配置如下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6     <groupId>cn.org.yinzhengjie</groupId>
 7     <artifactId>MyActor</artifactId>
 8     <version>1.0-SNAPSHOT</version>
 9     <!-- 定義一下常量 -->
10     <properties>
11         <encoding>UTF-8</encoding>
12         <scala.version>2.11.8</scala.version>
13         <scala.compat.version>2.11</scala.compat.version>
14         <akka.version>2.4.17</akka.version>
15     </properties>
16     <dependencies>
17         <!-- 添加scala的依賴 -->
18         <dependency>
19             <groupId>org.scala-lang</groupId>
20             <artifactId>scala-library</artifactId>
21             <version>${scala.version}</version>
22         </dependency>
23         <!-- 添加akka的actor依賴 -->
24         <dependency>
25             <groupId>com.typesafe.akka</groupId>
26             <artifactId>akka-actor_${scala.compat.version}</artifactId>
27             <version>${akka.version}</version>
28         </dependency>
29         <!-- 多進程之間的Actor通信 -->
30         <dependency>
31             <groupId>com.typesafe.akka</groupId>
32             <artifactId>akka-remote_${scala.compat.version}</artifactId>
33             <version>${akka.version}</version>
34         </dependency>
35     </dependencies>
36     <!-- 指定插件-->
37     <build>
38         <!-- 指定源碼包和測試包的位置 -->
39         <sourceDirectory>src/main/scala</sourceDirectory>
40         <testSourceDirectory>src/test/scala</testSourceDirectory>
41         <plugins>
42             <!-- 指定編譯scala的插件 -->
43             <plugin>
44                 <groupId>net.alchim31.maven</groupId>
45                 <artifactId>scala-maven-plugin</artifactId>
46                 <version>3.2.2</version>
47                 <executions>
48                     <execution>
49                         <goals>
50                             <goal>compile</goal>
51                             <goal>testCompile</goal>
52                         </goals>
53                         <configuration>
54                             <args>
55                                 <arg>-dependencyfile</arg>
56                                 <arg>${project.build.directory}/.scala_dependencies</arg>
57                             </args>
58                         </configuration>
59                     </execution>
60                 </executions>
61             </plugin>
62             <!-- maven打包的插件 -->
63             <plugin>
64                 <groupId>org.apache.maven.plugins</groupId>
65                 <artifactId>maven-shade-plugin</artifactId>
66                 <version>2.4.3</version>
67                 <executions>
68                     <execution>
69                         <phase>package</phase>
70                         <goals>
71                             <goal>shade</goal>
72                         </goals>
73                         <configuration>
74                             <filters>
75                                 <filter>
76                                     <artifact>*:*</artifact>
77                                     <excludes>
78                                         <exclude>META-INF/*.SF</exclude>
79                                         <exclude>META-INF/*.DSA</exclude>
80                                         <exclude>META-INF/*.RSA</exclude>
81                                     </excludes>
82                                 </filter>
83                             </filters>
84                             <transformers>
85                                 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
86                                     <resource>reference.conf</resource>
87                                 </transformer>
88                                 <!-- 指定main方法:cn.org.yinzhengjie.spark.SparkWorker -->
89                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
90                                     <mainClass>cn.org.yinzhengjie.spark.SparkWorker</mainClass>
91                                 </transformer>
92                             </transformers>
93                         </configuration>
94                     </execution>
95                 </executions>
96             </plugin>
97         </plugins>
98     </build>
99 </project>
指定main方法:cn.org.yinzhengjie.spark.SparkWorker

  接下來的兩步還是和上面的步驟一直,將打包完成后的文件改名並查看主類信息如下:

3>.開啟三台虛擬機並在master節點上傳master.jar並運行

[yinzhengjie@s101 download]$ ll
total 67320
-rw-r--r--. 1 yinzhengjie yinzhengjie 20124547 Jul 31 20:42 master.jar
-rw-r--r--. 1 yinzhengjie yinzhengjie 28678231 Jul 20 21:18 scala-2.11.8.tgz
-rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 21:52 worker.jar
[yinzhengjie@s101 download]$ 
[yinzhengjie@s101 download]$ 
[yinzhengjie@s101 download]$ java -jar master.jar 172.16.30.101 8888 master

 

4>.將worker.jar包上傳到另外的兩個節點並運行,如下:

  172.16.30.102節點操作如下:
[yinzhengjie@s102 download]$ ll
total 19656
-rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 22:01 worker.jar
[yinzhengjie@s102 download]$ 
[yinzhengjie@s102 download]$ java -jar worker.jar 172.16.30.102 6665 worker akka.tcp://sparkMaster@172.16.30.101:8888//user/master
[yinzhengjie@s102 download]$ 

  172.16.30.103節點操作如下:
[yinzhengjie@s103 download]$ ll
total 19656
-rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 22:00 worker.jar
[yinzhengjie@s103 download]$ 
[yinzhengjie@s103 download]$ 
[yinzhengjie@s103 download]$ java -jar worker.jar 172.16.30.103 6665 worker akka.tcp://sparkMaster@172.16.30.101:8888//user/master

  172.16.30.102節點操作如下:

 

  172.16.30.103節點操作如下:

  172.16.30.101輸出信息如下:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM