【題外話】
感覺自己很沮喪。我自己認為,我的內心很純凈。
廢話說了那么多,我想總結一下最近我學習spark的經驗。
最近使用spark做了一個數據分析的項目。
項目采用的基礎環境是:spark 1.6.0 hbase 0.98.17 hadoop2.5.2
項目的構建工具是:maven
項目采用的語言是 :java
這個項目分兩個方面:
1.架構方面
2.實現方面
這也是我第一次正式的接觸設計分布式存儲和分布式計算的項目。
本次簡單介紹一下分布式存儲的簡單知識:hbase的數據存儲 spark並發計算框架
先假設存在N台機器,然后假設存在N1個虛擬的邏輯上的存放數據的槽,N和N1沒有什么關系。
在處理數據前,先把N1個虛擬的槽和N個機器建立好對應關系。
在處理數據時,把數據按照某一種規則存入一個虛擬的槽內,這時,就能知道數據存入那一台機器。
1.首先預分配region,讓每個region負責存儲一部分數據,預創建 N
region個,以(負無窮,001),{001,002),...{N,正無窮)這樣創建
region個,以(負無窮,001),{001,002),...{N,正無窮)這樣創建
2.拿到具體數據后,先對數據進行hash,生成hashcode,然后使用hashcode%(N 1),得到的數據肯定會落在上述的某一個區間,然后將數據存入
假設我們計算的時候需要目標表A,該表在hbase中有3個region。
在假設該表的region1的startKey endKey為:負無窮,001
假設該表的region2的startKey
endKey為:001 , 002
endKey為:001 , 002
假設該表的region3的startKey
endKey為:002 ,正無窮
endKey為:002 ,正無窮
再次假設,該表存儲了10條數據:(partition:time:deviceid,這是rowkey的組成格式)
rowKey 000:2016031700:0012
rowKey 000:2016031700:0013
rowKey 000:2016031800:0013
rowKey 001:2016031700:0015
rowKey 001:2016031720:0016
rowKey 001:2016031800:0022
rowKey 002:2016031200:0015
rowKey 002:2016031800:0033
rowKey 002:2016031900:0022
rowKey 002:2016031900:0033
那么以前綴為000的數據會放在region1,也就是前面的那三條,以前綴001的數據會放在region2,也就是中間的那三條,以前綴為002的數據會放在region3,也就是最后的四條。
此時,在上述的假設的基礎上,我們要計算2016031700時間對應的deviceid分別出現了多少次。計算方法如下:
1.拿到table
A對應的region的startkey和endkey對應的值,也就是上述的region1,region2,region3對應的startKey
endKey對應的值:ListKey
A對應的region的startkey和endkey對應的值,也就是上述的region1,region2,region3對應的startKey
endKey對應的值:ListKey
2.遍歷ListKey,拿出KeyPair(first:startKey,second:endKey),假設第一次循環拿到region1的keyPair,也就是{負無窮,001),這時生成startKey
000:2016031700,生成endKey:001,即KeyPair(first:000:2016031700,second:001),放入SplitList
000:2016031700,生成endKey:001,即KeyPair(first:000:2016031700,second:001),放入SplitList
3.依次遍歷ListKey后,生成SplitList
4.spark使用上述的SplitList進行assign
task.
task.
我這里主要進行spark的task的生產策略的簡單描述和rdd
partition的簡單描述。
partition的簡單描述。
先普及二個知識:
1.spark的並行度
spark中,task是運行在worker進程中的,一個worker占用一個或者多個core。worker的實例又可以設置多個。故一個集群的task最大並行度為:SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES有過storm開發經驗的同學,可能對這點很熟悉(worker--executor--task),
storm里面對cpu
core的這點不知道是否支持的良好。 希望對storm精通的同學能幫忙深度剖析一下這個storm
task並發的知識。
core的這點不知道是否支持的良好。 希望對storm精通的同學能幫忙深度剖析一下這個storm
task並發的知識。
2.sparkRdd的partition的個數和spark的並行度的關系:spark
rdd的partition的個數和spark的task總數的關系是對應的。
rdd的partition的個數和spark的task總數的關系是對應的。
上面介紹了數據的存儲,現在派上用場了。我們針對上面的數據的存儲模型,進行相應的數據計算。由於我是前天晚上才開始接觸spark的,所有現在概念還不是特別熟悉,可能很多地方都存在錯誤的描述。如果能及時發現,請幫忙指正,多謝啊。
我們采用java編程。(spark的源碼是scala編寫的)
JavaPairRDD hBaseRDD = sparkContext.newAPIHadoopRDD(hbaseConf,SDKTableInputFormat.class, ImmutableBytesWritable.class, Result.class);
生成一個RDD。這里面有一個類:TableInputFormat。他是TableInputFormater的派生類。追蹤到最低層的話就是重寫hadoop的InputFormat和RecordRead這兩個。有興趣的同學可以去看看這個。我這里提供一個demo的網址:
http://blog.csdn.net/zhongyifly/article/details/25156145。這個rdd的partition的個數其實就是其內部生成的tableSplitList的鏈表的長度。也就決定了partition的數量。這個partition的數據跟上面兩篇郵件中的partition的數量是一致的。
這時,我們就可以拿到數據進行分析了。

![[spark]使用spark進行hbase數據分析 [spark]使用spark進行hbase數據分析](/image/aHR0cDovL3MxLnNpbmFpbWcuY24vbXc2OTAvMDAyV2RVSzF6eTcwRzdNb1NkaWYwJjY5MA==.png)
![[spark]使用spark進行hbase數據分析 [spark]使用spark進行hbase數據分析](/image/aHR0cDovL3MyLnNpbmFpbWcuY24vbXc2OTAvMDAyV2RVSzF6eTcwRzhTMUFJaDIxJjY5MA==.png)