SparkStreming中 `transform()`算子的 的使用


關聯 DStream 和 RDD

transform(func) Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream.
This can be used to do arbitrary RDD operations on the DStream.

黑名單過濾

實現思路:
拿到訪問日志,直接是DStream, 在數據庫中建立需要過濾的字段,拿到后,需要轉成RDD,
設置為字段為true,即過濾,建立表的左鏈接,
字段以 : 進行分隔,分隔后取元組,再取元組的元組,
測試: 將數據在IDEA中打印出來
開發: 直接存到數據庫中

訪問日志  ==>  DStream(直接拿到,生成DStream)
20180808,zs
20180808,ls
20180808,ww
20180808,w2
20180808,w3
20180808,w4

    ==》 (zs: 20180808,zs)(ls: 20180808,ls)(ww: 20180808,ww)


黑名單列表 ==>  RDD(這塊模擬,不用數據庫,直接用list,轉成RDD)
zs
ls
    ==> (zs: true)(ls: true)    // 過濾的結果(為true過濾掉) true: 即為黑名單


==> 20180808,ww     //(最終得到的結果)


leftjoin    //(左連接) 左邊的訪問日志表不變
(zs:[<20180808,zs> ,<true>])  x
(ls:[<20180808,ls> ,<true>])  x
(ww:[<20180808,ww> ,<false>])  ==> tuple 1


BlackFilterApp.scala

package com.imooc.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}


object BlackFilterApp {

  def main(args: Array[String]): Unit = {

    val sparkconf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val ssc = new StreamingContext(sparkconf,Seconds(5))

    /**
      * 構建黑名單(要過濾的數據)
      */
    val blacks = List("zs", "ls")  // 一般這條在數據庫中,用 read 讀進來即可
    val blacksRDD = ssc.sparkContext.parallelize(blacks)//轉成RDD
        .map(x => (x, true))
    //將這個元素 x 重新定位為一個新字段  (x,true)
    //(("zs","true"),("ls","true"))

    val lines = ssc.socketTextStream("localhost", 8888)

    //20180808,zs 原來的格式
    //zs,20180808,zs 處理后的格式
    //取index=1的元素,然后在跟上它自身
    val clicklogs = lines.map(x => (x.split(",")(1), x))
      .transform(rdd => {
        //blacksRDD進行map操作后它是RDD格式,此處的lines進行map操作后,它是DStream[U]格式,
        //所以此處,要將DStream和RDD進行聯合,就要使用transform算子,
        //通過將RDD-to-RDD函數應用於源DStream的每個RDD來返回新的DStream。
        //這可以用於在DStream上執行任意RDD操作。
            rdd.leftOuterJoin(blacksRDD)
              //進行表的左外連接 leftOuterJoin
              //
              // 端口傳進來的數據,經過處理后
              // zs,20180808,zs
              // ls,20180808,ls
              // ww,20180808,ww
              //
              // 黑名單中的數據
              // (("zs","true"),("ls","true"))
              //
              //進行關聯后的數據
              // (zs:[<20180808,zs>,<true>])  x
              // (ls:[<20180808,ls>,<true>])  x
              // (ww:[<20180808,ww>,<false>])  ==> tuple 1
            .filter(x => x._2._2.getOrElse(false) != true)
              // 過濾(zs: [<20180808,zs> ,<true>])中,第二個元素的中的第二個元素,判斷是否等於true,如果不為true,則返回false,
              // 此處運行后,就只剩下為false的元素了 (ww:[<20180808,ww>,<false>]) ,只有這一條了
            .map(x => x._2._1)
              //取(zs,[<20180808,zs>,<true>])中第二個元素的第一個元素  (tuple的使用)
        })

    clicklogs.print()  //在控制台打印信息   (這塊結果應該是有問題的,它只顯示最后一個), 應該在 filter或map這塊,它是有問題的, 感覺問題在 map, 對元組的掌握,還有待提高

    ssc.start()
    ssc.awaitTermination()
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM