本課將從二方面闡述:
一、解密SparkStreaming Job架構和運行機制
二、解密SparkStreaming容錯架構和運行機制
一切不能進行實時流處理的數據都將是無效的數據。在流處理時代,SparkStreaming有着強大吸引力,加上Spark的生態系統及各個子框架,SparkStreaming可以無縫的調用其兄弟框,如SQL,MLlib、Graphx等。掌握SparkStreaming架構及Job運行機制對精通SparkStreaming至關重要。通常的Spark應用程序是對RDD的Action操作觸發了應用程序的Job的運行。而對於SparkStreaming,Job是怎么樣運行的呢?在編寫SparkStreaming程序的時候,可設置BatchDuration,SparkStreaming框架會自動啟動Job並每隔BatchDuration時間會自動觸發Job的調用。
兩個Job的概念
-
每隔BatchInterval時間片就會產生的一個個Job,這里的Job並不是Spark Core中的Job,它只是基於DStreamGraph而生成的RDD的DAG而已;從Java角度講相當於Runnable接口的實現類,要想運行Job需要將Job提交給JobScheduler,在JobScheduler內部會通過線程池的方式創建運行Job的一個個線程,當找到一個空閑的線程后會將Job提交到集群運行(其實是在線程中基於RDD的Action觸發真正的作業的運行)。為什么使用線程池呢?
a.Job根據BatchInterval不斷生成,為了減少線程創建而帶來的效率提升我們需要使用線程池(這和在Executor中通過啟動線程池的方式來執行Task有異曲同工之妙);
b.如果Job的運行設置為FAIR公平調度的方式,這個時候也需要多線程的支持;
-
上面Job提交的Spark Job本身。單從這個時刻來看,此次的Job和Spark core中的Job沒有任何的區別。
下面通過運行代碼示例來分析整個運行機制
package com.dt.spark.sparkstreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* 使用Scala開發集群運行的Spark 在線黑名單過濾程序
* @author DT大數據夢工廠
* 新浪微博:http://weibo.com/ilovepains/
*
* 背景描述:在廣告點擊計費系統中,我們在線過濾掉黑名單的點擊,進而保護廣告商的利益,只進行有效的廣告點擊計費
* 或者在防刷評分(或者流量)系統,過濾掉無效的投票或者評分或者流量;
* 實現技術:使用transform Api直接基於RDD編程,進行join操作
*/
object OnlineForeachRDD2DB {
def main(args: Array[String]){
/**
* 第1步:創建Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息,
* 例如說通過setMaster來設置程序要鏈接的Spark集群的Master的URL,如果設置
* 為local,則代表Spark程序在本地運行,特別適合於機器配置條件非常差(例如
* 只有1G的內存)的初學者 *
*/
val conf = new SparkConf() //創建SparkConf對象
conf.setAppName("OnlineForeachRDD") //設置應用程序的名稱,在程序運行的監控界面可以看到名稱
// conf.setMaster("spark://Master:7077") //此時,程序在Spark集群
conf.setMaster("local[6]")
//設置batchDuration時間間隔來控制Job生成的頻率並且創建Spark Streaming執行的入口
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.socketTextStream("Master", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords => {
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"
val stmt = connection.createStatement();
stmt.executeUpdate(sql);
})
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
}
/**
* 在StreamingContext調用start方法的內部其實是會啟動JobScheduler的Start方法,進行消息循環,在JobScheduler
* 的start內部會構造JobGenerator和ReceiverTacker,並且調用JobGenerator和ReceiverTacker的start方法:
* 1,JobGenerator啟動后會不斷的根據batchDuration生成一個個的Job
* 2,ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor),在Receiver收到
* 數據后會通過ReceiverSupervisor存儲到Executor並且把數據的Metadata信息發送給Driver中的ReceiverTracker,在ReceiverTracker
* 內部會通過ReceivedBlockTracker來管理接受到的元數據信息
* 每個BatchInterval會產生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基於DStreamGraph而生成的RDD
* 的DAG而已,從Java角度講,相當於Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個
* 單獨的線程來提交Job到集群運行(其實是在線程中基於RDD的Action觸發真正的作業的運行),為什么使用線程池呢?
* 1.作業不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執行Task有異曲同工之妙;
* 2.有可能設置了Job的FAIR公平調度的方式,這個時候也需要多線程的支持;
*/
ssc.start()
ssc.awaitTermination()
}
}
package com.dt.spark.sparkstreaming;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;
public class ConnectionPool {
private static LinkedList<Connection> connectionQueue;
static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public synchronized static Connection getConnection() {
try {
if(connectionQueue == null) {
connectionQueue = new LinkedList<Connection>();
for(int i = 0; i < 5; i++) {
Connection conn = DriverManager.getConnection(
"jdbc:mysql://Master:3306/sparkstreaming",
"root",
"778899..");
connectionQueue.push(conn);
}
}
} catch (Exception e) {
e.printStackTrace();
}
return connectionQueue.poll();
}
public static void returnConnection(Connection conn) {
connectionQueue.push(conn);
}
}
將上述代碼打成JAR包:
啟動SparkStreaming應用程序:
具體執行細節是走的是SparkCore路線:
使用foreachRDD的話將結果內容直接插入數據庫中,不會進行打印結果輸出:
再看下整個運行過程:
總調度器啟動:Jobscheduler,主要是根據batchInterval或windows窗口移動進行作業划分,SparkStreaming不斷接收流進來數據,不斷生成Job,看下web控制台:
隨着時間的流失,不斷生成job本身,job怎么生成?
運行過程總結如下:
1、在StreamingContext調用start方法的內部其實是會啟動JobScheduler的Start方法,進行消息循環,在JobScheduler
的start內部會構造JobGenerator和ReceiverTacker,並且調用JobGenerator和ReceiverTacker的start方法:
JobGenerator啟動后會不斷的根據batchDuration生成一個個的Job
ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor),在Receiver收到
數據后會通過ReceiverSupervisor存儲到Executor並且把數據的Metadata信息發送給Driver中的ReceiverTracker,在ReceiverTracker
內部會通過ReceivedBlockTracker來管理接受到的元數據信息
每個BatchInterval會產生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基於DStreamGraph而生成的RDD
的DAG而已,從Java角度講,相當於Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個
單獨的線程來提交Job到集群運行(其實是在線程中基於RDD的Action觸發真正的作業的運行)。
2、為什么使用線程池呢?
作業不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執行Task有異曲同工之妙;
有可能設置了Job的FAIR公平調度的方式,這個時候也需要多線程的支持;
新浪微博:http://weibo.com/ilovepains
微信公眾號:DT_Spark
博客:http://blog.sina.com.cn/ilovepains
手機:18610086859
QQ:1740415547
郵箱:18610086859@vip.126.com
Spark發行版筆記3