Spark_總結五


 轉載請標明出處http://www.cnblogs.com/haozhengfei/p/e353daff460b01a5be13688fe1f8c952.html 


Spark_總結五

1.Storm 和 SparkStreaming區別

Storm                      純實時的流式處理,來一條數據就立即進行處理
SparkStreaming 微批處理,每次處理的都是一批非常小的數據
Storm支持動態調整並行度(動態的資源分配),SparkStreaming(粗粒度, 比較消耗資源)
 
Storm 優點 || 缺點
Storm 流式計算(扶梯)
    優點: 數據延遲度很低Storm的事務機制要比SparkStreaming的事務機制要完善 什么是事務機制?對於一條數據,不多處理也不少處理,對於一條數據恰好處理一次,比如金融,股票等要求實時性比較高,那么就需要選Storm
    缺點: 一直持有着資源, 每一條數據都要在集群中某一台節點處理,要計算的數據會進行網絡傳輸,吞吐量小,另外Storm不適合做復雜的業務邏輯(適合匯總)

SparkStreaming 優點 || 缺點

SparkStreaming 微批處理(類似於電梯), 它並不是純的批處理
    優點: 吞吐量大,可以 做復雜的業務邏輯(保證每個job的處理小於batch interval)
    缺點: 數據延遲度較高
 
公司中為什么選用SparkStreaming要多一些?
    1.秒級別延遲,通常應用程序是可以接受的,
    2.可以應用機器學習,SparkSQL...可擴展性比較好,數據吞吐量較高

2.SparkStreaming

2.1什么是SparkStreaming?

     SparkStreaming是一個流式處理框架,處理的模式是微批處理(微批有多大?通過時間來設置這個批有多大 [For example:Batch Interval 5s]
     SparkStreaming 基於DStream(Discretized Streams:離散的數據流 )來進行編程,處理的是一個流,這個流什么時候切成一個rdd-->根據batchinterval來決定何時切割成一個RDD。

SparkStreaming 架構圖

    job的個數是由output operator決定的, StreamContext底層封裝了SparkContext
 

2.2圖解SparkStreaming   ||   SparkStreaming執行流程

   從圖上可以看到,Batch Interval的間隔是5s,也就是說每經過5s,SparkStreaming會將這5s內的信息封裝成一個DStream,然后提交到Spark集群進行計算
執行流程
    第一個 DStream 里面是 0-5s 的數據,在第6s的時候會觸發 DStream 的job執行,這時會另啟動一個線程執行這個job (假設這個job只需要3s) ,同時在6-10s期間繼續接受數據,在第11s的時候會觸發 DStream 的job的執行, 這時會另啟動一個線程執行這個job (假設這個job只需要3s) 同時在11-15s期間繼續接受數據...
 
注意!
     如果這個job執行的時間大於5s會有什么問題?
    數據在5s內處理不完,又啟動一個job,導致數據越積越多, 從而導致  SparkStreaming down

2.3SparkStreaming代碼 TransformOperator

案例:過濾黑名單
    這里模擬了一份黑名單,SparkStreaming監控服務器中指定端口,時間設定為每5秒處理一次數據。每當job觸發時,用戶輸入的數據與黑名單中的數據進行左外連接,然后過濾
 
node1 創建一個Socket Server
    nc -lk 8888 (頁面停下了,開始輸入數據進入8888端口,此時SparkStreaming監聽這個端口)
    hello world
    hello jack
    hello tom( 過濾tom)
result:
 
注意事項!
1.為什么會沒有數據?
    因為只開啟了一條線程(這里只有接收數據的線程),所以local的模擬SparkStreaming必須至少設置兩個線程, new SparkConf().setMaster("local[2]").setAppName("TransformBlacklist");
 
2.Durations時間的設置 --接收數據的延遲時間,多久觸發一次job
final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
 
3.創建JavaStreamingContext有兩種方式(sparkconf、sparkcontext)
 
4.業務邏輯完成后,需要有一個output operator,將SparkStreaming處理后的數據輸出(HDFS,DBMS)
 
5.關於  JavaStreamingContext 的  start()   ||   stop()
        JavaStreamingContext.start() //straming框架啟動之后是不能再添加業務邏輯
        JavaStreamingContext.stop() //參的stop方法會將sparkContext一同關閉,解決辦法:stop(false)
        JavaStreamingContext.stop()  // 停止之后是不能在調用start()
 
6. DStreams(Discretized Streams--離散的流), 應用在每個DStream的算子操作 ,應用在RDD,應用在Partition,應用在Partition中的一條條數據, 所以最終應用到每一條記錄上
 

2.4Window窗口操作

window operation
普通的  每隔 多長時間切割RDD
基於窗口的操作: 每隔 多長時間切割RDD, 每隔 多長時間計算一次,每次計算的量是多少
 
 
為什么需要有窗口操作?
   比如 別人要求能夠實時看到此刻之前一段時間的數據情況,如果使用批處理的話,那么我們只能固定一個整段時間然后對這個整段時間進行spark core的計算,但是別人的要求是每一個時刻都需要有結果,那么就需要窗口操作?但是窗口操作肯定會有很多的 重復計算,這里有一個優化的地方這個優化也不是必須的視具體情況而定,比如說我們要查看最近30分鍾最熱門的頭條,我們在設計的時候不可能每隔30分鍾計算一次,這里定義了滑動窗口時間是1分鍾,然而計算量是30分鍾內的數據,那么肯定會有29分鍾重復的數據計算);但是 優化的話就會有一個前提,必須要checkpoint
 
                                               每次計算都是最近15s的數據,基於這個特性(微博熱點:最近30分鍾內最熱門的頭條)
問題一:batch interval 5s,窗口大小可以是8s么?
    不行,有的batch就不能被窗口所包含,必須是batch interval的整數倍
問題二:滑動窗口時間 8s 可以么?
    必須是batch interval的整數倍
 
優化:如何避免time3被重復計算(圖中time3在兩個window中都被計算了),可以沒有,但是有的話,就需要這種優化
                                                            Batch Interval 1s    ||     窗口大小 5s    ||     滑動窗口 1s
 
思考:計算一個趨勢的時候,需要基於滑動窗口的操作,是否必須要優化,避免重復計算?(未必
For example:
    1.查看微博中每小時的熱門微博,每隔1分鍾計算一次,相當於重復計算了59分鍾的內容
    2.商家想看前5分鍾的銷售額,每隔30秒看一次,也需要基於窗口的操作

2.5UpdateStateByKey

updateStateByKey 的使用需要 checkpoint ,隔幾次記錄一次到磁盤中
UpdateStateByKey的主要功能
   1、Spark Streaming中為 每一個Key維護一份state狀態,這個 state類型可以是任意類型的的, 可以是一個自定義的對象,那么更新函數也可以是任意類型的。
    2、通過更新函數對該key的狀態不斷更新對於每個新的batch而言,Spark Streaming會在使用updateStateByKey的時候為已經存在的key進行state的狀態更新 (對於每個新出現的key,會同樣的執行state的更新函數操作)
    3、 如果要不斷的更新每個key的state,就涉及到了狀態的保存和容錯,這個時候就需要開啟checkpoint機制和功能 (   one.checkpoint(Durations.seconds(10)) //容錯更好就需要犧牲性能,容錯不需要太高,時間可以設置的長一些,(比如這里將checkpoint設置為10s,也就是每隔10s會將標記有checkpoint的RDD計算的結果持久化到磁盤,如果我們設置的Batch Interval = 5s, 在第15s時觸發job進行計算, 但是在第17s時down, 這時候只能恢復到10s的數據,那么10s至17s的數據就丟失了,具體設置多少,視具體情況而定) )
 
UpdateStateByKey用處: 統計廣告點擊流量,統計這一天的車流量...
 
案例:全面的廣告點擊分析 UpdateStateByKeyOperator
    這里做了 checkpoint操作, jsc.checkpoint("hdfs://ndoe1:8020/user/sscheckpoint");
     node1創建一個Socket Server,指定8888端口,SparkStreaming與服務器這個端口建立通信,那么用戶的數據從這里流向SparkStreaming進行計算。
    在這個案例中, 用以空格分割的單詞來模擬用戶點擊的廣告,每個單詞代表一個廣告, 統計每一個廣告(單詞)出現的次數(就是WordCount)
 
    最后的 conunts.print()    //output operator類型的算子
 
result:   利用SparkStreaming做到了微批處理,近似實時計算

查看hdfs,發現設置checkpoint會將SparkStreaming的處理結果進行了持久化
 

2.6reduceByKeyAndWindow

基於滑動窗口的熱點搜索詞實時統計-- WindowOperator
    1.未優化的
    2.優化的必須要設置checkpoint的目錄
以下是優化的過的reduceByKeyAndWindow
 
補充:
1.Spark master 8080端口 監控資源
              Drive  4040端口 監控任務,可以看到有一個Streaming job(它里面有一個線程,是一直運行的,負責接收我們的數據)
 
2.transform 和  foreachRDD 的區別?
    transform Transformation類型算子, transform非常厲害,可以拿到每一個DStream的rdd;這樣就可以利用每一個rdd的所有算子來轉換甚至在里面應用spark core,或者將rdd轉換成DataFrame來使用SparkSQL操作
    foreachRDD Action類型算子,對於每個RDD進行操作, 什么時候會用?最后存結果的時候會用
 
3. transform取出DStream中的RDD
    使用transform將DStream中的RDD抽取出來,調用了RDD的Action類的算子(是可以執行的) 是在Driver端執行的,如果不在Driver端執行,調用Action類的算子就不會觸發一個job了
 
    對RDD的操作才會發送到Executor端執行transform是對 DStream進行操作(transform中抽取RDD,對這個RDD執行collect類型的數據,在job Generator時執行的,生成了多個job,以 jobSet的形式發送給 jobSecheduler),這樣的話就可以 預警:對數據的預警,與標准進行比較,如果超過了這個標准就進行報警(一旦發現某個黑名單就立即進行報警,),整體的代碼是在Driver端執行的,但是部分代碼對RDD的操作是在Executor段執行的
    SparkContext sc = userClickRDD.context();
    Object obj = "可以來源於數據庫,動態的更改廣播變量"
    sc.broadCast(obj)

2.6SparkStreaming--Driver HA

2.6.1Driver也有可能掛掉,如何實現它的高可用?

 
        當一個Driver掛掉后,(回憶:當初的Master是由zookeeper進行托管),另外啟動一個Driver,它就需要從上一個Driver中獲得相關的信息(包括batch的進度,data的位置,job執行進度, DStream的Graph(基於DStream的業務邏輯))
 
       如何實現Driver的高可用-->基於HDFS上面的元數據(Driver的信息)進行恢復, 注意!不會重新new SparkContext,因為這樣相當於又創建了一個全新的Driver
2.6.2Driver HA的代碼套路
    1.指定了去哪一個目錄下面尋找Driver的元數據信息
    2. 提交Application到集群中執行的時候,必須使用cluster模式,同時必須指定一個參數 --supervise(當某一個Driver掛掉,新的Driver需要另一個Driver中的信息來繼續job的執行)
2.6.3監控HDFS上指定目錄下文件數量的變化
示例代碼 SparkStreamingOnHDFS
    1.為了狀態的保存和容錯,開啟了checkpoint機制,Driver HA
    2. ssc.textFileStream("hdfs://node1:8020/userhdfs/")    //監控hdfs上/user/hdfs的變化
 
命令:hadoop fs -put wc /user/hdfs
2.6.4SparkStreaming 監控 HDFS 上文件數量的變化,並將變化寫入到MySql中
示例代碼 SparkStreamingOnHDFSToMySQL
    1.為了狀態的保存和容錯,開啟了checkpoint機制,Driver HA
    2. ssc.textFileStream("hdfs://node1:8020/userhdfs/")    //監控hdfs上/user/hdfs的變化

3.Kafka

3.1Kafka定義

  Apache Kafka是一個高吞吐的集發布與訂閱與一體的分布式消息系統
 
     流式處理的數據源是kafka,批處理的數據源是hive,也就是hdfs;

3.2消息隊列常見的場景

    1.系統之間的解耦合
        -queue模型
        -publish-subscribe模型
 
    2.峰值壓力緩沖,如果高峰期日志大量到SparkSreaming,那么會造成計算時間超過BatchInterval),可以在日志服務器和SparkStreaming中間加入Kafka,起到緩沖的作用
    3.異步通信
 
 

3.3Kafka的架構

    消費者的 消費偏移量存儲在 zookeeper中,生產者生產數據,消費者消費數據,kafka並不會生產數據,但kafka默認一周刪除一次數據。
     broker就是代理,在kafka cluster這一層這里,其實里面是有很多個broker
     topic就相當於 QueueQueue里面有生產者消費者模型

3.4Kafka的消息存儲和生產消費模型

 
   topic:一個kafka集群中可以划分n多的topic,一個topic可以分成多個partition( 這個是為了做並行的 ,  每個partition內部消息強有序,其 中的每個消息都有一個序號叫offset,一個partition對應一個broker,一個broker可以管多個partition, 這個partition可以很簡單想象為一個文件,當數據發過來的時候它就往這個 partition上面append,追加就行,kafka和很多消息系統不一樣,很多消息系統是消費完了我就把它刪掉,而kafka是根據時間策略刪除,而不是消費完就刪除,在 kafka里面沒有一個消費完這么個概念,只有過期這樣一個概念
   生產者自己決定往哪個partition寫消息 (輪循的負載均衡或者是基於hash的partition策略)
    消費者可以訂閱某一個topic,這個topic一旦有數據,會將數據推送給消費者

3.5kafka   組內queue消費模型   ||   組間publish-subscribe消費模型

 

3.6kafka有哪些特點

3.7為什么Kafka的吞吐量高?

3.7.1 什么是Zero Copy?
   零拷貝”是指計算機操作的過程中,CPU不需要為數 據在內存之間的拷貝消耗資源。而 它通常是指計算機在網絡上發送文件時,不需要 將文件內容拷貝到用戶空間(User Space)而直接在內核空間(Kernel Space)中 傳輸到網絡的方式
 
3.7.2  kafka采用零拷貝Zero Copy的方式
 
 
   從上圖中可以清楚的看到, Zero Copy的模式中,避免了數據在內存空間和用戶空間 之間的拷貝,從而提高了系統的整體性能。 Linux中的sendfile()以及 Java NIO中
的FileChannel.transferTo()方法都實現了零拷貝的功能,而在Netty中也通過在
FileRegion中包裝了NIO的FileChannel.transferTo()方法實現了零拷貝

3.8搭建Kafka集群--leader的均衡機制

 
 
Kafka中leader的均衡機制
     Kafka中一個topic有多個partition,如上圖,kfk有0,1,2共三個partition,每個partition都有對應的leader來進行管理,對於leader1來說它來管理partition0,當leader1掛掉之后,因為partition0配置了副本數(在broker0,broker2還存在partition0的副本),那么此時會在broker0,broker2上選出一台當做leader繼續管理partition0(比如說選取了broker2當做partition0的leader),這時候如果我們配置了leader均衡機制,重新恢復了broker1,那么partition0的leader就會從broker2轉移到broker1,減輕了broker2的讀取壓力,實現了負載均衡。當然如果不開啟leader均衡機制的話,重新恢復broker1,那么partition0的leader仍就是broker2。
 
Kafka中leader的均衡機制在哪里配置?
在server.properties添加如下一句話
auto.leader.rebalance.enable=true

3.9Kafka_code注意事項

注意一:
     向kafka中寫數據的時候我們 必須要指定所配置kafka的所有brokers節點,而不能只配置一個節點,因為我們寫的話,是不知道這個topic最終存放在什么地方,所以必須指定全,
    讀取Kafka中的數據的時候是需要指定zk的節點,只需要指定一個節點就可以了;目前我們使用的在代碼中直接寫上這些節點,以后全部要寫到配置文件中
注意二:
      kafka中存儲的是鍵值對,即使我們沒有明確些出來key, 獲取的時候也是需要利用tuple的方式獲取值的;而對於放到一個kafka中的數據, 這個數據到底存放到那個partition中呢?這個就需要使用hashPartition方式或者普通的輪詢方式存放;對於沒有明確指定key的發往kafka的數據,使用的就是輪詢方式;
 

4.SparkStreaming + Kafka    兩種模式--Receive模式    ||    Direct模式

Receive模式--SparkStreaming + Kafka 整體架構

 
                                                注意!每一步都是阻塞的,上一步完成之后才能進行下一步
流程:
1.接收數據( SparkStreaming作為 消費者,如果訂閱了一個topic,那么topic一有數據就會主動推送給SparkSreaming)
 
2.Executor將接收來的數據備份到其他Executor中(Executor中執行的job作為一個receiver,里面的task一直在接收kafka推送來的數據,然后將接收來的數據進行持久化, 默認的持久化級別MEMORY_DISK_SER_2
 
3.Executor備份完成之后,Driver中的 ReceiverTracker匯報數據存儲在哪一些的 Executor中( Driver{ReceiverTracker,DAGScheduler,TaskScheduler}
 
4.在zookeeper中更新消費偏移量
 
5.Driver負責分發 task到數據所在的 Executor中執行(達到移動計算,而不是移動數據)
 
注意!
    1.在SparkStreaming中Driver一旦掛掉,它下面的Executor也會掛掉
        如果在第四步完成后,Driver掛掉了,會有什么問題?
            其實數據並沒有被處理,數據就丟了,因此kafka的事務機制並不完善
       因此對於如上這種情況,提供了一個解決方案,就是WAL機制( WriteAheadLog--預寫機制 
       但是WAL機制有什么問題?(每一次接收來的數據,都要往HDFS上寫一份,性能會有所下降)
 
代碼示例
SparkStreamingOnKafkaReceiver
SparkStreamingDataManuallyProducerForKafka
    需要啟動HDFS

Direct模式

    SparkStreamingKafka直接連接,SparkStreaming去Kafka去 pull數據,這個 消費偏移量由SparkStreaming自己來維護(實際上通過checkpoint來管理的,checkpoint和job是異步的,總的來時SparkStreaming的事務機制並不是很完善),避免了數據的丟失(相對而言,不是絕對的)
 
並行度:
      1. linesDStream里面封裝的是RDD,RDD里面有partition,RDD里面的partition與這個topic的partition是一一對應的
      2.從kafka中讀來的數據,封裝到一個DStream中,可以 對這個DStream重分區,lines.repartition(10),增加partition的數量,提高並行度。
 
並行度:
    batch->rdd->DStream
    batchInterval 5s
    blockInterval = 200ms
    
    batch = 25block
    將一個 blockInterval設置的小一些,有更多的block,對應更多的split,也就有更多的partition,從而提高並行度
官方建議:blockInterval不要低於50ms,否則 batchInterval/ blockInterval 得到的block過多,partition就過多,啟動多個線程並行計算,影響執行job的性能
 
Receive模式    ||    Direct模式     最大的不同:消費偏移量誰來管理
 

 ©All rights reserved


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM