spark streaming (二)


一、基礎核心概念

      1、StreamingContext詳解 (一)

          有兩種創建StreamingContext的方式:
             val conf = new SparkConf().setAppName(appName).setMaster(master);
             val ssc = new StreamingContext(conf, Seconds(1));

          StreamingContext, 還可以使用已有的SparkContext來創建
              val sc = new SparkContext(conf)
              val ssc = new StreamingContext(sc, Seconds(1));

          appName, 是用來在Spark UI上顯示的應用名稱。 master, 是一個SparkMesos或者Yarn集群的URL, 或者是local[*]。 

        2、StreamingContext詳解 (二)

            一個StreamingContext定義之后, 必須做以下幾件事情:
                  1、 通過創建輸入DStream來創建輸入數據源。
                  2、 通過對DStream定義transformationoutput算子操作, 來定義實時計算邏輯。
                  3 、 調用StreamingContextstart()方法, 來開始實時處理數據。
                  4、 調用StreamingContextawaitTermination()方法, 來等待應用程序的終止。 可以使用CTRL+C手動停止,或者就是讓它持續不斷的運行進行計算。
                  5、 也可以通過調用StreamingContextstop()方法, 來停止應用程序。

           需要注意的要點:
                  1、 只要一個StreamingContext啟動之后, 就不能再往其中添加任何計算邏輯了。 比如執行start()方法之后, 還給某個DStream執行一個算子。
                  2、 一個StreamingContext停止之后, 是肯定不能夠重啟的。 調用stop()之后, 不能再調用start()
                  3、 一個JVM同時只能有一個StreamingContext啟動。 在你的應用程序中, 不能創建兩個StreamingContext
                  4、 調用stop()方法時, 會同時停止內部的SparkContext, 如果不希望如此, 還希望后面繼續使用SparkContext創建其他類型的Context, 比如SQLContext, 那么就用stop(false)
                  5、 一個SparkContext可以創建多個StreamingContext, 只要上一個先用stop(false)停止, 再創建下一個即可。

           3、輸入DStream和Receiver詳解(一)

                  輸入DStream代表了來自數據源的輸入數據流。 在之前的wordcount例子中, lines就是一個輸入 DStream( JavaReceiverInputDStream) , 代表了從netcat( nc) 服務接收到的數據流。 除了 文件數據流之外, 所有的輸入DStream都會綁定一個Receiver對象,

該對象是一個關鍵的組件, 用來從數據源接收數據, 並將其存儲在Spark的內存中, 以供后續處理。 
                 Spark Streaming提供了三種內置的數據源支持;
                              1、 基礎數據源: StreamingContext API中直接提供了對這些數據源的支持, 比如文件、 socket、 Akka Actor等。
                              2、 高級數據源: 諸如KafkaFlumeKinesisTwitter等數據源, 通過第三方工具類提供支持。 這些數據源的使用, 需要引用其依賴。
                              3、 自定義數據源: 我們可以自己定義數據源, 來決定如何接受和存儲數據。

            4、輸入DStream和Receiver詳解(二)

                     如果你想要在實時計算應用中並行接收多條數據流, 可以創建多個輸入DStream。 這樣就會創建多個 Receiver, 從而並行地接收多個數據流。 但是要注意的是, 一個Spark Streaming Application的 Executor, 是一個長時間運行的任務, 因此,

它會獨占分配給Spark Streaming Applicationcpu core。從而只要Spark Streaming運行起來以后, 這個節點上的cpu core, 就沒法給其他應用使用了。 

                    使用本地模式, 運行程序時, 絕對不能用local或者local[1], 因為那樣的話, 只會給執行輸入DStream的 executor分配一個線程。 而Spark Streaming底層的原理是, 至少要有兩條線程, 一條線程用來分配給 Receiver接收數據, 一條線程用來處理接收到的數據。

因此必須使用local[n], n>=2的模式。     (n不能大於當前節點的CPU核數)
                    如果不設置Master, 也就是直接將Spark Streaming應用提交到集群上運行, 那么首先, 必須要求集群 節點上, 有>1cpu core, 其次, 給Spark Streaming的每個executor分配的core, 必須>1, 這樣, 才能保證分配到executor上運行的輸入DStream

兩條線程並行, 一條運行Receiver, 接收數據; 一條處 理數據。 否則的話, 只會接收數據, 不會處理數據。 

 

總結:Receiver接收器

Receiver接收器,可以接收外部數據源中的數據,並將其保存到
內存中,以供后續使用。
在Spark中支持三大類型的數據源:
1、基礎數據源:比如文件、Socket、Akka中的數據。
2、高級數據源,比如Flume、Kafka、推特中的數據。
3、自定義數據源。
補充:在Spark Streaming中,可以通過兩種方式操作Kafka的數據。
一種是通過Receiver的方式,另一種Direct直接讀取的方式。


           5、輸入DStream之基礎數據源
               (1)Socket: 之前的wordcount例子,
                    

object WordCoundStreaming {
  def main(args: Array[String]): Unit = {
    /**
      * 處理Spakr Streaming程序至少需要2個線程,其中,
      * 一個線程負責接收輸入的數據
      * 另一個線程負責處理接收的數據
      * local[N] N>=2
      */
    val conf=new SparkConf().setAppName("WordCoundStreaming")
              .setMaster("local[2]")
    /**
      * SparkContext是用戶與Spark集群交互的唯一接口,所以SparkContext是必需的。
      * 在創建StreamingContext的過程中,Spark會在源碼中自動創建一個SparkContext對象。
      * 注意第二個參數Seconds(*),表示實時流數據中每批數據的時間間隔,
      * 也就是說,在DStream離散流中的每個RDD包含相應時間間隔的數據。
      */
    val ssc=new StreamingContext(conf,Seconds(5))
    /**
      * 通過socketTextStream()獲取nc服務器中的數據
      * 需要指明獲取nc服務器的節點名稱和端口,這是的端口要和運行nc服務器的端口一致。
      * 此時產生的lines不是RDD,而是一個DStream離散流。
      */
    val lines=ssc.socketTextStream("tgmaster",9999)
    /**
      * 對離散流lines進行flatMap轉換操作,實際上是對lines離散流中的每個RDD都進行
      * flatMap操作,從而產生了新的RDD,多個新的RDD構成了新的離散流words。
      */
    val words=lines.flatMap(_.split(" "))
    val pairs=words.map((_,1))
    val result=pairs.reduceByKey(_+_)

    //在控制台輸出內容
    result.print()
    //啟動StreamingContext
    ssc.start()
    //等待程序停止
    ssc.awaitTermination()
  }
}

    (2)基於HDFS文件的實時計算, 其實就是, 監控一個HDFS目錄, 只要其中有新文件出現, 就實時處理。相當於處理實時的文件流。 
               streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory)
               streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
               Spark Streaming會監視指定的HDFS目錄, 並且處理出現在目錄中的文件。
                要注意的是, 所有放入HDFS目錄中的文件, 都必須有相同的格式; 必須使用移動或者重命名的方式,將文件移入目錄; 一旦處理之后, 文件的內容即使改變, 也不會再處理了; 基於HDFS文件的數據源是沒有Receiver的, 因此不會占用一個cpu core
        

 def main(args: Array[String]): Unit = {
    val conf =new SparkConf().setAppName("word").setMaster("local[2]")
    val ssc=new StreamingContext(conf,Seconds(5))
    val lines=ssc.textFileStream("hdfs://liuwei1:9000/homework")
    val result=lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    result.print()
    ssc.start()
    ssc.awaitTermination()
  }

二、DStream的transformation

 

 

總結:DStream中Transformation類型的算子有以下三個最重要
1)updateStateByKey
可以為每個Key保留一份狀態,並更新狀態的值。
案例:在全局范圍內統計每個單詞出現的次數。
2)transform
可以執行RDD到RDD的操作,相當於對DStream API的一個補充。
3)window
滑動窗口操作,需要指明兩個參數,一個是窗口的長度,另一個是
窗口滑動的時間間隔。

三、updateStateBykey  (全局范圍之內處理數據,而不是一批一批的)
      updateStateByKey
    updateStateByKey操作, 可以讓我們為每個key維護一份state, 並持續不斷的更新該state
          1、 首先, 要定義一個state, 可以是任意的數據類型;
          2、 其次, 要定義state更新函數——指定一個函數如何使用之前的state和新值來更新state
    對於每個batchSpark都會為每個之前已經存在的key去應用一次state更新函數, 無論這個keybatch中是否有新的數據。 如果state更新函數返回none, 那么key
