一、基礎核心概念
1、StreamingContext詳解 (一)
有兩種創建StreamingContext的方式:
val conf = new SparkConf().setAppName(appName).setMaster(master);
val ssc = new StreamingContext(conf, Seconds(1));
StreamingContext, 還可以使用已有的SparkContext來創建
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1));
appName, 是用來在Spark UI上顯示的應用名稱。 master, 是一個Spark、 Mesos或者Yarn集群的URL, 或者是local[*]。
2、StreamingContext詳解 (二)
一個StreamingContext定義之后, 必須做以下幾件事情:
1、 通過創建輸入DStream來創建輸入數據源。
2、 通過對DStream定義transformation和output算子操作, 來定義實時計算邏輯。
3 、 調用StreamingContext的start()方法, 來開始實時處理數據。
4、 調用StreamingContext的awaitTermination()方法, 來等待應用程序的終止。 可以使用CTRL+C手動停止,或者就是讓它持續不斷的運行進行計算。
5、 也可以通過調用StreamingContext的stop()方法, 來停止應用程序。
需要注意的要點:
1、 只要一個StreamingContext啟動之后, 就不能再往其中添加任何計算邏輯了。 比如執行start()方法之后, 還給某個DStream執行一個算子。
2、 一個StreamingContext停止之后, 是肯定不能夠重啟的。 調用stop()之后, 不能再調用start()
3、 一個JVM同時只能有一個StreamingContext啟動。 在你的應用程序中, 不能創建兩個StreamingContext。
4、 調用stop()方法時, 會同時停止內部的SparkContext, 如果不希望如此, 還希望后面繼續使用SparkContext創建其他類型的Context, 比如SQLContext, 那么就用stop(false)。
5、 一個SparkContext可以創建多個StreamingContext, 只要上一個先用stop(false)停止, 再創建下一個即可。
3、輸入DStream和Receiver詳解(一)
輸入DStream代表了來自數據源的輸入數據流。 在之前的wordcount例子中, lines就是一個輸入 DStream( JavaReceiverInputDStream) , 代表了從netcat( nc) 服務接收到的數據流。 除了 文件數據流之外, 所有的輸入DStream都會綁定一個Receiver對象,
該對象是一個關鍵的組件, 用來從數據源接收數據, 並將其存儲在Spark的內存中, 以供后續處理。
Spark Streaming提供了三種內置的數據源支持;
1、 基礎數據源: StreamingContext API中直接提供了對這些數據源的支持, 比如文件、 socket、 Akka Actor等。
2、 高級數據源: 諸如Kafka、 Flume、 Kinesis、 Twitter等數據源, 通過第三方工具類提供支持。 這些數據源的使用, 需要引用其依賴。
3、 自定義數據源: 我們可以自己定義數據源, 來決定如何接受和存儲數據。
4、輸入DStream和Receiver詳解(二)
如果你想要在實時計算應用中並行接收多條數據流, 可以創建多個輸入DStream。 這樣就會創建多個 Receiver, 從而並行地接收多個數據流。 但是要注意的是, 一個Spark Streaming Application的 Executor, 是一個長時間運行的任務, 因此,
它會獨占分配給Spark Streaming Application的cpu core。從而只要Spark Streaming運行起來以后, 這個節點上的cpu core, 就沒法給其他應用使用了。
使用本地模式, 運行程序時, 絕對不能用local或者local[1], 因為那樣的話, 只會給執行輸入DStream的 executor分配一個線程。 而Spark Streaming底層的原理是, 至少要有兩條線程, 一條線程用來分配給 Receiver接收數據, 一條線程用來處理接收到的數據。
因此必須使用local[n], n>=2的模式。 (n不能大於當前節點的CPU核數)
如果不設置Master, 也就是直接將Spark Streaming應用提交到集群上運行, 那么首先, 必須要求集群 節點上, 有>1個cpu core, 其次, 給Spark Streaming的每個executor分配的core, 必須>1, 這樣, 才能保證分配到executor上運行的輸入DStream,
兩條線程並行, 一條運行Receiver, 接收數據; 一條處 理數據。 否則的話, 只會接收數據, 不會處理數據。
總結:Receiver接收器
Receiver接收器,可以接收外部數據源中的數據,並將其保存到
內存中,以供后續使用。
在Spark中支持三大類型的數據源:
1、基礎數據源:比如文件、Socket、Akka中的數據。
2、高級數據源,比如Flume、Kafka、推特中的數據。
3、自定義數據源。
補充:在Spark Streaming中,可以通過兩種方式操作Kafka的數據。
一種是通過Receiver的方式,另一種Direct直接讀取的方式。
5、輸入DStream之基礎數據源
(1)Socket: 之前的wordcount例子,
object WordCoundStreaming {
def main(args: Array[String]): Unit = {
/**
* 處理Spakr Streaming程序至少需要2個線程,其中,
* 一個線程負責接收輸入的數據
* 另一個線程負責處理接收的數據
* local[N] N>=2
*/
val conf=new SparkConf().setAppName("WordCoundStreaming")
.setMaster("local[2]")
/**
* SparkContext是用戶與Spark集群交互的唯一接口,所以SparkContext是必需的。
* 在創建StreamingContext的過程中,Spark會在源碼中自動創建一個SparkContext對象。
* 注意第二個參數Seconds(*),表示實時流數據中每批數據的時間間隔,
* 也就是說,在DStream離散流中的每個RDD包含相應時間間隔的數據。
*/
val ssc=new StreamingContext(conf,Seconds(5))
/**
* 通過socketTextStream()獲取nc服務器中的數據
* 需要指明獲取nc服務器的節點名稱和端口,這是的端口要和運行nc服務器的端口一致。
* 此時產生的lines不是RDD,而是一個DStream離散流。
*/
val lines=ssc.socketTextStream("tgmaster",9999)
/**
* 對離散流lines進行flatMap轉換操作,實際上是對lines離散流中的每個RDD都進行
* flatMap操作,從而產生了新的RDD,多個新的RDD構成了新的離散流words。
*/
val words=lines.flatMap(_.split(" "))
val pairs=words.map((_,1))
val result=pairs.reduceByKey(_+_)
//在控制台輸出內容
result.print()
//啟動StreamingContext
ssc.start()
//等待程序停止
ssc.awaitTermination()
}
}
(2)基於HDFS文件的實時計算, 其實就是, 監控一個HDFS目錄, 只要其中有新文件出現, 就實時處理。相當於處理實時的文件流。
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory)
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming會監視指定的HDFS目錄, 並且處理出現在目錄中的文件。
要注意的是, 所有放入HDFS目錄中的文件, 都必須有相同的格式; 必須使用移動或者重命名的方式,將文件移入目錄; 一旦處理之后, 文件的內容即使改變, 也不會再處理了; 基於HDFS文件的數據源是沒有Receiver的, 因此不會占用一個cpu core。
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("word").setMaster("local[2]")
val ssc=new StreamingContext(conf,Seconds(5))
val lines=ssc.textFileStream("hdfs://liuwei1:9000/homework")
val result=lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
result.print()
ssc.start()
ssc.awaitTermination()
}
二、DStream的transformation


總結:DStream中Transformation類型的算子有以下三個最重要
1)updateStateByKey
可以為每個Key保留一份狀態,並更新狀態的值。
案例:在全局范圍內統計每個單詞出現的次數。
2)transform
可以執行RDD到RDD的操作,相當於對DStream API的一個補充。
3)window
滑動窗口操作,需要指明兩個參數,一個是窗口的長度,另一個是
窗口滑動的時間間隔。
三、updateStateBykey (全局范圍之內處理數據,而不是一批一批的)
updateStateByKey
updateStateByKey操作, 可以讓我們為每個key維護一份state, 並持續不斷的更新該state。
1、 首先, 要定義一個state, 可以是任意的數據類型;
2、 其次, 要定義state更新函數——指定一個函數如何使用之前的state和新值來更新state。
對於每個batch, Spark都會為每個之前已經存在的key去應用一次state更新函數, 無論這個key在batch中是否有新的數據。 如果state更新函數返回none, 那么key
對應的state就會被刪除。 當然, 對於每個新出現的key, 也會執行state更新函數。
注意, updateStateByKey操作, 要求必須開啟Checkpoint機制。
案例: 基於緩存的實時wordcount程序( 在實際業務場景中, 這個是非常有用的)
object updateStateByKey {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("updateStateByKey")
.setMaster("local[2]")
val ssc=new StreamingContext(conf,Seconds(5))
//updateStateByKey操作,要求必須開啟Checkpoint機制。
ssc.checkpoint("hdfs://tgmaster:9000/in/ch")
val lines=ssc.socketTextStream("tgmaster",9999)
val pairs=lines.flatMap(_.split(" "))
.map((_,1))
/**
* values:多個新值的集合
* state:是一個Option類型的狀態,
* 可以通過state.getOrElse(0)為newValue設置初始值
*/
val result=pairs.updateStateByKey((values:Seq[Int],state:Option[Int])=>{
//創建newValue並設初始值0
var newValue=state.getOrElse(0)
/**
* 遍歷values集合的新值,用以改變原先的舊值
*/
for(value <- values){
newValue+=value
}
/**
* updateStateByKey算子需要一個Option類型的返回值,
* 所以將更新之后的新值作為Option返回。
*/
Option(newValue)
})
result.print()
ssc.start()
ssc.awaitTermination()
}
}
四、transform
transform操作, 應用在DStream上時, 可以用於執行任意的RDD到RDD的轉換操作。 它可以用於實現, DStream API中所沒有提供的操作。 比如說, DStream API中, 並沒有提 供將一個DStream中的每個batch, 與一個特定的RDD進行join的操作。
但是我們自己就 可以使用transform操作來實現該功能。
DStream.join(), 只能join其他DStream。 在DStream每個batch的RDD計算出來之后, 會 去跟其他DStream的RDD進行join。
案例: 廣告計費日志實時黑名單過濾
/**
* Created by tg on 3/29/17.
* 實時黑名單過濾
*/
object transformDemo {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("transformDemo")
.setMaster("local[2]")
val ssc=new StreamingContext(conf,Seconds(5))
//模擬數據,創建黑名單RDD,(String,Boolean) (name,Boolean)
val blackRDD=ssc.sparkContext.parallelize(Array(("tom",true)))
//從nc服務器中獲取數據,nc服務器中的數據格式:time name,比如: 1101 jack
val linesDStream=ssc.socketTextStream("tgmaster",9999)
val mapDStream=linesDStream.map(line=>{
val log=line.split(" ")
(log(1),line) //(name,time name)
})
/**
* tansform()算子可以執行RDD到RDD的轉換操作
*/
mapDStream.transform(adsRDD=>{
//讓adsRDD與blackRDD進行leftOuterJoin左外連接操作,操作之后的數據包含所有的用戶
// (String,(String,Boolean))
val joinRDD=adsRDD.leftOuterJoin(blackRDD)
val filterRDD=joinRDD.filter(x=>{
/**
* 當if條件成立時,意味着是黑名單人員,返回false將其刪除。
* 當if條件不成立時,意味着不是黑名單人員,返回true將其保留。
*/
if(x._2._2.getOrElse(false)) false else true
})
filterRDD.map(x=>{
x._2._1
})
}).print()
ssc.start()
ssc.awaitTermination()
}
}
五、window滑動窗口
Spark Streaming提供了滑動窗口操作的支持, 從而讓我們可以對一個滑動窗口內的數據執行計算 操作。
每次掉落在窗口內的RDD的數據, 會被聚合起來執行計算操作, 然后生成的RDD, 會作為window DStream的一個RDD。 比如下圖中, 就是對每三秒鍾的數據執行一次滑動窗口計算, 這3秒內的3個 RDD會被聚合起來進行處理, 然后過了兩秒鍾, 又會對最近三秒內的數據執行滑動窗口計算。 所以 每個滑動窗口操作, 都必須指定兩個參數, 窗口長度以及滑動間隔, 而且這兩個參數值都必須是 batch間隔的整數倍。 ( Spark Streaming對滑動窗口的支持, 是比Storm更加完善和強大的) 
window滑動窗口操作 
案例:
熱點搜索詞滑動統計, 每隔5秒鍾, 統計最近20秒鍾的搜索詞的搜索頻次, 並打印 出排名最靠前的3個搜索詞以及出現次數
package SparkCore.day1
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by tg on 3/29/17.
* 熱點搜索詞滑動統計,每隔5秒鍾,統計最近20秒鍾的搜索詞的搜索頻次,
* 並打印出排名最靠前的3個搜索詞以及出現次數。
*/
object reduceByKeyAndWindowDemo {
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setAppName("reduceByKeyAndWindowDemo")
.setMaster("local[2]")
val ssc=new StreamingContext(conf,Seconds(1))
//從nc服務器中獲取數據,數據格式:name keyword,比如:張三 Spark
val linesDStream=ssc.socketTextStream("tgmaster",9999)
//取出每行的搜索關鍵詞
val wordsDStream=linesDStream.map(x=>x.split(" ")(1))
val pairsDStream=wordsDStream.map(x=>(x,1))
/**
* 通過滑動窗口進行統計,每隔5秒鍾,統計最近20秒鍾的搜索詞出現次數。
* reduceByKeyAndWindow()算子中,
* 第一部分v1+v2用於計算搜索詞出現次數
* 第二部分Seconds(20)用於設置窗口的長度
* 第三部分Seconds(5)用於設置窗口的時間間隔
*/
val resultDStream=pairsDStream.reduceByKeyAndWindow((v1:Int,v2:Int)=>
v1+v2,Seconds(20),Seconds(5))
resultDStream.transform(itemRDD=>{
val result=itemRDD.map(x=>(x._2,x._1))
.sortByKey(false).take(3)
.map(x=>(x._2,x._1))
val resultRDD=ssc.sparkContext.parallelize(result)
resultRDD
}).print()
ssc.start()
ssc.awaitTermination()
}
}
六、output操作及foreachRDD
1、output 操作

