搗鼓了一下,先來個手動擋吧。自動擋要設置ssh無密碼登陸啥的,后面開搞。
一、手動多台機鏈接master
手動鏈接master其實上篇已經用過。
這里有兩台機器:
10.60.215.41 啟動master、worker1、application(spark shell)
10.0.2.15 啟動worker2
具體步驟如下:
1.在10.60.215.41 上
$SPARK_HOME $ ./sbin/start-master.sh
$SPARK_HOME $./bin/spark-class org.apache.spark.deploy.worker.Worker spark://qpzhangdeMac-mini.local:7077
2.在10.0.2.15上
$SPARK_HOME $./bin/spark-class org.apache.spark.deploy.worker.Worker spark://qpzhangdeMac-mini.local:7077
這里需要注意的是,貌似spark用了akka的庫,spark的master URL里面必須要用hostname(嘗試從配置文件里面改成IP,也沒生效),否則會報錯:
15/03/20 17:14:05 ERROR EndpointWriter: dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://sparkMaster@10.60.215.41:7077/]] arriving at [akka.tcp://sparkMaster@10.60.215.41:7077] inbound addresses are [akka.tcp://sparkMaster@qpzhangdeMac-mini.local:7077]
要在10.0.2.15機器的hosts里面,設置qpzhangdeMac-mini.local對應的IP為master 10.60.215.41,否則無法轉換成IP進行鏈接。
開始以為把master kill之后,master會自動轉為worker1 或者 work2中的一個,但是並沒有。worker只是不斷嘗試重連。
15/03/20 17:41:05 INFO Worker: Retrying connection to master (attempt # 2) 15/03/20 17:41:05 WARN EndpointWriter: AssociationError [akka.tcp://sparkWorker@10.60.215.41:53899] -> [akka.tcp://sparkMaster@qpzhangdeMac-mini.local:7077]: Error [Invalid address: akka.tcp://sparkMaster@qpzhangdeMac-mini.local:7077] [ akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@qpzhangdeMac-mini.local:7077 Caused by: akka.remote.transport.Transport$InvalidAssociationException: Connection refused: qpzhangdeMac-mini.local/10.60.215.41:7077
重新啟動master之后, 重連成功。
15/03/20 18:27:41 INFO Worker: Retrying connection to master (attempt # 10) 15/03/20 18:27:41 INFO Worker: Successfully registered with master spark://qpzhangdeMac-mini.local:7077
這里暫且留下幾個疑問:
1)原來salve只是workers 么?worker是不會升級為master的,這里沒有選舉之說。
2)master掛了之后,重啟,任務會丟失么?
3)單個worker是否可以注冊到多個master上?
3.在10.60.215.41 上
啟動spark shell,下達任務。
scala> val textFile = sc.textFile("/var/spark/README.md") 15/03/20 17:55:41 INFO MemoryStore: ensureFreeSpace(73391) called with curMem=186365, maxMem=555755765 15/03/20 17:55:41 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 71.7 KB, free 529.8 MB) 15/03/20 17:55:41 INFO MemoryStore: ensureFreeSpace(31262) called with curMem=259756, maxMem=555755765 15/03/20 17:55:41 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 30.5 KB, free 529.7 MB) 15/03/20 17:55:41 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.60.215.41:53983 (size: 30.5 KB, free: 530.0 MB) 15/03/20 17:55:41 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 15/03/20 17:55:41 INFO SparkContext: Created broadcast 2 from textFile at <console>:21 textFile: org.apache.spark.rdd.RDD[String] = /var/spark/README.md MapPartitionsRDD[3] at textFile at <console>:21 scala> textFile.count() 15/03/20 17:55:45 INFO FileInputFormat: Total input paths to process : 1 15/03/20 17:55:45 INFO SparkContext: Starting job: count at <console>:24 15/03/20 17:55:45 INFO DAGScheduler: Got job 1 (count at <console>:24) with 2 output partitions (allowLocal=false) 15/03/20 17:55:45 INFO DAGScheduler: Final stage: Stage 1(count at <console>:24) 15/03/20 17:55:45 INFO DAGScheduler: Parents of final stage: List() 15/03/20 17:55:45 INFO DAGScheduler: Missing parents: List() 15/03/20 17:55:45 INFO DAGScheduler: Submitting Stage 1 (/var/spark/README.md MapPartitionsRDD[3] at textFile at <console>:21), which has no missing parents 15/03/20 17:55:45 INFO MemoryStore: ensureFreeSpace(2640) called with curMem=291018, maxMem=555755765 15/03/20 17:55:45 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 2.6 KB, free 529.7 MB) 15/03/20 17:55:45 INFO MemoryStore: ensureFreeSpace(1931) called with curMem=293658, maxMem=555755765 15/03/20 17:55:45 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1931.0 B, free 529.7 MB) 15/03/20 17:55:45 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.60.215.41:53983 (size: 1931.0 B, free: 530.0 MB) 15/03/20 17:55:45 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0 15/03/20 17:55:45 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:839 15/03/20 17:55:45 INFO DAGScheduler: Submitting 2 missing tasks from Stage 1 (/var/spark/README.md MapPartitionsRDD[3] at textFile at <console>:21) 15/03/20 17:55:45 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks 15/03/20 17:55:45 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, 10.60.215.41, PROCESS_LOCAL, 1289 bytes) 15/03/20 17:55:45 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 4, 10.0.2.15, PROCESS_LOCAL, 1289 bytes) 15/03/20 17:55:45 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.60.215.41:53990 (size: 1931.0 B, free: 265.1 MB) 15/03/20 17:55:45 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.60.215.41:53990 (size: 30.5 KB, free: 265.1 MB) 15/03/20 17:55:45 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 10.0.2.15:53284 (size: 1931.0 B, free: 267.2 MB) 15/03/20 17:55:45 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.0.2.15:53284 (size: 30.5 KB, free: 267.2 MB) 15/03/20 17:55:45 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 127 ms on 10.60.215.41 (1/2) 15/03/20 17:55:46 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 4) in 470 ms on 10.0.2.15 (2/2) 15/03/20 17:55:46 INFO DAGScheduler: Stage 1 (count at <console>:24) finished in 0.471 s 15/03/20 17:55:46 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 15/03/20 17:55:46 INFO DAGScheduler: Job 1 finished: count at <console>:24, took 0.487544 s res2: Long = 98 scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:23 scala> linesWithSpark.count() 15/03/20 17:56:53 INFO SparkContext: Starting job: count at <console>:26 15/03/20 17:56:53 INFO DAGScheduler: Got job 2 (count at <console>:26) with 2 output partitions (allowLocal=false) 15/03/20 17:56:53 INFO DAGScheduler: Final stage: Stage 2(count at <console>:26) 15/03/20 17:56:53 INFO DAGScheduler: Parents of final stage: List() 15/03/20 17:56:53 INFO DAGScheduler: Missing parents: List() 15/03/20 17:56:53 INFO DAGScheduler: Submitting Stage 2 (MapPartitionsRDD[4] at filter at <console>:23), which has no missing parents 15/03/20 17:56:53 INFO MemoryStore: ensureFreeSpace(2848) called with curMem=295589, maxMem=555755765 15/03/20 17:56:53 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 2.8 KB, free 529.7 MB) 15/03/20 17:56:53 INFO MemoryStore: ensureFreeSpace(2034) called with curMem=298437, maxMem=555755765 15/03/20 17:56:53 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 2034.0 B, free 529.7 MB) 15/03/20 17:56:53 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 10.60.215.41:53983 (size: 2034.0 B, free: 530.0 MB) 15/03/20 17:56:53 INFO BlockManagerMaster: Updated info of block broadcast_4_piece0 15/03/20 17:56:53 INFO SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:839 15/03/20 17:56:53 INFO DAGScheduler: Submitting 2 missing tasks from Stage 2 (MapPartitionsRDD[4] at filter at <console>:23) 15/03/20 17:56:53 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks 15/03/20 17:56:53 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 5, 10.0.2.15, PROCESS_LOCAL, 1289 bytes) 15/03/20 17:56:53 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 6, 10.60.215.41, PROCESS_LOCAL, 1289 bytes) 15/03/20 17:56:53 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 10.60.215.41:53990 (size: 2034.0 B, free: 265.1 MB) 15/03/20 17:56:53 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 10.0.2.15:53284 (size: 2034.0 B, free: 267.2 MB) 15/03/20 17:56:53 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 6) in 113 ms on 10.60.215.41 (1/2) 15/03/20 17:56:53 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 5) in 122 ms on 10.0.2.15 (2/2) 15/03/20 17:56:53 INFO DAGScheduler: Stage 2 (count at <console>:26) finished in 0.122 s 15/03/20 17:56:53 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/03/20 17:56:53 INFO DAGScheduler: Job 2 finished: count at <console>:26, took 0.137589 s res3: Long = 19
從日志里面看到,任務都是分解成2個,分別發送到2個worker上面執行。
這里不免想到以下問題:
1)master的任務是怎么分配的?local file 是傳遞path到不同的worker上去,還是把內容讀取了傳遞過去?
2)如果僅僅是傳遞path過去,那么每個work都要讀一遍文件?全部讀取,還是移位讀取的呢?
多執行幾次,然后看worker的日志,發現是傳path,加上文件分片的;不同的分片應該是隨機分到對應的worker的,因為幾次命令,每個worker收到的分片地址不一樣。
這里還有一個問題,如果是從HDFS上面讀取文件,一個地址是可以被不同機器的worker讀取到的。如果是讀本地local path的話,那么就呵呵了,你要自己把文件內容分派到不同的worker機器上去了。
可在 http://10.60.215.41:4040/executors/ 上面可以看到當前執行task的 workers list,以及task被執行的狀態。
二,自動擋部署
==========
其實原理也很簡單,就是shell腳本,根據配置的slavers機器,通過ssh登錄到slaver機器上面,切換到對應的目錄,啟動slave。
相比手動啟動slaver,這個一鍵啟動只需要在一台master機器上完成。
前提是,你必須配置好ssh的無密碼登錄,你可以參考這里。
配置好后,修改conf目錄下的slavers列表:
root@qp-zhang:/var/spark# cat conf/slaves
# A Spark Worker will be started on each of the machines listed below.
localhost
root@qpzhangdeMac-mini.local
采用對應的slavers腳本啟動即可:
root@qp-zhang:/var/spark# ./sbin/start-slaves.sh root@qpzhangdeMac-mini.local: starting org.apache.spark.deploy.worker.Worker, logging to /private/var/spark/sbin/../logs/spark-root-org.apache.spark.deploy.worker.Worker-1-qpzhangdeMac-mini.local.out localhost: starting org.apache.spark.deploy.worker.Worker, logging to /var/spark/sbin/../logs/spark-root-org.apache.spark.deploy.worker.Worker-1-qp-zhang.out
這時,可以通過
http://localhost:8080/ 查看當前master的slavers(也可以說是workers)。
===================================