對應的state就會被刪除。 當然, 對於每個新出現的key, 也會執行state更新函數。
注意, updateStateByKey操作, 要求必須開啟Checkpoint機制。
案例: 基於緩存的實時wordcount程序( 在實際業務場景中, 這個是非常有用的

             

object updateStateByKey {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("updateStateByKey")
            .setMaster("local[2]")
    val ssc=new StreamingContext(conf,Seconds(5))
    //updateStateByKey操作,要求必須開啟Checkpoint機制。
    ssc.checkpoint("hdfs://tgmaster:9000/in/ch")

    val lines=ssc.socketTextStream("tgmaster",9999)
    val pairs=lines.flatMap(_.split(" "))
              .map((_,1))
    /**
      * values:多個新值的集合
      * state:是一個Option類型的狀態,
      * 可以通過state.getOrElse(0)為newValue設置初始值
      */
    val result=pairs.updateStateByKey((values:Seq[Int],state:Option[Int])=>{
      //創建newValue並設初始值0
      var newValue=state.getOrElse(0)
      /**
        * 遍歷values集合的新值,用以改變原先的舊值
        */
      for(value <- values){
        newValue+=value
      }
      /**
        * updateStateByKey算子需要一個Option類型的返回值,
        * 所以將更新之后的新值作為Option返回。
        */
      Option(newValue)
    })

    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

四、transform

      transform操作, 應用在DStream上時, 可以用於執行任意的RDDRDD的轉換操作。 它可以用於實現, DStream API中所沒有提供的操作。 比如說, DStream API中, 並沒有提 供將一個DStream中的每個batch, 與一個特定的RDD進行join的操作。

但是我們自己就 可以使用transform操作來實現該功能。
     DStream.join(), 只能join其他DStream。 在DStream每個batchRDD計算出來之后, 會 去跟其他DStreamRDD進行join
案例: 廣告計費日志實時黑名單過濾

      

/**
  * Created by tg on 3/29/17.
  * 實時黑名單過濾
  */
object transformDemo {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("transformDemo")
                .setMaster("local[2]")
    val ssc=new StreamingContext(conf,Seconds(5))

    //模擬數據,創建黑名單RDD,(String,Boolean) (name,Boolean)
    val blackRDD=ssc.sparkContext.parallelize(Array(("tom",true)))
    //從nc服務器中獲取數據,nc服務器中的數據格式:time name,比如: 1101 jack
    val linesDStream=ssc.socketTextStream("tgmaster",9999)
    val mapDStream=linesDStream.map(line=>{
      val log=line.split(" ")
      (log(1),line) //(name,time name)
    })
    /**
      * tansform()算子可以執行RDD到RDD的轉換操作
      */
    mapDStream.transform(adsRDD=>{
      //讓adsRDD與blackRDD進行leftOuterJoin左外連接操作,操作之后的數據包含所有的用戶
     // (String,(String,Boolean))
      val joinRDD=adsRDD.leftOuterJoin(blackRDD)
      val filterRDD=joinRDD.filter(x=>{
        /**
          * 當if條件成立時,意味着是黑名單人員,返回false將其刪除。
          * 當if條件不成立時,意味着不是黑名單人員,返回true將其保留。
          */
        if(x._2._2.getOrElse(false)) false else true
      })
      filterRDD.map(x=>{
          x._2._1
      })
    }).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

五、window滑動窗口

     Spark Streaming提供了滑動窗口操作的支持, 從而讓我們可以對一個滑動窗口內的數據執行計算 操作。
     每次掉落在窗口內的RDD的數據, 會被聚合起來執行計算操作, 然后生成的RDD, 會作為window DStream的一個RDD。 比如下圖中, 就是對每三秒鍾的數據執行一次滑動窗口計算, 這3秒內的3個 RDD會被聚合起來進行處理, 然后過了兩秒鍾, 又會對最近三秒內的數據執行滑動窗口計算。 所以 每個滑動窗口操作, 都必須指定兩個參數, 窗口長度以及滑動間隔, 而且這兩個參數值都必須是 batch間隔的整數倍。 ( Spark Streaming對滑動窗口的支持, 是比Storm更加完善和強大的) 


     window滑動窗口操作

案例:
      熱點搜索詞滑動統計, 每隔5秒鍾, 統計最近20秒鍾的搜索詞的搜索頻次, 並打印 出排名最靠前的3個搜索詞以及出現次數 

package SparkCore.day1

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by tg on 3/29/17.
  * 熱點搜索詞滑動統計,每隔5秒鍾,統計最近20秒鍾的搜索詞的搜索頻次,
  * 並打印出排名最靠前的3個搜索詞以及出現次數。
  */
object reduceByKeyAndWindowDemo {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("reduceByKeyAndWindowDemo")
              .setMaster("local[2]")
    val ssc=new StreamingContext(conf,Seconds(1))
    //從nc服務器中獲取數據,數據格式:name keyword,比如:張三 Spark
    val linesDStream=ssc.socketTextStream("tgmaster",9999)
    //取出每行的搜索關鍵詞
    val wordsDStream=linesDStream.map(x=>x.split(" ")(1))
    val pairsDStream=wordsDStream.map(x=>(x,1))
    /**
      * 通過滑動窗口進行統計,每隔5秒鍾,統計最近20秒鍾的搜索詞出現次數。
      * reduceByKeyAndWindow()算子中,
      * 第一部分v1+v2用於計算搜索詞出現次數
      * 第二部分Seconds(20)用於設置窗口的長度
      * 第三部分Seconds(5)用於設置窗口的時間間隔
      */
    val resultDStream=pairsDStream.reduceByKeyAndWindow((v1:Int,v2:Int)=>
    v1+v2,Seconds(20),Seconds(5))

    resultDStream.transform(itemRDD=>{
      val result=itemRDD.map(x=>(x._2,x._1))
        .sortByKey(false).take(3)
        .map(x=>(x._2,x._1))

      val resultRDD=ssc.sparkContext.parallelize(result)
      resultRDD
    }).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

  

六、output操作及foreachRDD

     1、output 操作

          

       DStream中的所有計算, 都是由output操作觸發的, 比如print()。 如果沒有任何 output操作, 那么, 壓根兒就不會執行定義的計算邏輯。 

       此外, 即使你使用了foreachRDD output操作, 也必須在里面對RDD執行action操 作, 才能觸發對每一個batch的計算邏輯。 否則, 光有foreachRDD output操作,在里面沒有對RDD執行action操作, 也不會觸發任何邏輯。 

     2、foreachRDD詳解

        通常在foreachRDD中, 都會創建一個Connection, 比如JDBC Connection, 然后通過 Connection將數據寫入外部存儲 

       誤區一: 在RDDforeach操作外部, 創建Connection 這種方式是錯誤的, 因為它會導致Connection對象被序列化后傳輸到每個Task中。 而這 種Connection對象, 實際上一般是不支持序列化的, 也就無法被傳輸 
                  dstream.foreachRDD { rdd =>
                   val connection = createNewConnection()
                  rdd.foreach { record => connection.send(record)
                }
                }

         誤區二: 在RDDforeach操作內部, 創建Connection 這種方式是可以的, 但是效率低下。 因為它會導致對於RDD中的每一條數據, 都 創建一個Connection對象。 而通常來說, Connection的創建, 是很消耗性能的。 
                   dstream.foreachRDD { rdd =>
                    rdd.foreach { record =>
                   val connection = createNewConnection()
                   connection.send(record)
                   connection.close()
                   }
                  }


           合理方式一: 使用RDDforeachPartition操作, 並且在該操作內部, 創建Connection對象, 這樣就相當於是, 為RDD的每個partition創建一個 Connection對象, 節省資源的多了。 
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
            合理方式二: 自己手動封裝一個靜態連接池, 使用RDDforeachPartition操作, 並且在 該操作內部, 從靜態連接池中, 通過靜態方法, 獲取到一個連接, 使用之后再還回去。 這樣的話, 甚至在多個RDDpartition之間, 也可以復用連接了。 而且可以讓連接池采取
懶創建的策略, 並且空閑一段時間后, 將其釋放掉。

             dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection)
}
}


案例:
     改寫UpdateStateByKeyWordCount, 將每次統計出來的全局的單詞計數, 寫入一份, 到MySQL數據庫中。 

 

補充:

一、mysql數據庫中varchar與char的區別

varchar 可變的 ;name varchar(100)   賦值時賦了10 ,不占用多余的空間,節省資源空間 

char 固定的,name varchar(100) 賦值為10,占用多余空間,浪費資源空間

char在查詢檢索數據時,char類型的都是固定長度,所以直接查詢檢索數據,但是對於varchar類型的存儲長度空間不一致,所以在查詢檢索數據之前,需要確定數據的存儲長度。然后再查詢數據,這樣就比char類型的多了一層確定數據存儲長度的操作,因此的那個數據量非常龐大時,會

影響查詢數據效率

建表:
create table wordcount (
id integer auto_increment primary key,
updated_time timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
word varchar(255),
count integer
);
添加數據:
insert into 表名(列1,列2,....) values (值1,值2,....)

 










 




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM