關聯 DStream 和 RDD
| transform(func) | Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. |
|---|
黑名單過濾
實現思路:
拿到訪問日志,直接是DStream, 在數據庫中建立需要過濾的字段,拿到后,需要轉成RDD,
設置為字段為true,即過濾,建立表的左鏈接,
字段以 : 進行分隔,分隔后取元組,再取元組的元組,
測試: 將數據在IDEA中打印出來
開發: 直接存到數據庫中
訪問日志 ==> DStream(直接拿到,生成DStream)
20180808,zs
20180808,ls
20180808,ww
20180808,w2
20180808,w3
20180808,w4
==》 (zs: 20180808,zs)(ls: 20180808,ls)(ww: 20180808,ww)
黑名單列表 ==> RDD(這塊模擬,不用數據庫,直接用list,轉成RDD)
zs
ls
==> (zs: true)(ls: true) // 過濾的結果(為true過濾掉) true: 即為黑名單
==> 20180808,ww //(最終得到的結果)
leftjoin //(左連接) 左邊的訪問日志表不變
(zs:[<20180808,zs> ,<true>]) x
(ls:[<20180808,ls> ,<true>]) x
(ww:[<20180808,ww> ,<false>]) ==> tuple 1
BlackFilterApp.scala
package com.imooc.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object BlackFilterApp {
def main(args: Array[String]): Unit = {
val sparkconf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
val ssc = new StreamingContext(sparkconf,Seconds(5))
/**
* 構建黑名單(要過濾的數據)
*/
val blacks = List("zs", "ls") // 一般這條在數據庫中,用 read 讀進來即可
val blacksRDD = ssc.sparkContext.parallelize(blacks)//轉成RDD
.map(x => (x, true))
//將這個元素 x 重新定位為一個新字段 (x,true)
//(("zs","true"),("ls","true"))
val lines = ssc.socketTextStream("localhost", 8888)
//20180808,zs 原來的格式
//zs,20180808,zs 處理后的格式
//取index=1的元素,然后在跟上它自身
val clicklogs = lines.map(x => (x.split(",")(1), x))
.transform(rdd => {
//blacksRDD進行map操作后它是RDD格式,此處的lines進行map操作后,它是DStream[U]格式,
//所以此處,要將DStream和RDD進行聯合,就要使用transform算子,
//通過將RDD-to-RDD函數應用於源DStream的每個RDD來返回新的DStream。
//這可以用於在DStream上執行任意RDD操作。
rdd.leftOuterJoin(blacksRDD)
//進行表的左外連接 leftOuterJoin
//
// 端口傳進來的數據,經過處理后
// zs,20180808,zs
// ls,20180808,ls
// ww,20180808,ww
//
// 黑名單中的數據
// (("zs","true"),("ls","true"))
//
//進行關聯后的數據
// (zs:[<20180808,zs>,<true>]) x
// (ls:[<20180808,ls>,<true>]) x
// (ww:[<20180808,ww>,<false>]) ==> tuple 1
.filter(x => x._2._2.getOrElse(false) != true)
// 過濾(zs: [<20180808,zs> ,<true>])中,第二個元素的中的第二個元素,判斷是否等於true,如果不為true,則返回false,
// 此處運行后,就只剩下為false的元素了 (ww:[<20180808,ww>,<false>]) ,只有這一條了
.map(x => x._2._1)
//取(zs,[<20180808,zs>,<true>])中第二個元素的第一個元素 (tuple的使用)
})
clicklogs.print() //在控制台打印信息 (這塊結果應該是有問題的,它只顯示最后一個), 應該在 filter或map這塊,它是有問題的, 感覺問題在 map, 對元組的掌握,還有待提高
ssc.start()
ssc.awaitTermination()
}
}
