spark機器學習


第一章 mesos spark shell

  1. SPARK-shell
    (1)修改spark/conf/spark-env.sh ,增加以下內容
    export MESOS_NATIVE_JAVA_LIBRARY=/usr/local/mesos/lib/libmesos.so
    export SPARK_EXECUTOR_URI=<上傳spark-1.6.0.tar.gz對應的hdfs URL, 如果已經把spark的jar包放在了mesos agent機器上,不用這個配置>
    

(2)運行命令:
shell ./bin/spark-shell --master mesos://host:5050
(3)代碼
```
scala> val lines=sc.textFile("/root/README.md")
lines: org.apache.spark.rdd.RDD[String] = /root/README.md MapPartitionsRDD[3] at textFile at :27

scala> lines.count
res1: Long = 3 
```

(4)web頁面:4040端口,可以看到上面執行的count操作

  1. SPARK的核心概念
    (1)spark-shell其實是一個driver programme(驅動器程序)
    (2)driver programme包含應用的main函數,定義了集群上的分布式數據集,用來發起集群上的各種並行操作
    (3)由於spark-shell在開啟時,制定了master,因此driver programe提交這些任務到集群上操作

  2. spark文件操作

第二章 RDD編程

  1. 操作過程
    (1)RDD包含兩類操作:transaction和action,只有action會對RDD計算出一個結果
    (2)RDD會在每次action操作時重新計算。可以通過presist對RDD持久化,在第一次對持久化的RDD計算后,spark會把RDD內容保存到內存中(以分區方式存儲在集群的每台機器上),這樣之后的action操作就可以重用這些RDD數據。默認不會把RDD緩存在內存中是因為,大量數據不應該僅僅以不同的形式多份存在內存中。

    val lines=sc.textFile("/root/README.md")
    val rdd2 = lines.filter(line=>line.contains("aa")).count
    rdd2.presist()
    rdd2.count
    
  2. 創建RDD的兩種途徑
    (1)把已有集合傳給SparkContext的parallelize()方法
    (2)從外部數據創建rdd

    sc.parallelize(List("aaa","bbb"))
    sc.textFile("/root/README.md")
    
  3. RDD操作
    (1)transaction:
        a. 轉化操作只能產生新的RDD,而不能改版原先RDD的數據

    errorsRDD = inputRDD.filter(lambda x: "error" in x)
    warningsRDD = inputRDD.filter(lambda x: "warning" in x)
    badLinesRDD = errorsRDD.union(warningsRDD)
    

    b. main函數中,所有的RDD會用譜系圖記錄產生依賴關系
![Screenshot from 2017-05-04 22-45-27.png-15kB][1]
(2)action :
    a. 獲取操作:take(int n) 獲取RDD的前n行數據, collect()獲取RDD中的所有數據
    b. 當調用一個新action操作時,整個RDD都會重新計算,導致行為低效,用戶可以將中間結果持久化

  1. 常見算子
    (1)transaction
    val rdd1 = sc.parallelize(List(1,2,3,3))    // (1,2,3,3)
    
    scala> rdd1.map(x=>x+1).collect             // Array(2, 3, 4, 4)
    scala> rdd1.flatMap(x=>x.to(3)).collect     // Array(1, 2, 3, 2, 3, 3, 3)
    scala> rdd1.filter(x => x!=1).collect       // Array(2, 3, 3)
    scala> rdd1.distinct.collect                // Array(2, 1, 3)   
    scala> rdd1.sample(false,0.5).collect       // 隨機取樣,個數和數值每次都不一樣(是否替換)
    
    val rdd1 = sc.parallelize(List(1,2,3))      // (1,2,3)
    val rdd2 = sc.parallelize(List(3,4,5))      // (3,4,5)
    
    scala> rdd1.union(rdd2).collect             // Array(1, 2, 3, 3, 4, 5)
    scala> rdd1.intersection(rdd2).collect      // Array(3)  求兩個RDD共同的元素
    scala> rdd1.subtract(rdd2).collect          // Array(2, 1) 求兩個RDD不同的部分
    scala> rdd1.cartesian(rdd2).collect         // 兩個RDD求笛卡爾積 Array((1,3), (1,4), (1,5), (2,3), (3,3), (2,4), (2,5), (3,4), (3,5))
    

(2)action
```scala
val rdd1 = sc.parallelize(List(1,2,3,3)) /home/lj/Documents // (1,2,3,3)

scala> rdd1.collect                        // Array(1, 2, 3, 3) 返回RDD所有元素
scala> rdd1.count                          // 4   返回RDD的元素個數
scala> rdd1.countByValue                   // Map(2 -> 1, 1 -> 1, 3 -> 2) 返回鍵值對(元素值 -> 元素個數)
scala> rdd1.take(2)                        // Array(1, 2)   返回RDD中的n個元素
scala> rdd1.top(2)                         // Array(3, 3)   返回RDD中的前2個元素


```

![action.png-118kB][2]

  1. 持久化
    (1)RDD持久化時,計算出RDD的節點會分別保存他們所求出的分區數據
    (2)如果一個有持久化數據的節點發生故障,spark會在用到緩存數據時重算丟失的數據分區。如果想在節點故障時不拖累執行速度,也可以把數據備份到多個節點上。
    (3)persist()默認把數據以序列化的形式緩存在jvm堆(內存)中,同時,也可以通過調整持久化級別把數據緩存到磁盤或堆外緩存上。
    scala> rdd2.persist(StorageLevel.DISK_ONLY)         // persist不會立刻觸發緩存,而是等到第一次action操作后,自動緩存這個RDD結果
    res4: rdd2.type = MapPartitionsRDD[1] at filter at <console>:30
    
    scala> rdd2.count                                  
    res5: Long = 1
    

(4)如果緩存的RDD數據在節點上的內存放不下了,spark會通過LRU(最近最少被使用)原則吧老數據移除內存,存放新數據。因此,不論只緩存到內存還是同時緩存到內存和硬盤, 都不會因為緩存而使得作業停止,但是緩存過多不必要的數據,會帶來更多分區重算時間

第三章 鍵值對RDD

一. 普通RDD轉換成pair RDD

  1. 初始化

    scala> val rdd1 = sc.parallelize(List(1->2,3->4,3->9))
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
    

二. pair RDD轉化操作

  1. 聚合操作:組合RDD中相同key的value
    (1)reduceByKey:接受一個函數,為數據集中每個鍵進行規約操作,每個規約操作會將鍵相同的值合並起來
    (2)foldByKey:將key相同的分在一組,再對組內的value進行fold操作.使用一個零值初始進行折疊(零值與另一個元素合並結果仍為該元素)
    // mapValues與reduceByKey計算每個鍵對應的均值
    scala> val rdd1 = sc.parallelize(Array(("panda",3),("pink",1),("panda",6),("pink",3)))
    scala> val rdd2 = rdd1.mapValues(x=>(x,1)).reduceByKey((v1,v2)=>(v1._1+v2._1,v1._2+v2._2))   // Array((panda,(9,2)), (pink,(4,2))) 
    scala> val rdd3 = rdd2.mapValues(v=>v._1*1.0/v._2)  // Array((panda,4.5), (pink,2.0))
     
    

(3)combineByKey(createCombiner,mergeValue,mergeCombiners):combineByKey方法的三個參數分別對應聚合的幾個階段。在遍歷所有元素時,每個元素的key,要么沒有遇到過,要么與之前的某個元素的key相同。第一個參數createCombiner:將每個元素的value映射成新的value,相當於mapvalue方法。第二個參數mergeValue是說,當發現該元素的key與之前已經映射成新value的元素的key相同時,這個新形勢的value與新遍歷到的元素的舊形式的value如何組合。第三個參數mergeCombiners:當每個分區的元素都已經形成了新形勢的k,v,此時如何對相同k的value進行組合
```scala
// combineValues計算每個key的平均值
scala> val rdd1 = sc.parallelize(Array(("panda",3),("pink",1),("panda",6)))
scala> val rdd2 = rdd1.combineByKey(
| (v)=>(v,1),
| (nValue:(Int,Int),oValue)=>(nValue._1+oValue,nValue._2+1),
| (nValue1:(Int,Int),nValue2:(Int,Int))=>(nValue1._1+nValue2._1,nValue1._2+nValue2._2)
| )
scala> val rdd3 = rdd2.mapValues(v=>v._1*1.0/v._2) // Array((panda,4.5), (pink,1.0))

```

(4)並行度優化:在執行分組和聚合時,可以指定spark的分區數
scala sc.parallelize(data).reduceByKey((x,y)=>x+y,10) // 指定10個分區

  1. 數據分組
    (1)groupByKey:把相同鍵值的RDD[K,V]經過聚合變成RDD[K,Iterator(V)]. 因此,

    rdd.reduceByKey(func)  =  rdd.groupByKey().mapValues(v=>v.reduce(func))
    
  2. 連接
    (1)cogroup:將兩個pair rdd合並成一個rdd,形式為RDD[k,Iterator[v],Iterator[w]]
    (2)leftOuterJoin和rightOuterJoin:分別表示左右連接

    scala>  val rdd1 = sc.parallelize(Array((1, 30), (2, 29), (4, 21)))  
    scala> val rdd2 = sc.parallelize(Array((1, "zhangsan"), (2, "lisi"), (3, "wangwu")))
    
    scala> rdd1.cogroup(rdd2).collect                 // cogroup
    res0: Array[(Int, (Iterable[Int], Iterable[String]))] =
    Array((4,(CompactBuffer(21),CompactBuffer())), 
          (2,(CompactBuffer(29),CompactBuffer(lisi))), 
          (1,(CompactBuffer(30),CompactBuffer(zhangsan))), 
          (3,(CompactBuffer(),CompactBuffer(wangwu))))
          
    
    scala> rdd1.leftOuterJoin(rdd2).collect           // leftOuterJoin
    res1: Array[(Int, (Int, Option[String]))] = Array((4,(21,None)), (2,(29,Some(lisi))), (1,(30,Some(zhangsan))))
    
    scala> rdd1.rightOuterJoin(rdd2).collect          // rightOuterJoin
    res2: Array[(Int, (Option[Int], String))] = Array((2,(Some(29),lisi)), (1,(Some(30),zhangsan)), (3,(None,wangwu)))
    
  3. 數據排序
    (1)sortByKey:默認按照升序排列相同key的value,讓rdd有順序的save到磁盤或展示出來

    scala> val rdd1 = sc.parallelize(Array(("panda",3),("pink",1),("panda",6)))
    
    scala> def sortInt = new Ordering[Int]{
         |   override def compare(a:Int,b:Int) = a.toString.compare(b.toString)
         | }
    
    scala> rdd1.sortByKey().collect()  // Array((panda,3), (panda,6), (pink,1))
    
  4. Pair RDD的action操作
    (1)countByKey:統計每個key出現的個數
    (2)collectAsMap:把RDD輸出Map
    (3)lookup(key):返回key對應的所有value

    scala> val rdd1 = sc.parallelize(List((1,2),(3,4),(3,6)))
    
    scala> rdd1.countByKey()
    res5: scala.collection.Map[Int,Long] = Map(1 -> 1, 3 -> 2)
    
    scala> rdd1.collectAsMap
    res6: scala.collection.Map[Int,Int] = Map(1 -> 2, 3 -> 6)
    
    scala> rdd1.lookup(3)
    res7: Seq[Int] = WrappedArray(4, 6)
    
  5. 數據分區
    (1)分布式程序中,通信的代價很大,因此控制數據分布來減少網絡傳輸可以極大提升整體性能。但分區並非對所有的應用都是好的,比如如果RDD只需被掃描一次就完全不必預先對其分區。只有當數據集多次在諸如連接這樣的基於key的操作時,分區才會有用。

    (2)所有的Pair RDD都能進行分區,系統會根據一個針對key的函數對元素進行分組。spark沒有給出顯示控制每個key具體落在哪個工作節點上的方法(其中一個原因是節點失敗時讓然可以在其他節點進行工作)。但是spark可以確保同一組key出現在同一個節點上。eg:通過哈希分區將一個RDD分成100個分區,此時key的哈希值對100取模結果相同的記錄都會被放在同一個節點上。

    (3)應用舉例:內存中保存着一份由(UserID,UserInfo)對組成的RDD表,其中UserInfo包含用戶訂閱的url。這張表會周期性的與一個小文件進行組合,小文件存儲着過去五分鍾用戶所訪問的url。因此現在要每五分鍾對用戶訪問其未訂閱的url做統計。

    // userid和userinfo這個表一般不變
    val userData = sc.sequenceFile[UserId,UserInfo]("hdfs://...").persist()
    
    // 周期性的調用該方法,處理過去5分鍾產生的事件日志
    def processNewLogs(logFileName:String){
        val events = sc.sequenceFile[UserId,UserInfo](logFileName) // 用戶點擊事件日志
        val joined = userData.join(events)                         // Pair RDD of (UserId,(UserInfo,LinkInfo))
        val offTopicVisits = joined.filter({
            case (userid,(userinfo,linkinfo)) => !userinfo.topics.contains(linkinfo.topic)
        }).count()
    }
    
    

      上面的代碼可以運行,但是效率不高。原因在於。默認情況下,join操作會把兩個pair RDD中的所有key的哈希值都求出來,再將key哈希值相同的記錄通過網絡傳到一個機器上,然后在那台機器上進行連接操作。因為userData這張用戶訂閱uel表,遠遠比沒五分鍾出現的小表大,所以每五分鍾都要對userData表進行哈希取值,然后跨節點混洗。
      因此,我們的改進方法就是先對userData表進行哈希分區,之后持久化到內存中,讓每五分鍾出現的操作只對小表進行hash取值
scala val userData = sc.sequenceFile[UserId,UserInfo]("hdfs://...") .partitionBy(new HashPartitioner(100)) //分成100個分區,分區個數至少和集群中機器的個數相同 .persist() // partitionBy只是轉化操作,需要持久化才能避免每次引用該rdd重新分區
(4)scala可以通過RDD的partitioner屬性獲取分區信息
```scala
scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3)))
scala> pairs.partitioner // 無分區
res8: Option[org.apache.spark.Partitioner] = None

scala> val partitioned = pairs.partitionBy(new org.apache.spark.HashPartitioner(2))
scala> partitioned.partitioner   // 有分區
res9: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
```

(5)合適手動設置分區有效益:
用partitioner方法對RDD分區會對很多操作產生益處:reduceBy , cogroup , groupwith , join , leftOuterJoin , rightOuterJoin , groupByKey , combineByKey , lookup。但是對於像reduceByKey這樣操作單個RDD的方法,他們只會把每個key對應的value在本地機器上進行計算,最終把所有機器上的結果進行規約,這種操作本身就不會產生數據跨節點混洗。但是想cogroup和join這樣操作兩個RDD的方法,如果對這兩個RDD采用相同的辦法手動分區,那么相同key的項都在同一台機器上,這樣就避免了產生數據的跨界點混洗

(6)自定義分區方式
    繼承org.apache.spark.Partitioner類,實現下列三個方法:
    (a) numPartitions:創建的總分區數
    (b) getPartition(key:Any):根據key,返回分區編號 ( 編號從0到numPartitions-1 )
    (c) equals():該方法來判斷你的分區器對象是否和其他分區器實例相同
scala /** 創建一個基於域名的分區器,這個分區器只對url中的域名部分求哈希 */ class DomainNumPartitioner(numParts:Int) extends Partitioner{ override def numPartitions:Int = numParts override def getPartition(key:Any):Int = { val domain = new java.net.Url(key.toString).getHost() val code = (domain.hashCode % numPartitions) if(code <0) code + numPartitions else code } override def equals(other:Any):Boolean = other match{ case dnp:DomainNumPartitioner => dnp.numPartitions == numPartitions case _ => false } }

第四章 spark的共享變量

  1. 累加器
    (1)由於spark的任務再多個節點上跑,驅動節點上的普通變量不能再多個節點上共享,因此為了解決共享變量的問題,提出了累加器( 結果聚合 )和廣播變量 ( 廣播 )
    // 計算文件中空行的行數
    scala> val file = sc.textFile("/root/README.md")
    scala> val count = sc.accumulator(0)
    count: org.apache.spark.Accumulator[Int] = 0
    scala> val call = file.map(line => { if(line=="") count += 1})
    scala> println(count.value)
    5
    

這個例子中,使用了累加器在數據讀取時進行錯誤統計,而沒有對rdd進行filter和reduce實現
(2)累加器是一種只寫變量,操作節點不能訪問累加器的值,必須要對每次更新操作進行復雜的通信
(3)通過value變量獲取可累加器的值
(4)累加器操作應該寫在action的動作中:比如寫在forEach算子。因為在轉化算子中,比如如果有一個分區執行map操作失敗了,spark會在另一個節點重新運行該任務,即使該節點沒有崩潰,只是處理速度比別的節點慢很多。spark也可以搶占式的再另一個節點上啟動一個任務副本,誰先結束任務就取誰的副本。因此,這種情況會導致累加器操作重復執行多次

  1. 廣播變量
    (1)廣播變量是一種只讀變量
    (2)雖然spark會把閉包中的變量發送到每個工作結點,但這種方法比廣播變量低效得多。原因有二:
            a ) 廣播變量再變量的發送上對大對象有網絡優化
            b ) 如果這個變量來自於讀取文件,不適用廣播變量會導致這個文件會被不同工作節點讀取多次。
    (3)使用value獲取廣播變量的值
    scala> val words = sc.broadcast(List("fuck","shit"))
    scala> words.value
    res1: List[String] = List(fuck, shit)
    

(4)當廣播變量的數據很大時,應當選擇一種合適的序列化機制

  1. 分區共享連接池等資源
    (1)當map等轉換操作中包含訪問數據庫等操作時,就需要通過數據庫連接池的方式重用連接。而分布式的代碼在不同分區中運行,簡單的復用連接池對象無法正常工作。
    (2)scala提供了mapPartitions(function)算子,這個function中的變量會在分區之間a共享(這個function輸入為每個分區的元素迭代器,返回一個執行結果的序列迭代器)

    object Test1 extends App{
      def sumofeveryPartition(in:Iterator[Int]):Int = {
        var sum = 0
        in.reduce(_+_)
      }
      val conf = new SparkConf().setAppName("test111").setMaster("mesos://base1:5050")
      val sc = new SparkContext(conf)
      val input = sc.parallelize(List(1,2,3,4))
      val result = input.mapPartitions(   // partVal : Iterator[Int],RDD中的元素是Int的
        partVal => Iterator(sumofeveryPartition(partVal))
      )
      result.collect().foreach(print(_))   // 2個結果:3,7
      sc.stop
    }
    
    
  2. 數值操作
    (1)spark對包含數值數據的RDD提供了統計學方法

    方法 含義
    count(long value) RDD中元素個數
    mean() 元素平均值
    vaiance 元素方差
    samoleVariance() 從采樣中計算出方差
    stdev() 標准差
    sampleStdev() 采樣的標准差

(2)通過RDD的stats()方法,返回org.apache.spark.util.StatCounter對象,該對象包含mean()平均值,stdev()標准差等數值方法
scala scala> val rdd1 = sc.parallelize(List(1,2,3,4)) scala> val stats = rdd1.stats() scala> stats.mean

第五章 submit提交集群

  1. 驅動器節點2個職責
    (1)把用戶程序轉換成分布式任務:
    所有的spark程序遵從同一個流程:把輸入數據創建一系列RDD,通過轉化操作派生出新的RDD,最后使用行動操作收集或存儲結果RDD中的數據
    spark程序會隱式地創建出一個由操作組成的有向無環圖,當驅動器程序執行時會把這個邏輯圖轉換為物理執行計划
    (2)為執行器節點調度任務
    驅動器程序必須在各執行器進程間協調任務調度。執行器進程啟動后,會向驅動器進程注冊自己

  2. 執行器節點2個作用:
    spark應用啟動時,執行器節點就被同時啟動
    (1)執行組成spark應用的任務,並將結果返回給驅動器程序
    (1)通過BlockManager為用戶程序中要求緩存的RDD提供內存式存儲

  3. 集群管理器
    spark依賴集群管理器啟動驅動器節點。集群管理器是spark的可插拔式組件。集群管理器用於啟動執行器節點。而驅動器可以被集群管理器也可以不被集群管理器啟動

  4. spark-submit
    (1)基本格式:

    bin/spark-submit [options] <app jar | python file> [app options]
    
  5. 構建程序包
    (1)build.sbt:

    import AssemblyKeys._
    name := "Simple Project"
    version := "1.0"
    organization := "com.databricks"
    scalaVersion := "2.11.8"
    libraryDependencies ++= Seq(
    	 // Spark依賴
    	 "org.apache.spark" % "spark-core_2.10" % "1.2.0" % "provided",
    	 // 第三方庫
    	 "net.sf.jopt-simple" % "jopt-simple" % "4.3",
    	 "joda-time" % "joda-time" % "2.0"
    )
    // 這條語句打開了assembly插件的功能
    assemblySettings
    // 配置assembly插件所使用的JAR
    jarName in assembly := "my-project-assembly.jar"
    // 一個用來把Scala本身排除在組合JAR包之外的特殊選項,因為Spark
    // 已經包含了Scala
    assemblyOption in assembly :=
     (assemblyOption in assembly).value.copy(includeScala = false)
    
    

(2)project/assembly.sbt
```shell
# 顯示project/assembly.sbt的內容
$ cat project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")

$ sbt assembly
```
  1. submit的部署模式
    (1)客戶端模式
    客戶端模式下,驅動器程序會在執行spark-submit的機器上,此時終端可以看到驅動器程序的輸出,但要保持終端始終連接。且該機器與執行節點需要有很快速的網絡交換
    (2)集群模式:--deploy-mode cluster
    該模式下,驅動器程序本身也會在集群中申請資源運行自己的進程。這樣,可以在程序運行時關閉電腦。
    (3)yarn管理的spark集群既有客戶端模式,又有集群模式。但是mesos管理的spark集群,只有客戶端模式,但是mesos管理下的任務,可以動態分配CPU(即執行器進程占用的cpu個數會在他們執行的過程中動態變化)。這種默認的方式成為細粒度模式。mesos也支持粗粒度模式,一開始分配固定的cpu,內存(Spark 應用的 spark.mesos.coarse 設置為 true)
    (4)yarn集群和mesos集群的選擇:
    Mesos 相對於 YARN 和獨立模式的一大優點在於其細粒度共享的選項,該選項可以將類似 Spark shell這樣的交互式應用中的不同命令分配到不同的 CPU 上。因此這對於多用戶同時運行交互式 shell 的用例更有用處。除此之外,選擇使用yarn模式更為合適

