spark streaming 統計wordcount


spark streaming 學習:

spark streaming 與strom 的區別:

Spark Streaming strom 的區別:

1、Strom是純實時的流式處理框架,SparkStreaming 是准實時處理框架(微批處理),因為微批處理,SparkStreaming 的吞吐量比strom的要高

2、Strom的事物機制要比spark streamming 完善

3、Strom 支持動態資源調度,(Spark1.2 開始以后也支持)

4、SparkSteaming擅長復雜的業務處理,Strom不擅長復雜的業務處理,擅長簡單的匯總型計算

spark streaming 進行統計wordcount簡單源碼:

 

 

 public  static  void main(String[] args){

        SparkConf conf = new SparkConf();
        conf.setAppName("SparkStreaming").setMaster("local[2]");
     //   JavaSparkContext sc = new JavaSparkContext(conf);

        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
        JavaReceiverInputDStream<String> sts = jsc.socketTextStream("mynode2", 9999);

        JavaPairDStream<String, Integer> result= sts.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String str) throws Exception {
                String[] s = str.split(" ");
                List<String> strings = Arrays.asList(s);
                return strings.iterator();
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String str) throws Exception {
                return new Tuple2<String, Integer>(str, 1);
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        result.print();
        jsc.start();
        try {
            jsc.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        jsc.stop();
    }

 

注: conf.setmaster("local")和conf.setmaster("local[2]")的區別:

local是指進行單核處理,只能對數據進行接受 

local[2]是指對數據進行雙核處理,不僅能夠接受數據,還能都輸出數據

在運行代碼的時候,需要打開代碼中設置的9999端口(端口可以隨意設置),打開端口的命令:

nc -lk 9999

端口的作用: 進行tcp通信 

 JavaReceiverInputDStream<String> sts = jsc.socketTextStream("mynode2", 9999);

此行代碼的作用就是創建一個tcp連接的流

當代碼運行之后,查看代碼運行結果:
localhost:4040/streaming

sparkstreaming 處理任務流程圖:

 在進行socket tcp 輸出測試時,如果本地環境沒有對輸入數據進行輸出,可以換一個端口號進行嘗試 

* 1.SparkStreaming batchInterval 設置每批次的間隔時間【多久生成一批次】
* 2.setMaster("local[2]")至少設置2個線程模擬運行,一個task接收數據,一個task處理數據
* 3.SparkStreaming 啟動之后不能添加新的邏輯
* 4.SparkStreaming.stop() 默認停掉SparkStreaming的同時,將SparkContext也會回收,stop(false)在停掉SparkStreaming之后,不會將SparkContext回收掉。
* 5.創建StreamingContext的兩種方式new StreamingContext(SparkConf|SparkContext)
* 6.代碼邏輯中需要一個outputOperator類算子觸發執行
* 7.StreamingContext停掉之后,不能重新調用start方法啟動。
/**
  * 1.SparkStreaming batchInterval 設置每批次的間隔時間【多久生成一批次】
  * 2.setMaster("local[2]")至少設置2個線程模擬運行,一個task接收數據,一個task處理數據
  * 3.SparkStreaming 啟動之后不能添加新的邏輯
  * 4.SparkStreaming.stop() 默認停掉SparkStreaming的同時,將SparkContext也會回收,stop(false)在停掉SparkStreaming之后,不會將SparkContext回收掉。
  * 5.創建StreamingContext的兩種方式new StreamingContext(SparkC 
  * 7.StreamingContext停掉之后,不能重新調用start方法啟動。
  */

object SparkStreamingTest1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("streamingTest")
    val sc = new SparkContext(conf)
   // sc.setLogLevel("Error")
    val ssc = new StreamingContext(sc,Durations.seconds(5))
    /**
      * hello spark
      * hello java
      */
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("mynode2",9998)
    val words: DStream[String] = lines.flatMap(line => {
      line.split(" ")
    })
    val pairWords: DStream[(String, Int)] = words.map(word=>{
      //      println("++++++++++++++++++++ DStream Map  +++++++++++++++")
      new Tuple2(word,1)
    })
    val result: DStream[(String, Int)] = pairWords.reduceByKey((v1, v2)=>{v1+v2})


    /**
      * foreachRDD
      * 可以遍歷DStream中的RDD,可以對DStream中的RDD進行rdd的Transformation類算子操作
      * 注意:1.對獲取的RDD一定要使用rdd的action算子觸發執行
      *       2.DStream中RDD的transforamtion類算子外部,foreachRDD內部的代碼是在Driver端執行的,可以通過這個特點做到動態的改變廣播變量
      */
    //    result.foreachRDD(rdd=>{
    //      val bc: Broadcast[List[String]] = rdd.context.broadcast(List[String]("a","b"))
    ////      println("========= rdd1 Transformation 外部 ===============")
    //      val rdd1: RDD[String] = rdd.map(one => {
    ////        println("************** rdd1 *************")
    //        val value: List[String] = bc.value
    //        one._1 + "_" + one._2
    //      })
    //      rdd1.foreach(println)
    //
    //    })


    /**
      * print() DStream的outPutOperator類算子
      */
        result.print()

    ssc.start()
    ssc.awaitTermination()

    //    ssc.stop()


  }

 

注:對foreachRdd方法的使用,以及該方法的作用 

sparkStreaming transform算子的使用:
/**
    * 
    * transform算子可以獲取Dstream中的RDD,需要返回RDD類型的數據 
    * 注意:1、transform算子必須返回一個RDD,這個RDD被封裝到一個Dstream中進行返回 
    *       2、transform算子內,獲取的RDD的算子外部的代碼是在Driver端執行的,可以根據這個特點做到動態的改變廣播變量的值
    *       3、
    * 
    * @param args
    */
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("transForm")

    val sc = new SparkContext(conf)

    sc.setLogLevel("Error")

    val ssc = new StreamingContext(sc,Durations.seconds(5))

    val lines = ssc.socketTextStream("mynode2",9998)

    val result = lines.transform(rdd => {

      val myrdd = rdd.map(line => {
        line + "#"
      })

      myrdd
    })

    result.print()
    ssc.start()
    ssc.awaitTermination()

 

SparkStreaming updateStateByKey算子

object UpdateStateByKey {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("streamingTest")
    val sc = new SparkContext(conf)
    sc.setLogLevel("Error")
    val ssc = new StreamingContext(sc,Durations.seconds(5))

    /**
      * updateStateByKey 算子狀態默認在內存中,多久將內存中的狀態更新到Checkpoint一次?【必須開啟checkpoint】
      * 1.如果batchInterval 小於10s,那么就10s更新一次
      * 2.如果batchInterval 大於10s,那么就batchInterval 更新一次,這樣做為了防止頻繁訪問磁盤
      *
      */
    ssc.checkpoint("./data/ck")
    val lines: ReceiverInputDStream[String] = ssc.socketTextStream("node5",9999)
    val words: DStream[String] = lines.flatMap(line => {
      line.split(" ")
    })
    val pairWords: DStream[(String, Int)] = words.map(word=>{
      new Tuple2(word,1)
    })
    val result: DStream[(String, Int)] = pairWords.updateStateByKey((seq: Seq[Int], option: Option[Int]) => {
      val i: Int = option.getOrElse(0)
      var oldValue = i
      for (elem <- seq) {
        oldValue += elem
      }
      Option(oldValue)
    })
    result.print()


    ssc.start()
    ssc.awaitTermination()

checkpoint知識點總結:

checkpoint的意思就是建立檢查點,類似於快照,例如在spark計算里面 計算流程DAG特別長,服務器需要將整個DAG計算完成得出結果,但是如果在這很長的計算流程中突然中間算出的數據丟失了,spark又會根據RDD的依賴關系從頭到尾計算一遍,這樣子就很費性能,當然我們可以將中間的計算結果通過cache或者persist放到內存或者磁盤中,但是這樣也不能保證數據完全不會丟失,存儲的這個內存出問題了或者磁盤壞了,也會導致spark從頭再根據RDD計算一遍,所以就有了checkpoint,其中checkpoint的作用就是將DAG中比較重要的中間數據做一個檢查點將結果存儲到一個高可用的地方(通常這個地方就是HDFS里面)

