轉載請標明出處http://www.cnblogs.com/haozhengfei/p/e353daff460b01a5be13688fe1f8c952.html
Spark_總結五
1.Storm 和 SparkStreaming區別
Storm | 純實時的流式處理,來一條數據就立即進行處理 |
SparkStreaming | 微批處理,每次處理的都是一批非常小的數據 |
Storm支持動態調整並行度(動態的資源分配),SparkStreaming(粗粒度, 比較消耗資源) |
Storm 優點 || 缺點
Storm 流式計算(扶梯)
優點:
數據延遲度很低,
Storm的事務機制要比SparkStreaming的事務機制要完善(
什么是事務機制?對於一條數據,不多處理也不少處理,對於一條數據恰好處理一次,比如金融,股票等要求實時性比較高,那么就需要選Storm
)
缺點:
一直持有着資源,
每一條數據都要在集群中某一台節點處理,要計算的數據會進行網絡傳輸,吞吐量小,另外Storm不適合做復雜的業務邏輯(適合匯總)
SparkStreaming 優點 || 缺點
SparkStreaming 微批處理(類似於電梯),
它並不是純的批處理
優點:
吞吐量大,可以
做復雜的業務邏輯(保證每個job的處理小於batch interval)
缺點:
數據延遲度較高
公司中為什么選用SparkStreaming要多一些?
1.秒級別延遲,通常應用程序是可以接受的,
2.可以應用機器學習,SparkSQL...可擴展性比較好,數據吞吐量較高
2.SparkStreaming
2.1什么是SparkStreaming?
SparkStreaming是一個流式處理框架,處理的模式是微批處理(微批有多大?通過時間來設置這個批有多大
[For example:Batch Interval 5s])
SparkStreaming
基於DStream(Discretized Streams:離散的數據流
)來進行編程,處理的是一個流,這個流什么時候切成一個rdd-->根據batchinterval來決定何時切割成一個RDD。
SparkStreaming 架構圖

job的個數是由output operator決定的,
StreamContext底層封裝了SparkContext
2.2圖解SparkStreaming || SparkStreaming執行流程

從圖上可以看到,Batch Interval的間隔是5s,也就是說每經過5s,SparkStreaming會將這5s內的信息封裝成一個DStream,然后提交到Spark集群進行計算
執行流程
第一個 DStream 里面是 0-5s 的數據,在第6s的時候會觸發 DStream 的job執行,這時會另啟動一個線程執行這個job
(假設這個job只需要3s)
,同時在6-10s期間繼續接受數據,在第11s的時候會觸發 DStream 的job的執行,
這時會另啟動一個線程執行這個job
(假設這個job只需要3s)
,
同時在11-15s期間繼續接受數據...
注意!
如果這個job執行的時間大於5s會有什么問題?
數據在5s內處理不完,又啟動一個job,導致數據越積越多,
從而導致
SparkStreaming down
2.3SparkStreaming代碼 TransformOperator
案例:過濾黑名單
這里模擬了一份黑名單,SparkStreaming監控服務器中指定端口,時間設定為每5秒處理一次數據。每當job觸發時,用戶輸入的數據與黑名單中的數據進行左外連接,然后過濾
node1 創建一個Socket Server
nc -lk 8888 (頁面停下了,開始輸入數據進入8888端口,此時SparkStreaming監聽這個端口)
hello world
hello jack
hello tom(
過濾tom)
result:

注意事項!
1.為什么會沒有數據?
因為只開啟了一條線程(這里只有接收數據的線程),所以local的模擬SparkStreaming必須至少設置兩個線程,
new SparkConf().setMaster("local[2]").setAppName("TransformBlacklist");
2.Durations時間的設置
--接收數據的延遲時間,多久觸發一次job
final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
3.創建JavaStreamingContext有兩種方式(sparkconf、sparkcontext)
4.業務邏輯完成后,需要有一個output operator,將SparkStreaming處理后的數據輸出(HDFS,DBMS)
5.關於
JavaStreamingContext 的
start() || stop()
JavaStreamingContext.start() //straming框架啟動之后是不能再添加業務邏輯
JavaStreamingContext.stop() //無
參的stop方法會將sparkContext一同關閉,解決辦法:stop(false)
JavaStreamingContext.stop() //
停止之后是不能在調用start()
6.
DStreams(Discretized Streams--離散的流),
應用在每個DStream的算子操作
,應用在RDD,應用在Partition,應用在Partition中的一條條數據,
所以最終應用到每一條記錄上
2.4Window窗口操作
window operation
普通的
每隔
多長時間切割RDD
基於窗口的操作:
每隔
多長時間切割RDD,
每隔
多長時間計算一次,每次計算的量是多少
為什么需要有窗口操作?
比如
別人要求能夠實時看到此刻之前一段時間的數據情況,如果使用批處理的話,那么我們只能固定一個整段時間然后對這個整段時間進行spark core的計算,但是別人的要求是每一個時刻都需要有結果,那么就需要窗口操作?但是窗口操作肯定會有很多的
重復計算,這里有一個優化的地方(
這個優化也不是必須的,
視具體情況而定,比如說我們要查看最近30分鍾最熱門的頭條,我們在設計的時候不可能每隔30分鍾計算一次,這里定義了滑動窗口時間是1分鍾,然而計算量是30分鍾內的數據,那么肯定會有29分鍾重復的數據計算);但是
優化的話就會有一個前提,必須要checkpoint

