kafka的message包括哪些信息
一個Kafka的Message由一個固定長度的header和一個變長的消息體body組成
header部分由一個字節的magic(文件格式)和四個字節的CRC32(用於判斷body消息體是否正常)構成。當magic的值為1的時候,會在magic和crc32之間多一個字節的數據:attributes(保存一些相關屬性,比如是否壓縮、壓縮格式等等);如果magic的值為0,那么不存在attributes屬性
body是由N個字節構成的一個消息體,包含了具體的key/value消息
怎么查看kafka的offset
0.9版本以上,可以用最新的Consumer client 客戶端,有consumer.seekToEnd() / consumer.position() 可以用於得到當前最新的offset:
hadoop的shuffle過程
一、Map端的shuffle
Map端會處理輸入數據並產生中間結果,這個中間結果會寫到本地磁盤,而不是HDFS。每個Map的輸出會先寫到內存緩沖區中,當寫入的數據達到設定的閾值時,系統將會啟動一個線程將緩沖區的數據寫到磁盤,這個過程叫做spill。
在spill寫入之前,會先進行二次排序,首先根據數據所屬的partition進行排序,然后每個partition中的數據再按key來排序。partition的目是將記錄划分到不同的Reducer上去,以期望能夠達到負載均衡,以后的Reducer就會根據partition來讀取自己對應的數據。接着運行combiner(如果設置了的話),combiner的本質也是一個Reducer,其目的是對將要寫入到磁盤上的文件先進行一次處理,這樣,寫入到磁盤的數據量就會減少。最后將數據寫到本地磁盤產生spill文件(spill文件保存在{mapred.local.dir}指定的目錄中,Map任務結束后就會被刪除)。
最后,每個Map任務可能產生多個spill文件,在每個Map任務完成前,會通過多路歸並算法將這些spill文件歸並成一個文件。至此,Map的shuffle過程就結束了。
二、Reduce端的shuffle
Reduce端的shuffle主要包括三個階段,copy、sort(merge)和reduce。
首先要將Map端產生的輸出文件拷貝到Reduce端,但每個Reducer如何知道自己應該處理哪些數據呢?因為Map端進行partition的時候,實際上就相當於指定了每個Reducer要處理的數據(partition就對應了Reducer),所以Reducer在拷貝數據的時候只需拷貝與自己對應的partition中的數據即可。每個Reducer會處理一個或者多個partition,但需要先將自己對應的partition中的數據從每個Map的輸出結果中拷貝過來。
接下來就是sort階段,也成為merge階段,因為這個階段的主要工作是執行了歸並排序。從Map端拷貝到Reduce端的數據都是有序的,所以很適合歸並排序。最終在Reduce端生成一個較大的文件作為Reduce的輸入。
最后就是Reduce過程了,在這個過程中產生了最終的輸出結果,並將其寫到HDFS上。
spark集群運算的模式
Spark 有很多種模式,最簡單就是單機本地模式,還有單機偽分布式模式,復雜的則運行在集群中,目前能很好的運行在 Yarn和 Mesos 中,當然 Spark 還有自帶的 Standalone 模式,對於大多數情況 Standalone 模式就足夠了,如果企業已經有 Yarn 或者 Mesos 環境,也是很方便部署的。
standalone(集群模式):典型的Mater/slave模式,不過也能看出Master是有單點故障的;Spark支持ZooKeeper來實現 HA
on yarn(集群模式): 運行在 yarn 資源管理器框架之上,由 yarn 負責資源管理,Spark 負責任務調度和計算
on mesos(集群模式): 運行在 mesos 資源管理器框架之上,由 mesos 負責資源管理,Spark 負責任務調度和計算
on cloud(集群模式):比如 AWS 的 EC2,使用這個模式能很方便的訪問 Amazon的 S3;Spark 支持多種分布式存儲系統:HDFS 和 S3
HDFS讀寫數據的過程
讀:
1、跟namenode通信查詢元數據,找到文件塊所在的datanode服務器
2、挑選一台datanode(就近原則,然后隨機)服務器,請求建立socket流
3、datanode開始發送數據(從磁盤里面讀取數據放入流,以packet為單位來做校驗)
4、客戶端以packet為單位接收,現在本地緩存,然后寫入目標文件
寫:
1、根namenode通信請求上傳文件,namenode檢查目標文件是否已存在,父目錄是否存在
2、namenode返回是否可以上傳
3、client請求第一個 block該傳輸到哪些datanode服務器上
4、namenode返回3個datanode服務器ABC
5、client請求3台dn中的一台A上傳數據(本質上是一個RPC調用,建立pipeline),A收到請求會繼續調用B,然后B調用C,將真個pipeline建立完成,逐級返回客戶端
6、client開始往A上傳第一個block(先從磁盤讀取數據放到一個本地內存緩存),以packet為單位,A收到一個packet就會傳給B,B傳給C;A每傳一個packet會放入一個應答隊列等待應答
7、當一個block傳輸完成之后,client再次請求namenode上傳第二個block的服務器。
RDD中reduceBykey與groupByKey哪個性能好,為什么
reduceByKey:reduceByKey會在結果發送至reducer之前會對每個mapper在本地進行merge,有點類似於在MapReduce中的combiner。這樣做的好處在於,在map端進行一次reduce之后,數據量會大幅度減小,從而減小傳輸,保證reduce端能夠更快的進行結果計算。
groupByKey:groupByKey會對每一個RDD中的value值進行聚合形成一個序列(Iterator),此操作發生在reduce端,所以勢必會將所有的數據通過網絡進行傳輸,造成不必要的浪費。同時如果數據量十分大,可能還會造成OutOfMemoryError。
通過以上對比可以發現在進行大量數據的reduce操作時候建議使用reduceByKey。不僅可以提高速度,還是可以防止使用groupByKey造成的內存溢出問題。
spark sql怎么取數據的差集
好像不支持
spark2.0的了解
更簡單:ANSI SQL與更合理的API
速度更快:用Spark作為編譯器
更智能:Structured Streaming
rdd 怎么分區寬依賴和窄依賴
寬依賴:父RDD的分區被子RDD的多個分區使用 例如 groupByKey、reduceByKey、sortByKey等操作會產生寬依賴,會產生shuffle
窄依賴:父RDD的每個分區都只被子RDD的一個分區使用 例如map、filter、union等操作會產生窄依賴
spark streaming 讀取kafka數據的兩種方式
這兩種方式分別是:
Receiver-base
使用Kafka的高層次Consumer API來實現。receiver從Kafka中獲取的數據都存儲在Spark Executor的內存中,然后Spark Streaming啟動的job會去處理那些數據。然而,在默認的配置下,這種方式可能會因為底層的失敗而丟失數據。如果要啟用高可靠機制,讓數據零丟失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分布式文件系統(比如HDFS)上的預寫日志中。所以,即使底層節點出現了失敗,也可以使用預寫日志中的數據進行恢復。
Direct
Spark1.3中引入Direct方式,用來替代掉使用Receiver接收數據,這種方式會周期性地查詢Kafka,獲得每個topic+partition的最新的offset,從而定義每個batch的offset的范圍。當處理數據的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數據。
kafka的數據存在內存還是磁盤
Kafka最核心的思想是使用磁盤,而不是使用內存,可能所有人都會認為,內存的速度一定比磁盤快,我也不例外。在看了Kafka的設計思想,查閱了相應資料再加上自己的測試后,發現磁盤的順序讀寫速度和內存持平。
而且Linux對於磁盤的讀寫優化也比較多,包括read-ahead和write-behind,磁盤緩存等。如果在內存做這些操作的時候,一個是JAVA對象的內存開銷很大,另一個是隨着堆內存數據的增多,JAVA的GC時間會變得很長,使用磁盤操作有以下幾個好處:
磁盤緩存由Linux系統維護,減少了程序員的不少工作。
磁盤順序讀寫速度超過內存隨機讀寫。
JVM的GC效率低,內存占用大。使用磁盤可以避免這一問題。
系統冷啟動后,磁盤緩存依然可用。
怎么解決kafka的數據丟失
producer端:
宏觀上看保證數據的可靠安全性,肯定是依據分區數做好數據備份,設立副本數。
broker端:
topic設置多分區,分區自適應所在機器,為了讓各分區均勻分布在所在的broker中,分區數要大於broker數。
分區是kafka進行並行讀寫的單位,是提升kafka速度的關鍵。
Consumer端
consumer端丟失消息的情形比較簡單:如果在消息處理完成前就提交了offset,那么就有可能造成數據的丟失。由於Kafka consumer默認是自動提交位移的,所以在后台提交位移前一定要保證消息被正常處理了,因此不建議采用很重的處理邏輯,如果處理耗時很長,則建議把邏輯放到另一個線程中去做。為了避免數據丟失,現給出兩點建議:
enable.auto.commit=false 關閉自動提交位移
在消息被完整處理之后再手動提交位移
---------------------
作者:godblesspl
來源:CSDN
原文:https://blog.csdn.net/godblesspl/article/details/79393958
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!