DStream中的所有計算, 都是由output操作觸發的, 比如print()。 如果沒有任何 output操作, 那么, 壓根兒就不會執行定義的計算邏輯。
此外, 即使你使用了foreachRDD output操作, 也必須在里面對RDD執行action操 作, 才能觸發對每一個batch的計算邏輯。 否則, 光有foreachRDD output操作,在里面沒有對RDD執行action操作, 也不會觸發任何邏輯。
2、foreachRDD詳解
通常在foreachRDD中, 都會創建一個Connection, 比如JDBC Connection, 然后通過 Connection將數據寫入外部存儲
誤區一: 在RDD的foreach操作外部, 創建Connection 這種方式是錯誤的, 因為它會導致Connection對象被序列化后傳輸到每個Task中。 而這 種Connection對象, 實際上一般是不支持序列化的, 也就無法被傳輸
dstream.foreachRDD { rdd =>
val connection = createNewConnection()
rdd.foreach { record => connection.send(record)
}
}
誤區二: 在RDD的foreach操作內部, 創建Connection 這種方式是可以的, 但是效率低下。 因為它會導致對於RDD中的每一條數據, 都 創建一個Connection對象。 而通常來說, Connection的創建, 是很消耗性能的。
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
合理方式一: 使用RDD的foreachPartition操作, 並且在該操作內部, 創建Connection對象, 這樣就相當於是, 為RDD的每個partition創建一個 Connection對象, 節省資源的多了。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
合理方式二: 自己手動封裝一個靜態連接池, 使用RDD的foreachPartition操作, 並且在 該操作內部, 從靜態連接池中, 通過靜態方法, 獲取到一個連接, 使用之后再還回去。 這樣的話, 甚至在多個RDD的partition之間, 也可以復用連接了。 而且可以讓連接池采取
懶創建的策略, 並且空閑一段時間后, 將其釋放掉。
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection)
}
}
案例:
改寫UpdateStateByKeyWordCount, 將每次統計出來的全局的單詞計數, 寫入一份, 到MySQL數據庫中。
補充:
一、mysql數據庫中varchar與char的區別
varchar 可變的 ;name varchar(100) 賦值時賦了10 ,不占用多余的空間,節省資源空間
char 固定的,name varchar(100) 賦值為10,占用多余空間,浪費資源空間
char在查詢檢索數據時,char類型的都是固定長度,所以直接查詢檢索數據,但是對於varchar類型的存儲長度空間不一致,所以在查詢檢索數據之前,需要確定數據的存儲長度。然后再查詢數據,這樣就比char類型的多了一層確定數據存儲長度的操作,因此的那個數據量非常龐大時,會
影響查詢數據效率
建表:
create table wordcount (
id integer auto_increment primary key,
updated_time timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
word varchar(255),
count integer
);
添加數據:
insert into 表名(列1,列2,....) values (值1,值2,....)
