1.摘要
如果要將企業應用系統按照技術或數據按時間進行划分的話,那么可以以2008年Google推出的分布式文件系統DFS為一個划分標准,2008年之前,由於通信信息技術的弊端,還屬於PC互聯網時代,整個互聯網產生的數據和現在相比只是量級分之一,所以基本上是傳統的企業應用系統,將數據存儲在RDBMS數據庫中,再通過諸如J2EE的軟件技術架構去操作這些數據。2008年之后,隨着通信技術4G的誕生,迎來了在PC互聯網時代基礎上疊加的移動互聯網時代,這個時候產生的數據是巨大的,從最開始的PB級到EB級,甚至對於很多如Google,Alibaba,Tencent等巨頭企業,他們的數據可以達到ZB級。隨着2019年5G被逐漸商用,未來可能會迎來萬物互聯的下一個互聯網時代,數據的量可能還會出現量級的增長,甚至很多專家或學者將移動互聯網和萬物互聯合稱為大數據時代。但不管是2008年前的PC互聯網時代,還是之后的移動互聯網時代,甚至已經到來或即將到來的萬物互聯時代,或者說大數據時代,對數據的操作的要求都是很高,這里的操作具體體現在數據的讀(查詢本質上也是讀,只是帶計算的讀)和寫,以及計算,那么對數據操作的優化是非常有必要的。就目前而言,應用任何一個軟件系統或產品,除網絡狀態以外,如果操作一個系統,需要5到10秒才能響應。那么這個軟件系統基本上失敗的,甚至是垃圾產品。而一個系統的響應速度,很大部分是數據操作的速度決定的。而對於大數據,對數據的操作的響應會要求更高,因為大數據時代在數據的存儲,操作上和傳統的有所不同。對於傳統的系統而言,數據量少,在RDBMS中是直接存儲原始數據,在系統中通過編程進行一些特定的出來就能達到系統的使用效果。而對於大數據時代,數據量的巨大,如果將原始數據直接存儲在數據庫中顯然會帶來很多問題,所在在大數據領域中,往往是先對數據預處理,所以有了ETL,數據治理,數據倉庫,數據湖,數據集市等等一些概念和方案的誕生。不管是RDBMS,還是大數據或者大數據組件,優化是開發一個企業應用系統必不可少的工作。本篇博文主要講述大數據的綜合優化的一些思想,包括宏觀的優化思想,從哪些方面優化及基於目前常用的大數據生態組件某些組件的具體優化方案和手段。
2.宏觀的優化思想
宏觀的優化思想是每個大數據工程師必須具備的常識。例如更高計算性能的服務器資源,分而治之等,接下來會從每個方面具體介紹
2.1.高性能的算力芯片
在現行的對於字符的計算一般服務器都是采用CPU計算,而不同代的CPU的算力是不一樣的,CPU的性能越好,意味着對同一計算的時間更短,而優化本身就是在計算過程中讓耗時達到極限的少。當然對於某些領域如圖計算而言(這里補充一點,萬物皆可量化為數據,圖自然也是數據的一種),會采用CPU+GPU組合作計算
2.2.更充裕的內存
在很多大數據開發場景中,開發者會遇到集群中的某些機器內存被耗光或者內存空間不足導致計算數據的性能變慢,從而導致系統的響應時間不盡人意。想要弄清楚為什么會出現這種情況就必須明白內存,CPU,磁盤(也被稱作外存)之間的關系了。內存是CPU與磁盤的溝通橋梁,計算機中所有程序的運行都是在內存中進行的,因此內存的性能對計算機的影響非常大,內存的作用是將要把磁盤的數據給CPU計算的緩存之地,同時內存的讀寫效率遠遠高於磁盤。那么就意味着在CPU的算力固定下,內存越大,一次算的數據就越大,而內存的讀寫又高,可以整體將計算數據提升。相信很多開發者明白,很多數據庫或者計算引擎會將內存作為數據的暫存之地,如Redis,Spark以提高數據的查詢和計算性能。
2.3.分而治之,治之匯之
個人覺得分而治之的思想是一個大數據開發者入門的必要思想,如果不能悟出分而治之的思想,或者在開發過程中不能很好的將分而治之的思想應用業務以提高產品性能,是一個不合格的大數據開發工程師。下面來舉兩個通俗的示例來解釋分而治之:
a.第一個示例
現在假設你要把200斤的玉米從地里(A地)搬到家里(B地),兩百斤你可以勉強搬得起,但是因為太重你就走得慢,假設需要一個小時。現在假設你搬20斤很輕松五分鍾就可以從A地搬到B地了,那么分五次搬只需要50分鍾了,這個時候就節省了10分鍾,這就叫分而治之。同時因為勞累,你不確定自己搬了幾次,你只能將已搬了的數一下,然后匯聚在一起,才能確定真的搬了200斤全部的。這就叫治之匯之。
b.第二個示例
現在假設有600斤玉米,你一個人從A搬到B,你為了能全部搬完,一次只能搬100斤,所以你要搬六次,假設搬100斤的時間是40分鍾,你一個人搬需要需要6次,總共需要240分鍾。你嫌太累太廢時間,就叫了其他五個你的小伙伴幫你一起搬,這也叫分而治之,很輕松而且耗時很短的就搬完了,但你要確定每個小伙伴是不是真的搬了100斤,有沒有只搬了90斤,所以需要將每個小伙伴的掂量一下確定總數,這就叫治之匯之。
上面兩個示例中把搬運當成一種大數據開發中的一種計算就可以明白,分而治之是有不同的場景的,但對於大數據開發而言,一般都是基於實體(數據),時間的分而治之及匯之。參照下面示例圖:
2.4.空間換時間
不管是上面的高性能的計算芯片,充裕的內存,分而治之還是更大的磁盤空間,本質上都是以空間換時間。眾所周知,一個高性能的系統或產品,帶來的經濟效益是巨大的。而高性能的計算芯片,內存和磁盤在巨大的經濟效益下是微不足道的。目前大數據開發實踐上,本質上也遵循了空間換時間的標准。但是空間不是越大越好,比如一個小的大數據系統,采用了過多的磁盤空間,過多的內存空間等等,是有些浪費的。所以空間換時間更多的應該結合業務,如數據量大小,數據的計算的復雜度等等因素而定的,並不是盲目的增大。
2.5.數據結構的特性
在算力,內存,磁盤約定下,同時遵循了分而治之的思路,另一個方面就是數據結構也能對性能有很好的作用。目前大數據存儲組件常用的大約有十幾種,大數據計算引擎也有四五種,這些組件和計算引擎在很大程度上都是有獨特的數據結構從而在特定的業務上提高性能。比如Hbase在key-value上針對數據量龐大的效果是很好的,那么在業務場景上,就可以應用在比如通過唯一標識找出數據信息,在比如說Clickhouse,將數據按照每一列存儲,也稱為列式存儲。那么一般在數據字段多,但實際應用中不會全查,而是大部分只查部分字段的場景上,應用效果和性能都是很好的。再比如計算引擎Flink,特有的流式數據集就可以很好的提供計算引擎的計算效率。針對於這些事例很多,就不一一計算了,下面以兩張圖展現列式存儲和行式存儲的比較及流行的計算引擎的比較。
列式存儲和行式存儲:
計算引擎的比較:
關於計算引擎之間的原理及對比,個人覺得這篇博文(https://www.cnblogs.com/zdz8207/p/hadoop-spark-flink.html) 寫得很好可以參考
2.6.總結
上述所說的一些宏觀的優化思想,在理論上是很容易理解的。但日常的業務開發可能不僅僅是這么簡單,需要受到實際的情況而定。在實踐業務開發的過程中,往往是對應用的組件,計算引擎,數據庫等各方面,再遵循這些宏觀的思想,之后根據各個組件,計算引擎和數據庫底層設計原理具體的去優化,當然這部分工作是很細致化的。當然,有過大數據開發實踐經驗的開發者都知道,基本上所有的大數據應用組件,在算力,內存,磁盤約定的情況下,都是遵循分而治之和空間換時間的標准的,只是每個組件在數據結構,底層的設計原理上有自己的設計思想和架構從而達到應有的特性及效果。
3.大數據中影響性能的某些關鍵點
3.1.數據傾斜
數據傾斜是很常見的現象,著名的二八理論本質上也是數據傾斜,80%的財富集中在20%的人手中, 80%的用戶只使用20%的功能 , 20%的用戶貢獻了80%的訪問量。數據傾斜簡而言之是不同唯一標識值key的數據分布不均勻,一個key有十萬條數據記錄,而另一個key只有幾百條數據記錄。在實際的大數據業務開發中,針對數據的處理計算是並行計算的,為了簡化理解,假設以一個key為一個任務,很多key組成了多個並行任務,那么數據記錄多的key自然比數據少的key處理計算的時間要長,數據記錄少的key處理完畢后要等待數據記錄多的key,但是在開始任務時申請的資源又是大致相同的。那么會造成兩個問題,第一個問題是數據記錄多的key會拖累整體的性能,第二個問題是數據資源少的key的任務在一定程度上浪費了資源。這兩個問題都是開發者不願看到的。
對於數據傾斜的解決方案就是對每個任務的數據進行均衡,對key比較多的數據記錄進行打散,這依然是分而治之的思想。當然這里是給了初略的方案思路,在后面會針對大數據每個組件解決數據傾斜的具體方案細節。
3.2.一批次數據量過大
一批次數據量過大時非常常見的問題,而在實際的實踐中可以知道,在資源一定的情況下,對數據的處理和計算數據量和計算時間並不是成正比的,假設計算處理1G的數據需要時間是1min,那么處理10G的數據消耗的時間大多情況下不可能是10min,可能是20分鍾甚至更多。這就意味着數據量大會整體影響性能。針對一批數據量過大的問題,現在目前采用的對數據按時間或其他的業務含義進行分區處理,當然最常用的是時間分區,因為時間可以具體的細化,那么久可以很好的將數據量大分為多個小數據的分區。當然數據小不是絕對的,如果小數據太多會產生很多任務,所以數據量應當通過測試評估出來,比如HDFS建議一批的數據接近128M,過多的超過這個數據量過大或過小都會帶來性能問題。
3.3.數據字段過多
數據字段過多,這是針對於格式化的數據格式或數據表。數據字段意味着復雜的計算,那么復雜的計算必然帶來資源的消耗和時間的花費。當然數據字段過多意味着數據架構設計不合理,對於數據字段過多應該依據業務的角度進行表的拆分。
3.4.過多的復雜關聯查詢
在大數據開發中,查詢是必不可少的。而站在業務的角度,復雜的查詢是必然的,復雜的查詢同樣意味着復雜的計算。當然查詢主要以SQL為主,比如在SQL中常用的復雜的關聯查詢,子查詢,去重匯總查詢都會一定程度上影響性能。在這一方面,,沒有具體的方案。具體指導是針對業務對數據表結構和數據優化建模。關於這一點,后面會以某些組件為例,詳細介紹具體的方案。
3.5.數據表設計不合理
在大數據中,數據建模,數據表設計不合理不但會影響性能,同時會對業務系統的正常運行,開發效率帶來很多問題。在這方面,大數據采用的方案是建立合理的數據倉庫。而在數據倉庫中,對數據根據業務進行維度和事實划分,就產生了維表和事實表,這樣就產生了星型模型的設計理念。星型模型一定上能夠對性能,開發效率,系統的正常運行有很好的提升。如果采用雪花模型就會帶來很多問題了。下面是星型模型和雪花模型的概念
a.星型模型
星座模型,是對星型模型的擴展延伸,多張事實表共享維度表。數倉模型建設后期,當一個星型模型為一個實體,又有多個是實體,實體間又共用維表(這個是很常見的),就自然成了星座模型了。大部分維度建模都是星座模型。
b.雪花模型
雪花模型,在星型模型的基礎上,維度表上又關聯了其他維度表。這種模型使用過程中會造成大量的join,維護成本高,性能方面也較差,所以一般不建議使用。尤其是基於hadoop體系構建數倉,減少join就是減少shuffle,性能差距會很大。
4.Hive優化
4.1.Hive造成性能低下下的根源
Hive是hadoop生態下的組件,HiveQL本質上是格式化數據的MapReduce,即從MapReduce的運行角度來考慮優化性能,從更底層思考如何優化運算性能,而不僅僅局限於邏輯代碼層面。所以利用Hive處理數據有以下幾個顯著特征:
1>.數據的大規模並不是負載重點,造成運行壓力過大是因為運行數據的傾斜。
2>.jobs數比較多的作業運行效率相對比較低,比如即使有幾百行的表,如果多次關聯對此匯總,產生幾十個jobs,將會很多時間且大部分時間被用於作業分配,初始化和數據輸出。MapReduce作業初始化的時間是比較耗時間資源的一個部分。
3>.在使用SUM,COUNT,MAX,MIN等UDAF函數時,不怕數據傾斜問題,Hadoop在Map端的匯總合並優化過,使數據傾斜不成問題。
4>.COUNT(DISTINCT)在數據量大的情況下,效率較低,如果多COUNT(DISTINCT)效率更低,因為COUNT(DISTINCT)是按GROUP BY字段分組,按DISTINCT字段排序,一般這種分布式方式是很傾斜的;比如:男UV,女UV,淘寶一天30億的PV,如果按性別分組,分配2個reduce,每個reduce處理15億數據。
5>.數據傾斜是導致效率大幅降低的主要原因,可以采用多一次 Map/Reduce 的方法, 避免傾斜。
4.2.配置角度優化
Hive作為一個成熟的組件,在配置上自然提供了一些優化參數。Hive系統內部已針對不同的查詢預設定了優化方法,用戶可以通過調整配置進行控制,以下舉例介紹部分優化的策略以及優化控制選項。
1>.列裁剪
Hive 在讀數據的時候,可以只讀取查詢中所需要用到的列,而忽略其它列。例如,若有以下查詢:
SELECT a,b FROM q WHERE e<10;
在實施此項查詢中,Q 表有 5 列(a,b,c,d,e),Hive 只讀取查詢邏輯中真實需要 的 3 列 a、b、e,而忽略列 c,d;這樣做節省了讀取開銷,中間表存儲開銷和數據整合開銷。
裁剪所對應的參數項為:hive.optimize.cp=true
(默認值為真)
2>.分區裁剪
可以在查詢的過程中減少不必要的分區。例如,若有以下查詢:
SELECT * FROM (SELECTT a1,COUNT(1) FROM T GROUP BY a1) subq WHERE subq.prtn=100; #(多余分區)
SELECT * FROM T1 JOIN (SELECT * FROM T2) subq ON (T1.a1=subq.a2) WHERE subq.prtn=100;
查詢語句若將“subq.prtn=100”條件放入子查詢中更為高效,可以減少讀入的分區 數目。Hive自動執行這種裁剪優化。
分區參數為:hive.optimize.pruner=true
(默認值為真)
3>Join操作
a.Join操作原則
:
在使用寫有 Join 操作的查詢語句時有一條原則:應該將條目少的表/子查詢放在 Join 操作符的左邊。原因是在 Join 操作的 Reduce 階段,位於 Join 操作符左邊的表的內容會被加載進內存,將條目少的表放在左邊,可以有效減少發生 OOM 錯誤的幾率。對於一條語句中有多個 Join 的情況,如果 Join 的條件相同,比如查詢:
INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x ON (u.userid = x.userid);
如果Join的key相同,不管有多少個表,都會則會合並為一個Map-Reduce,一個Map-Reduce任務,而不是n個,在做OUTER JOIN的時候也是一樣
如果 Join 的條件不相同,比如:
INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x on (u.age = x.age);
Map-Reduce的任務數目和Join操作的數目是對應的,上述查詢和以下查詢是等價的:
INSERT OVERWRITE TABLE tmptable
SELECT * FROM page_view p JOIN user u
ON (pv.userid = u.userid);
INSERT OVERWRITE TABLE pv_users
SELECT x.pageid, x.age FROM tmptable x
JOIN newuser y ON (x.age = y.age);
b.MAP JOIN
Join操作在Map階段完成,不再需要Reduce,前提條件是需要的數據在 Map 的過程中可以訪問到。比如查詢:
INSERT OVERWRITE TABLE pv_users
SELECT /*+ MAPJOIN(pv) */ pv.pageid, u.age
FROM page_view pv
JOIN user u ON (pv.userid = u.userid);
可以在Map階段完成Join
相關的參數為:
hive.join.emit.interval = 1000
hive.mapjoin.size.key = 10000
hive.mapjoin.cache.numrows = 10000
c.GROUP BY
進行GROUP BY操作時需要注意一下幾點:
Map端部分聚合
事實上並不是所有的聚合操作都需要在reduce部分進行,很多聚合操作都可以先在Map端進行部分聚合,然后reduce端得出最終結果。
這里需要修改的參數為:
hive.map.aggr=true
(用於設定是否在 map 端進行聚合,默認值為真)
hive.groupby.mapaggr.checkinterval=100000
(用於設定 map 端進行聚合操作的條目數)
有數據傾斜時進行負載均衡:
此處需要設定hive.groupby.skewindata
,當選項設定為 true 是,生成的查詢計划有兩個MapReduce任務。在第一個MapReduce中,map的輸出結果集合會隨機分布到reduce中,每個reduce 做部分聚合操作,並輸出結果。這樣處理的結果是,相同的Group By Key有可能分發到不同的reduce中,從而達到負載均衡的目的;第二個 MapReduce 任務再根據預處 理的數據結果按照 Group By Key 分布到 reduce 中(這個過程可以保證相同的 Group By Key 分布到同一個 reduce 中),最后完成最終的聚合操作。
d.合並小文件
我們知道文件數目小,容易在文件存儲端造成瓶頸,給 HDFS 帶來壓力,影響處理效率。對此,可以通過合並Map和Reduce的結果文件來消除這樣的影響。
用於設置合並屬性的參數有:
是否合並Map輸出文件:hive.merge.mapfiles=true
(默認值為真)
是否合並Reduce端輸出文件:hive.merge.mapredfiles=false
(默認值為假)
合並文件的大小:hive.merge.size.per.task=256*1000*1000
(默認值為 256000000)
4.3.SQL語句的優化
熟練地使用 SQL,能寫出高效率的查詢語句。
1>.場景
有一張 user 表,為賣家每天收到表,user_id,ds(日期)為 key,屬性有主營類目,指標有交易金額,交易筆數。每天要取前10天的總收入,總筆數,和最近一天的主營類目。
2>.解決方法1
如下所示:常用方法
INSERT OVERWRITE TABLE t1
SELECT user_id,substr(MAX(CONCAT(ds,cat),9) AS main_cat) FROM users
WHERE ds=20120329 // 20120329 為日期列的值,實際代碼中可以用函數表示出當天日期 GROUP BY user_id;
INSERT OVERWRITE TABLE t2
SELECT user_id,sum(qty) AS qty,SUM(amt) AS amt FROM users
WHERE ds BETWEEN 20120301 AND 20120329
GROUP BY user_id
SELECT t1.user_id,t1.main_cat,t2.qty,t2.amt FROM t1
JOIN t2 ON t1.user_id=t2.user_id
下面給出方法1的思路,實現步驟如下:
第一步:利用分析函數,取每個user_id最近一天的主營類目,存入臨時表t1。
第二步:匯總10天的總交易金額,交易筆數,存入臨時表 t2。
第三步:關聯t1 t2得到最終的結果。
3>.解決方法2
如下所示:優化方法
SELECT user_id,substr(MAX(CONCAT(ds,cat)),9) AS main_cat,SUM(qty),SUM(amt) FROM users
WHERE ds BETWEEN 20120301 AND 20120329
GROUP BY user_id
在工作中我們總結出:
方案 2 的開銷等於方案 1 的第二步的開銷,性能提升,由原有的 25 分鍾完成,縮短為 10 分鍾以內完成。節省了兩個臨時表的讀寫是一個關鍵原因,這種方式也適用於 Oracle 中的數據查找工作。
SQL具有普遍性,很多 SQL 通用的優化方案在 Hadoop 分布式計算方式中也可以達到效果。
4>.無效ID在關聯時的數據傾斜問題
問題:日志中常會出現信息丟失,比如每日約為 20 億的全網日志,其中的 user_id 為主鍵,在日志收集過程中會丟失,出現主鍵為null的情況,如果取其中的user_id和bmw_users 關聯,就會碰到數據傾斜的問題。原因是Hive中,主鍵為null值的項會被當做相同的Key而分配進同一個計算Map。
解決方法1:user_id 為空的不參與關聯,子查詢過濾 null
SELECT * FROM log a
JOIN bmw_users b ON a.user_id IS NOT NULL AND a.user_id=b.user_id
UNION All SELECT * FROM log a WHERE a.user_id IS NULL
解決方法2 如下所示:函數過濾 null
SELECT * FROM log a LEFT OUTER
JOIN bmw_users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT(‘dp_hive’,RAND()) ELSE a.user_id END =b.user_id;
這個優化適合無效 id(比如-99、 ‘’,null 等)產生的傾斜問題。把空值的 key 變成一個字符串加上隨機數,就能把傾斜的 數據分到不同的Reduce上,從而解決數據傾斜問題。因為空值不參與關聯,即使分到不同 的 Reduce 上,也不會影響最終的結果。附上 Hadoop 通用關聯的實現方法是:關聯通過二次排序實現的,關聯的列為 partion key,關聯的列和表的 tag 組成排序的 group key,根據 pariton key分配Reduce。同一Reduce內根據group key排序。
5.Hbase優化
Hbase在大數據中通過唯一標識key找數據的場景是很常用的,而這種場景也是很多的。另外Hbase對於超大數據的存儲和超大數據下單條數據命中上也是支持很好的。所以對於Hbase的優化也是非常有必要的。Hbase的優化主要分為以下幾個方面:
1>.表的設計優化
2>.寫表操作優化
3>.讀表操作優化
接下來將從以上三個方面給出具體的優化方案
5.1.表的設計優化
5.1.1.預分區
默認情況下,在創建HBase表的時候會自動創建一個region分區,當導入數據的時候,所有的HBase客戶端都向這一個region寫數據,直到這個region足夠大了才進行切分。一種可以加快批量寫入速度的方法是通過預先創建一些空的regions,這樣當數據寫入HBase時,會按照region分區情況,在集群內做數據的負載均衡。如下通過Java API寫預先創建Regions的代碼示例
public static boolean createTable(HBaseAdmin admin, HTableDescriptor table, byte[][] splits)
throws IOException {
try {
admin.createTable(table, splits);
return true;
} catch (TableExistsException e) {
logger.info("table " + table.getNameAsString() + " already exists");
// the table already exists...
return false;
}
}
public static byte[][] getHexSplits(String startKey, String endKey, int numRegions) { //start:001,endkey:100,10region [001,010]
[011,020]
byte[][] splits = new byte[numRegions-1][];
BigInteger lowestKey = new BigInteger(startKey, 16);
BigInteger highestKey = new BigInteger(endKey, 16);
BigInteger range = highestKey.subtract(lowestKey);
BigInteger regionIncrement = range.divide(BigInteger.valueOf(numRegions));
lowestKey = lowestKey.add(regionIncrement);
for(int i=0; i < numRegions-1;i++) {
BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger.valueOf(i)));
byte[] b = String.format("%016x", key).getBytes();
splits[i] = b;
}
return splits;
}
5.1.2.Row Key設計
HBase中row key用來檢索表中的記錄,支持以下三種方式:
通過單個row key訪問:即按照某個row key鍵值進行get操作;
通過row key的range進行scan:即通過設置startRowKey和endRowKey,在這個范圍內進行掃描;
全表掃描:即直接掃描整張表中所有行記錄。
在HBase中,row key可以是任意字符串,最大長度64KB,實際應用中一般為10~100bytes,存為byte[]字節數組,一般設計成定長的。
row key是按照字典序存儲,因此,設計row key時,要充分利用這個排序特點,將經常一起讀取的數據存儲到一塊,將最近可能會被訪問的數據放在一塊。
舉個例子:如果最近寫入HBase表中的數據是最可能被訪問的,可以考慮將時間戳作為row key的一部分,由於是字典序排序,所以可以使用Long.MAX_VALUE - timestamp作為row key,這樣能保證新寫入的數據在讀取時可以被快速命中。
Rowkey規則:
1>.越小越好
2>.Rowkey的設計是要根據實際業務來
3>.散列性
取反001 002 100 200
Hash
5.1.3.列族的設計
不要在一張表里定義太多的column family。目前Hbase並不能很好的處理超過2~3個column family的表。因為某個column family在flush的時候,它鄰近的column family也會因關聯效應被觸發flush,最終導致系統產生更多的I/O。在實際開發中,為了規避這一點往往將多個列簇字段拼接成一個列簇。
5.1.4.In Memory
創建表的時候,可以通過HColumnDescriptor.setInMemory(true)將表放到RegionServer的緩存中,保證在讀取的時候被cache命中。(讀緩存)
5.1.5.Max Version
創建表的時候,可以通過HColumnDescriptor.setMaxVersions(int maxVersions)設置表中數據的最大版本,如果只需要保存最新版本的數據,那么可以設置setMaxVersions(1)。
5.1.6.Time To Live
創建表的時候,可以通過HColumnDescriptor.setTimeToLive(int timeToLive)設置表中數據的存儲生命期,過期數據將自動被刪除,例如如果只需要存儲最近兩天的數據,那么可以設置setTimeToLive(2 * 24 * 60 * 60)。(相當於Linux中的Crontab任務)
5.1.7.Compact & Split
在HBase中,數據在更新時首先寫入WAL 日志(HLog)和內存(MemStore)中,MemStore中的數據是排序的,當MemStore累計到一定閾值時,就會創建一個新的MemStore,並且將老的MemStore添加到flush隊列,由單獨的線程flush到磁盤上,成為一個StoreFile。於此同時, 系統會在zookeeper中記錄一個redo point,表示這個時刻之前的變更已經持久化了(minor compact)。
StoreFile是只讀的,一旦創建后就不可以再修改。因此Hbase的更新其實是不斷追加的操作。當一個Store中的StoreFile達到一定的閾值后,就會進行一次合並(major compact),將對同一個key的修改合並到一起,形成一個大的StoreFile,當StoreFile的大小達到一定閾值后,又會對 StoreFile進行分割(split),等分為兩個StoreFile。
由於對表的更新是不斷追加的,處理讀請求時,需要訪問Store中全部的StoreFile和MemStore,將它們按照row key進行合並,由於StoreFile和MemStore都是經過排序的,並且StoreFile帶有內存中索引,通常合並過程還是比較快的。
實際應用中,可以考慮必要時手動進行major compact,將同一個row key的修改進行合並形成一個大的StoreFile。同時,可以將StoreFile設置大些,減少split的發生。
hbase為了防止小文件(被刷到磁盤的menstore)過多,以保證保證查詢效率,hbase需要在必要的時候將這些小的store file合並成相對較大的store file,這個過程就稱之為compaction。在hbase中,主要存在兩種類型的compaction:minor compaction和major compaction。
minor compaction:的是較小、很少文件的合並。
major compaction 的功能是將所有的store file合並成一個,觸發major compaction的可能條件有:major_compact 命令、majorCompact() API、region server自動運行(相關參數:hbase.hregion.majoucompaction 默認為24 小時、hbase.hregion.majorcompaction.jetter 默認值為0.2 防止region server 在同一時間進行major compaction)。
hbase.hregion.majorcompaction.jetter參數的作用是:對參數hbase.hregion.majoucompaction 規定的值起到浮動的作用,假如兩個參數都為默認值24和0,2,那么major compact最終使用的數值為:19.2~28.8 這個范圍。實際開發中可以關閉自動major compaction,手動編寫major compaction,如下參考:
Timer類,contab
minor compaction的運行機制要復雜一些,它由一下幾個參數共同決定:
hbase.hstore.compaction.min :默認值為 3,表示至少需要三個滿足條件的store file時,minor compaction才會啟動
hbase.hstore.compaction.max 默認值為10,表示一次minor compaction中最多選取10個store file
hbase.hstore.compaction.min.size 表示文件大小小於該值的store file 一定會加入到minor compaction的store file中
hbase.hstore.compaction.max.size 表示文件大小大於該值的store file 一定會被minor compaction排除
hbase.hstore.compaction.ratio 將store file 按照文件年齡排序(older to younger),minor compaction總是從older store file開始選擇
5.2.寫表操作優化
5.2.1.多HTable並發寫
創建多個HTable客戶端用於寫操作,提高寫數據的吞吐量,一個例子:
static final Configuration conf = HBaseConfiguration.create();
static final String table_log_name = “user_log”;
wTableLog = new HTable[tableN];
for (int i = 0; i < tableN; i++) {
wTableLog[i] = new HTable(conf, table_log_name);
wTableLog[i].setWriteBufferSize(5 * 1024 * 1024); //5MB
wTableLog[i].setAutoFlush(false);
5.2.2.HTable參數設置
aAuto Flush
通過調用HTable.setAutoFlush(false)
方法可以將HTable寫客戶端的自動flush關閉,這樣可以批量寫入數據到HBase,而不是有一條put就執行一次更新,只有當put填滿客戶端寫緩存時,才實際向HBase服務端發起寫請求。默認情況下auto flush
是開啟的。
b.Write Buffer
通過調用HTable.setWriteBufferSize(writeBufferSize)
方法可以設置HTable客戶端的寫buffer大小,如果新設置的buffer小於當前寫buffer中的數據時,buffer將會被flush到服務端。其中,writeBufferSize的單位是byte字節數,可以根據實際寫入數據量的多少來設置該值。
c.WAL Flag
(慎用除非導入測試數據)
在HBae中,客戶端向集群中的RegionServer提交數據時(Put/Delete操作),首先會先寫WAL(Write Ahead Log)日志(即HLog,一個RegionServer上的所有Region共享一個HLog),只有當WAL日志寫成功后,再接着寫MemStore,然后客戶端被通知提交數據成功;如果寫WAL日志失敗,客戶端則被通知提交失敗。這樣做的好處是可以做到RegionServer宕機后的數據恢復。
因此,對於相對不太重要的數據,可以在Put/Delete操作時,通過調用Put.setWriteToWAL(false)或Delete.setWriteToWAL(false)函數,放棄寫WAL日志,從而提高數據寫入的性能。
值得注意的是:謹慎選擇關閉WAL日志,因為這樣的話,一旦RegionServer宕機,Put/Delete的數據將會無法根據WAL日志進行恢復。
5.2.3.批量寫
通過調用HTable.put(Put)方法可以將一個指定的row key記錄寫入HBase,同樣HBase提供了另一個方法:通過調用HTable.put(List
5.2.3.多線程並發寫
在客戶端開啟多個HTable寫線程,每個寫線程負責一個HTable對象的flush操作,這樣結合定時flush和寫buffer(writeBufferSize),可以既保證在數據量小的時候,數據可以在較短時間內被flush(如1秒內),同時又保證在數據量大的時候,寫buffer一滿就及時進行flush。下面給個具體的例子:
for (int i = 0; i < threadN; i++) {
Thread th = new Thread() {
public void run() {
while (true) {
try {
sleep(1000); //1 second
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (wTableLog[i]) {
try {
wTableLog[i].flushCommits();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
};
th.setDaemon(true);
th.start();
}
當然這種多線程並發寫在Hadoop體現下就沒必要了
5.3.讀表操作優化
5.3.1.HTable參數設置
a.Scanner Caching
hbase.client.scanner.caching
配置項可以設置HBase scanner
一次從服務端抓取的數據條數,默認情況下一次一條。通過將其設置成一個合理的值,可以減少scan過程中next()
的時間開銷,代價是scanner需要通過客戶端的內存來維持這些被cache的行記錄。有三個地方可以進行配置:
1>.在HBase的conf配置文件中進行配置;(一般不用該全局配置)
2>.通過調用HTable.setScannerCaching(int scannerCaching)
進行配置
3>.通過調用Scan.setCaching(int caching)
進行配置。三者的優先級越來越高。
b.Scan Attribute Selection
scan時指定需要的Column Family
,可以減少網絡傳輸數據量,否則默認scan操作會返回整行所有Column Family
的數據。
c.Close ResultScanner
通過scan取完數據后,記得要關閉ResultScanner
,否則RegionServer
可能會出現問題(對應的Server資源無法釋放)。
5.3.2.批量讀
通過調用HTable.get(Get)
方法可以根據一個指定的row key獲取一行記錄,同樣HBase提供了另一個方法:通過調用HTable.get(List<Get>)
方法可以根據一個指定的row key列表,批量獲取多行記錄,這樣做的好處是批量執行,只需要一次網絡I/O開銷,這對於對數據實時性要求高而且網絡傳輸RTT高的情景下可能帶來明顯的性能提升。
5.3.3.多線程並發讀
在客戶端開啟多個HTable讀線程,每個讀線程負責通過HTable對象進行get操作。當然這種多線程並發寫在Hadoop體現下就沒必要了,下面是一個多線程並發讀取HBase,獲取店鋪一天內各分鍾PV值的例子
public class DataReaderServer {
//獲取店鋪一天內各分鍾PV值的入口函數
public static ConcurrentHashMap<String, String> getUnitMinutePV(long uid, long startStamp, long endStamp){
long min = startStamp;
int count = (int)((endStamp - startStamp) / (60*1000));
List<String> lst = new ArrayList<String>();
for (int i = 0; i <= count; i++) {
min = startStamp + i * 60 * 1000;
lst.add(uid + "_" + min);
}
return parallelBatchMinutePV(lst);
}
//多線程並發查詢,獲取分鍾PV值
private static ConcurrentHashMap<String, String> parallelBatchMinutePV(List<String> lstKeys){
ConcurrentHashMap<String, String> hashRet = new ConcurrentHashMap<String, String>();
int parallel = 3;
List<List<String>> lstBatchKeys = null;
if (lstKeys.size() < parallel ){
lstBatchKeys = new ArrayList<List<String>>(1);
lstBatchKeys.add(lstKeys);
}
else{
lstBatchKeys = new ArrayList<List<String>>(parallel);
for(int i = 0; i < parallel; i++ ){
List<String> lst = new ArrayList<String>();
lstBatchKeys.add(lst);
}
for(int i = 0 ; i < lstKeys.size() ; i ++ ){
lstBatchKeys.get(i%parallel).add(lstKeys.get(i));
}
}
List<Future< ConcurrentHashMap<String, String> >> futures = new ArrayList<Future< ConcurrentHashMap<String, String> >>(5);
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
builder.setNameFormat("ParallelBatchQuery");
ThreadFactory factory = builder.build();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(), factory);
for(List<String> keys : lstBatchKeys){
Callable< ConcurrentHashMap<String, String> > callable = new BatchMinutePVCallable(keys);
FutureTask< ConcurrentHashMap<String, String> > future = (FutureTask< ConcurrentHashMap<String, String> >) executor.submit(callable);
futures.add(future);
}
executor.shutdown();
// Wait for all the tasks to finish
try {
boolean stillRunning = !executor.awaitTermination(
5000000, TimeUnit.MILLISECONDS);
if (stillRunning) {
try {
executor.shutdownNow();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
} catch (InterruptedException e) {
try {
Thread.currentThread().interrupt();
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
// Look for any exception
for (Future f : futures) {
try {
if(f.get() != null)
{
hashRet.putAll((ConcurrentHashMap<String, String>)f.get());
}
} catch (InterruptedException e) {
try {
Thread.currentThread().interrupt();
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
} catch (ExecutionException e) {
e.printStackTrace();
}
}
return hashRet;
}
//一個線程批量查詢,獲取分鍾PV值
protected static ConcurrentHashMap<String, String> getBatchMinutePV(List<String> lstKeys){
ConcurrentHashMap<String, String> hashRet = null;
List<Get> lstGet = new ArrayList<Get>();
String[] splitValue = null;
for (String s : lstKeys) {
splitValue = s.split("_");
long uid = Long.parseLong(splitValue[0]);
long min = Long.parseLong(splitValue[1]);
byte[] key = new byte[16];
Bytes.putLong(key, 0, uid);
Bytes.putLong(key, 8, min);
Get g = new Get(key);
g.addFamily(fp);
lstGet.add(g);
}
Result[] res = null;
try {
res = tableMinutePV[rand.nextInt(tableN)].get(lstGet);
} catch (IOException e1) {
logger.error("tableMinutePV exception, e=" + e1.getStackTrace());
}
if (res != null && res.length > 0) {
hashRet = new ConcurrentHashMap<String, String>(res.length);
for (Result re : res) {
if (re != null && !re.isEmpty()) {
try {
byte[] key = re.getRow();
byte[] value = re.getValue(fp, cp);
if (key != null && value != null) {
hashRet.put(String.valueOf(Bytes.toLong(key,
Bytes.SIZEOF_LONG)), String.valueOf(Bytes
.toLong(value)));
}
} catch (Exception e2) {
logger.error(e2.getStackTrace());
}
}
}
}
return hashRet;
}
}
//調用接口類,實現Callable接口
class BatchMinutePVCallable implements Callable<ConcurrentHashMap<String, String>>{
private List<String> keys;
public BatchMinutePVCallable(List<String> lstKeys ) {
this.keys = lstKeys;
}
public ConcurrentHashMap<String, String> call() throws Exception {
return DataReadServer.getBatchMinutePV(keys);
}
5.3.4.緩存查詢結果
對於頻繁查詢HBase的應用場景,可以考慮在應用程序中做緩存,當有新的查詢請求時,首先在緩存中查找,如果存在則直接返回,不再查詢HBase;否則對HBase發起讀請求查詢,然后在應用程序中將查詢結果緩存起來。至於緩存的替換策略,可以考慮LRU等常用的策略。
5.3.5.Blockcache
設置讀緩存,在服務器端
HBase上Regionserver的內存分為兩個部分,一部分作為Memstore,主要用來寫;另外一部分作為BlockCache,主要用於讀。
寫請求會先寫入Memstore,Regionserver會給每個region提供一個Memstore,當Memstore滿64MB以后,會啟動 flush刷新到磁盤。當Memstore的總大小超過限制時(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),會強行啟動flush進程,從最大的Memstore開始flush直到低於限制。
讀請求先到Memstore中查數據,查不到就到BlockCache中查,再查不到就會到磁盤上讀,並把讀的結果放入BlockCache。由於BlockCache采用的是LRU策略,因此BlockCache達到上限(heapsize * hfile.block.cache.size * 0.85)后,會啟動淘汰機制,淘汰掉最老的一批數據。
一個Regionserver上有一個BlockCache和N個Memstore,它們的大小之和不能大於等於heapsize * 0.8,否則HBase不能啟動。默認BlockCache為0.2,而Memstore為0.4。對於注重讀響應時間的系統,可以將 BlockCache設大些,比如設置BlockCache=0.4,Memstore=0.39,以加大緩存的命中率。
6.Spark優化
Spark作為現在最流行的數據分布式計算引擎,優化是必然的。當然對於計算引擎,優化的方案就相對於數據存儲引擎較為簡單,下面介紹幾個優化的方案
6.1.參數級的優化
spark_driver_memory=4g
spark_num_executors=6
spark_executor_memory=4g
spark_executor_cores=1
spark_executor_memory_over_head=1024
spark_sql_shuffle_partitions=18
spark.default.parallelism=18
主要是這七個參數,這七個個參數的說明如下
spark_driver_memory
設置driver的內存大小
spark_num_executors
設置executors的個數
spark_executor_memory
設置每個spark_executor_cores
的內存大小
spark_executor_cores
設置每個executor
的cores數目
spark_executor_memory_over_head
設置executor執行的時候,用的內存可能會超過executor-memoy
,所以會為executor額外預留一部分內存。該參數代表了這部分內存
spark_sql_shuffle_partitions
設置executor的partitions個數,注意這個參數只對SparkSQL有用
spark.default.parallelism
設置executor的partitions個數,注意這個參數只對SparkRDD有用
對於這七個參數,需要充分理解Spark執行的邏輯才能明白並合適的配置,Spark的執行邏輯如下(這里不再細講),可以參照這邊博客https://www.cnblogs.com/cxxjohnson/p/8909578.html 或官方API
其中關於內存的配置要結合hadoop yarn的集群的資源情況而定,不是越大越好。而對於spark_num_executors,spark_executor_cores,spark_sql_shuffle_partitions這三個參數,根據實際的經驗需滿足spark_sql_shuffle_partitions=spark_num_executorsspark_executor_cores3,而spark_executor_cores一般保持在1
再提交任務時:
spark-submit \
--master yarn \
--deploy-mode cluster \
--num-executors $spark_num_executors \
--driver-memory $spark_driver_memory \
--executor-memory $spark_executor_memory \
--executor-cores $spark_executor_cores \
--queue yarn_queue_test \
--conf spark.app.name=spark_name_test \
--conf spark.yarn.executor.memoryOverhead=$spark_executor_memory_over_head \
--conf spark.core.connection.ack.wait.timeout=300 \
--conf spark.dynamicAllocation.enabled=false \
--jars test.jar
6.2.Task數據分布的優化
在一般情況下Task數據分配是隨機默認的,這樣會帶來一個問題,如果多大的Task,而只是部分的Task數據處理量大,大部分很小,那么如果能做到將小部分的Task數據處理量優化到和大部分的大致相等,那么性能自然就提升上去了。這樣優化分為兩步:
a.在執行的Java代碼中獲取num_executors參數的值,上面的例子是spark_num_executors=6
int rddPartition = Integer.parseInt(parameterParse.getNum_executors()) * 3;
b.不管是rdd的遍歷還是直接的session.sql("sql").foreachPartition()
在遍歷之前加上一個方法repartition(partition)
session.sql(sqlStr).repartition(partition).foreachPartition(iterator -> {
while (iterator.hasNext()) {
Row row = iterator.next();
//邏輯處理
}
});
這樣做后,在任務的管理頁面看到的executor數據分布式非常均勻的,從而提高性能
6.3.分而治之
分而治之是貫穿整個大數據計算的核心,不管是MapReduce,Spark,Flink等等,而這里要說的分而治之可以初略的物理流程上的分而治之,而不是對Spark的driver,executor,Task分而治之,因為本身就是分布式的分而治之。假設經過反復的性能壓力測試,得出Spark在現有規定資源上只有1000000條/s的性能,而現在的數據有一億條。現在不做任何處理提交session.sql("sql").foreachPartition()
或rdd.foreachPartition()
,雖然最終會處理完,但發現時間是比預定的100000000/1000000s多得多,這樣會拖累整體性能,這個時候是可以對現有的一億條數據做以1000000條為組的組合切割分配成100000000/1000000個集合,對集合數據依次執行,這樣性能上會有所提升。當然這種優化方式還是需要跟實際業務邏輯來定
7.總結
本文開始講了大數據優化的宏觀優化思想,接着講了影響性能的一些核心點,最后以在企業開發中最常用的三個組件Hive、Hbase和Spark的一些優化策略和優化方案,這三個組件其實代表了三個領域,分別是數據倉庫組件,針對某一業務的數據存儲組件和數據計算引擎。希望這三個組件的優化點可以在讀者在企業業務開發時候得到幫助和指導。當然最重要的是知道影響大數據每個環節性能的核心點,再結合優化的思想去具體的優化。授人以魚不如授人以漁,就是這個道理。當然在后面的博文中會陸續對大數據常用的每個組件給出優化方案和具體的優化實踐。