數據本地性
數據計算盡可能在數據所在的節點上運行,這樣可以減少數據在網絡上的傳輸,畢竟移動計算比移動數據代價小很多。進一步看,數據如果在運行節點的內存中,就能夠進一步減少磁盤的I/O的傳輸。在spark中,數據本地性優先級從高到低為PROCESS_LOCAL>NODE_LOCAL>NO_PREF>RACK_LOACL>ANY即最好是運行在節點內存中的數據,次要是同一個NODE,再次是同機架,最后是任意位置。
PROCESS_LOCAL 進程本地化:task要計算的數據在同一個Executor中
NODE_LOCAL 節點本地化:速度比 PROCESS_LOCAL 稍慢,因為數據需要在不同進程之間傳遞或從文件中讀取
NODE_PREF 沒有最佳位置這一說,數據從哪里訪問都一樣快,不需要位置優先。比如說SparkSQL讀取MySql中的數據
RACK_LOCAL 機架本地化,數據在同一機架的不同節點上。需要通過網絡傳輸數據及文件 IO,比 NODE_LOCAL 慢
ANY 跨機架,數據在非同一機架的網絡上,速度最慢
延遲執行
在任務分配到運行節點時,先判斷任務最佳運行節點是否空閑,如果該節點沒有足夠的資源運行該任務,在這種情況下任務會等待一定的時間;如果在等待的時間內該節點釋放出足夠的資源,則任務在該節點運行,如果還是不足會找出次佳節點進行運行。通過這樣的方式進行能讓任務運行在更高級別的數據本地性節點,從而減少磁盤I/O和網絡傳輸。一般情況下只對PROCESS_LOCAL和NODE_LOCAL兩個數據本地性優先級進行等待,系統默認延遲時間為3S;
spark任務分配的原則是讓任務運行在數據本地性優先級別更高的節點上,甚至可以為此等待一定的時間。該任務分派過程是由TaskSchedulerImpI.resourceOffers方法實現,該方法先對應用程序獲取的資源進行混洗,以使任務能夠更加均衡的分散在集群中運行,然后對任務集對應的TaskSetManager根據設置的調度算法進行排序,最后對TaskSetManager中的任務按照數據本地性分配任務運行節點,分配時先根據任務集的本地性從優先級高到低分配任務,在分配過程中動態判斷集群中節點的運行情況,通過延遲執行等待數據本地性更高的節點運行。
High Available(HA)
Master異常
Master作為spark獨立運行模式的核心,如果Master出現異常,則整個集群的運行狀況和資源都無法進行管理,整個集群就處於群龍無首的狀況,spark在設計的時候就考慮到了這個情況,在集群運行的時候,Master將啟動一個或者多個Standy Master,當Master出現異常的時候,Standy Master將根據一定的規則確定其中一個接管Master。在獨立運行模式中,spark支持如下幾種策略,可以在配置文件spark-env.sh配置項spark.deploy.recoveryMode進行設置,默認為None.
①ZOOKEEPER:集群的元數據持久化到ZooKeeper中,當Master出現異常時,Zookeeper會通過選舉機制選舉出新的Master,新的Master接管時需要從Zookeeper獲取持久化信息並根據這些信息回復集群狀態
②FILESYSTMEM:集群的元數據持久化到本地文件系統中,當Master出現異常時,只要在該機器上重新啟動Master,啟動后新的Master獲取持久化信息並根據這些信息恢復集群狀態。
③CUSTOM:自定義恢復方式,對StandaloneRecoveryModeFactory抽象類進行實現並把該類配置到系統中,當Master出現異常時,會根據用戶自定義方式恢復集群狀態
④NONE:不持久化集群的元數據,當master出現異常時,啟動新的Master不進行恢復集群狀態,而是直接接管集群
如何配置spark master的HA
1.配置zookeeper,下載SPARK並解壓
2.配置spark-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_101
export HADOOP_HOME=/root/hadoop/hadoop-2.7.4
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SCALA_HOME=/root/scala/scala-2.11.8
export HIVE_HOME=/root/hive/apache-hive-2.1.1
export LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib:$LIB_NATIVE_DIR"
export SPARK_CLASSPATH=$SPARK_HOME/mysql-connector-java-5.1.39.jar
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=spark1:2181,spark2:2181,spark3:2181,spark4:2181 -Dspark.deploy.zookeeper.dir=/spark"
說明: -Dspark.deploy.recoveryMode=ZOOKEEPER #說明整個集群狀態是通過zookeeper來維護的,整個集群狀態的恢復也是通過zookeeper來維護的。就是說用zookeeper做了spark的HA配置,Master(Active)掛掉的話,Master(standby)要想變成Master(Active)的話,Master(Standby)就要像zookeeper讀取整個集群狀態信息,然后進行恢復所有Worker和Driver的狀態信息,和所有的Application狀態信息; -Dspark.deploy.zookeeper.url=spark1:2181,spark2:2181,spark3:2181,spark4:2181#將所有配置了zookeeper,並且在這台機器上有可能做master(Active)的機器都配置進來;
-Dspark.deploy.zookeeper.dir=/spark -Dspark.deploy.zookeeper.dir是保存spark的元數據,保存了spark的作業運行狀態; zookeeper會保存spark集群的所有的狀態信息,包括所有的Workers信息,所有的Applactions信息,所有的Driver信息,如果集群
a.在Master切換的過程中,因為程序在運行之前,已經申請過資源了,driver和Executors通訊,不需要和master進行通訊的,所有的已經在運行的程序皆正常運行!因為Spark Application在運行前就已經通過Cluster Manager獲得了計算資源,所以在運行時Job本身的調度和處理和Master是沒有任何關系的!
b. 在Master的切換過程中唯一的影響是不能提交新的Job:一方面不能夠提交新的應用程序給集群,因為只有Active Master才能接受新的程序的提交請求;另外一方面,已經運行的程序中也不能夠因為Action操作觸發新的Job的提交請求;
3.復制slaves.template成slaves,配置如下:
spark1
spark2
spark3
spark4
4.將配置好安裝包分發給其他節點
5.各個節點配置環境變量,並使之生效
6.啟動zookeeper
所有節點均要執行zkServer.sh start
7.啟動hdfs集群
任意一個節點執行即可
8.啟動spark集群
在一個節點啟動start-all.sh,其他節點啟動start-master.sh
driver的功能
1.一個Spark作業運行時包括一個Driver進程,也是作業的主進程,具有main函數,並且有SparkContext的實例,是程序的人口點;
2.功能:負責向集群申請資源,向master注冊信息,負責了作業的調度,負責作業的解析、生成Stage並調度Task到Executor上。包括DAGScheduler,TaskScheduler。
spark的部署模式
本地模式 運行在一個機器上,可以多線程執行,默認啟動和CPU核數一樣的線程,此模式主要用於本地調試測試
偽分布模式 一台機器上模擬集群運行,master,worker,sparkcontext這些進程都在一台機器上
獨立運行模式 spark自身實現的資源調度框架,由客戶端,master節點,worker節點組成,sparkcontext可以運行在本地客戶端,也可以運行在master節點上,spark-shell的spark-shell在master節點上運行,使用spark-submit提交的或者IDEA等平台開發的,sparkcontext運行在本機客戶端。資源管理和任務監控是Spark自己監控,這個模式也是其他模式的基礎
Spark on yarn模式 分布式部署集群,資源和任務監控交給yarn管理,但是目前僅支持粗粒度資源分配方式,包含cluster和client運行模式,cluster適合生產,driver運行在集群子節點,具有容錯功能,client適合調試,dirver運行在客戶端
Spark On Mesos模式。官方推薦這種模式(當然,原因之一是血緣關系)。正是由於Spark開發之初就考慮到支持Mesos,因此,目前而言,Spark運行在Mesos上會比運行在YARN上更加靈活,更加自然。用戶可選擇兩種調度模式之一運行自己的應用程序:
1) 粗粒度模式(Coarse-grained Mode):每個應用程序的運行環境由一個Dirver和若干個Executor組成,其中,每個Executor占用若干資源,內部可運行多個Task(對應多少個“slot”)。應用程序的各個任務正式運行之前,需要將運行環境中的資源全部申請好,且運行過程中要一直占用這些資源,即使不用,最后程序運行結束后,回收這些資源。
2) 細粒度模式(Fine-grained Mode):鑒於粗粒度模式會造成大量資源浪費,Spark On Mesos還提供了另外一種調度模式:細粒度模式,這種模式類似於現在的雲計算,思想是按需分配。
spark的組件
Spark core:核心組件,是個分布式大數據處理架構,提供了多種資源調度管理,通過內存計算,DAG等機制保證分布式計算的快速,並引入了RDD的抽象保證數據的高容錯性。
SparkStreaming是一個對實時數據流進行高吞吐、高容錯的流式處理系統,可以對多種數據源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)進行類似Map、Reduce和Join等復雜操作,並將結果保存到外部系統.數據庫或者應用到儀表盤。
spark sql:能夠統一處理關系數據表和RDD,是開發人員輕松使用SQL命令進行玩不查詢,同時進行復雜的數據分析
MLlib:機器學習
BlinkDB :是一個用於在海量數據上運行交互式 SQL 查詢的大規模並行查詢引擎,它允許用戶通過權衡數據精度來提升查詢響應時間,其數據的精度被控制在允許的誤差范圍內。
GraphX圖計算
worker的主要工作
主要功能:管理當前節點內存,CPU的使用狀況,接收master分配過來的資源指令,通過ExecutorRunner啟動程序分配任務,管理分配新進程,做計算的服務。需要注意的是:1)worker會不會匯報當前信息給master,worker心跳給master主要只有workid,它不會發送資源信息以心跳的方式給mater,master分配的時候就知道worker,只有出現故障的時候才會發送資源。2)worker不會運行代碼,具體運行的是Executor是可以運行具體appliaction寫的業務邏輯代碼,操作代碼的節點,它不會運行程序的代碼的。
Spark為什么比mapreduce快?
1)基於內存計算,減少低效的磁盤交互;2)高效的調度算法,基於DAG;3)容錯機制Linage(血統),精華部分就是DAG和Lingae
Spark的並行計算
spark用戶提交的任務成為application,一個application對應一個sparkcontext,app中存在多個job,每觸發一次action操作就會產生一個job。這些job可以並行或串行執行,每個job中有多個stage,stage是shuffle過程中DAGSchaduler通過RDD之間的依賴關系划分job而來的,每個stage里面有多個task,組成taskset有TaskSchaduler分發到各個executor中執行,executor的生命周期是和app一樣的,即使沒有job運行也是存在的,所以task可以快速啟動讀取內存進行計算。
RDD
RDD(Resilient Distributed Dataset)叫做彈性分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、里面的元素可並行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在內存中,后續的查詢能夠重用工作集,這極大地提升了查詢速度。
RDD的屬性
A list of partitions 分區,並行計算
A function for computing each split 一個函數應用於各個分區(並行計算)
A list of dependencies on other RDDs 依賴其他RDD 傳遞依賴 RDD1=>RDD2=>RDD3
Optionally(可選), a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
cache后面能接其他算子的結果
如下例:
scala> val bb=mapRDD.cache.count
bb: Long = 10
cache后面接其他算子沒有問題,但是本來打算cache的是一個RDD,由於后面的count重新觸發了cache,結果cache了8,沒達到緩存RDD的目的
數據本地性的確定環節
task運行在那他機器上,是在DAGscheduler進行stage的划分的時候,確定的
RDD的彈性表現在哪幾點?
1、自動的進行內存和磁盤數據存儲的切換;
2、基於Lineage的高效容錯(第n個節點出錯,會從第n-1個節點恢復,血統容錯);
3、Task如果失敗會自動進行特定次數的重試(默認4次);
4、Stage如果失敗會自動進行特定次數的重試(可以只運行計算失敗的階段);只計算失敗的數據分片;
5、checkpoint和persist
6、數據調度彈性:DAG TASK 和資源 管理無關
7、數據分片的高度彈性(人工自由設置分片函數),repartition
常規的容錯方式
1).數據檢查點,會發生拷貝,浪費資源
2).記錄數據的更新,每次更新都會記錄下來,比較復雜且比較消耗性能
Spark的持久化
把RDD持久化到內存極大的提高了迭代以及各計算模型之間的數據共享,一般情況下執行點60%內存用於緩存數據,剩下40%用於運行任務,如果某個計算步驟比較多,計算復雜,網絡傳輸shuffle,最后只有一個結果,如果在計算中途做保存一些臨時數據,失敗的的風險很高,重算成本更高。可以在shuffle之后做一個persist.
序列化
序列化的好處就是可以減少空間,高效存儲,傳輸更快,但使用數據的時候要反序列化,耗時耗CPU
HashPartitioner與RangePartitioner
HashPartitioner根據給定的KEY隨機生成分區ID,弊端是數據不均勻,容易導致數據傾斜
RangePartitioner分區則盡量保證每個分區中數據量的均勻,而且分區與分區之間是有序的,也就是說一個分區中的元素肯定都是比另一個分區內的元素小或者大;但是分區內的元素是不能保證順序的
如下例所示
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
scala> import org.apache.spark.RangePartitioner
import org.apache.spark.RangePartitioner
scala> mapRDD.collect
res23: Array[(Int, Int)] = Array((0,1), (1,2), (1,3), (2,4), (2,5), (3,6), (4,7), (4,8), (5,9), (5,10))
scala> mapRDD.partitionBy(new RangePartitioner(5,mapRDD)).map(x=>(TaskContext.getPartitionId,x)).collect
res24: Array[(Int, (Int, Int))] = Array((0,(0,1)), (0,(1,2)), (0,(1,3)), (1,(2,4)), (1,(2,5)), (2,(3,6)), (3,(4,7)), (3,(4,8)), (4,(5,9)), (4,(5,10)))
scala> mapRDD.partitionBy(new HashPartitioner(5)).map(x=>(TaskContext.getPartitionId,x)).collect
res25: Array[(Int, (Int, Int))] = Array((0,(0,1)), (0,(5,9)), (0,(5,10)), (1,(1,2)), (1,(1,3)), (2,(2,4)), (2,(2,5)), (3,(3,6)), (4,(4,7)), (4,(4,8)))
Spark自定義partitioner分區器
derby數據庫是單實例,不能支持多個用戶同時操作
窄依賴父RDD的partition和子RDD的parition,不一定都是一對一的關系,比如join操作的每個partiion僅僅和已知的partition進行join,就是窄依賴,不是一對一的關系
shuffle
shuffle在中文的意思是洗牌,混洗的意思,shuffle階段會涉及到磁盤的讀寫以及網絡的傳輸,因此shuffle的性能高低直接影響整個程序的性能和吞吐量。就是將各個節點上的同一類數據匯集到某一個節點進行計算,把這些分布在不同節點的數據按照一定的規則聚集到一起的過程稱為shuffle。
spark的處理方式是一個迭代的過程。shuffle的寫操作分為基於哈希的shuffle寫操作和基於排序的shuffle的寫操作兩種類型,基於hash寫操作在map和reduce數量較大的情況下會導致寫數據文件數量大和緩存開銷過大,產生很多小文件,甚至導致數據傾斜,基於排序的shuffle寫操作,每個shuffle map task不會為后續的每個任務創建單獨的文件,而是會將所有結果寫到一個文件中,同時生成一個索引文件index進行索引,通過這種機制避免了大量文件的產生,減輕了文件管理系統額壓力,節省了內存避免了GC的風險和頻率。
HashBaseShuffle 缺點:小文件過多,數量為task*reduce的數量
優化:使用spark.shuffle.consolidateFiles機制,修改值為true,默認為false,沒有啟用。文件數量為:reduce*core,在一個core里面並行運行的task其中生成的文件數為reduce的個數。一個core里面並行運行的task,將數據都追加到一起。
spark支持兩種方式的Shuffle,即Hash Based Shuffle和Sort Based Shuffle。
Spark 1.2的默認Shuffle機制從Hash變成了Sort。從而使得spark在集群上處理更大規模的數據
如果需要Hash Based Shuffle,在spark-defaults.conf中,將spark.shuffle.manager設置成“hash”即可。
parition和block
hdfs中的block是分布式存儲的最小單元,等分,可設置冗余,這樣設計有一部分磁盤空間的浪費,但是整齊的block大小,便於快速找到、讀取對應的內容;
Spark中的partion是彈性分布式數據集RDD的最小單元,RDD是由分布在各個節點上的partion組成的。partion是指的spark在計算過程中,生成的數據在計算空間內最小單元,同一份數據(RDD)的partion大小不一,數量不定,是根據application里的算子和最初讀入的數據分塊數量決定;
block位於存儲空間、partion位於計算空間,block的大小是固定的、partion大小是不固定的
-----------------
二次排序就是首先按照第一字段排序,然后再對第一字段相同的行按照第二字段排序,注意不能破壞第一次排序的結果。
scala> val aa=sc.makeRDD(1 to 21,5)
aa: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at makeRDD at <console>:25
scala> val mapRDD=aa.map(x=>(TaskContext.getPartitionId,x))
mapRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[12] at map at <console>:27
scala> mapRDD.groupByKey().collect
res3: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(1, 2, 3, 4)), (1,CompactBuffer(5, 6, 7, 8)), (2,CompactBuffer(9, 10, 11, 12)), (3,CompactBuffer(13, 14, 15, 16)), (4,CompactBuffer(17, 18, 19, 20, 21)))
scala> mapRDD.groupByKey().map(x=>(x._1,x._2.toList.sorted.reverse)).sortByKey(false).collect //首次排序sorted.reverse//二次排序sortByKey(false).
res16: Array[(Int, List[Int])] = Array((4,List(21, 20, 19, 18, 17)), (3,List(16, 15, 14, 13)), (2,List(12, 11, 10, 9)), (1,List(8, 7, 6, 5)), (0,List(4, 3, 2, 1)))
----------------
hash shuffle與sorted shuffle
處理少量數據的時候,hash shuffle會快於sorted shuffle,但數據量大的時候,sorted shuffle回比hash shuffle快許多,因為hash shuffle 會產生很多小的文件,分布不均勻,導致數據傾斜,耗內存
Sort-basesd shuffle產生的臨時文件數量 為:每個Mapper任務產生2個文件,一個data,一個index索引文件。(M代表Mapper中並行partition的總數量,其實就是Mapper端SHuffleMapTask的總數量)
Sort-basesd shuffle的缺陷:
如果Mapper 中Task的數量過大,依舊會產生很多小文件。此時Shuffle在傳遞數據到Reducer端的過程中,傳數據的過程有序列化和反序列化,要內存消耗,對GC的壓力會比較大,造成系統緩慢甚至奔潰!塢絲計划就是很好的解決方案。
如果需要在分片內也進行排序的話,此時需要進行Mapper端和Reducer端的兩次排序!!!這對性能也是巨大的消耗。可以改造Mapper的實現來解決這個問題。
序列化
在分布式計算中,序列化和壓縮是提升性能的兩個重要手段,spark通過序列化將鏈式分布的數據轉化為連續分布分數據,這樣就能夠進行分布式的進程間數據通信或者在內存進行數據的壓縮等操作,通過壓縮能夠減少內存查勇以及IO和網絡數據傳輸開銷,提升spark整體引用性能。
spark shell啟動會啟動spark sql,spark sql默認使用derby保存元數據,但是盡量不要用derby,它是單實例,不利於開發。會在本地生成一個文件metastore_db,如果啟動報錯,就把那個文件給刪了 ,derby數據庫是單實例,不能支持多個用戶同時操作,盡量避免使用.
conf/spark-default.conf 中的相關配置
-
parallelism:'pærəlɛl'ɪzəm/ 並行度,並行
spark.default.parallelism:用於設置每個stage的默認task數量,合理設置可以提高執行效率,In general, we recommend 2-3 tasks per CPU core in your cluster.
//導入隱式轉換,如果不導入則無法將RDD轉換成為DataFrame import sqlContext.implicits._
二分法
scala> def sconderyfind(arr:Array[(String,String,String)],ip:Int):Long={var low=0;var high=arr.length-1;while(low<=high){val middle=(low+high)/2;if(ip>=arr(middle)._1.toInt&&ip<=arr(middle)._2.toInt) return middle;if(ip<arr(middle)._1.toInt) high=middle-1 else low=middle+1}; -1}
sconderyfind: (arr: Array[(String, String, String)], ip: Int)Long
scala> def sconderyfind(arr:Array[(String,String,String)],ip:Int):Long=
{
var low=0;
var high=arr.length-1;
while(low<=high){
val middle=(low+high)/2;
if(ip>=arr(middle)._1.toInt&&ip<=arr(middle)._2.toInt)
return middle;
if(ip<arr(middle)._1.toInt)
high=middle-1
else low=middle+1
};
-1
}
sconderyfind: (arr: Array[(String, String, String)], ip: Int)Long
scala> val arr=Array(("100","200","aaa"),("300","500","bb"),("800","900","bb"),("1000","1500","bb"))
arr: Array[(String, String, String)] = Array((100,200,aaa), (300,500,bb), (800,900,bb), (1000,1500,bb))
scala> sconderyfind(arr,850)
res2: Long = 2
RDD寫入mysql
查看MySQL中的數據
mysql> select * from t_name;
Empty set (0.00 sec)
scala> import java.sql.{Connection,DriverManager, PreparedStatement,Date}
import java.sql.{Connection, DriverManager, PreparedStatement, Date}
scala> def rddtodb(iter:Iterator[(Int,String)]){var con:Connection=null;var ps:PreparedStatement=null;val sql="insert into t_name(id,name,createtime)values(?,?,?)" ;try{con=DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root");iter.foreach(line=>{ps=con.prepareStatement(sql);ps.setInt(1,line._1.toInt);ps.setString(2,line._2.toString);ps.setDate(3,new Date(System.currentTimeMillis));ps.executeUpdate()}) }catch{case e:Exception=>println("mysql exception")} finally{if(con!=null)con.close;if(ps!=null)ps.close}}
rddtodb: (iter: Iterator[(Int, String)])Unit
scala> def rddtodb(iter:Iterator[(Int,String)]){
var con:Connection=null;
var ps:PreparedStatement=null;
val sql="insert into t_name(id,name,createtime)values(?,?,?)" ;
try{
con=DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root");
iter.foreach(line=>{
ps=con.prepareStatement(sql);
ps.setInt(1,line._1.toInt);
ps.setString(2,line._2.toString);
ps.setDate(3,new Date(System.currentTimeMillis));
ps.executeUpdate()})
}catch
{case e:Exception=>println("mysql exception")}
finally{if(con!=null)con.close;if(ps!=null)ps.close}}
rddtodb: (iter: Iterator[(Int, String)])Unit
scala> val nameRDD=sc.makeRDD(Array((1,"tian"),(2,"wang"),(3,"liu"),(4,"ma"),(5,"lilei"),(6,"hanmeimei")),3)
nameRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[2] at makeRDD at <console>:25
scala> nameRDD.foreachPartition(rddtodb)
查看mysql中的數據
mysql> select * from t_name;
+------+-----------+---------------------+
| id | name | createtime |
+------+-----------+---------------------+
| 1 | tian | 2018-08-21 00:00:00 |
| 2 | wang | 2018-08-21 00:00:00 |
| 3 | liu | 2018-08-21 00:00:00 |
| 4 | ma | 2018-08-21 00:00:00 |
| 5 | lilei | 2018-08-21 00:00:00 |
| 6 | hanmeimei | 2018-08-21 00:00:00 |
+------+-----------+---------------------+
6 rows in set (0.00 sec)
-------------------------
開發過程中,盡可能避免使用reduceByKey、join、distinct、repartition等會進行shuffle的算子,盡量使用map類的非shuffle算子,沒有shuffle操作或者僅有較少shuffle操作的Spark作業,可以大大減少性能開銷。
---------------------------
Spark Shuffle后續優化方向
壓縮:對數據進行壓縮,減少寫讀數據量;
內存化:Spark歷史版本中是有這樣設計的:Map寫數據先把數據全部寫到內存中,寫完之后再把數據刷到磁盤上;考慮內存是緊缺資源,后來修改成把數據直接寫到磁盤了;對於具有較大內存的集群來講,還是盡量地往內存上寫吧,內存放不下了再放磁盤。
Spark中的數據傾斜問題
定位數據傾斜,是OOM(out off memory)了,還是任務執行緩慢,看日志,看WebUI
避免不必要的shuffle
改變並行度,並行度太小導致數據分布不均為
分區方式不合理,使用自定義分區函數進行分區
創建RDD的方式
基於程序中的集合,基於外部系統的文件(如HDFS),轉換獲得,數據庫中也可以獲得,基於數據流等
Spark並行度的設置
spark並行度,每個core承載2~4個partition,32個core,那么64~128之間的並行度,也就是設置64~128個partion,並行度和數據規模大下無關,只和內存使用量和cpu使用
時間有關
spark數據存儲位置由BlockManager 管理
BlockManager 是 spark 中至關重要的一個組件, 在 spark的的運行過程中到處都有 BlockManager 的身影, 只有搞清楚 BlockManager 的原理和機制,你才能更加深入的理解 spark。BlockManager 是一個嵌入在 spark 中的 key-value型分布式存儲系統,是為 spark 量身打造的, BlockManager 在一個 spark 應用中作為一個本地緩存運行在所有的節點上, 包括所有 driver 和 executor上。 BlockManager 對本地和遠程提供一致的 get 和set 數據塊接口, BlockManager 本身使用不同的存儲方式來存儲這些數據, 包括 memory, disk, off-heap。
spark存儲過程分為寫數據和讀數據兩個過程,度數據分為本地讀取和遠程節點讀取兩種方式。
spark將不能序列化的對象封裝成object
driver通過collect把集群中各個節點的內容收集過來匯總成結果,collect返回結果是Array類型的,collect把各個節點上的數據抓過來,抓過來數據是Array型,collect對Array抓過來的結果進行合並,合並后Array中只有一個元素,是tuple類型(KV類型的)的。
scala> val str="ni hao who are you what is your name"
str: String = ni hao who are you what is your name
scala> str.split(" ").takeWhile(_.length==2)
res21: Array[String] = Array(ni)
scala> str.split(" ").take(5)
res22: Array[String] = Array(ni, hao, who, are, you)
scala> str.split(" ").takeRight(5)
res23: Array[String] = Array(you, what, is, your, name)
spark 修改默認task執行個數
spark中有partition的概念,每個partition都會對應一個task,task越多,在處理大規模數據的時候,就會越有效率。不過task並不是越多越好,如果平時測試,或者數據量沒有那么大,則沒有必要task數量太多。
參數可以通過spark_home/conf/spark-default.conf配置文件設置:
spark.sql.shuffle.partitions 20 //針對spark sql的task數量
spark.default.parallelism 30//非spark sql程序設置生效
在Spark中實現map-side join和reduce-side join
Map-side Join:如果要join的表中一個是大表,一個是小表(小到可以加載到內存中),就可以采用該算法。該算法可以將join算子執行在Map端,無需經歷shuffle和reduce等階段,因此效率非常高。
scala> val smallrdd=sc.makeRDD(Array((1,"tian"),(2,"wang")))
smallrdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[27] at makeRDD at <console>:25
scala> val bigrdd=sc.makeRDD(Array((1,"tianyongtao"),(2,"wangbajiu"),(1,"hhee"),(2,"hhee")))
bigrdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[28] at makeRDD at <console>:25
scala> var bc=sc.broadcast(smallrdd.collectAsMap)
bc: org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,String]] = Broadcast(19)
scala> bigrdd.mapPartitions(iter=>{val m=bc.value;for((k,v)<-iter if m.contains(k))yield (k,(v,m.get(k).getOrElse("")))}).collect
res52: Array[(Int, (String, String))] = Array((1,(tianyongtao,tian)), (2,(wangbajiu,wang)), (1,(hhee,tian)), (2,(hhee,wang)))
Reduce-side Join:
當兩個文件非常大,難以將其中之一放到內存時,就可以采用Reduce-side Join。該算法需要通過Map和Reduce兩個階段完成,在Map階段,將key相同的記錄划分給同一個Reduce Task(需標記每條記錄的來源,便於在Reduce階段合並),在Reduce階段,對key相同的進行合並。
Spark提供了Join算子,可以直接通過該算子實現reduce-side join,但要求RDD中的記錄必須是key-value pairs
scala> val bigrdd1=sc.makeRDD(Array((1,"tianyongtao"),(2,"wangbajiu"),(1,"hhee"),(3,"hhee")))
bigrdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[33] at makeRDD at <console>:25
scala> val bigrdd2=sc.makeRDD(Array((1,"tianyongtao"),(2,"wangbajiu"),(1,"hhee"),(3,"hhee")))
bigrdd2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[34] at makeRDD at <console>:25
scala> bigrdd1.join(bigrdd2)
res54: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[37] at join at <console>:30
scala> bigrdd1.join(bigrdd2).collect
res55: Array[(Int, (String, String))] = Array((1,(hhee,tianyongtao)), (1,(hhee,hhee)), (1,(tianyongtao,tianyongtao)), (1,(tianyongtao,hhee)), (2,(wangbajiu,wangbajiu)), (3,(hhee,hhee)))
檢查文件的情況,區塊,位置
hdfs fsck /tmp/person.txt -files -blocks -locations
Connecting to namenode via http://localhost:50070/fsck?ugi=root&files=1&blocks=1&locations=1&path=%2Ftmp%2Fperson.txt
FSCK started by root (auth:SIMPLE) from /127.0.0.1 for path /tmp/person.txt at Tue Aug 21 17:27:37 CST 2018
/tmp/person.txt 98 bytes, 1 block(s): OK
0. BP-1744121868-127.0.0.1-1504694797902:blk_1073751380_10581 len=98 repl=1 [DatanodeInfoWithStorage[127.0.0.1:50010,DS-344f1c41-a710-4e59-8fd0-c4eb3a00a998,DISK]]
Status: HEALTHY
Total size: 98 B
Total dirs: 0
Total files: 1
Total symlinks: 0
Total blocks (validated): 1 (avg. block size 98 B)
Minimally replicated blocks: 1 (100.0 %)
Over-replicated blocks: 0 (0.0 %)
Under-replicated blocks: 0 (0.0 %)
Mis-replicated blocks: 0 (0.0 %)
Default replication factor: 1
Average block replication: 1.0
Corrupt blocks: 0
Missing replicas: 0 (0.0 %)
Number of data-nodes: 1
Number of racks: 1
FSCK ended at Tue Aug 21 17:27:38 CST 2018 in 1549 milliseconds
The filesystem under path '/tmp/person.txt' is HEALTHY