每次計算都是最近15s的數據,基於這個特性(微博熱點:最近30分鍾內最熱門的頭條)
問題一:batch interval 5s,窗口大小可以是8s么?
不行,有的batch就不能被窗口所包含,必須是batch interval的整數倍
問題二:滑動窗口時間 8s 可以么?
必須是batch interval的整數倍
優化:如何避免time3被重復計算(圖中time3在兩個window中都被計算了),可以沒有,但是有的話,就需要這種優化

Batch Interval 1s
||
窗口大小 5s
||
滑動窗口 1s
思考:計算一個趨勢的時候,需要基於滑動窗口的操作,是否必須要優化,避免重復計算?(未必)
For example:
1.查看微博中每小時的熱門微博,每隔1分鍾計算一次,相當於重復計算了59分鍾的內容
2.商家想看前5分鍾的銷售額,每隔30秒看一次,也需要基於窗口的操作
2.5UpdateStateByKey
updateStateByKey
的使用需要
checkpoint
,隔幾次記錄一次到磁盤中
UpdateStateByKey的主要功能
1、Spark Streaming中為
每一個Key維護一份state狀態,這個
state類型可以是任意類型的的, 可以是一個自定義的對象,那么更新函數也可以是任意類型的。
2、通過更新函數對該key的狀態不斷更新,
對於每個新的batch而言,Spark Streaming會在使用updateStateByKey的時候為已經存在的key進行state的狀態更新
(對於每個新出現的key,會同樣的執行state的更新函數操作)
3、
如果要不斷的更新每個key的state,就涉及到了狀態的保存和容錯,這個時候就需要開啟checkpoint機制和功能 (
one.checkpoint(Durations.seconds(10)) //容錯更好就需要犧牲性能,容錯不需要太高,時間可以設置的長一些,(比如這里將checkpoint設置為10s,也就是每隔10s會將標記有checkpoint的RDD計算的結果持久化到磁盤,如果我們設置的Batch Interval = 5s, 在第15s時觸發job進行計算, 但是在第17s時down, 這時候只能恢復到10s的數據,那么10s至17s的數據就丟失了,具體設置多少,視具體情況而定)
)
UpdateStateByKey用處:
統計廣告點擊流量,統計這一天的車流量...
案例:全面的廣告點擊分析
UpdateStateByKeyOperator
這里做了
checkpoint操作,
jsc.checkpoint("hdfs://ndoe1:8020/user/sscheckpoint");
node1創建一個Socket Server,指定8888端口,SparkStreaming與服務器這個端口建立通信,那么用戶的數據從這里流向SparkStreaming進行計算。
在這個案例中,
用以空格分割的單詞來模擬用戶點擊的廣告,每個單詞代表一個廣告,
統計每一個廣告(單詞)出現的次數(就是WordCount)
最后的
conunts.print() //output operator類型的算子

result: 利用SparkStreaming做到了微批處理,近似實時計算


查看hdfs,發現設置checkpoint會將SparkStreaming的處理結果進行了持久化


2.6reduceByKeyAndWindow
基於滑動窗口的熱點搜索詞實時統計--
WindowOperator
1.未優化的
2.優化的必須要設置checkpoint的目錄
以下是優化的過的reduceByKeyAndWindow

補充:
1.Spark master 8080端口 監控資源
Drive
4040端口 監控任務,可以看到有一個Streaming job(它里面有一個線程,是一直運行的,負責接收我們的數據)
2.transform 和
foreachRDD 的區別?
transform Transformation類型算子,
transform非常厲害,可以拿到每一個DStream的rdd;這樣就可以利用每一個rdd的所有算子來轉換;甚至在里面應用spark core,或者將rdd轉換成DataFrame來使用SparkSQL操作
foreachRDD Action類型算子,對於每個RDD進行操作,
什么時候會用?最后存結果的時候會用
3.
transform取出DStream中的RDD

使用transform將DStream中的RDD抽取出來,調用了RDD的Action類的算子(是可以執行的)
是在Driver端執行的,如果不在Driver端執行,調用Action類的算子就不會觸發一個job了
對RDD的操作才會發送到Executor端執行,
transform是對
DStream進行操作(transform中抽取RDD,對這個RDD執行collect類型的數據,在job Generator時執行的,生成了多個job,以
jobSet的形式發送給
jobSecheduler),這樣的話就可以
預警:對數據的預警,與標准進行比較,如果超過了這個標准就進行報警(一旦發現某個黑名單就立即進行報警,),整體的代碼是在Driver端執行的,但是部分代碼對RDD的操作是在Executor段執行的
SparkContext sc = userClickRDD.context();
Object obj = "可以來源於數據庫,動態的更改廣播變量"
sc.broadCast(obj)
2.6SparkStreaming--Driver HA
2.6.1Driver也有可能掛掉,如何實現它的高可用?

當一個Driver掛掉后,(回憶:當初的Master是由zookeeper進行托管),另外啟動一個Driver,它就需要從上一個Driver中獲得相關的信息(包括batch的進度,data的位置,job執行進度,
DStream的Graph(基於DStream的業務邏輯))
如何實現Driver的高可用-->基於HDFS上面的元數據(Driver的信息)進行恢復,
注意!不會重新new SparkContext,因為這樣相當於又創建了一個全新的Driver
2.6.2Driver HA的代碼套路
1.指定了去哪一個目錄下面尋找Driver的元數據信息
2.
提交Application到集群中執行的時候,必須使用cluster模式,同時必須指定一個參數 --supervise(當某一個Driver掛掉,新的Driver需要另一個Driver中的信息來繼續job的執行)
2.6.3監控HDFS上指定目錄下文件數量的變化
示例代碼
SparkStreamingOnHDFS
1.為了狀態的保存和容錯,開啟了checkpoint機制,Driver HA
2.
ssc.textFileStream("hdfs://node1:8020/userhdfs/")
//監控hdfs上/user/hdfs的變化
命令:hadoop fs -put wc /user/hdfs
2.6.4SparkStreaming 監控 HDFS 上文件數量的變化,並將變化寫入到MySql中
示例代碼
SparkStreamingOnHDFSToMySQL
1.為了狀態的保存和容錯,開啟了checkpoint機制,Driver HA
2.
ssc.textFileStream("hdfs://node1:8020/userhdfs/")
//監控hdfs上/user/hdfs的變化
3.Kafka
3.1Kafka定義
Apache Kafka是一個高吞吐的集發布與訂閱與一體的分布式消息系統
流式處理的數據源是kafka,批處理的數據源是hive,也就是hdfs;
3.2消息隊列常見的場景
1.系統之間的解耦合
-queue模型
-publish-subscribe模型

2.峰值壓力緩沖,如果高峰期日志大量到SparkSreaming,那么會造成計算時間超過BatchInterval),可以在日志服務器和SparkStreaming中間加入Kafka,起到緩沖的作用
3.異步通信

3.3Kafka的架構


消費者的
消費偏移量存儲在
zookeeper中,生產者生產數據,消費者消費數據,kafka並不會生產數據,但kafka默認一周刪除一次數據。
broker就是代理,在kafka cluster這一層這里,其實里面是有很多個broker
topic就相當於
Queue,
Queue里面有生產者消費者模型
3.4Kafka的消息存儲和生產消費模型

topic:一個kafka集群中可以划分n多的topic,一個topic可以分成多個partition(
這個是為了做並行的
),
每個partition內部消息強有序,其
中的每個消息都有一個序號叫offset,一個partition對應一個broker,一個broker可以管多個partition,
這個partition可以很簡單想象為一個文件,當數據發過來的時候它就往這個
partition上面append,追加就行,kafka和很多消息系統不一樣,很多消息系統是消費完了我就把它刪掉,而kafka是根據時間策略刪除,而不是消費完就刪除,在
kafka里面沒有一個消費完這么個概念,只有過期這樣一個概念
生產者自己決定往哪個partition寫消息
(輪循的負載均衡或者是基於hash的partition策略)
消費者可以訂閱某一個topic,這個topic一旦有數據,會將數據推送給消費者
3.5kafka 組內queue消費模型 || 組間publish-subscribe消費模型

3.6kafka有哪些特點

3.7為什么Kafka的吞吐量高?
3.7.1 什么是Zero Copy?
零拷貝”是指計算機操作的過程中,CPU不需要為數
據在內存之間的拷貝消耗資源。而
它通常是指計算機在網絡上發送文件時,不需要
將文件內容拷貝到用戶空間(User Space)而直接在內核空間(Kernel Space)中
傳輸到網絡的方式
3.7.2
kafka采用零拷貝Zero Copy的方式


從上圖中可以清楚的看到,
Zero Copy的模式中,避免了數據在內存空間和用戶空間
之間的拷貝,從而提高了系統的整體性能。
Linux中的sendfile()以及
Java NIO中
的FileChannel.transferTo()方法都實現了零拷貝的功能,而在Netty中也通過在
FileRegion中包裝了NIO的FileChannel.transferTo()方法實現了零拷貝
3.8搭建Kafka集群--leader的均衡機制


Kafka中leader的均衡機制
Kafka中一個topic有多個partition,如上圖,kfk有0,1,2共三個partition,每個partition都有對應的leader來進行管理,對於leader1來說它來管理partition0,當leader1掛掉之后,因為partition0配置了副本數(在broker0,broker2還存在partition0的副本),那么此時會在broker0,broker2上選出一台當做leader繼續管理partition0(比如說選取了broker2當做partition0的leader),這時候如果我們配置了leader均衡機制,重新恢復了broker1,那么partition0的leader就會從broker2轉移到broker1,減輕了broker2的讀取壓力,實現了負載均衡。當然如果不開啟leader均衡機制的話,重新恢復broker1,那么partition0的leader仍就是broker2。
Kafka中leader的均衡機制在哪里配置?

在server.properties添加如下一句話
auto.leader.rebalance.enable=true
3.9Kafka_code注意事項
注意一:
向kafka中寫數據的時候我們
必須要指定所配置kafka的所有brokers節點,而不能只配置一個節點,因為我們寫的話,是不知道這個topic最終存放在什么地方,所以必須指定全,
讀取Kafka中的數據的時候是需要指定zk的節點,只需要指定一個節點就可以了;目前我們使用的在代碼中直接寫上這些節點,以后全部要寫到配置文件中
注意二:
kafka中存儲的是鍵值對,即使我們沒有明確些出來key,
獲取的時候也是需要利用tuple的方式獲取值的;而對於放到一個kafka中的數據,
這個數據到底存放到那個partition中呢?這個就需要使用hashPartition方式或者普通的輪詢方式存放;對於沒有明確指定key的發往kafka的數據,使用的就是輪詢方式;
4.SparkStreaming + Kafka 兩種模式--Receive模式 || Direct模式
Receive模式--SparkStreaming + Kafka 整體架構

注意!每一步都是阻塞的,上一步完成之后才能進行下一步
流程:
1.接收數據(
SparkStreaming作為
消費者,如果訂閱了一個topic,那么topic一有數據就會主動推送給SparkSreaming)
2.Executor將接收來的數據備份到其他Executor中(Executor中執行的job作為一個receiver,里面的task一直在接收kafka推送來的數據,然后將接收來的數據進行持久化,
默認的持久化級別是
MEMORY_DISK_SER_2)
3.Executor備份完成之后,向
Driver中的
ReceiverTracker匯報數據存儲在哪一些的
Executor中(
Driver{ReceiverTracker,DAGScheduler,TaskScheduler})
4.在zookeeper中更新消費偏移量
5.Driver負責分發
task到數據所在的
Executor中執行(達到移動計算,而不是移動數據)
注意!
1.在SparkStreaming中Driver一旦掛掉,它下面的Executor也會掛掉
如果在第四步完成后,Driver掛掉了,會有什么問題?
其實數據並沒有被處理,數據就丟了,因此kafka的事務機制並不完善
因此對於如上這種情況,提供了一個解決方案,就是WAL機制(
WriteAheadLog--預寫機制)
但是WAL機制有什么問題?(每一次接收來的數據,都要往HDFS上寫一份,性能會有所下降)
代碼示例
SparkStreamingOnKafkaReceiver
SparkStreamingDataManuallyProducerForKafka
需要啟動HDFS
Direct模式
SparkStreaming和
Kafka直接連接,SparkStreaming去Kafka去
pull數據,這個
消費偏移量由SparkStreaming自己來維護(實際上通過checkpoint來管理的,checkpoint和job是異步的,總的來時SparkStreaming的事務機制並不是很完善),避免了數據的丟失(相對而言,不是絕對的)
並行度:
1.
linesDStream里面封裝的是RDD,RDD里面有partition,RDD里面的partition與這個topic的partition是一一對應的
2.從kafka中讀來的數據,封裝到一個DStream中,可以
對這個DStream重分區,lines.repartition(10),增加partition的數量,提高並行度。
並行度:
batch->rdd->DStream
batchInterval 5s
blockInterval = 200ms
batch = 25block
將一個
blockInterval設置的小一些,有更多的block,對應更多的split,也就有更多的partition,從而提高並行度
官方建議:blockInterval不要低於50ms,否則
batchInterval/
blockInterval 得到的block過多,partition就過多,啟動多個線程並行計算,影響執行job的性能
Receive模式 || Direct模式
最大的不同:消費偏移量誰來管理
附件列表
©All rights reserved