第六章 Spark-SQL

  1. DataSet與DataFrame
    (1)DataSet是Spark1.6以后新加的分布式數據集,比RDD有諸多好處,比如強類型和提供更有力的表達式方法,適應sql執行引擎。
    (2)DataFrame是包含列名的DataSet

    // 1. 構建對象的分布式數據集
    scala> val spark = SparkSession.builder().appName("test-sql").config("p1", "v1").getOrCreate()
    scala> case class People(name:String,age:Int)
    scala> val ds1 = Seq(People("Andy",32)).toDS
    scala> ds1.show()
    +----+---+
    |name|age|
    +----+---+
    |Andy| 32|
    +----+---+
    scala> ds1.collect
    res2: Array[People] = Array(People(Andy,32))
    
    // 2. 構建一般數據類型的分布式數據集
    scala> val primitiveDS = Seq(1, 2, 3,5,7,9).toDS()
    primitiveDS: org.apache.spark.sql.Dataset[Int] = [value: int]
    
    scala> primitiveDS.map(_+1).collect
    res3: Array[Int] = Array(2, 3, 4, 6, 8, 10)
    
    //3. 把文件讀成對象的分布式數據集
    scala> case class Person(name: String, age: BigInt)
    defined class Person
    
    scala> val peopleDS = spark.read.json("examples/src/main/resources/people.json").as[Person]   
    // 不加as[Person],只會讀成res6: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
    peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
    
    scala> peopleDS.collect
    res5: Array[Person] = Array(Person(Michael,null), Person(Andy,30), Person(Justin,19))
    
    
    
  2. 解析json文件
    (1)文件格式:文件的每一行都是一個json串,每一行會被轉化為一個Row對象
    (2)Spark-sql讀取文件后,把整個形成一個DataFrame,帶有列名的表

    scala> import org.apache.spark.sql.SparkSession
    scala> import spark.implicits._
    
    scala> val spark = SparkSession.builder().appName("test-sql").config("p1", "v1").getOrCreate()
    【WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.】
    【spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@399ef33f】
    
    scala> val df = spark.read.json("examples/src/main/resources/people.json")
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> df.select("name").show()
    +-------+
    |   name|
    +-------+
    |Michael|
    |   Andy|
    | Justin|
    +-------+
    
    scala> df.select($"name", $"age" + 1).show
    +-------+---------+
    |   name|(age + 1)|
    +-------+---------+
    |Michael|     null|
    |   Andy|       31|
    | Justin|       20|
    +-------+---------+
    
    
  3. 用sql語句查詢session中的視圖
    session中的視圖只能存在於這個session中,一旦session結束,視圖消失。如果想在所有session中共享,就要使用全局視圖

    scala> df.createOrReplaceTempView("people")
    
    scala> val sqlDF = spark.sql("select * from people")
    sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> sqlDF.collect
    res8: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
    ```
    
4. **全局視圖**
全局視圖保存在系統數據庫`global_temp`中,使用全局視圖時,必須加上數據庫的名字
    ```scala
    scala> df.createGlobalTempView("people")

    scala> spark.sql("SELECT * FROM global_temp.people").collect
    res11: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
    
    scala> spark.newSession.sql("SELECT * FROM global_temp.people").collect
    res12: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])

    ```
5. **spark-sql支持的文件類型**
spark-sql支持的文件類型:`json, parquet, jdbc, orc, libsvm, csv, text`
    ```scala
    scala> val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
    peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    
    scala> peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

    ```
    
## **第七章  spark streaming**
#### **一. 入門例子**
1. 要求:
(1)從一台服務器的7777端口上接受一個以換行符分割的多行文本,從中篩選出包含error的行並打印出來
(2)使用命令模擬向端口7777發送消息
    ```shell
    $ nc -lk localhost 7777
    <在此輸入文本>
    ```
    
2. streaming代碼:
    ```scala
    import org.apache.spark._
    import org.apache.spark.streaming._
    
    object TestMain {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        val ssc = new StreamingContext(conf,Seconds(1))
        val lines = ssc.socketTextStream("localhost",9999)
        val words = lines.flatMap(_.split(""))
        val pairs = words.map(x=>(x,1))
        val wordcounts = pairs.reduceByKey(_+_)
        wordcounts.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }

    ```
    ```scala
    libraryDependencies ++= Seq(
      "org.apache.spark" % "spark-streaming_2.11" % "2.1.0" ,
      "org.apache.spark" % "spark-core_2.11" % "2.1.0"
    )
    ```
3. 提交代碼
(1)submit提交命令
    ```shell
    spark-submit --class "Test" --master local[4] sparkdemo_2.11-1.0.jar
    ```
(2)ide中配置的提交jvm參數

    ```java
    -Dspark.master=local[4] -Dspark.app.name=mystreamingtest
    ```

#### **二. 架構與抽象**
1. 離散化流的概念
(1)spark-streaming使用微批次架構,把流式數據當做一系列小規模批處理對待。新批次按照均勻時間間隔創建出來
(2)streaming的編程模型是離散化流DStream,他是一個RDD序列,每個RDD代表數據流中一個時間片內的數據

2. straming在驅動器和執行節點的執行過程
(1)spark streamng為每個輸入源啟動接收器,接收器以任務的形式運行在執行器中。
(2)接收器從輸入源收集數據並保存為RDD。他們在收到輸入數據后會把數據復制到另一個執行器進程來保障容錯性。
(3)數據被保存在執行器進程的內存中,和緩存RDD的方式一樣。
(4)streamingcontext周期性的運行spark任務來處理這些數據,把數據和之前區間的RDD整合。

3. spark-streaming的容錯性
(1)streaming對DStream提供的容錯性,和spark為RDD提供的容錯性一致。只要數據還在,就能根據RDD譜系圖重算出任意狀態的數據集。
(2)默認情況下,數據分別存在於兩個節點上,這樣可以保證數據容錯性,但是只根據譜系圖重算所有從程序啟動就接收到的數據可能會花很長時間。因此streaming提供檢查點來保存數據到hdfs中。一般情況下,每處理5-10次就保存一次
    ```scala
    ssc.checkpoint("hdfs:// ... ")    // 本地開發時,可以使用本地路徑
    ```

### **三. streaming的轉化操作**
1. DStream無狀態轉化
(1)無狀態轉化操作是應用到每個時間片的RDD上的
&nbsp;&nbsp;&nbsp;&nbsp;eg:`map,flatMap,filter,repartition,reduceByKey,groupByKey`
(2)無狀態轉化操作也可用於把兩個同時間片內的DStream連接起來

2. 有狀態轉化操作 - 滑動窗口(有狀態轉化操作需要打開檢查點機制來確保容錯性)
(1)基於窗口的操作在一個比streamingcontext批次更長的時間范圍內,通過整合更多個批次的結果,計算整個窗口的結果。所以通過window產生的DStream中每個RDD會包含多個批次的數據,可以對這些數據進行count() , transform()操作。
    ```scala
    object TestMain {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf()
        val ssc = new StreamingContext(sparkConf, Seconds(1))
        ssc.checkpoint("./checkpoints")   // 設置檢查點
    
        // 初始消息RDD
        val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
    
        // 創建sparkstreaming環境
        val lines = ssc.socketTextStream("localhost",9999)
        val words = lines.flatMap(_.split(" "))
        val pairs = words.map(x => (x, 1))  // pairRDD
    
        /**
          * @param reduceFunc : reduce function
          * @param windowDuration:窗口寬度,一次批處理的時間長短
          * @param slideDuration:兩次窗口滑動間隔
          */
        pairs.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(15),Seconds(1))
        pairs.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
    ```
3. 有狀態轉化操作 - updateStatesByKey()與mapWithState
(1)這兩個方法都是操作PairRdd的,他們要求新消息以只讀的形式到來,key是新消息,value是新消息對應的狀態
(2)mapWithState需要傳入mappingfunc來計算消息的新狀態: `(KeyType, Option[ValueType], State[StateType]) => MappedType`
(3)updateStateByKey需要傳入updateFunc來更新消息狀態,輸入參數:` (Seq[V], Option[S]) => Option[S]`
(4)用這兩個方法實現持續統計單詞技術:
    ```scala
    import org.apache.spark._
    import org.apache.spark.streaming._
    
    object TestMain {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf()
        val ssc = new StreamingContext(sparkConf, Seconds(5))
        ssc.checkpoint(".")   // 設置檢查點
    
        // 初始消息RDD
        val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
    
        // 創建sparkstreaming環境
        val lines = ssc.socketTextStream("localhost",9999)
        val words = lines.flatMap(_.split(" "))
        val wordDstream = words.map(x => (x, 1))  // pairRDD
    
        // 定義狀態更新函數:輸入key,新到的pairRDD中value值,已經保存的key的狀態值,返回一個鍵值對(key,State)
        val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
          val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
          val output = (word, sum)
          state.update(sum)
          output
        }
    
        // values是新消息pairRDD的value值,state是以保存的狀態值
        def updateFunc (values:Seq[Int],state:Option[Int]): Option[Int] ={
          val newcount = state.getOrElse(0)+values.size
          Some(newcount)
        }
    
        val stateDstream = wordDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))  // 通過mapping方法累積所有消息狀態
        val stateDstream = wordDstream.updateStateByKey[Int](updateFunc _)   // 通過update方法累積所有消息狀態
        stateDstream.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
    ```

### **四. 輸出操作**
1. print()
(1)Dstream如果沒有被執行輸出操作,則這些DStream不會被求值。若StreamingContext中沒有定義輸出操作,整個context就不會啟動

2. 保存文件
(1)saveAsTextFile
    ```scala
    // output-1497685765000.txt文件,根據streamingcontext設置的時間間隔執行一次
    wordcounts.saveAsTextFiles("output","txt"); 
    ```
(2)saveAsHadoopFile
該函數接受一種Hadoop輸出格式作為參數,可以用這個函數將DStream保存成SequenceFile
    ```scala
    val pairs = words.map(x=>(new Text(x),new LongWritable(1)))
    val wordcounts = pairs.reduceByKey((x:LongWritable,y:LongWritable)=>new LongWritable(x.get()+y.get()))
    wordcounts.saveAsHadoopFiles[SequenceFileOutputFormat[Text,LongWritable]]("outdir","txt");
    ```

3. 存入外部存儲系統,如mysql中
    ```scala
    wordcounts.foreachRDD({ // DStream中的每個RDD
      rdd => rdd.foreachPartition({  // 每台機器上的RDD都能公用一個分區
        item => pool.getConn.save(item) // 保存每一條數據
      })
    })
    ```

### **五. 輸入源**
每個DStream與一個Receiver對象相關聯,該對象從數據源接收數據並將其存儲到spark集群的內存中。

1. 核心數據源
(1)文件流:監聽一個hdfs下的文件夾,一旦有新文件進入,就將其作為輸入源處理成DStream。這種方式,文件一旦進入該文件夾,就不能再修改。

    ```c
    ssc.fileStream[KeyClass, ValueClass, InputFormatClass](dir)
    ```
(2)自定義一個接收器acceptor,接收akka數據源。[Custom Receiver Guide][6]
(3)RDD隊列模擬輸入源:可以把一系列的RDD作為DStream的一批數據
    ```c
    ssc.queueStream(queueOfRDDs)
    ```
2. 附加數據源
(1)kafka數據源
(2)flume數據源

3. 多數據源與集群規模
(1)當使用類似union()將多個DStream合並時,使用多個接收器用來提高聚合操作中的數據獲取吞吐量(一個接收器會成為系統的性能瓶頸)。此外,有時需要用不同接收器從不同數據源接受各種數據。此時應用分配的CPU個數至少為數據源個數+1(最后一個用來計算這些數據)

### **六  24/7不間斷運行配置**
1. 檢查點機制
檢查點是streaming中容錯性的主要機制。streaming可通過轉化圖的譜系圖來重算狀態,檢查點機制則可以控制要在轉化圖中回溯多遠。其次,如果是驅動器程序崩潰,用戶在重啟驅動器程序並讓驅動器程序從檢查點回復,則streaming可以讀取之前運行的程序處理數據進度,並從這里繼續。
2. 驅動器程序容錯
讓驅動器程序重啟后,先從檢查點恢復sparkstreamingcontext,再重新創建streamingcontext,保證錯誤恢復

    ```scala
    def createStreamingContext() = {
      val sparkConf = new SparkConf()
      val ssc = new StreamingContext(sparkConf, Seconds(1))
      ssc.checkpoint("./checkpoints") // 設置檢查點
      ssc
    }
    val ssc = StreamingContext.getOrCreate("./checkpoints",createStreamingContext _)
    ```

3. 工作節點容錯
(1)streaming使用與spark相同的容錯機制,所有從外部數據源中收到的數據都會在多個工作節點上備份,所偶有RDD操作,都能容忍一個工作節點的失敗,根據RDD譜系圖,系統就能把丟失的數據從輸入數據備份中計算出來。
(2)工作節點上的接收器容錯:接受其提供如下保障:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;a. 所有從hdfs中讀取的數據都是可靠的,因為底層文件系統有備份,strreaming會記住那些數據放到了檢查點中,並在應用崩潰后,從檢查點處繼續執行。
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;b. 對於像kafka這種不可靠數據源,spark會把數據放到hdfs中,仍然確保不丟失數據。

###**七. 性能**
1. 批次和窗口大小
(1)streaming可使用的最小批次間隔一般為500毫秒
(2)這個結果是從一個較大的時間窗口(10s)逐步縮小實驗而來。當減小時間窗口后,如果streaming用戶界面現實的處理時間保持不變,就可以進一步減小批次大小,如果處理時間增大,則認為達到了應用極限。此外,滑動步長也對性能有着巨大影響,當計算代價巨大並成為系瓶頸,就應該考慮增加滑動步長。

2. 提高並行度
(1)增加接收器數目:
&nbsp;&nbsp;&nbsp;&nbsp;如果記錄太多,導致單台機器來不及讀入並分發數據,接收器就會成為系統瓶頸。此時可以通過創建多個輸入DStream來增加接收器的數目,然后使用union合並為一個打的數據源。
(2)將收到的數據顯式的重新分區
&nbsp;&nbsp;&nbsp;&nbsp;如果接收器數目無法增加,可以通過使用DStream。repartition來重新分區輸入流,從而重新分配收到的數據源。
(3)提高聚合計算的並行度
對於像reduceByKeyy這樣的操作,可以再第二個參數制定並行度。

3. 垃圾回收和內存使用
可以通過修改gc策略,使用CMS策略
    ```scala
    spark-submit --conf spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC
    ```

## **第八章 spark 機器學習**





## **第九章 spark調試與調優**
### **一. 使用SparkConf配置Spark**
1. spark每個配置項都是基於字符串形式的鍵值對。eg:通過setAppName()設置spark.app.name
2. spark允許通過spark-submit腳本動態配置配置項,腳本會把這些配置項這知道運行時環境中。當一個新的SparkConf被創建出來時,這些環境變量會被檢測出來並自動配置好。因此,用戶只需要創建一個空的SparkConf,並直接傳給SparkContext即可。

    ```shell
    spark-submit --class com.example.MyApp --name "My app"
    --conf spark.ui.port=36000
    myApp.jar
    ```
3. spark-submit腳本會查找conf/spark-defaults.conf文件,然后嘗試讀取該文件中以空格隔開的鍵值對數據。也可通過--properties-File自定義文件路徑

    ```shell
    # 提交腳本
    spark-subnmit --class com.example.MyApp --properties-File myconfig.conf MyApp.jar
    # myconfig.conf內容
    spark.master local[4]
    spark.app.name "My App"
    spark.ui.port 36000
    ```
        
4.  **sparkconf的優先級選擇**
(1)最高:用戶顯示調用的sparkconfig的set()方法設置的選項
(2)其次:spark-submit傳遞的參數
(3)寫在配置文件中的值

### **二. RDD依賴關系**
1. **RDD依賴**
(1) 窄依賴:父RDD的每個Partition,最多被子RDD的1個分區所使用
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;a ) 窄依賴分為兩種:一對一依賴OneToOneDependcy,一對一范圍依賴 RangeDependency
(2) 寬依賴:指計算中會產生shuffle操作的RDD依賴。表示一個父RDD的Partition會被多個子RDD的Partition使用
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;a ) groupByKey就是常見的寬依賴算子

2. **DAG生成機制**
(1)DAG生成過程,就是對計算中stage的划分。
(2)對於窄依賴,RDD之間的數據不需要進行shuffle,這些處理操作可以在同一台機器的內存中完成,所以窄依賴在划分中被分成一個stage
(3)對於寬依賴,由於數據之間存在shuffle,必須等到父RDD所有數據shuffle完成之后才能進行后續操作,所以在此處進行stage划分

3. **RDD檢查點** 
(1)checkpoint也是存儲RDD結果的一種方式,它不同於persist將數據存儲在本地磁盤,而是把結果存儲在HDFS中
    ```shell
    scala> val wordcount = sc.textFile("/root/README.txt").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_)
    scala> wordcount.checkpoint
    ```
    
4. **RDD容錯**
(1)RDD容錯分為3個層面:調度層,RDD血統層,Checkpoint層
(2)調度層容錯:分別在Stage輸出時出錯與計算時出錯。stage輸出出錯,上層調度器DAGScheduler會進行重試。stage計算出錯時,該task會自動被重新計算4次
(3)RDD LINEAGE血統層容錯:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;a ) 基於各RDD各項Transaction構成了compute chain,在部分結果丟失的時候可以根據Lineage重新計算
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;b ) 窄依賴中,數據進行的流水線處理,子RDD的分區數據同樣在父RDD的分區中,並不存在冗余計算
(4)CheckPoint層容錯
在寬依賴上做檢查點可以避免Lineage很長重新計算而帶來的冗余計算


### **三. Spark執行步驟:作業,任務,步驟**
1. **關於作業**
(1)rdd.toDebugString方法查看RDD譜系圖
(2)行動操作會觸發生成一個作業,這個作業包含了transaction動作產生的多個步驟

2. **查找作業信息**
(1)4040端口展示了作業列表,里面包含stage執行的詳情。該頁面包含了一個作業的性能表現,若果有些步驟特別慢,還可以點擊進去查看是哪段用戶代碼
(2)數據傾斜是導致性能問題的常見原因,當有少量任務對於其他任務需要花費大量時間時,一般就是發生了數據傾斜

### **四. 驅動器日志和執行器日志**
1. spark獨立模式下:所有日志再主節點的網頁中直接顯示,存儲於spark目錄下的work目錄中
2. Mesos模式下:日志存儲在Mesos從節點的work目錄中,可通過主節點用戶界面訪問
3. YARN模式下:當作業運行完畢,可以通過yarn logs -applicationId <app ID>來打包一個應用日志。如果要查看運行再YARN上的應用日志,可以從資源管理器的用戶界面進入從節點頁面,瀏覽特定節點容器的日志
4. log4j配置文件的示例在conf/log4j.properties.template,也可通過spark-submit --Files添加log4j.properties文件

### **五. 關鍵性能考量**
1. 並行度
2. 序列化格式
3. 內存管理
4. 硬件供給


  [1]: http://static.zybuluo.com/lj72808up/vixh17ux3ny0536r12re7jkj/Screenshot%20from%202017-05-04%2022-45-27.png
  [2]: http://static.zybuluo.com/lj72808up/2349rmqrpwn144x1kstddwv0/action.png
  [3]: http://static.zybuluo.com/lj72808up/d2w90ls7600pjgypnbz4yfop/pairddtrans.png
  [4]: http://static.zybuluo.com/lj72808up/d6dszvv4cye5zdox3ewywjh0/2pairRddtrans.png
  [5]: http://static.zybuluo.com/lj72808up/j3kn7lb44hz7zci9dvcl9ta7/2pairRddtrans2.png
  [6]: http://spark.apache.org/docs/1.6.0/streaming-custom-receivers.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM