目錄
· 概況
· 手工搭建集群
· 引言
· 安裝Scala
· 配置文件
· 啟動與測試
· 應用部署
· 部署架構
· 應用程序部署
· 核心原理
· RDD概念
· RDD核心組成
· RDD依賴關系
· DAG圖
· 應用程序資源構建
· API
· RDD構建
· RDD分區數
· 共享變量
· DoubleRDDFunctions常用Operation
· PairRDDFunctions間操作Operation
· OrderedRDDFunctions常用Operation
· 數據准備
· 加載&預處理
· 統計DAU
· 統計MAU
概況
1. Spark相對MapReduce的優勢:
a) 支持迭代計算;
b) 中間結果存儲在內存而不是硬盤,降低延遲。
2. Spark已成為輕量級大數據快速處理統一平台,“One stack to rule them all”,一個平台完成:即席查詢(ad-hoc queries)、批處理(batch processing)、流式處理(stream processing)。
3. Spark集群搭建方式:
a) 集成部署工具,如Cloudera Manager;
b) 手工搭建。
4. Spark源碼編譯方式:
a) SBT編譯;
b) Maven編譯。
手工搭建集群
引言
1. 環境:
Role |
Host name |
Master |
centos1 |
Slave |
centos2 |
centos3 |
2. Standalone模式需在Master和Slave節點部署,YARN模式僅需在命令提交機器部署。
3. 假設已成功安裝JDK、Hadoop集群。
安裝Scala
1. [Master(Standalone模式)或命令提交機器(YARN模式)]安裝Scala到/opt/app目錄下。
tar zxvf scala-2.10.6.tgz -C /opt/app
2. [Master(Standalone模式)或命令提交機器(YARN模式)]配置環境變量。
vi /etc/profile
export SCALA_HOME=/opt/app/scala-2.10.6
export PATH=$SCALA_HOME/bin:$PATH
source /etc/profile # 生效 env | grep SCALA_HOME # 驗證
配置文件
3. [Master(Standalone模式)或命令提交機器(YARN模式)]
tar zxvf spark-1.6.3-bin-hadoop2.6.tgz -C /opt/app cd /opt/app/spark-1.6.3-bin-hadoop2.6/conf cp spark-env.sh.template spark-env.sh vi spark-env.sh
export JAVA_HOME=/opt/app/jdk1.8.0_121 export SCALA_HOME=/opt/app/scala-2.10.6 export HADOOP_HOME=/opt/app/hadoop-2.6.5 export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop export YARN_CONF_DIR=${HADOOP_HOME}/etc/hadoop # For standalone mode export SPARK_WORKER_CORES=1 export SPARK_DAEMON_MEMORY=512m
cp spark-defaults.conf.template spark-defaults.conf hadoop fs -mkdir /spark.eventLog.dir vi spark-defaults.conf
spark.driver.extraClassPath /opt/app/apache-hive-1.2.2-bin/lib/mysql-connector-java-5.1.22-bin.jar spark.eventLog.enabled true spark.eventLog.dir hdfs://centos1:9000/spark.eventLog.dir
cp slaves.template slaves vi slaves
centos2
centos3
ln -s /opt/app/apache-hive-1.2.2-bin/conf/hive-site.xml .
4. [Master(Standalone模式)]從Master復制Spark目錄到各Slave。注意:僅Standalone集群需要執行本步驟。
scp -r /opt/app/spark-1.6.3-bin-hadoop2.6 hadoop@centos2:/opt/app scp -r /opt/app/spark-1.6.3-bin-hadoop2.6 hadoop@centos3:/opt/app
啟動與測試
5. [Master(Standalone模式)或命令提交機器(YARN模式)]配置Spark環境變量。
export SPARK_HOME=/opt/app/spark-1.6.3-bin-hadoop2.6 export PATH=$PATH:$SPARK_HOME/bin
export PYSPARK_PYTHON=/usr/bin/python3 # 可選
export PYSPARK_DRIVER_PYTHON=/usr/bin/python3 # 可選
6. [Master(Standalone模式)]啟動Spark,測試。
sbin/start-all.sh jps
Master # Master機器進程
Worker # Slave機器進程
7. [Master(Standalone模式)或命令提交機器(YARN模式)]測試。
bin/spark-submit --master spark://centos1:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Standalone Client模式運行 bin/spark-submit --master spark://centos1:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Standalone Cluster模式運行 bin/spark-submit --master yarn-client --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Yarn Client模式運行 bin/spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 lib/spark-examples-1.6.3-hadoop2.6.0.jar # Yarn Custer模式運行
bin/spark-submit --master yarn-client --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 examples/src/main/python/pi.py # Yarn Client模式運行(Python)
bin/yarn application -list # 查看YARN運行的應用 bin/yarn application -kill ApplicationID # 殺死YARN運行的應用
bin/spark-shell --master spark://centos1:7077 --deploy-mode client --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 # Standalone Client模式運行 bin/spark-shell --master yarn --deploy-mode client --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 # Yarn Client模式運行
8. 監控頁面。
http://centos1:8080 |
Spark監控 |
http://centos1:8088 |
YARN監控 |
應用部署
部署架構
1. Application:Spark應用程序,包括一個Driver Program和集群中多個WorkNode中的Executor,其中每個WorkNode為每個Application僅提供一個Executor。
2. Driver Program:運行Application的main函數。通常也用SparkContext表示。負責DAG構建、Stage划分、Task管理及調度、生成SchedulerBackend用於Akka通信,主要組件有DAGScheduler、TaskScheduler、SchedulerBackend。
3. Cluster Manager:集群管理器,可封裝如Spark Standalone、YARN等不同集群管理器。Driver Program通過Cluster Manager分配資源,並將任務發送到多個Work Node執行。
4. WorkNode:集群節點。應用程序在運行時的Task在WorkNode的Executor中執行。
5. Executor:WorkNode為Application啟動的一個進程,負責執行Task。
6. Stage:一個Applicatoin一般包含一到多個Stage。
7. Task:被Driver Program發送到Executor的計算單元,通常一個Task處理一個split(即一個分區),每個split一般是一個Block大小。一個Stage包含一到多個Task,通過多個Task實現並行計算。
8. DAGScheduler:將Application分解成一到多個Stage,每個Stage根據RDD分區數決定Task個數,然后生成相應TaskSet放到TaskScheduler中。
9. DeployMode:Driver進程部署模式,有cluster和client兩種。
10. 注意:
a) Driver Program必須與Spark集群處於同一網絡環境。因為SparkContext要發送任務給不同WorkNode的Executor並接受Executor的執行結果。
b) 生產環境中,Driver Program所在機器性能配置,尤其CPU較好。
應用程序部署
1. 分類:
a) spark-shell:交互式,用於開發調試。已創建好“val sc: SparkContext”和“val sqlContext: SQLContext”實例。
b) spark-submit:應用提交式,用於生產部署。
2. spark-shell參數:
bin/spark-shell --help
Usage: ./bin/spark-shell [options] Options: --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). --class CLASS_NAME Your application's main class (for Java / Scala apps). --name NAME A name of your application. --jars JARS Comma-separated list of local jars to include on the driver and executor classpaths. --packages Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. Will search the local maven repo, then maven central and any additional remote repositories given by --repositories. The format for the coordinates should be groupId:artifactId:version. --exclude-packages Comma-separated list of groupId:artifactId, to exclude while resolving the dependencies provided in --packages to avoid dependency conflicts. --repositories Comma-separated list of additional remote repositories to search for the maven coordinates given with --packages. --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. --files FILES Comma-separated list of files to be placed in the working directory of each executor. --conf PROP=VALUE Arbitrary Spark configuration property. --properties-file FILE Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf. --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M). --driver-java-options Extra Java options to pass to the driver. --driver-library-path Extra library path entries to pass to the driver. --driver-class-path Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath. --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). --proxy-user NAME User to impersonate when submitting the application. --help, -h Show this help message and exit --verbose, -v Print additional debug output --version, Print the version of current Spark Spark standalone with cluster deploy mode only: --driver-cores NUM Cores for driver (Default: 1). Spark standalone or Mesos with cluster deploy mode only: --supervise If given, restarts the driver on failure. --kill SUBMISSION_ID If given, kills the driver specified. --status SUBMISSION_ID If given, requests the status of the driver specified. Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. Spark standalone and YARN only: --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode) YARN-only: --driver-cores NUM Number of cores used by the driver, only in cluster mode (Default: 1). --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). --num-executors NUM Number of executors to launch (Default: 2). --archives ARCHIVES Comma separated list of archives to be extracted into the working directory of each executor. --principal PRINCIPAL Principal to be used to login to KDC, while running on secure HDFS. --keytab KEYTAB The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically.
3. spark-submit參數(除Usage外,其他參數與spark-shell一樣):
bin/spark-submit --help
Usage: spark-submit [options] <app jar | python file> [app arguments] Usage: spark-submit --kill [submission ID] --master [spark://...] Usage: spark-submit --status [submission ID] --master [spark://...] Options: --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or on one of the worker machines inside the cluster ("cluster") (Default: client). --class CLASS_NAME Your application's main class (for Java / Scala apps). --name NAME A name of your application. --jars JARS Comma-separated list of local jars to include on the driver and executor classpaths. --packages Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. Will search the local maven repo, then maven central and any additional remote repositories given by --repositories. The format for the coordinates should be groupId:artifactId:version. --exclude-packages Comma-separated list of groupId:artifactId, to exclude while resolving the dependencies provided in --packages to avoid dependency conflicts. --repositories Comma-separated list of additional remote repositories to search for the maven coordinates given with --packages. --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. --files FILES Comma-separated list of files to be placed in the working directory of each executor. --conf PROP=VALUE Arbitrary Spark configuration property. --properties-file FILE Path to a file from which to load extra properties. If not specified, this will look for conf/spark-defaults.conf. --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M). --driver-java-options Extra Java options to pass to the driver. --driver-library-path Extra library path entries to pass to the driver. --driver-class-path Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath. --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G). --proxy-user NAME User to impersonate when submitting the application. --help, -h Show this help message and exit --verbose, -v Print additional debug output --version, Print the version of current Spark Spark standalone with cluster deploy mode only: --driver-cores NUM Cores for driver (Default: 1). Spark standalone or Mesos with cluster deploy mode only: --supervise If given, restarts the driver on failure. --kill SUBMISSION_ID If given, kills the driver specified. --status SUBMISSION_ID If given, requests the status of the driver specified. Spark standalone and Mesos only: --total-executor-cores NUM Total cores for all executors. Spark standalone and YARN only: --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode) YARN-only: --driver-cores NUM Number of cores used by the driver, only in cluster mode (Default: 1). --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). --num-executors NUM Number of executors to launch (Default: 2). --archives ARCHIVES Comma separated list of archives to be extracted into the working directory of each executor. --principal PRINCIPAL Principal to be used to login to KDC, while running on secure HDFS. --keytab KEYTAB The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically.
4. 默認參數:
a) 默認應用程序參數配置文件:conf/spark-defaults.conf
b) 默認JVM參數配置文件:conf/spark-env.sh
c) 常用的jar文件可通過“--jar”參數配置。
5. 參數優先級(由高到低):
a) SparkConf顯示配置參數;
b) spark-submit指定參數;
c) conf/spark-defaults.conf配置文件參數。
6. MASTER_URL格式
MASTER_URL |
說明 |
local |
以單線程在本地運行(完全無並行) |
local[K] |
在本地以K個Worker線程運行,K設置為CPU核數較理想 |
local[*] |
K=CPU核數 |
spark://HOST:PORT |
連接Standalone集群的Master,即Spark監控頁面的URL,端口默認為7077(不支持省略) |
yarn-client |
以client模式連接到YARN集群,通過HADOOP_CONF_DIR環境變量查找集群 |
yarn-cluster |
以cluster模式連接到YARN集群,通過HADOOP_CONF_DIR環境變量查找集群 |
7. 注意:
a) spark-shell默認使用4040端口,當4040端口被占用時,程序打印日志警告WARN並嘗試遞增端口(4041、4042……)直到找到可用端口為止。
b) Executor節點上每個Driver Program的jar包和文件會被復制到工作目錄下,可能占用大量空間。YARN集群會自動清除,Standalone集群需配置“spark.worker.cleanup.appDataTtl”開啟自動清除。
8. 應用程序模板
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContext object Test { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Test") val sc = new SparkContext(conf) // ... } }
9. 提交示例:
bin/spark-submit --master spark://ubuntu1:7077 --class org.apache.spark.examples.SparkPi lib/spark-examples-1.6.3-hadoop2.6.0.jar
核心原理
RDD概念
1. RDD:Resilient Distributed Dataset,彈性分布式數據集。
2. 意義:Spark最核心的抽象概念;具有容錯性基於內存的集群計算方法。
RDD核心組成
1. 5個核心方法。
a) getPartitions:分區列表(數據塊列表)
b) compute:計算各分區數據的函數。
c) getDependencies:對父RDD的依賴列表。
d) partitioner:key-value RDD的分區器。
e) getPreferredLocations:每個分區的預定義地址列表(如HDFS上的數據塊地址)。
2. 按用途分類以上5個方法:
a) 前3個:描述RDD間的血統關系(Lineage),必須有的方法;
b) 后2個:用於優化執行。
3. RDD的實例:RDD[T],T為泛型,即實例。
4. 分區:
a) 分區概念:將大數據量T實例集合split成多個小數據量的T實例子集合。
b) 分區源碼:實際上是Iterator[T]。
c) 分區存儲:例如以Block方式存在HDFS。
5. 依賴:
a) 依賴列表:一個RDD可有多個父依賴,所以是父RDD依賴列表。
b) 與分區關系:依賴是通過RDD分區間的依賴體現的,通過依賴列表和getPartitions方法可知RDD各分區是如何依賴一組父RDD分區的。
6. compute方法:
a) 延時(lazy)特性,當觸發Action時才真正執行compute方法;
b) 計算粒度是分區,而不是T元素。
7. partitioner方法:T實例為key-value對類型的RDD。
8. RDD抽象類源碼(節選自v1.6.3):
1 package org.apache.spark.rdd 2 3 // … 4 5 /** 6 * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, 7 * partitioned collection of elements that can be operated on in parallel. This class contains the 8 * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition, 9 * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value 10 * pairs, such as `groupByKey` and `join`; 11 * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of 12 * Doubles; and 13 * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that 14 * can be saved as SequenceFiles. 15 * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] 16 * through implicit. 17 * 18 * Internally, each RDD is characterized by five main properties: 19 * 20 * - A list of partitions 21 * - A function for computing each split 22 * - A list of dependencies on other RDDs 23 * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 24 * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for 25 * an HDFS file) 26 * 27 * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD 28 * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for 29 * reading data from a new storage system) by overriding these functions. Please refer to the 30 * [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details 31 * on RDD internals. 32 */ 33 abstract class RDD[T: ClassTag]( 34 @transient private var _sc: SparkContext, 35 @transient private var deps: Seq[Dependency[_]] 36 ) extends Serializable with Logging { 37 // ... 38 39 // ======================================================================= 40 // Methods that should be implemented by subclasses of RDD 41 // ======================================================================= 42 43 /** 44 * :: DeveloperApi :: 45 * Implemented by subclasses to compute a given partition. 46 */ 47 @DeveloperApi 48 def compute(split: Partition, context: TaskContext): Iterator[T] 49 50 /** 51 * Implemented by subclasses to return the set of partitions in this RDD. This method will only 52 * be called once, so it is safe to implement a time-consuming computation in it. 53 */ 54 protected def getPartitions: Array[Partition] 55 56 /** 57 * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only 58 * be called once, so it is safe to implement a time-consuming computation in it. 59 */ 60 protected def getDependencies: Seq[Dependency[_]] = deps 61 62 /** 63 * Optionally overridden by subclasses to specify placement preferences. 64 */ 65 protected def getPreferredLocations(split: Partition): Seq[String] = Nil 66 67 /** Optionally overridden by subclasses to specify how they are partitioned. */ 68 @transient val partitioner: Option[Partitioner] = None 69 70 // ... 71 }
RDD依賴關系
1. 窄依賴與寬依賴(外框表示RDD,內框表示分區):
2. 窄依賴:父RDD每個分區最多被一個子RDD分區所用。
3. 寬依賴:子RDD每個分區都依賴所有分區或多個分區。
4. 特性:
a) pipeline操作:窄依賴可pipeline操作,即允許在單個集群節點上流水線式執行,該節點可計算所有父分區。
b) RDD故障恢復:窄依賴只需在故障集群節點上重新計算丟失的父分區,並且可在不同節點上並行重新計算;對於寬依賴,失敗節點可能導致一個RDD所有父RDD分區丟失,都需重新計算。
5. WordCount依賴圖:
a) ShuffledRDD為寬依賴,將DAG划分成兩個Stage:第1個Stage從HadoopRDD到MapPartitionsRDD,生成ShuffleMapTask;第2個Stage從ShuffledRDD到MapPartitionsRDD,生成ResultTask。
b) 第一個Stage由3個ShuffleMapTask通過pipeline方式並行執行,直至3個Task均執行結束至MapPartitionsRDD處。
DAG圖
1. DAG:在圖論中,如果一個有向圖無法從任一頂點出發經過若干條邊回到該點,則這個圖是一個有向無環圖(Directed Acyclic Graph)。
2. Spark DAG:Spark將數據在分布式環境下分區,再將作業(Job)轉化為DAG,並分階段進行DAG調度和任務分布式並行處理。
3. Stage:DAG調度時,會根據Shuffle將Job划分Stage。如圖,RDD A到RDD B間、RDD F到RDD G間都需要Shuffle,所以有3個Stage:RDD A、RDD C到RDD F、RDD B和RDD F到RDD G。
4. 流水線(pipeline):
a) Spark采用貪心算法划分Stage,即如果RDD的分區到父RDD分區是窄依賴,則實施經典的Fusion(融合)優化,把對應的Operation划分到一個Stage。
b) 如果連續RDD序列都是窄依賴,則把多個Operation並到一個Stage,直到遇到寬依賴。
c) pipeline好處:減少大量的全局屏障(barrier),並無須物化很多中間結果RDD,極大地提升性能。
RDD故障恢復機制
1. 假設一個RDD故障,根據依賴關系和分區,僅需要再執行一遍父RDD的相應分區。
2. 跨寬依賴的再執行涉及多個父RDD,為避免故障RDD的大量父RDD再執行,Spark保持Map階段中間數據輸出的持久,再執行可獲取相應分區的中間數據。
3. Spark提供數據checkpoint和記錄日志持久化中間RDD。checkpoint直接將RDD持久化到磁盤或HDFS等存儲,與cache/persist方法不同,checkpoint的RDD不會因作業結束而被消除,一直存在並被后續作業直接讀取加載。
Standalone模式的Spark架構
1. Standalone模式兩種運行方式(--deploy-mode參數控制)
a) cluster方式:Driver運行在Worker節點。
b) client方式:Driver運行在客戶端。
2. 作業執行流程(cluster方式):
a) 客戶端提交Application給Master,Master讓一個Worker啟動Driver,即SchedulerBackend(Worker創建一個DriverRunner線程,DriverRunner啟動SchedulerBackend進程)。
b) Master會讓其余Worker啟動Executor,即ExecutorBackend(Worker創建一個ExecutorRunner線程,ExecutorRunner會啟動ExecutorBackend進程)。
c) ExecutorBackend啟動后向Driver的SchedulerBackend注冊。
d) SchedulerBackend進程中包含DAGScheduler,它根據用戶程序生成執行計划,並調度執行。對於每個Stage的Task,都被存放到TaskScheduler中,ExecutorBackend向SchedulerBackend匯報時把TaskScheduler中的Task調度到ExecutorBackend執行。
e) 所有Stage都完成后Application結束。
3. 故障恢復
a) 如果Worker發生故障:Worker退出前,將該Worker上的Executor殺掉;Worker通過定時心跳讓Master感知Worker故障,而后匯報給Driver,並將該Worker移除;Driver可知該Worker上的Executor已被殺死。
b) 如果Executor發生故障:ExecutorRunner匯報給Master,由於Executor所在Worker正常,Master則發送LaunchExecutor指令給該Worker,讓其再次啟動一個Executor。
c) 如果Master發生故障:通過ZooKeeper搭建的Master HA(一個Active,其他Standby)切換Master。
YARN模式的Spark架構
1. YARN模式兩種運行方式(--deploy-mode參數控制)
a) cluster方式:Driver運行在NodeManager節點。
b) client方式:Driver運行在客戶端。
i. SparkAppMaster:相當於Standalone模式的SchedulerBackend。SparkAppMaster包括DAGScheduler和YARNClusterScheduler。
ii. Executor:相當於Standalone模式的ExecutorBackend。
2. 作業執行流程(cluster方式):
a) 客戶端提交Application給ResourceManager,ResourceManager在某一NodeManager匯報時把SparkAppMaster分配給NodeManager,NodeManager啟動SparkAppMaster。
b) SparkAppMaster啟動后初始化Application,然后向ResourceManager申請資源,申請后通過RPC讓相應SparkAppMaster:相當於Standalone模式的SchedulerBackend。SparkAppMaster包括DAGScheduler和YARNClusterScheduler。
c) Executor:相當於Standalone模式的ExecutorBackend。的NodeManager啟動SparkExecutor。
d) SparkExecutor向SparkAppMaster匯報並完成Task。
e) 此外,SparkClient通過SparkAppMaster獲取Application運行狀態。
應用程序資源構建
1. 兩種資源構建方式
a) 粗粒度:應用程序提交后,運行前,根據應用程序資源需求一次性湊齊資源,整個運行時不再申請資源。
b) 細粒度:應用程序提交后,動態向Cluster Manager申請資源,只要等到資源滿足一個Task的運行,便開始運行該Task,而不必等到所有資源全部到位。
2. Spark on YARN僅支持粗粒度構建方式。
API
WordCount示例
1. 准備數據。
hadoop fs -mkdir -p /test/wordcount hadoop fs -put README.md /test/wordcount
2. 執行程序。
spark-shell --master spark://centos1:7077
1 import org.apache.log4j.{Logger,Level} 2 Logger.getLogger("org.apache.spark").setLevel(Level.WARN) 3 Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN) 4 val textFile = sc.textFile("/test/wordcount/README.md") 5 val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((count1, count2) => count1 + count2) 6 wordCounts.saveAsTextFile("/test/wordcount/result") 7 wordCounts.collect
a) “flatMap(line => line.split(" "))”:將文本每一行按空格拆分成單詞RDD。
b) “map(word => (word, 1))”:將每個單詞轉換為單詞+單詞數的二元組RDD。
c) “reduceByKey((count1, count2) => count1 + count2)”:按key分組(即按單詞分組)后,每組內單詞數求和。
d) “collect”:Action操作,將RDD全部元素轉換為Scala Array返回給Driver Program。如果數據量過大,會導致Driver Program內存不足。
3. 查看結果。
hadoop fs -cat /test/wordcount/WordCounts
RDD構建
1. 加載外部存儲系統的文件構建RDD
a) 方法定義
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
b) “sc.textFile("/test/directory")”:加載指定目錄下所有文件。
c) “sc.textFile("/test/directory/*.txt")”:加載指定目錄下所有txt格式的文件。
d) “sc.textFile("/test/directory/*.gz")”:加載指定目錄下所有gz格式的文件,Hadoop內置支持.gz格式,但不支持split。其他壓縮格式參考文檔。
e) “sc.textFile("/test/directory/**/*")”:加載指定目錄下所有文件(包含子目錄)。
f) “sc.sequenceFile("/test/directory")”:以序列文件方式加載指定目錄下所有文件。
g) textFile方法和sequenceFile方法:底層均調用hadoopFile方法,只是參數不同;均使用HadoopInputFormat的子類,TextInputFormat和SequenceFileInputFormat。
2. 從Scala數據集構建RDD
a) 方法定義
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
b) 示例
1 val testRdd = sc.parallelize(List("A", "B", "C", "D"), 2) 2 testRdd.partitions.size // 分區數2 3 testRdd.toDebugString // 查看Lineage
RDD緩存與持久化
1. 主動持久化的目的:RDD重用,將計算結果RDD存儲以供后續Operation使用。
2. persist方法:將任意RDD緩存到內存、磁盤和Tachyon文件系統。
def persist(newLevel: StorageLevel): this.type
3. cache方法:persist方法使用MEMORY_ONLY存儲級別的快捷方式。
def cache()
4. 存儲級別。
存儲級別(Storage Level) |
含義 |
MEMORY_ONLY |
將RDD以反序列化(deserialized)Java對象存儲到JVM。如果RDD不能被內存裝下,一些分區將不會被緩存,並且在需要的時候被重新計算。這是默認級別。 |
MEMORY_AND_DISK |
將RDD以反序列化Java對象存儲到JVM。如果RDD不能被內存裝下,超出的分區被保存到硬盤上,並且在需要是被讀取。 |
MEMORY_ONLY_SER |
將RDD以序列化(serialized)Java對象存儲(每一分區占用一個字節數組)。通常比對象反序列化的空間利用率更高,尤其當使用快速序列化器(fast serializer),但在讀取時比較消耗CPU。 |
MEMORY_AND_DISK_SER |
類似於MEMORY_ONLY_SER,但把超出內存的分區存儲在硬盤上而不是在每次需要時重新計算。 |
DISK_ONLY |
只將RDD分區存儲在硬盤上。 |
MEMORY_ONLY_2 MEMORY_AND_DISK_2 |
與上述存儲級別一樣,但將每個分區都復制到兩個集群節點上。 |
OFF_HEAP (experimental) |
以序列化的格式將RDD存儲到Tachyon…… |
5. 數據移除
a) 自動:集群內存不足時,Spark根據LRU(Least Recently Uesed,最近最少使用算法)刪除數據分區。
b) 手動:unpersit方法,立即生效。
6. 演示效果。
a) 可在Spark監控頁面Storage查看緩存生效情況;
b) 內存不足時,打印警告日志“Not enough space to cache rdd_ ... in memory ...”。
1 val file = sc.textFile("/test/wordcount/README.md") // 可分別嘗試小文件和超大文件(視內存) 2 file.cache // 緩存到內存,lazy操作 3 file.count // Action操作觸發lazy 4 file.unpersit // 釋放緩存,eager操作
RDD分區數
1. 加載文件創建RDD的分區數
a) “sc.defaultParallelism”默認並行數,是加載文件創建RDD的分區數最小值參考,實際的分區數由加載文件時的split數決定,即文件的HDFS block數,也可以由加載時的API參數制定分區數。
b) “sc.defaultParallelism”取配置項“spark.default.parallelism”的值,集群模式缺省為8、本地模式缺省為總內核數。
c) 示例。
1 val textFile = sc.textFile("/test/README.md") 2 textFile.toDebugString // 查看Lineage 3 textFile.partitions.size // 分區數4 4 sc.defaultParallelism // 默認並行數4
2. key-value RDD的分區數
a) partitioner方法是針對key-value RDD的分區器,默認使用HashPartitioner。
b) 通過源碼可知,沒有設置分區數時,會使用“spark.default.parallelism”配置項的值作為默認分區數。
1 // … 2 package org.apache.spark 3 // … 4 object Partitioner { 5 /** 6 * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. 7 * 8 * If any of the RDDs already has a partitioner, choose that one. 9 * 10 * Otherwise, we use a default HashPartitioner. For the number of partitions, if 11 * spark.default.parallelism is set, then we'll use the value from SparkContext 12 * defaultParallelism, otherwise we'll use the max number of upstream partitions. 13 * 14 * Unless spark.default.parallelism is set, the number of partitions will be the 15 * same as the number of partitions in the largest upstream RDD, as this should 16 * be least likely to cause out-of-memory errors. 17 * 18 * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. 19 */ 20 def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { 21 val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse 22 for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) { 23 return r.partitioner.get 24 } 25 if (rdd.context.conf.contains("spark.default.parallelism")) { 26 new HashPartitioner(rdd.context.defaultParallelism) 27 } else { 28 new HashPartitioner(bySize.head.partitions.size) 29 } 30 } 31 } 32 // …
共享變量
1. 普通變量的問題:遠程機器上對變量的修改無法傳回Driver程序。當Spark以多個Task在不同Worker上並執行一個函數時,它傳遞每一個變量的副本並緩存在Worker上。
2. 分類:廣播變量、累加器。
3. 廣播變量:將只讀變量緩存在每台Worker節點的cache,而不是每個Task發送一份副本。
1 val broadcastVar = sc.broadcast(Array(1, 2, 3)) // 初始化 2 broadcastVar.value // 獲取
4. 累加器:能夠進行“加”操作的變量,Spark原生支持Int和Double類型,可自定義新類型支持。
1 val accumulator = sc.accumulator(0) // 初始化 2 accumulator += 100 // 累加 3 accumulator.value // 獲取
RDD Operation
1. RDD Operation分為Transformation和Action:
a) Transformation:從已存在的RDD上創建一個新的RDD,是RDD的邏輯操作,並無真正計算(lazy),而是形成DAG。
b) Action:提交一個與前一個Action之間的所有Transformation組成的Job進行計算。
2. 常用Operation羅列(后面會詳細展開各Operation的用法)
a) 通用RDD Transformation
名稱 |
說明 |
map(func) |
數據集中的每條元素經過func函數轉換后形成一個新的分布式數據集 |
filter(func) |
過濾作用,選取數據集中讓函數func返回值為true的元素,形成一個新的數據集 |
flatMap(func) |
類似map,但每個輸入項可以被映射到0個或更多的輸出項(所以func應該返回一個Seq,而不是一個單獨項) |
mapPartitions(func) |
類似map,但單獨運行在RDD每個分區(塊),因此運行類型為Type TRDD上時,func類型必須是Iterator<T> => Iterator<U> |
mapPartitionsWithIndex(func) |
與mapPartitions相似,但也要提供func與一個代表分區的的索引整數項,因此所運行的RDD為Type T時,func類型必須是(Int, Iterator<T>) => Iterator<U> |
sample(withReplacement, fraction) |
根據給定的隨機種子seed,隨機抽樣出數量為fraction的數據(可以選擇有無替代replacement) |
union(otherDataset) |
返回一個由原數據集和參數聯合而成的新的數據集 |
intersection(otherDataset) |
返回一個包含數據集交集元素的新的RDD和參數 |
distinct([numTasks]) |
返回一個數據集去重過后的新的數據集 |
cartesian(otherDataset) |
當在數據類型為T和U的數據集上調用時,返回由(T, U)對組成的一個數據集 |
pipe(command, [envVars]) |
通過一個shell命令,如Perl或bash腳本,流水化各個分區的RDD。RDD元素被寫入到進程的stdin,輸出到stdout的行將會以一個RDD字符串的形式返回 |
coalesce(numPartitions) |
將RDD分區的數目合並為numPartitions |
repartition(numPartitions) |
在RDD上隨機重洗數據,從而創造出更多或更少的分區以及它們之間的平衡。這個操作將重洗網絡上所有的數據 |
b) key-value RDD Transformation
名稱 |
說明 |
groupByKey([numTasks]) |
當在一個由鍵值對(K, V)組成的數據集上調用時,按照key進行分組,返回一個(K, Iterable<V>)對的數據集。 注意: 1) 如果是為了按照key聚合數據(如求和、平均值)而進行分組,使用reduceByKey或combineByKey方法性能更好 2) 默認情況下,輸出的並行程度取決於父RDD的分區數。可以通過傳遞一個可選的的numTasks參數設置不同的並行任務數 |
reduceByKey(func, [numTasks]) |
當在一個鍵值對(K, V)組成的數據集上調用時,按照key進行分組,使用給定func聚合values值,返回一個鍵值對(K, V)數據集,其中func函數的類型必須是(V, V) => V。類似於groupByKey,並行任務數可通過可選的第二個參數配置 |
sortByKey([ascending], [numTasks]) |
返回一個以key排序(升序或降序)的(K, V)鍵值對組成的數據集,其中布爾型參數ascending決定升序還是降序,而numTasks為並行任務數 |
join(otherDataset, [numTasks]) |
根據key連接兩個數據集,將類型為(K, V)和(K, W)的數據集合並成一個(K, (V, W))類型的數據集。外連接通過leftouterjoin和rightouterjoin,其中numTasks為並行任務數 |
cogroup(otherDataset, [numTasks]) |
當在兩個形如(K, V)和(K, W)的鍵值對數據集上調用時,返回一個(K, Iterable<V>, Iterable<W>)形式的數據集 |
c) 通用RDD Action
名稱 |
說明 |
reduce(func) |
通過函數func聚集數據集中的所有元素,func函數接收兩個參數,返回一個值,這個函數必須滿足交換律和結合律,以確保可以被正確地並發執行 |
collect() |
在Driver程序中,以數組形式返回數據集的所有元素到Driver程序,為防止Driver程序內存溢出,一般要控制返回的數據集大小 |
count() |
返回數據集的元素個數 |
first() |
返回數據集的第一個元素 |
take(n) |
以數組形式,返回數據集上前n個元素 |
takeSample(withReplacement, num, seed) |
返回一個數組,由數據集中隨機采樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定的隨機數生成器種子 |
takeOrdered(n, [ordering]) |
使用自然順序或自定義比較器,返回前n個RDD元素 |
foreach(func) |
在數據集的每個元素上運行func。具有副作用,如會更新累加器變量或與外部存儲系統相互作用 |
d) key-value RDD Action
名稱 |
說明 |
countByKey |
返回形如(K, int)的hashmap,對每個key的個數計數 |
RDD Operation隱式轉換
1. 隱式轉換函數為裝載不同類型的RDD提供了相應的額外方法。
2. 隱式轉換后的類包括以下幾種:
a) PairRDDFunctions:輸入的數據單元是2元元組,分別為key和value。
b) DoubleRDDFunctions:輸入的數據單元可隱式轉換為Scala的Double類型。
c) OrderedRDDFunctions:輸入的數據單元是2元元組,並且key可排序。
d) SequenceFileRDDFunctions:輸入的數據單元是2元元組。
RDD[T]分區Operation
1. coalesce
a) 定義
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]
參數:
numPartitions |
新分區數 |
shuffle |
重新分區時是否shuffle |
b) 功能
返回分區數為numPartitions的新RDD。如果增加分區數,shuffle必須為true,否則重新分區無效。
c) 示例
1 val rdd = sc.parallelize(List("A", "B", "C", "D", "E"), 2) 2 rdd.partitions.size // 分區數2 3 rdd.coalesce(1).partitions.size // 分區數2→1 4 rdd.coalesce(4).partitions.size // 分區數2→4無效,仍然是2 5 rdd.coalesce(4, true).partitions.size // 分區數2→4
d) 應用場景
i. 大數據RDD過濾后,各分區數據量非常小,可重新分區減小分區數,把小數據量分區合並成一個分區。
ii. 小數據量RDD保存到HDFS前,可重新分區減小分區數(比如1),保存成1個文件,從而減少小文件個數,也方便查看。
iii. 分區數過少,導致CPU使用率過低時,可重新分區增加分區數,從而提高CPU使用率,提升性能。
2. repartition
a) 定義
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
參數:
numPartitions |
新分區數 |
b) 功能
從源碼可看出,repartition是coalesce shuffle為true的版本,故不在贅述。
1 /** 2 * Return a new RDD that has exactly numPartitions partitions. 3 * 4 * Can increase or decrease the level of parallelism in this RDD. Internally, this uses 5 * a shuffle to redistribute data. 6 * 7 * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, 8 * which can avoid performing a shuffle. 9 */ 10 def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { 11 coalesce(numPartitions, shuffle = true) 12 }
c) 示例
1 val rdd = sc.parallelize(List("A", "B", "C", "D", "E"), 2) 2 rdd.partitions.size // 分區數2 3 rdd.repartition(1).partitions.size // 分區數2→1 4 rdd.repartition(4).partitions.size // 分區數2→4
d) 應用場景
與coalesce一致。
RDD[T]常用聚合Operation
1. aggregate
a) 定義
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
參數:
zeroValue |
執行seqOp和combOp的初始值。源碼注釋:the initial value for the accumulated result of each partition for the `seqOp` operator, and also the initial value for the combine results from different partitions for the `combOp` operator - this will typically be the neutral element (e.g. `Nil` for list concatenation or `0` for summation) |
seqOp |
聚合分區的函數。源碼注釋:an operator used to accumulate results within a partition |
combOp |
聚合seqOp結果的函數。源碼注釋:an associative operator used to combine results from different partitions |
b) 功能
先使用seqOp將RDD中每個分區中的T類型元素聚合成U類型,再使用combOp將之前每個分區聚合后的U類型聚合成U類型,注意seqOp和combOp都會使用zeroValue作為初始值,zeroValue的類型為U。
c) 示例
1 var rdd = sc.makeRDD(1 to 10, 2) 2 rdd.aggregate(2)({(x: Int, y: Int) => x + y}, {(a: Int, b: Int) => a * b}) 3 // 分區1:2 + 1 + 2 + 3 + 4 + 5 = 17, 4 // 分區2:2 + 6 + 7 + 8 + 9 + 10 = 42 5 // 最后:2 * 17 * 42 = 1428
2. reduce
a) 定義
def reduce(f: (T, T) => T): T
參數:
f |
合並函數,二變一 |
b) 功能
將RDD中元素兩兩傳遞給輸入f函數,同時產生一個新的值,新產生的值與RDD中下一個元素再被傳遞給輸入函數直到最后只有一個值為止。
c) 示例
1 var rdd = sc.makeRDD(1 to 10, 2) 2 rdd.reduce((x, y) => if (x > y) x else y) // 求最大值,10 3 import java.lang.Math 4 rdd.reduce((x, y) => Math.max(x, y)) // 求最大值,與上面邏輯一致 5 rdd.reduce((x, y) => x + y) // 求和,55
3. fold
a) 定義
def fold(zeroValue: T)(op: (T, T) => T): T
參數:
zeroValue |
op函數的初始值 |
op |
合並函數,二變一 |
b) 功能
以zeroValue為初始值,將RDD中元素兩兩傳遞給輸入f函數,同時產生一個新的值,新產生的值與RDD中下一個元素再被傳遞給輸入函數直到最后只有一個值為止。
c) 示例
1 var rdd = sc.makeRDD(1 to 10, 2) 2 rdd.fold(100)((x, y) => if (x > y) x else y) // 求最大值,100 3 import java.lang.Math 4 rdd.reduce((x, y) => Math.max(x, y)) // 求最大值,與上面邏輯一致 5 rdd.reduce((x, y) => x + y) // 求和,155
4. 三者關系
fold比reduce多一個初始值;fold是aggregate seqOp和combOp函數相同時的簡化版。
RDD間操作Operation
1. cartesian
a) 定義
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)]
參數:
other |
另一個RDD |
b) 功能
笛卡爾積。
c) 示例
1 val rdd1 = sc.parallelize(List[String]("A", "B", "C", "D", "E"), 2) 2 val rdd2 = sc.parallelize(List[Int](1, 2, 3, 4, 5, 6), 2) 3 val rdd3 = rdd1 cartesian rdd2 4 rdd3.collect 5 val rdd4 = rdd2 cartesian rdd1 6 rdd4.collect
2. unioin
a) 定義
def union(other: RDD[T]): RDD[T]
參數:
other |
另一個RDD |
b) 功能
聯合兩個RDD,注意不會去重。
union實際是將父依賴RDD所有分區合並成各自分區,最終的分區與父依賴RDD分區一一對應。
c) 示例
1 val rdd1 = sc.parallelize(List[String]("A", "B", "C"), 2) 2 val rdd2 = sc.parallelize(List[String]("D", "E"), 1) 3 rdd1.partitions.size // 分區數2 4 rdd2.partitions.size // 分區數1 5 val rdd3 = rdd1 union rdd2 6 rdd3.partitions.size // 分區數3 7 rdd3.collect // Array(A, B, C, C, D)
3. zip
a) 定義
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
參數:
other |
另一個RDD |
b) 功能
拉鏈操作,將兩個RDD中第i個元素組成一個元祖,形成key-value的二元組PairRDD。
注意:兩個RDD分區數必須一致,否則報錯“Can’t zip RDDs with unequal numbers of partitions”;兩個RDD元素個數必須一致,否則報錯“Can only zip RDDs with same number of elements”。
c) 示例
1 val rdd1 = sc.parallelize(1 to 4, 2) 2 val rdd2 = sc.parallelize("a b c d".split(" "), 2) 3 val rdd3 = rdd1 zip rdd2 4 rdd3.collect // Array((1, a), (2, b), (3, c), (4, d))
4. zipPartitions
a) 定義
def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, V: ClassTag](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V]
參數:
f |
拉鏈操作函數 |
b) 功能
N(N=2, 3, 4)個RDD的拉鏈操作,具體操作由f函數指定。
c) 示例
val rdd1 = sc.parallelize(1 to 4, 2) val rdd2 = sc.parallelize("a b c d".split(" "), 2) def zipFunc(aIter: Iterator[Int], bIter: Iterator[String]): Iterator[String] = { var list = List[String]() while (aIter.hasNext && bIter.hasNext) { val str = aIter.next + "+" + bIter.next list ::= str } list.iterator } val rdd3 = rdd1.zipPartitions(rdd1, rdd2)(zipFunc) rdd3.collect // Array(1+a, 2+b, 3+c, 4+d)
5. zipWithUniqueId
a) 定義
def zipWithUniqueId(): RDD[(T, Long)]
b) 功能
將當前RDD元素與索引i進行拉鏈操作。
c) 示例
1 val rdd = sc.parallelize("a b c d".split(" "), 2) 2 rdd.zipWithUniqueId.collect // Array((a, 0), (b, 1), (c, 2), (d, 3))
DoubleRDDFunctions常用Operation
1. histogram
a) 定義
def histogram(buckets: Array[Double], evenBuckets: Boolean = false): Array[Long] def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]]
參數:
buckets |
分桶區間,左閉右開區間“[)” |
evenBuckets |
是否采用常亮時間內快速分桶方法 |
bucketCount |
平均分桶,每桶區間 |
b) 功能
生成柱狀圖的分桶。
c) 示例
1 val rdd = sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2) 2 rdd.histogram(Array(0.0, 4.1, 9.0)) 3 rdd.histogram(Array(0.0, 4.1, 9.0), true) 4 rdd.histogram(3)
2. mean/meanApprox
a) 定義
def mean(): Double
def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
b) 功能
Mean計算平均值,meanApprox計算近似平均值。
c) 示例
1 val rdd = sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2) 2 rdd.mean
3. sampleStdev
a) 定義
def sampleStdev(): Double
b) 功能
計算樣本標准偏差(sample standard deviation)。
c) 示例
1 val rdd = sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2) 2 rdd.sampleStdev
4. sampleVariance
a) 定義
def sampleVariance(): Double
b) 功能
計算樣本偏差(sample variance)。
c) 示例
val rdd = sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2)
rdd.sampleVariance
5. stats
a) 定義
def stats(): StatCounter
b) 功能
返回org.apache.spark.util.StatCounter對象,包括平均值、標准偏差、最大值、最小值等。
c) 示例
val rdd = sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2)
rdd.stats
6. stdev
a) 定義
def stdev(): Double
b) 功能
計算標准偏差(standard deviation)。
c) 示例
1 val rdd = sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2) 2 rdd.stdev
7. sum/sumApprox
a) 定義
def sum(): Double
def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble]
b) 功能
sum計算總和,sumApprox計算近似總和。
c) 示例
1 val rdd = sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2) 2 rdd.sum 3 rdd.sumApprox
8. variance
a) 定義
def variance(): Double
b) 功能
計算方差(variance)。
c) 示例
1 val rdd = sc.parallelize(List(1.1, 1.2, 2.1, 2.2, 2, 3, 4.1, 4.3, 7.1, 8.3, 9.3), 2) 2 rdd.variance
PairRDDFunctions聚合Operation
1. aggregateByKey
a) 定義
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
參數:
zeroValue |
參考aggregate |
seqOp |
參考aggregate |
combOp |
參考aggregate |
numPartitions |
分區數,使用new HashPartitioner(numPartitions)分區器 |
partitioner |
指定自定義分區器 |
b) 功能
aggregateByKey與aggregate功能類似,區別在於前者僅對key相同的聚合。
c) 示例
1 val rdd = sc.parallelize(List((1, 3), (1, 2), (1, 4), (2, 3))) // Array((1,9), (2,3)) 2 import java.lang.Math 3 rdd.aggregateByKey(1)({(x: Int, y: Int) => Math.max(x, y)}, {(a: Int, b: Int) => a + b}).collect
2. combineByKey
a) 定義
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
參數:
createCombiner |
組合器函數,用於將V類型轉換成C類型,輸入參數為RDD[K,V]中的V,輸出為C |
mergeValue |
合並值函數,將一個C類型和一個V類型值合並成一個C類型,輸入參數為(C,V),輸出為C |
mergeCombiners |
合並組合器函數,用於將兩個C類型值合並成一個C類型,輸入參數為(C,C),輸出為C |
mapSideCombine |
是否需要在Map端進行combine操作,類似於MapReduce中的combine,默認為true |
b) 功能
將RDD[(K,V)]combine為RDD[(K,C)]。非常重要,aggregateByKey、foldByKey、reduceByKey等函數都基於它實現。
c) 示例
1 val rdd = sc.parallelize(List((1, "www"), (1, "iteblog"), (1, "com"), (2, "bbs"), (2, "iteblog"), (2, "com"), (3, "good"))) 2 rdd.combineByKey(List(_), (x: List[String], y: String) => y :: x, (x: List[String], y: List[String]) => x ::: y).collect // Array((1, List(www, iteblog, com)), (2, List(bbs, iteblog, com)), (3, List(good)))
1 val rdd = sc.parallelize(List(("iteblog", 1), ("bbs", 1), ("iteblog", 3))) 2 rdd.combineByKey(x => x, (x: Int, y: Int) => x + y, (x: Int, y: Int) => x + y).collect // Array((iteblog, 4), (bbs, 1))
d) 應用場景
combineByKey將大數據的處理轉為對小數據量的分區級別處理,然后合並各分區處理后再次進 行聚合,提升了對大數據量的處理性能。
3. foldByKey
a) 定義
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
參數:
zeroValue |
func函數的初始值 |
func |
合並函數,二變一 |
b) 功能
foldByKey與fold功能類似,區別在於前者僅對key相同的聚合。
c) 示例
1 var rdd = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) 2 rdd.foldByKey(0)(_+_).collect // Array((A,2), (B,3), (C,1)) 3 rdd.foldByKey(2)(_+_).collect // Array((A,6), (B,7), (C,3))
4. reduceByKey
a) 定義
def reduceByKey(func: (V, V) => V): RDD[(K, V)] def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
參數:
func |
合並函數,二變一 |
b) 功能
reduceByKey與reduce功能類似,區別在於前者僅對key相同的聚合。
c) 示例
1 var rdd = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) 2 rdd.reduceByKey(_+_).collect // Array((A,2), (B,3), (C,1))
PairRDDFunctions間操作Operation
1. join族
a) 定義
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))]
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], W))]
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))]
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Option[V], Option[W]))]
參數:
other |
另一個RDD |
b) 功能
兩個RDD連接操作。
c) 示例
1 val rdd1 = sc.parallelize(List(("Tom", 21), ("Jerry", 31), ("Mary", 23))) 2 val rdd2 = sc.parallelize(List(("Tom", 'm'), ("Mary", 'f'), ("Henry", 'm'))) 3 val rdd3 = rdd1 join rdd2 4 rdd3.collect // Array((Mary, (23, f)), (Tom, (21, m)))
OrderedRDDFunctions常用Operation
1. sortByKey
a) 定義
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
參數:
ascending |
是否正序 |
numPartitions |
新分區數,默認為原分區數 |
b) 功能
返回按key排序的新RDD。
c) 示例
1 val rdd1 = sc.parallelize(List((3, "a"), (7, "b"), (5, "c"), (3, "b"), (6, "c"), (9, "d")), 3) 2 val rdd2 = rdd1.sortByKey 3 rdd2.collect // Array((3, a), (3, b), (5, c), (6, c), (7, b), (9, d)) 4 rdd2.partitions.size // 分區數3 5 val rdd3 = rdd1.sortByKey(true, 2) 6 rdd3.collect 7 rdd3.partitions.size // 分區數2
2. repartitionAndSortWithinPartitions
a) 定義
def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]
參數:
partitioner |
分區器 |
b) 功能
返回使用分區器partitioner重新分區並且對各分區按key排序的新RDD。
c) 示例
1 val rdd1 = sc.parallelize(List((3, "a"), (7, "b"), (5, "c"), (3, "b"), (6, "c"), (9, "d")), 3) 2 import org.apache.spark.HashPartitioner 3 val rdd2 = rdd1.repartitionAndSortWithinPartitions(new HashPartitioner(2)) // HashPartitioner(2)以key對分區數取模分區,所以奇數、偶數分到兩個分區。 4 rdd2.collect // Array((6, c), (3, a), (3, b), (5, c), (7, b), (9, d)) 5 rdd1.partitions.size // 分區數3 6 rdd2.partitions.size // 分區數2
案例:移動終端上網數據分析
數據准備
1. 數據結構:移動終端上網訪問記錄
字段 |
說明 |
nodeid |
基站ID |
ci |
小區標識(cell identity) |
imei |
國際移動設備標識碼(IMEI) |
app |
應用名稱 |
time |
訪問時間 |
uplinkbytes |
上行字節數 |
downlinkbytes |
下行字節數 |
2. 測試數據
1,1,460028714280218,360,2015-05-01,7,1116 1,2,460028714280219,qq,2015-05-02,8,121 1,3,460028714280220,yy,2015-05-03,9,122 1,4,460028714280221,360,2015-05-04,10,119 2,1,460028714280222,yy,2015-05-05,5,1119 2,2,460028714280223,360,2015-05-01,12,121 2,3,460028714280224,qq,2015-05-02,13,122 3,1,460028714280225,qq,2015-05-03,1,1117 3,2,460028714280226,qq,2015-05-04,9,1118 3,3,460028714280227,qq,2015-05-05,10,120 1,1,460028714280218,360,2015-06-01,7,1118 1,2,460028714280219,qq,2015-06-02,8,1119 1,3,460028714280220,yy,2015-06-03,9,1120 1,4,460028714280221,360,2015-06-04,10,119 2,1,460028714280222,yy,2015-06-05,11,1118 2,2,460028714280223,360,2015-06-02,4,121 2,3,460028714280224,qq,2015-06-03,17,1119 3,1,460028714280225,qq,2015-06-04,18,119 3,2,460028714280226,qq,2015-06-05,19,1119 3,3,460028714280227,qq,2015-06-10,120,121
3. 上傳數據文件至HDFS
hadoop fs -put mobile.csv /test/
加載&預處理
預處理,如無效數據過濾等。
1 val fields = sc.broadcast(List("nodeid", "ci", "imei", "app", "time", "uplinkbytes", "downlinkbytes")) 2 val mobile = sc.textFile("/test/mobile.csv").map(_.split(",")).filter(line => line.length != fields.value.length)
統計App訪問次數
1 mobile.map(line => (line(fields.value.indexOf("app")), 1)).reduceByKey(_+_).map(appCount => (appCount._2, appCount._1)).sortByKey(false).map(appCount => (appCount._1, appCount._2)).repartition(1).saveAsTextFile("/text/result.csv") // Array((qq, 10), (360, 6), (yy, 4))
統計DAU
1 mobile.map(line => line(fields.value.indexOf("imei")) + ":" + line(fields.value.indexOf("time"))).distinct().map(imeiTime => (imeiTime.split(":")(1), 1)).reduceByKey(_+_).sortByKey().collect // Array((2015-05-01, 2), (2015-05-02, 2), (2015-05-03, 2), (2015-05-04, 2), (2015-05-05, 2), (2015-06-01, 2), (2015-06-03, 2), (2015-06-04, 2), (2015-06-05, 2), (2015-06-10, 1))
統計MAU
1 mobile.map { line => 2 val time = line(fields.value.indexOf("time")) 3 val month = time.substring(0, time.lastIndexOf("-")) 4 line(fields.value.indexOf("imei")) + ":" + month 5 }.distinct.map { imeiMonth => (imeiMonth.split(":")(1), 1) }.reduceByKey(_+_).sortByKey().collect // Array((2015-05, 10), (2015-06, 10))
統計App上下流量
1 mobile.map { line => 2 val uplinkbytes = line(fields.value.indexOf("uplinkbytes")) 3 val downlinkbytes = line(fields.value.indexOf("downlinkbytes")) 4 (line(fields.value.indexOf("app")), (uplinkbytes, downlinkbytes)) 5 }.reduceByKey((updownlinkbytes1, updownlinkbytes2) => (updownlinkbytes1._1 + updownlinkbytes2._1, updownlinkbytes1._2 + updownlinkbytes2._2)).collect // Array((yy, (34.0, 3479.0)), (qq, (117.0, 6195.0)), (360, (54.0, 2714.0)))
作者:netoxi
出處:http://www.cnblogs.com/netoxi
本文版權歸作者和博客園共有,歡迎轉載,未經同意須保留此段聲明,且在文章頁面明顯位置給出原文連接。歡迎指正與交流。