    說道checkpoint就得說說RDD的依賴

比如我們計算wordcount的時候:

sc.textFile("hdfspath").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfspath")

1.在textFile讀取hdfs的時候就會先創建一個HadoopRDD,其中這個RDD是去讀取hdfs的數據key為偏移量value為一行數據,因為通常來講偏移量沒有太大的作用所以然后會將HadoopRDD轉化為MapPartitionsRDD,這個RDD只保留了hdfs的數據

2.flatMap 產生一個RDD MapPartitionsRDD

3.map 產生一個RDD MapPartitionsRDD

4.reduceByKey 產生一個RDD ShuffledRDD

5.saveAsTextFile 產生一個RDD MapPartitionsRDD

可以根據查看RDD的依賴:

scala> val rdd = sc.textFile("hdfs://lijie:9000/checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[29] at reduceByKey at <console>:27 scala> rdd.toDebugString res3: String = (2) ShuffledRDD[29] at reduceByKey at <console>:27 [] +-(2) MapPartitionsRDD[28] at map at <console>:27 [] | MapPartitionsRDD[27] at flatMap at <console>:27 [] | hdfs://lijie:9000/checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000 MapPartitionsRDD[26] at textFile at <console>:27 [] | hdfs://lijie:9000/checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000 HadoopRDD[25] at textFile at <console>:27 [] 

 

    怎么建立checkpoint

首先需要用sparkContext設置hdfs的checkpoint的目錄(如果不設置使用checkpoint會拋出異常:throw new SparkException(“Checkpoint directory has not been set in the SparkContext”):
scala> sc.setCheckpointDir("hdfs://lijie:9000/checkpoint0727")

執行了上面的代碼,hdfs里面會創建一個目錄:

/checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20

然后執行checkpoint

scala> val rdd1 = sc.parallelize(1 to 10000) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27 scala> rdd1.checkpoint

發現hdfs中還是沒有數據,通過collect然后hdfs就有數據了,說明checkpoint也是個transformation的算子

scala> rdd1.sum res2: Double = 5.0005E7 
#其中hdfs [root@lijie hadoop]
# hadoop dfs -ls /checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it. Found 2 items -rw-r--r-- 3 root supergroup

53404 2017-07-24 14:26 /checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000 -rw-r--r-- 3 root supergroup

53404 2017-07-24 14:26 /checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00001

 

但是執行的時候相當於走了兩次流程,sum的時候前面計算了一遍,然后checkpoint又會計算一次,所以一般我們先進行cache然后做checkpoint就會只走一次流程,checkpoint的時候就會從剛cache到內存中取數據寫入hdfs中,如下:

rdd.cache()
rdd.checkpoint()
rdd.collect

其中作者也說明了,在checkpoint的時候強烈建議先進行cache,並且當你checkpoint執行成功了,那么前面所有的RDD依賴都會被銷毀,如下:

/** 
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/

 

updateStateByKey 錯誤解決:

19/06/26 09:56:55 ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
    at scala.Predef$.require(Predef.scala:219)
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:168)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
    at com.bjsxt.myscala.UpdateStateByKey$.main(UpdateStateByKey.scala:58)
    at com.bjsxt.myscala.UpdateStateByKey.main(UpdateStateByKey.scala)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
    at scala.Predef$.require(Predef.scala:219)
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:168)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
    at com.bjsxt.myscala.UpdateStateByKey$.main(UpdateStateByKey.scala:58)
    at com.bjsxt.myscala.UpdateStateByKey.main(UpdateStateByKey.scala)

代碼執行的時候,需要出發print()方法,否則會報錯

 

窗口操作:

 
        
def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[2]").setAppName("windowsoption")
    val sc = new SparkContext(conf)
    sc.setLogLevel("Error")

    val ssc = new StreamingContext(sc,Durations.seconds(5))

    val lines = ssc.socketTextStream("mynode2",9996)

    val words = lines.flatMap(line => (line.split(" ")))

    val pairWords = words.map(word=>{new Tuple2(word,1)})

    /**
      * 窗口操作的普通機制
      *
      * 滑動間隔和窗口長度必須是 batchInterval 的整數倍
      * 代碼中參數的作用:每隔5秒計算過去15秒批次的數據
      */

    /*val windowsResult = pairWords.reduceByKeyAndWindow((v1: Int, v2: Int) => {
      v1 + v2
    }, Durations.seconds(15), Durations.seconds(5))
    */

    /**
      * 窗口操作優化機制
      *
      *
      */

    ssc.checkpoint("./data/streamingCheckpoint")
    val windowResult = pairWords.reduceByKeyAndWindow(
      (v1: Int, v2: Int) => {
        v1 + v2
      },
      (v1: Int, v2: Int) => {
        v1 - v2
      },
      Durations.seconds(15),
      Durations.seconds(5)
    )




    windowResult.print()
    ssc.start()
    ssc.awaitTermination()

 

Spark Streaming 監控一個目錄:

 val conf = new SparkConf().setMaster("local").setAppName("monitorFile")
    val ssc = new StreamingContext(conf,Durations.seconds(10))
    ssc.sparkContext.setLogLevel("Error")
    val lines = ssc.textFileStream("./data/streamingCopyFile")
    val words = lines.flatMap(line=>{line.split(" ")})
    val pairWords = words.map(word=>{(word,1)})
    val result = pairWords.reduceByKey((v1:Int,v2:Int)=>{v1+v2})

    result.saveAsTextFiles("./data/streamingSavePath/prefix","suffix")
    result.print()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

生成文件的代碼:

 def main(args: Array[String]): Unit = {

    /**
      * 此復制文件的程序是模擬在data目錄下動態生成相同格式的txt文件,用於給sparkstreaming 中 textFileStream提供輸入流。
      * @author root
      *
      */

    while(true){
      import java.util.UUID
      Thread.sleep(5000)
      val uuid = UUID.randomUUID.toString
      System.out.println(uuid)
      copyFile(new File("./data/words"), new File(".\\data\\streamingCopyFile\\" + uuid + "----words.txt"))
    }
  }
  def copyFile(fromFile: File, toFile: File): Unit = {
    val ins = new FileInputStream(fromFile)
    val out = new FileOutputStream(toFile)
    val buffer = new Array[Byte](1024 * 1024)
    var size = 0
    while(size!= -1){
      out.write(buffer,0,buffer.length)
      size = ins.read(buffer)

    }
    ins.close()
    out.close()
  }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM