一、spark的命令行模式
1.第一種進入方式:執行 pyspark進入,執行exit()退出
注意報錯信息:java.lang.IllegalArgumentException: Required executor memory (1024+384 MB) is above the (最大閾值)max threshold (1024 MB) of this cluster!
表示 執行器的內存(1024+384 MB) 大於最大閾值(1024 MB)
Please check the values of 'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'
2.初始化RDD的方法
本地內存中已經有一份序列數據(比如python的list),可以通過sc.parallelize去初始化一個RDD。
當執行這個操作以后,list中的元素將被自動分塊(partitioned),並且把每一塊送到集群上的不同機器上。
import pyspark
from pyspark import SparkContext as sc
from pyspark import SparkConf
conf=SparkConf().setAppName("miniProject").setMaster("local[*]")
#任何Spark程序都是SparkContext開始的,SparkContext的初始化需要一個SparkConf對象,SparkConf包含了Spark集群配置的各種參數(比如主節點的URL)。
#初始化后,就可以使用SparkContext對象所包含的各種方法來創建和操作RDD和共享變量。
#Spark shell會自動初始化一個SparkContext(在Scala和Python下可以,但不支持Java)。
#getOrCreate表明可以視情況新建session或利用已有的session
sc=SparkContext.getOrCreate(conf)
# 利用list創建一個RDD;使用sc.parallelize可以把Python list,NumPy array或者Pandas Series,Pandas DataFrame轉成Spark RDD。
rdd = sc.parallelize([1,2,3,4,5])
rdd 打印 ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
# getNumPartitions() 方法查看list被分成了幾部分
rdd.getNumPartitions() 打印結果:2
# glom().collect()查看分區狀況
rdd.glom().collect() 打印結果: [[1, 2], [3, 4, 5]]
二、可直接執行 spark-shell,也可以執行 spark-shell --master local[2]
多線程方式:運行 spark-shell --master local[N] 讀取 linux本地文件數據 。通過本地 N 個線程跑任務,只運行一個 SparkSubmit 進程,利用 spark-shell --master local[N] 讀取本地數據文件實現單詞計數master local[N]:采用本地單機版的來進行任務的計算,N是一個正整數,它表示本地采用N個線程來進行任務的計算,會生成一個SparkSubmit進程