SparkContext 和 SparkConf
任何
Spark程序都是SparkContext開始的,SparkContext的初始化需要一個SparkConf對象,SparkConf包含了Spark集群配置的各種參數。
初始化后,就可以使用SparkContext對象所包含的各種方法來創建和操作RDD和共享變量。
val conf = new SparkConf().setMaster("master").setAppName("appName") val sc = new SparkContext(conf) 或者 val sc = new SparkContext("master","appName")
Note:
Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified by the user.
也就是說一旦設置完成SparkConf,就不可被使用者修改
對於單元測試,您也可以調用SparkConf(false)來跳過加載外部設置,並獲得相同的配置,無論系統屬性如何。
咱們再看看setMaster()和setAppName()源碼:
根據上面的解釋,setMaster主要是連接主節點,如果參數是"local",則在本地用單線程運行spark,如果是 local[4],則在本地用4核運行,如果設置為spark://master:7077,就是作為單節點運行,而setAppName就是在web端顯示應用名而已,它們說到底都調用了set()函數,讓我們看看set()是何方神聖
logDeprecation(key)是日志輸出函數,防止輸入參數名無效, 看看settings,是個HashMap結構,追溯一下:
果然,是個ConcurrentHashMap對象,ConcurrentHashMap主要作用是解決多線程並發下數據段訪問效率,該類相對於hashMap而言具有同步map中的數據,對於hashTable而言,該同步數據對於並發程序提高了極高的效率,所以在使用緩存機制的時候如果對map中的值具有高並發的情況的話,那么我們就需要使用ConcurrentHashMap,ConcurrentHashMap中主要實體類就是三個:ConcurrentHashMap(整個Hash表),Segment(桶),HashEntry(節點)
,CurrentHashMap的初始化一共有三個參數,一個initialCapacity,表示初始的容量,一個loadFactor,表示負載參數,最后一個是concurrentLevel,代表ConcurrentHashMap內部的Segment的數量,ConcurrentLevel一經指定,不可改變,這也是為什么SparkConf配置好了就無法更改的原因。
ConcurrentHashMap應用了鎖分段技術,
HashTable容器在競爭激烈的並發環境下表現出效率低下的原因,是因為所有訪問HashTable的線程都必須競爭同一把鎖,那假如容器里有多把鎖,每一把鎖用於鎖容器其中一部分數據,那么當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭,從而可以有效的提高並發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術,首先將數據分成一段一段的存儲,然后給每一段數據配一把鎖,當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問。
另外,如果ConcurrentHashMap的元素數量增加導致ConrruentHashMap需要擴容,ConcurrentHashMap是不會增加Segment的數量的,而只會增加Segment中鏈表數組的容量大小,這樣的好處是擴容過程不需要對整個ConcurrentHashMap做rehash,而只需要對Segment里面的元素做一次rehash就可以了。
SparkSession: SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的。
val sparkSession = SparkSession.builder .master("master") .appName("appName") .getOrCreate() 或者 SparkSession.builder.config(conf=SparkConf())
上面代碼類似於創建一個SparkContext,master設置為"xiaojukeji",然后創建了一個SQLContext封裝它。如果你想創建hiveContext,可以使用下面的方法來創建SparkSession,以使得它支持Hive(HiveContext):
val sparkSession = SparkSession.builder .master("master") .appName("appName") .enableHiveSupport() .getOrCreate() //sparkSession 從csv讀取數據: val dq = sparkSession.read.option("header", "true").csv("src/main/resources/scala.csv")
getOrCreate():有就拿過來,沒有就創建,類似於單例模式:
s1 = SparkSession().builder.config("k1", "v1").getORCreat()
s2 = SparkSession().builder.config("k2", "v2").getORCreat()
return s1.conf.get("k1") == s2.conf.get("k2")
True
