spark面試問題
1、spark中的RDD是什么,有哪些特性
-
RDD(Resilient Distributed Dataset)叫做彈性分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、里面的元素可並行計算的集合。
- Dataset:就是一個集合,用於存放數據的
- Distributed:分布式,可以並行在集群計算
- Resilient:表示彈性的
- RDD的彈性體現在哪里?
- 1、自動的進行內存和磁盤數據存儲的切換;
- 2、基於lineage的高效容錯
- 3、task如果失敗會特定次數的重試 (默認4次)
- 4、stage如果失敗會自動進行特定次數的重試,而且只會只計算失敗的分片
- 5、checkpoint【每次對RDD操作都會產生新的RDD,如果鏈條比較長,計算比較笨重,就把數據放在硬盤中】和persist 【內存或磁盤中對數據進行復用】(檢查點、持久化)
- 6、任務調度彈性:DAG TASK 和資源管理無關
- spark程序就是一個計算任務的邏輯,哪里可以給當前這個任務提供計算資源,就可以把任務提交到哪里去運行
- standAlone
- yarn
- yarn-cluster
- yarn-client
- mesos
- spark程序就是一個計算任務的邏輯,哪里可以給當前這個任務提供計算資源,就可以把任務提交到哪里去運行
- 7、數據分片的高度彈性repartition
- RDD的彈性體現在哪里?
-
RDD五大特性:
- A list of partitions
一個分區列表,RDD中的數據都存在一個分區列表里面 - A function for computing each split
作用在每一個分區中的函數 - A list of dependencies on other RDDs
一個RDD依賴於其他多個RDD,這個點很重要,RDD的容錯機制就是依據這個特性而來的 - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
可選的,針對於kv類型的RDD才具有這個特性,作用是決定了數據的來源以及數據處理后的去向 - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
可選項,數據本地性,數據位置最優
- A list of partitions
2、概述一下spark中的常用算子區別(map、mapPartitions、foreach、foreachPartition)
- map:用於遍歷RDD,將函數f應用於每一個元素,返回新的RDD(transformation算子)。
- foreach:用於遍歷RDD,將函數f應用於每一個元素,無返回值(action算子)。
- mapPartitions:用於遍歷操作RDD中的每一個分區,返回生成一個新的RDD(transformation算子)。
- foreachPartition: 用於遍歷操作RDD中的每一個分區。無返回值(action算子)。
- 總結:一般使用mapPartitions或者foreachPartition算子比map和foreach更加高效,推薦使用。
3、簡述reduceByKey和groupByKey區別
- reduceByKey
- 會使用用戶自定義的函數在每個節點先進行預聚合,減少數據的拉取量,減少數據的網絡傳輸
- 類似於mapreduce中的combiner
- groupByKey
- 不會進行數據的聚合,全量拉取數據,性能較差
4、yarn-cluster和yarn-client的區別
- yarn-cluster
- driver運行在ApplicationMaster上,並且后期從yarn中申請資源,該進程會運行在yarn的container內,所有啟動AM的client可以立即關閉而不用持續到Application的生命周期
- yarn-client
- 程序在哪里提交任務,就在當前機器產生driver,driver需要跟后面的task進行通信,這個時候的client不可以干掉,干掉client就相當於把整個Application的driver干掉了。
5、談談spark中的寬窄依賴
- RDD和它依賴的父RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
- 寬依賴:指的是多個子RDD的Partition會依賴同一個父RDD的Partition
- 窄依賴:指的是每一個父RDD的Partition最多被子RDD的一個Partition使用。
6、spark中如何划分stage
- 1.Spark Application中可以因為不同的Action觸發眾多的job,一個Application中可以有很多的job,每個job是由一個或者多個Stage構成的,后面的Stage依賴於前面的Stage,也就是說只有前面依賴的Stage計算完畢后,后面的Stage才會運行。
- 2.Stage划分的依據就是寬依賴,何時產生寬依賴,例如reduceByKey,groupByKey的算子,會導致寬依賴的產生。
- 3.由Action(例如collect)導致了SparkContext.runJob的執行,最終導致了DAGScheduler中的submitJob的執行,其核心是通過發送一個case class JobSubmitted對象給eventProcessLoop。
eventProcessLoop是DAGSchedulerEventProcessLoop的具體實例,而DAGSchedulerEventProcessLoop是eventLoop的子類,具體實現EventLoop的onReceive方法,onReceive方法轉過來回調doOnReceive - 4.在doOnReceive中通過模式匹配的方法把執行路由到
- 5.在handleJobSubmitted中首先創建finalStage,創建finalStage時候會建立父Stage的依賴鏈條
- 總結:以來是從代碼的邏輯層面上來展開說的,可以簡單點說:寫介紹什么是RDD中的寬窄依賴,然后在根據DAG有向無環圖進行划分,從當前job的最后一個算子往前推,遇到寬依賴,那么當前在這個批次中的所有算子操作都划分成一個stage,然后繼續按照這種方式在繼續往前推,如在遇到寬依賴,又划分成一個stage,一直到最前面的一個算子。最后整個job會被划分成多個stage,而stage之間又存在依賴關系,后面的stage依賴於前面的stage。
7、spark-submit的時候如何引入外部jar包
- 在通過spark-submit提交任務時,可以通過添加配置參數來指定
- --driver-class-path 外部jar包
- 它會把jar下發到driver端
- --jars 外部jar包
- 它會把jar包下發到每一個executor進程中
- --driver-class-path 外部jar包
8、spark 如何防止內存溢出
- driver端的內存溢出
- 可以增大driver的內存參數:spark.driver.memory (default 1g)
- 這個參數用來設置Driver的內存。在Spark程序中,SparkContext,DAGScheduler都是運行在Driver端的。對應rdd的Stage切分也是在Driver端運行,如果用戶自己寫的程序有過多的步驟,切分出過多的Stage,這部分信息消耗的是Driver的內存,這個時候就需要調大Driver的內存。
- map過程產生大量對象導致內存溢出
- 這種溢出的原因是在單個map中產生了大量的對象導致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),這個操作在rdd中,每個對象都產生了10000個對象,這肯定很容易產生內存溢出的問題。針對這種問題,在不增加內存的情況下,可以通過減少每個Task的大小,以便達到每個Task即使產生大量的對象Executor的內存也能夠裝得下。具體做法可以在會產生大量對象的map操作之前調用repartition方法,分區成更小的塊傳入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
面對這種問題注意,不能使用rdd.coalesce方法,這個方法只能減少分區,不能增加分區,不會有shuffle的過程。
- 這種溢出的原因是在單個map中產生了大量的對象導致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),這個操作在rdd中,每個對象都產生了10000個對象,這肯定很容易產生內存溢出的問題。針對這種問題,在不增加內存的情況下,可以通過減少每個Task的大小,以便達到每個Task即使產生大量的對象Executor的內存也能夠裝得下。具體做法可以在會產生大量對象的map操作之前調用repartition方法,分區成更小的塊傳入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
- 數據不平衡導致內存溢出
- 數據不平衡除了有可能導致內存溢出外,也有可能導致性能的問題,解決方法和上面說的類似,就是調用repartition重新分區。這里就不再累贅了。
- shuffle后內存溢出
- shuffle內存溢出的情況可以說都是shuffle后,單個文件過大導致的。在Spark中,join,reduceByKey這一類型的過程,都會有shuffle的過程,在shuffle的使用,需要傳入一個partitioner,大部分Spark中的shuffle操作,默認的partitioner都是HashPatitioner,默認值是父RDD中最大的分區數,這個參數通過spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism參數只對HashPartitioner有效,所以如果是別的Partitioner或者自己實現的Partitioner就不能使用spark.default.parallelism這個參數來控制shuffle的並發量了。如果是別的partitioner導致的shuffle內存溢出,就需要從partitioner的代碼增加partitions的數量。
- standalone模式下資源分配不均勻導致內存溢出
- 在standalone的模式下如果配置了--total-executor-cores 和 --executor-memory 這兩個參數,但是沒有配置--executor-cores這個參數的話,就有可能導致,每個Executor的memory是一樣的,但是cores的數量不同,那么在cores數量多的Executor中,由於能夠同時執行多個Task,就容易導致內存溢出的情況。這種情況的解決方法就是同時配置--executor-cores或者spark.executor.cores參數,確保Executor資源分配均勻。
- 使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()
- rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等價的,在內存不足的時候rdd.cache()的數據會丟失,再次使用的時候會重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在內存不足的時候會存儲在磁盤,避免重算,只是消耗點IO時間。
9、spark中cache和persist的區別
- cache:緩存數據,默認是緩存在內存中,其本質還是調用persist
- persist:緩存數據,有豐富的數據緩存策略。數據可以保存在內存也可以保存在磁盤中,使用的時候指定對應的緩存級別就可以了。
- 什么時候設置緩存
- 1、某個rdd后期被使用了多次
- val rdd2=rdd1.map()
- val rdd2=rdd1.map()
- 1、某個rdd后期被使用了多次
- 什么時候設置緩存
10、簡要描述Spark分布式集群搭建的步驟
- 地球人都知道
- 這里可以概述下如何搭建高可用的spark集群(HA)
- 主要是引入了zookeeper
11、spark中的數據傾斜的現象、原因、后果
- (1)、數據傾斜的現象
- 多數task執行速度較快,少數task執行時間非常長,或者等待很長時間后提示你內存不足,執行失敗。
- (2)、數據傾斜的原因
- 數據問題
- 1、key本身分布不均衡(包括大量的key為空)
- 2、key的設置不合理
- spark使用問題
- 1、shuffle時的並發度不夠
- 2、計算方式有誤
- 數據問題
- (3)、數據傾斜的后果
- 1、spark中的stage的執行時間受限於最后那個執行完成的task,因此運行緩慢的任務會拖垮整個程序的運行速度(分布式程序運行的速度是由最慢的那個task決定的)。
- 2、過多的數據在同一個task中運行,將會把executor撐爆。
12、如何解決spark中的數據傾斜問題
- 發現數據傾斜的時候,不要急於提高executor的資源,修改參數或是修改程序,首先要檢查數據本身,是否存在異常數據。
- 1、數據問題造成的數據傾斜
- 找出異常的key
- 如果任務長時間卡在最后最后1個(幾個)任務,首先要對key進行抽樣分析,判斷是哪些key造成的。
選取key,對數據進行抽樣,統計出現的次數,根據出現次數大小排序取出前幾個。 - 比如: df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(+).map(k=>(k.2,k.1)).sortByKey(false).take(10)
- 如果發現多數數據分布都較為平均,而個別數據比其他數據大上若干個數量級,則說明發生了數據傾斜。
- 如果任務長時間卡在最后最后1個(幾個)任務,首先要對key進行抽樣分析,判斷是哪些key造成的。
- 經過分析,傾斜的數據主要有以下三種情況:
- 1、null(空值)或是一些無意義的信息()之類的,大多是這個原因引起。
- 2、無效數據,大量重復的測試數據或是對結果影響不大的有效數據。
- 3、有效數據,業務導致的正常數據分布。
- 解決辦法
- 第1,2種情況,直接對數據進行過濾即可(因為該數據對當前業務不會產生影響)。
- 第3種情況則需要進行一些特殊操作,常見的有以下幾種做法
- (1) 隔離執行,將異常的key過濾出來單獨處理,最后與正常數據的處理結果進行union操作。
- (2) 對key先添加隨機值,進行操作后,去掉隨機值,再進行一次操作。
- (3) 使用reduceByKey 代替 groupByKey(reduceByKey用於對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,並且merge操作可以通過函數自定義.)
- (4) 使用map join。
- 案例
- 如果使用reduceByKey因為數據傾斜造成運行失敗的問題。具體操作流程如下:
- (1) 將原始的 key 轉化為 key + 隨機值(例如Random.nextInt)
- (2) 對數據進行 reduceByKey(func)
- (3) 將 key + 隨機值 轉成 key
- (4) 再對數據進行 reduceByKey(func)
- 如果使用reduceByKey因為數據傾斜造成運行失敗的問題。具體操作流程如下:
- 案例操作流程分析:
- 假設說有傾斜的Key,我們給所有的Key加上一個隨機數,然后進行reduceByKey操作;此時同一個Key會有不同的隨機數前綴,在進行reduceByKey操作的時候原來的一個非常大的傾斜的Key就分而治之變成若干個更小的Key,不過此時結果和原來不一樣,怎么破?進行map操作,目的是把隨機數前綴去掉,然后再次進行reduceByKey操作。(當然,如果你很無聊,可以再次做隨機數前綴),這樣我們就可以把原本傾斜的Key通過分而治之方案分散開來,最后又進行了全局聚合
- 注意1: 如果此時依舊存在問題,建議篩選出傾斜的數據單獨處理。最后將這份數據與正常的數據進行union即可。
- 注意2: 單獨處理異常數據時,可以配合使用Map Join解決。
- 找出異常的key
- 2、spark使用不當造成的數據傾斜
- 提高shuffle並行度
- dataFrame和sparkSql可以設置spark.sql.shuffle.partitions參數控制shuffle的並發度,默認為200。
- rdd操作可以設置spark.default.parallelism控制並發度,默認參數由不同的Cluster Manager控制。
- 局限性: 只是讓每個task執行更少的不同的key。無法解決個別key特別大的情況造成的傾斜,如果某些key的大小非常大,即使一個task單獨執行它,也會受到數據傾斜的困擾。
- 使用map join 代替reduce join
- 在小表不是特別大(取決於你的executor大小)的情況下使用,可以使程序避免shuffle的過程,自然也就沒有數據傾斜的困擾了.(詳細見http://blog.csdn.net/lsshlsw/article/details/50834858、http://blog.csdn.net/lsshlsw/article/details/48694893)
- 局限性: 因為是先將小數據發送到每個executor上,所以數據量不能太大。
- 提高shuffle並行度
- 1、數據問題造成的數據傾斜
13、flume整合sparkStreaming問題
- (1)、如何實現sparkStreaming讀取flume中的數據
- 可以這樣說:
- 前期經過技術調研,查看官網相關資料,發現sparkStreaming整合flume有2種模式,一種是拉模式,一種是推模式,然后在簡單的聊聊這2種模式的特點,以及如何部署實現,需要做哪些事情,最后對比兩種模式的特點,選擇那種模式更好。
- 推模式:Flume將數據Push推給Spark Streaming
- 拉模式:Spark Streaming從flume 中Poll拉取數據
- 前期經過技術調研,查看官網相關資料,發現sparkStreaming整合flume有2種模式,一種是拉模式,一種是推模式,然后在簡單的聊聊這2種模式的特點,以及如何部署實現,需要做哪些事情,最后對比兩種模式的特點,選擇那種模式更好。
- 可以這樣說:
- (2)、在實際開發的時候是如何保證數據不丟失的
- 可以這樣說:
- flume那邊采用的channel是將數據落地到磁盤中,保證數據源端安全性(可以在補充一下,flume在這里的channel可以設置為memory內存中,提高數據接收處理的效率,但是由於數據在內存中,安全機制保證不了,故選擇channel為磁盤存儲。整個流程運行有一點的延遲性)
- sparkStreaming通過拉模式整合的時候,使用了FlumeUtils這樣一個類,該類是需要依賴一個額外的jar包(spark-streaming-flume_2.10)
- 要想保證數據不丟失,數據的准確性,可以在構建StreamingConext的時候,利用StreamingContext.getOrCreate(checkpoint, creatingFunc: () => StreamingContext)來創建一個StreamingContext,使用StreamingContext.getOrCreate來創建StreamingContext對象,傳入的第一個參數是checkpoint的存放目錄,第二參數是生成StreamingContext對象的用戶自定義函數。如果checkpoint的存放目錄存在,則從這個目錄中生成StreamingContext對象;如果不存在,才會調用第二個函數來生成新的StreamingContext對象。在creatingFunc函數中,除了生成一個新的StreamingContext操作,還需要完成各種操作,然后調用ssc.checkpoint(checkpointDirectory)來初始化checkpoint功能,最后再返回StreamingContext對象。
這樣,在StreamingContext.getOrCreate之后,就可以直接調用start()函數來啟動(或者是從中斷點繼續運行)流式應用了。如果有其他在啟動或繼續運行都要做的工作,可以在start()調用前執行。 - 流式計算中使用checkpoint的作用:
- 保存元數據,包括流式應用的配置、流式沒崩潰之前定義的各種操作、未完成所有操作的batch。元數據被存儲到容忍失敗的存儲系統上,如HDFS。這種ckeckpoint主要針對driver失敗后的修復。
- 保存流式數據,也是存儲到容忍失敗的存儲系統上,如HDFS。這種ckeckpoint主要針對window operation、有狀態的操作。無論是driver失敗了,還是worker失敗了,這種checkpoint都夠快速恢復,而不需要將很長的歷史數據都重新計算一遍(以便得到當前的狀態)。
- 設置流式數據checkpoint的周期
- 對於一個需要做checkpoint的DStream結構,可以通過調用DStream.checkpoint(checkpointInterval)來設置ckeckpoint的周期,經驗上一般將這個checkpoint周期設置成batch周期的5至10倍。
- 使用write ahead logs功能
- 這是一個可選功能,建議加上。這個功能將使得輸入數據寫入之前配置的checkpoint目錄。這樣有狀態的數據可以從上一個checkpoint開始計算。開啟的方法是把spark.streaming.receiver.writeAheadLogs.enable這個property設置為true。另外,由於輸入RDD的默認StorageLevel是MEMORY_AND_DISK_2,即數據會在兩台worker上做replication。實際上,Spark Streaming模式下,任何從網絡輸入數據的Receiver(如kafka、flume、socket)都會在兩台機器上做數據備份。如果開啟了write ahead logs的功能,建議把StorageLevel改成MEMORY_AND_DISK_SER。修改的方法是,在創建RDD時由參數傳入。
- 使用以上的checkpoint機制,確實可以保證數據0丟失。但是一個前提條件是,數據發送端必須要有緩存功能,這樣才能保證在spark應用重啟期間,數據發送端不會因為spark streaming服務不可用而把數據丟棄。而flume具備這種特性,同樣kafka也具備。
- 可以這樣說:
- (3)Spark Streaming的數據可靠性
- 有了checkpoint機制、write ahead log機制、Receiver緩存機器、可靠的Receiver(即數據接收並備份成功后會發送ack),可以保證無論是worker失效還是driver失效,都是數據0丟失。原因是:如果沒有Receiver服務的worker失效了,RDD數據可以依賴血統來重新計算;如果Receiver所在worker失敗了,由於Reciever是可靠的,並有write ahead log機制,則收到的數據可以保證不丟;如果driver失敗了,可以從checkpoint中恢復數據重新構建。
14、kafka整合sparkStreaming問題
- (1)、如何實現sparkStreaming讀取kafka中的數據
- 可以這樣說:在kafka0.10版本之前有二種方式與sparkStreaming整合,一種是基於receiver,一種是direct,然后分別闡述這2種方式分別是什么
- receiver:是采用了kafka高級api,利用receiver接收器來接受kafka topic中的數據,從kafka接收來的數據會存儲在spark的executor中,之后spark streaming提交的job會處理這些數據,kafka中topic的偏移量是保存在zk中的。
- 基本使用: val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) - 還有幾個需要注意的點:
- 在Receiver的方式中,Spark中的partition和kafka中的partition並不是相關的,所以如果我們加大每個topic的partition數量,僅僅是增加線程來處理由單一Receiver消費的主題。但是這並沒有增加Spark在處理數據上的並行度.
- 對於不同的Group和topic我們可以使用多個Receiver創建不同的Dstream來並行接收數據,之后可以利用union來統一成一個Dstream。
- 在默認配置下,這種方式可能會因為底層的失敗而丟失數據. 因為receiver一直在接收數據,在其已經通知zookeeper數據接收完成但是還沒有處理的時候,executor突然掛掉(或是driver掛掉通知executor關閉),緩存在其中的數據就會丟失. 如果希望做到高可靠, 讓數據零丟失,如果我們啟用了Write Ahead Logs(spark.streaming.receiver.writeAheadLog.enable=true)該機制會同步地將接收到的Kafka數據寫入分布式文件系統(比如HDFS)上的預寫日志中. 所以, 即使底層節點出現了失敗, 也可以使用預寫日志中的數據進行恢復. 復制到文件系統如HDFS,那么storage level需要設置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
- 基本使用: val kafkaStream = KafkaUtils.createStream(streamingContext,
- direct:在spark1.3之后,引入了Direct方式。不同於Receiver的方式,Direct方式沒有receiver這一層,其會周期性的獲取Kafka中每個topic的每個partition中的最新offsets,之后根據設定的maxRatePerPartition來處理每個batch。(設置spark.streaming.kafka.maxRatePerPartition=10000。限制每秒鍾從topic的每個partition最多消費的消息條數)。
- receiver:是采用了kafka高級api,利用receiver接收器來接受kafka topic中的數據,從kafka接收來的數據會存儲在spark的executor中,之后spark streaming提交的job會處理這些數據,kafka中topic的偏移量是保存在zk中的。
- 可以這樣說:在kafka0.10版本之前有二種方式與sparkStreaming整合,一種是基於receiver,一種是direct,然后分別闡述這2種方式分別是什么
- (2) 對比這2中方式的優缺點:
- 采用receiver方式:這種方式可以保證數據不丟失,但是無法保證數據只被處理一次,WAL實現的是At-least-once語義(至少被處理一次),如果在寫入到外部存儲的數據還沒有將offset更新到zookeeper就掛掉,這些數據將會被反復消費. 同時,降低了程序的吞吐量。
- 采用direct方式:相比Receiver模式而言能夠確保機制更加健壯. 區別於使用Receiver來被動接收數據, Direct模式會周期性地主動查詢Kafka, 來獲得每個topic+partition的最新的offset, 從而定義每個batch的offset的范圍. 當處理數據的job啟動時, 就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數據。
- 優點:
- 1、簡化並行讀取
- 如果要讀取多個partition, 不需要創建多個輸入DStream然后對它們進行union操作. Spark會創建跟Kafka partition一樣多的RDD partition, 並且會並行從Kafka中讀取數據. 所以在Kafka partition和RDD partition之間, 有一個一對一的映射關系.
- 2、高性能
- 如果要保證零數據丟失, 在基於receiver的方式中, 需要開啟WAL機制. 這種方式其實效率低下, 因為數據實際上被復制了兩份, Kafka自己本身就有高可靠的機制, 會對數據復制一份, 而這里又會復制一份到WAL中. 而基於direct的方式, 不依賴Receiver, 不需要開啟WAL機制, 只要Kafka中作了數據的復制, 那么就可以通過Kafka的副本進行恢復.
- 3、一次且僅一次的事務機制
- 基於receiver的方式, 是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的. 這是消費Kafka數據的傳統方式. 這種方式配合着WAL機制可以保證數據零丟失的高可靠性, 但是卻無法保證數據被處理一次且僅一次, 可能會處理兩次. 因為Spark和ZooKeeper之間可能是不同步的. 基於direct的方式, 使用kafka的簡單api, Spark Streaming自己就負責追蹤消費的offset, 並保存在checkpoint中. Spark自己一定是同步的, 因此可以保證數據是消費一次且僅消費一次。不過需要自己完成將offset寫入zk的過程,在官方文檔中都有相應介紹.
*簡單代碼實例:- messages.foreachRDD(rdd=>{
val message = rdd.map(.2)//對數據進行一些操作
message.map(method)//更新zk上的offset (自己實現)
updateZKOffsets(rdd)
}) - sparkStreaming程序自己消費完成后,自己主動去更新zk上面的偏移量。也可以將zk中的偏移量保存在mysql或者redis數據庫中,下次重啟的時候,直接讀取mysql或者redis中的偏移量,獲取到上次消費的偏移量,接着讀取數據。
- messages.foreachRDD(rdd=>{
- 基於receiver的方式, 是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的. 這是消費Kafka數據的傳統方式. 這種方式配合着WAL機制可以保證數據零丟失的高可靠性, 但是卻無法保證數據被處理一次且僅一次, 可能會處理兩次. 因為Spark和ZooKeeper之間可能是不同步的. 基於direct的方式, 使用kafka的簡單api, Spark Streaming自己就負責追蹤消費的offset, 並保存在checkpoint中. Spark自己一定是同步的, 因此可以保證數據是消費一次且僅消費一次。不過需要自己完成將offset寫入zk的過程,在官方文檔中都有相應介紹.
- 1、簡化並行讀取
- 優點: