spark core (二)


一、Spark-Shell交互式工具

     1、Spark-Shell交互式工具

        Spark-Shell提供了一種學習API的簡單方式, 以及一個能夠交互式分析數據的強大工具。

        Scala語言環境下或Python語言環境下均可使用。

        啟動Spark-Shell
                ./bin/spark-shell      本地模式,線程數為1(1個CPU)
                ./bin/spark-shell --master
                ./bin/spark-shell --master local[2] //使用2CPU核運行
                ./bin/spark-shell --master local[2] --jar testcode.jar //指定Jar包路徑

                   ./bin/spark-shell --master  spark://liuwei3:7077 

 

        Spark-Shell中已經創建了一個名為scSparkContext對象

        --master用來設置context將要連接並使用的資源主節點, master的值可以是Standalone模式 的Spark集群地址、 MesosYarn集群的URL, 或者是一個local地址。 

       使用--jar可以添加Jar包的路徑, 使用逗號分隔可以添加多個包

       Spark-Shell的本質是在后台調用了spark-submit腳本來啟動應用程序。

     2、Spark-Shell操作案例

         收集到數據后, 需要對數據進行各種數據處理,  包括數據的抽取-轉換-裝載( Extract-Transform-Load, ETL) 、 數據統計、 數據挖掘, 以 及為后續的數據呈現和為決策而提供的數據持久化。 

         本案例數據處理過程, 包含了對外部文件的加載, 對文件數據的轉換、 過濾、 各種數據統計, 以及處理結果的存儲。 

         加載文件 :

                  Spark創建sc之后, 可以加載本地文件創建RDD, 返回一個MapPartitionsRDD
                            val textFile=sc.textFile(“file:///*****/README.md”)
                   加載HDFS文件和本地文件都是使用textFile( ), 區別是添加前綴進行標識, hdfs://file:///
                   從本地讀取的文件返回MapPartitionsRDD
                   HDFS讀取的文件先轉成HadoopRDD, 然后隱式轉換成MapPartitonsRDD
                   MapPartitionsRDDHadoopRDD都是基於Spark的彈性分布式數據集RDD

       執行Transformation操作返回新RDD:
                   texfFile.first() //獲取RDD文件的第一行
                   textFile.count() //獲取RDD文件的行數
                   val textRDD=textFile.filter(line=>line.contains(“Spark”)) //過濾出包含Spark的行, 並返回新RDD
                   textFile.filter(line=>line.contains(“Spark”)).count() //鏈接多個TransformationAction操作

       找出文本中每行最多的單詞數
                   textFile.map(line=>line.split(“ ”).size).reduce((a,b)=>if(a>b) a else b)
                   首先將textFile的每一行文本使用split(“ ”)進行分詞, 並統計分詞后的單詞數, 然后執行reduce操作, 使 用(a,b)=>if(a>b) a else b 進行比較, 返回最大值 

       詞頻統計
                   val wordCount=textFile.flatMap(line=>line.split(“ ”)).map(word=>(word,1)).reduceByKey((x,y)=>x+y)
                   wordCount.collect()
                   結合flatMapMapreduceByKey來計算文件中每個單詞的詞頻, 並返回(String,Int)類型的 鍵值對ShuffleRDD, 最后使用collect()聚合單詞計數結果。

                  如果想讓代碼更簡潔, 可以使用占位符”_”
                   當每個參數在函數文本中最多出現一次的時候, 可以使用下划線_+_擴展成帶兩個參數的函數文本。
                   多個下划線指代多個參數, 而不是單個參數的重復使用。 第一個下划線代表第一個參數, 第二個下划線 代表第二個參數。
                   val wordCount=textFile.flatMap(_.split(“ ”)).map((_,1)).reduceByKey(_+_)
                   Spark默認是不進行排序的, 可以使用sortByKey按照Key進行排序, false為降序, true為升序。
                   val wordCount=textFile.flatMap(_.split(“ ”)).map((_,1)).reduceByKey(_+_).map(m=>(m._2,m._1)).sortByKey(false).map(m=>(m._2,m._1))
                   其中, m=>(m._2,m._1)實現keyvalue互換。
                   wordCount.saveAsTextFile(“hdfs://tgmaster:9000/out/wordcount”) //保存文件

      RDD緩存(內存持久化)
                   Spark支持將數據集存進內存緩存中, 當數據被反復訪問時, 是非常有用的。
                   textFile.cache() //通過cache緩存數據可用於非常大的數據集, 支持跨越幾十或幾百個節點。    或者textFile.persist()     清除緩存:textFile.unpersist()
                   textFile.count()

 

    .collect()作用:將計算結果從集群中獲取到本地內存來顯示,容易發生OOM(內存溢出)所以collect只適合結果數據量較小的情況,如果計算結果數據量很大,此時要用foreach()輸出

2、操作案例
//加載文件
val lines=sc.textFile("file:///home/tg/datas/ws")
val lines2=sc.textFile("hdfs://tgmaster:9000/in/ws")
//統計文件的行數
lines.count
//查詢文件第一行的數據
lines.first
//過濾出包含"spark"的行
val result1=lines.filter(line=>line.contains("spark"))
//包含"spark"關鍵詞的行數
result1.count
//統計出每行最多的單詞數
lines.map(line=>line.split(" ").size)
.reduce((x,y)=> if (x>y) x else y)

//單詞統計計數(WordCount),根據次數進行降序排列
vla result=lines.flatMap(x=>x.split(" "))  //對單詞進行分隔
.map(x=>(x,1)) //將分隔后的單詞進行統計,形成鍵值對(word,1)
.reduceByKey((x,y)=>x+y) //對key進行分組,然后統計Value值
====方法二=======
.sortBy(_._2,false)

====方法一=====
.map(x=>(x._2,x._1)) //(次數,單詞)
.sortByKey(false)  //默認true升序,false降序
.map(x=>(x._2,x._1))  //(單詞,次數)

//將計算結果從集群中獲取到本地內存來顯示,
//容易發生OOM(內存溢出),所以collect只適合結果數據量較小的情況,
//如果計算結果數據量很大,此時要用foreach()輸出
.collect  
//將最終的單詞統計結果保存到HDFS上
result.saveAsTextFile("hdfs://tgmaster:9000/out/wsresult")

3、RDD緩存(內存持久化)
result.cache() //緩存到內存
result.persist()//同上
result.unpersist()//清除緩存

二、SparkContext與部署模式

        1、 SparkContext
           SparkContextSpark上下文
           獨立應用程序需要初始化一個SparkContext作為程序的一部分, 然后將一個包含應 用程序信息的SparkConf對象傳遞給SparkContext構造函數。
           創建SparkContext對象:
   

scala方式:                

def main(args: Array[String]): Unit = {
    //第一步:創建SparkConf對象
    val conf=new SparkConf().setAppName("WordCount")
                .setMaster("local")
    //第二步:創建SparkContext對象
    val sc=new SparkContext(conf)
    //第三步:可以從linux或者HDFS中獲取數據
    val lines=sc.textFile("hdfs://tgmaster:9000/in/resws")
    //第四大步:進行單詞統計計數
    val result=lines.flatMap(_.split(" "))
                  .map((_,1)).reduceByKey(_+_)
                  .sortBy(_._2,false)
    //第五步:將計算結果保存到HDFS中
//    result.saveAsTextFile("hdfs://liuwei1:9000/out/res2")

//    result.collect()
    //通過foreach算子輸出單詞與次數
    result.foreach(x=>println(x._1+" "+x._2))
  }

JAVA方式:

setMaster()用於設置運行的master上的URL,如果程序在本地運行,需要設置為local或local[N](N>2),如果程序需要打包運行在集群中,那么在代碼中就不需要設置setMaster()

SparkContext,即為Spark上下文,它包含在Driver驅動程序中,Spark程序的運行離不開SparkContext

Scala開發spark對應的是sparkcontext;Java開發spark對應的是Javasparkcontext

創建sparkcontext對象,需要sparkConf對象作為參數 

package day1;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;
/**
 * 用Java開發Spark的WordCount
 * @author Administrator
 *
 */
public class WordCount {
	public static void main(String[] args) {
		//第一步:創建SparkConf對象
		SparkConf conf=new SparkConf().setAppName("WordCount")
				.setMaster("local");
		//第二步:創建JavaSparkContext對象
		JavaSparkContext sc = new JavaSparkContext(conf);
		//第三步:從HDFS中加載數據,生成JavaRDD<String>    
從HDFS等外部數據源創建程序中第一個RDD,即為初始RDD。
創建初始RDD有兩種方式:通過HDFS等外部數據源中創建;通過並行集合的方式創建(例:val ) JavaRDD<String> lines = sc.textFile("hdfs://tgmaster:9000/in/resws"); //第四步:分隔單詞
flatmap:先map,后flatten
flatMap算子中,通常有一個String類型的參數,同時具有集合Iterable<T>類型的返回值 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { /** * */ private static final long serialVersionUID = 1L; public Iterable<String> call(String line) throws Exception { // TODO Auto-generated method stub return Arrays.asList(line.split(" ")); } }); //第五步:將分隔后的每個單詞出現次數記錄為1,形成鍵值對<單詞,1>
用JAVA開發Spark,如果需要通過映射的方式產生鍵值對,此時要用到mapToPair算子,這一點與scala開發spark不一樣,在scala中只有map算子,沒有maoToPair算子。
mapToPair 算子中有一個String類型 JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { /** * */ private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(String word) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(word, 1); } }); //第六步:通過reduceByKey()算子統計每個單詞出現的次數
reduceBykey 這個算子很重要,它有兩個Integer類型的參數,返回值是Integer
它的運行過程是,先在本地按照key值進行聚合,然后再全局范圍再按照key值進行聚合,他的性能比groupBykey強很多 JavaPairRDD<String, Integer> result = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { /** * */ private static final long serialVersionUID = 1L; public Integer call(Integer num1, Integer num2) throws Exception { // TODO Auto-generated method stub return num1+num2; } }); //第七步:通過foreach()算子輸出最終結果 result.foreach(new VoidFunction<Tuple2<String,Integer>>() { /** * */ private static final long serialVersionUID = 1L; public void call(Tuple2<String, Integer> info) throws Exception { // TODO Auto-generated method stub System.out.println(info._1+"出現了"+info._2); } }); } }

   2、Spark部署模式

           查看本地電腦cpu是幾核,通過任務管理器,性能,CPU使用記錄有幾個框就是幾核

          Local本地模式: 運行於本地
                             spark-shell --master local[2]
                             local[2]是說, 執行Application需要用到CPU2個核

          Standalone獨立模式: Spark自帶的一種集群模式
                             Spark自己管理集群資源, 此時只需要將HadoopHDFS啟動,(不需要啟動yarn)
                             Master節點有master,Slave節點上有worker
                             啟動 ./bin/spark-shell --master spark://master:7077

          YARN模式
                             Spark自己不管理資源, 向YARN申請資源, 所以必須保證YARN是啟動起來的
                               操作步驟:
                                 啟動Hadoop:Hadoop目錄執行start-all.sh
                                 Spark目錄, ./bin/spark-shell --master yarn-clientyarn-cluster

                                                       yarn-client運行在測試環境   yarn-cluster運行在真實環境

         以上三種都是集群模式

         IDEA中生成Jar包, 使用IDEA編譯class文件, 同時將class打包成Jar文件。
                  File——Project Structure, 彈性“Project Structure”的設置對話框
                  選擇左邊的Artifacts, 點擊上方的“+”按鈕
                  在彈出的對話框中選擇“Jar”——“from moduls with dependencies”
                  選擇要啟動的類, 然后確定   (刪除SparkApps.jar下的所有jar包)
                  應用之后選擇菜單“Build”——“Build Artifacts”, 選擇“Build”“Rebuild”即可生成

                       如果重新打包的話,刪除src下的META-INF
         Spark提交Job:

                   ./spark-submit \

                   --class newScala.Test \

                  --master yarn-cluster \ 

                  /home/hadoop/Test.jar      (剩下的可以寫,也可以不寫)

 

      3、Spark部署模式—Master URL格式及說明  

           

            yarn-client運行在客戶端本地,yarn-cluster運行在ApplicationMaster所在的集群節點中

三、RDD彈性分布式數據集

        1、RDD彈性分布式數據集    (RDD既可以放在內存中,也可以放在磁盤中,優先放在內存中,內存放不下,再考慮放在磁盤中;分區分布在很多節點上;partition真正存放數據的地方,partition在內存中)(默認情況下,文件在HDFS上有幾個block塊,初始RDD就有幾個分區)

            RDDSpark提供的核心抽象, 全稱為Resillient Distributed Dataset, 即彈性分布式數 據集。

            RDD在抽象上來說是一種元素集合, 包含了數據。 它是被分區的, 分為多個分區。 每個分區分布在集群中的不同節點上, 從而讓RDD中的數據可以被並行操作。 ( 分布 式數據集) 

            RDD通常通過Hadoop上的文件, 即HDFS文件或者Hive表, 來進行創建, 也可以通過 應用程序中的集合來創建。 

            RDD最重要的特性就是, 提供了容錯性, 可以自動從節點失敗中恢復過來。 如果某個 節點上的RDD partition, 因為節點故障, 導致數據丟了, 那么RDD會自動通過 數據來源重新計算該partition

                       Spark的容錯機制:1、lineage(血統) 2、checkpoint(檢查點)
            RDD的數據默認情況下存放在內存中的, 但是在內存資源不足時, Spark會自動將 RDD數據寫入磁盤。 ( 彈性) 

         2、RDD的定義
                一個RDD對象, 包含如下5個核心屬性:
                  一個分區列表, 每個分區里是RDD的一部分數據( 或稱數據塊) ;(每個分區保持相對均衡,不是絕地均衡)
                  一個依賴列表, 存儲依賴的其他RDD;(RDDA經過算子產生RDDB,RDDB依賴RDDA)
                  一個名為computer的計算函數, 用於計算RDD各分區的值;
                  分區器( 可選) , 用於鍵/值類型的RDD, 比如某個RDD是按散列來分區;
                  計算各分區時優先的位置列表( 可選) , 比如從HDFS上的文件生成RDD時, RDD分區的位置優先選擇數據所在的節點, 這樣可以避免數據移動帶來的開銷。 
               
 前三個是必須 ,后兩個是可選的

 四、創建RDD

        1、創建RDD

             進行Spark核心編程時, 首先要做的第一件事, 就是創建一個初始的RDD
                   該RDD中, 通常包含了Spark應用程序的輸入源數據。
                   在創建了初始的RDD之后, 才可以通過Spark Core提供的transformation算子, 對該RDD進 行轉換獲取其他的RDD。 

              Spark Core提供了三種創建RDD的方式, 包括:
                       使用程序中的集合創建RDD
                       使用本地文件創建RDD或使用HDFS文件創建RDD
                       RDD通過算子形成新的RDD

 

                       (前兩種創建初始RDD)

              經驗總結:
                     使用程序中的集合創建RDD, 主要用於進行測試, 可以在實際部署到集群運行之前, 自 己使用集合構造測試數據, 來測試后面的Spark應用的流程。
                     使用本地文件創建RDD, 主要用於臨時性地處理一些存儲了大量數據的文件。
                     使用HDFS文件創建RDD, 應該是最常用的生產環境處理方式, 主要可以針對HDFS上 存儲的大數據, 進行離線批處理操作。 

         2、並行化集合創建RDD

             如果要通過並行化集合來創建RDD, 需要針對程序中的集合, 調用SparkContextparallelize()方法。 Spark會將集合中的數據拷貝到集群上去, 形成一個分布式的數據集合, 也就是一個RDD
             相當於集合中的一部分數據會到一個節點上, 而另一部分數據會到其他節點上, 然后就可以用並行的方 式來操作這個分布式數據集合, 即RDD。 

                    案例: 110累加求和
                           val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                           val rdd = sc.parallelize(arr)                
                           val sum = rdd.reduce(_ + _)

             調用parallelize()時, 有一個重要的參數可以指定, 就是要將集合切分成多少個partition。 Spark會為每一個partition運行一個task來進行處理。
             Spark默認會根據集群的情況來設置partition的數量, 但是也可以在調用parallelize()方法時, 傳入第二個 參數, 來設置RDDpartition數量。 比如parallelize(arr, 10) 

          3、使用本地文件和HDFS創建RDD

              Spark是支持使用任何Hadoop支持的存儲系統上的文件創建RDD的, 比如說 HDFS、 Cassandra、 HBase以及本地文件。 

              通過調用SparkContexttextFile()方法, 可以針對本地文件或HDFS文件創建 RDD。 
              注意:
                        SparktextFile()方法支持針對目錄、 壓縮文件以及通配符進行RDD創建。
                        Spark默認會為hdfs文件的每一個block創建一個partition, 但是也可以通過textFile() 的第二個參數手動設置分區數量, 只能比block數量多, 不能比block數量少。 

              案例: 文件字數統計
                         val rdd = sc.textFile("data.txt")
                         val wordCount = rdd.map(line => line.length).reduce(_ + _)

              SparktextFile()除了可以針對上述幾種普通的文件創建RDD之外, 還有一些特列 的方法來創建RDD: 

                     SparkContext.wholeTextFiles()方法, 可以針對一個目錄中的大量小文件, 返 回<filename, fileContent>組成的pair, 作為一個PairRDD, 而不是普通的RDD
                          普通的textFile()返回的RDD中, 每個元素就是文件中的一行文本。
                      SparkContext.sequenceFile[K, V]()方法, 可以針對SequenceFile創建RDD, KV泛型類型就是SequenceFilekeyvalue的類型。 KV要求必須是
                         Hadoop的序列化類型, 比如IntWritableText等。
                      SparkContext.hadoopRDD()方法, 對於Hadoop的自定義輸入類型, 可以創建 RDD。 該方法接收JobConf、 InputFormatClass、 KeyValueClass
                      SparkContext.objectFile()方法, 可以針對之前調用RDD.saveAsObjectFile() 創建的對象序列化的文件, 反序列化文件中的數據, 並創建一個RDD。 
             

1、並行化集合創建RDD
val array=Array(1,2,3,4,5,6)
//沒有指定rdd的分區,此時默認為程序所分配的資源的CPU核數
val rdd=sc.parallelize(array)
//指定rdd2中有3個分區
val rdd2=sc.parallelize(array,3)
2、從外部數據源創建RDD
//此時沒有指定分區,
//那么分區數就等於文件在HDFS上存儲的Block數。
val lines=sc.textFile("hdfs路徑")
//指定了lines中有2個分區
//設置的分區數,只能比Block多,不能比它少。
val lines=sc.textFile("hdfs路徑",2)
3、通過Transformation(轉換)類型的算子產生新的RDD
val result=lines.flatMap(_.split(" "))
					.map((_,1))
					.reduceByKey(_+_)
但是下面兩個算子是Action(行動)類型。
result.collect
result.foreach(println) 

五、RDD操作

          RDD操作:創建操作、 轉換操作(transformation)、 行動操作 (Action)、 控制操作(緩存到內存,也稱內存持久化)

          在spark中transformation類型的算子,系統只是記錄下了這個操作行為,但這個行為並沒有被執行。當程序遇到一個action類型的算子時,會觸發Job的提交運行, 此時Action算子之前所有

transformation類型的算子就會被執行。  (saveAsTextFile、reduce是Action,reduceByKey是transformation,之前的textfile map sortBy等等都是transformation)

           1、transformation和action介紹

                 Spark支持兩種RDD操作: transformationaction
                 transformation操作會針對已有的RDD創建一個新的RDD

                 action則主要是對RDD進行最后的操作, 比如遍歷、 reduce、 保存到文件等, 並可以返回結果給Driver程序。

                 例如:
                      map就是一種transformation操作, 它用於將已有RDD的每個元素傳入一個自定義的函數, 並獲取一個新的元素, 然后將所有的新元素組成一個新的RDD
                      reduce就是一種action操作, 它用於對RDD中的所有元素進行聚合操作, 並獲取一個最終的結果, 然后返回給Driver 程序。 

                 transformation的特點就是lazy特性。

                 lazy特性指的是, 如果一個spark應用中只定義了transformation操作, transformation是不會觸發Spark程序的執行的,它們只是記錄了對RDD所做的操作, 但是不會自發的執行。 

                 只有當執行了一個action操作之后, 所有的transformation才會執行。
                 Spark通過這種lazy特性來進行底層的Spark應用執行的優化, 避免產生過多中間結果。
                 action操作執行, 會觸發一個spark job的運行, 從而觸發這個action之前所有的transformation的執行。
                 這是action的特性。

           2、案例:統計文件字數

                    // 這里通過textFile()方法, 針對外部文件創建了一個RDD lines, 但是實際上, 程序執行到這里為止, spark.txt文件的數據是不會加載到內存中的。 lines, 只是代表了一個指向

spark.txt文件的引用。   val lines = sc.textFile("spark.txt") 

                     // 這里對lines RDD進行了map算子, 獲取了一個轉換后的lineLengths RDD, 但是這里連數據都沒有, 當然
也不會做任何操作。 lineLengths RDD也只是一個概念上的東西而已。

                        val lineLengths = lines.map(line => line.length)

                     // 之后, 執行了一個action操作, reduce。 此時就會觸發之前所有transformation操作的執行, Spark會將操 作拆分成多個task到多個機器上並行執行, 每個task會在本地執行map操作,

並且進行本地的reduce聚合。 最后會進行一個全局的reduce聚合, 然后將結果返回給Driver程序。 val totalLength = lineLengths.reduce(_ + _) 

            3、常用transformation介紹

                    

                   reduceByKey:現在本地進行聚合,再全局聚合,性能比 groupBykey好

                   groupByKey:全局聚合           

              4、常用action介紹

                  

Spark運行原理(初級版):
DAG有向無環圖——DAGScheduler,Stage階段(stage划分算法)
——TaskSet——TaskScheduler,
把task任務(task分配算法)發送到Work節點中的Executor去執行
---Executor接收到task之后,會從線程池中取出相應的線程去執行接收到的task任務

  

補充:

         1、hadoop 1.x:

                HDFS:namenode(1個) 、datanode(多個)

                MapReduce:

                          JobTracker(1個):與用戶通信,接收用戶提交的Application;將Application划分為很多的任務(Task),分配多個TaskTracker去執行;管理調度集群中的資源

                          TaskTracker(多個):執行JobTracker分配的任務

               hadoop1.x缺陷:單點故障、

               hadoop 2.x:

                     HDFS:提出HA高可靠概念 (兩個namenode,一個負責讀,一個負責寫)

                     MapReduce

                     Yarn:管理調度集群中的資源


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM