總論:
大數據計算服務 ( MaxCompute,原名 ODPS ) 是一種快速、完全托管的 TB/PB 級數據倉庫解決方案 。MaxCompute 向用戶提供了完善的數據導入方案以及多種經典的分布式計算模型,能夠更快速的解決用戶海量數據計算問題,有效降低企業成本,並保障數據安全 。同時,大數據開發套件和 MaxCompute關系緊密,大數據開發套件為 MaxCompute 提供了一站式的數據同步,任務開發,數據工作流開發,數據管理和數據運維等功能,您可以參見 大數據開發套件簡介 來對其進行深入了解 。
MaxCompute 主要服務於批量結構化數據的存儲和計算,可以提供海量數據倉庫的解決方案以及針對大數據的分析建模服務 。隨着社會數據收集手段的不斷豐富及完善,越來越多的行業數據被積累下來 。數據規模已經增長到了傳統軟件行業無法承載的海量數據(百 GB、TB、乃至 PB)級別 。
在分析海量數據場景下,由於單台服務器的處理能力限制,數據分析者通常采用分布式計算模式 。但分布式的計算模型對數據分析人員提出了較高的要求,且不易維護 。使用分布式模型,數據分析人員不僅需要了解業務需求,同時還需要熟悉底層計算模型 。MaxCompute 的目的是為用戶提供一種便捷的分析處理海量數據的手段 。用戶可以不必關心分布式計算細節,從而達到分析大數據的目的 。
首先MaxCompute不同於普通的mysql,oracle這樣的關系型數據庫,它其實是一個綜合性的數據服務平台,它並不能在毫秒級甚至秒級返回查詢結果,一條odps命令的執行通常需要經過如下流程:
提交作業(簡易流程):
- 提交一個SQL語句,發送 RESTful 請求給HTTP服務器
- HTTP 服務器做用戶認證。認證通過后,請求就會以 Kuafu通信協議方式發送給 Worker。
- Worker判斷該請求作業是否需要啟動Fuxi Job。如果不需要,本地執行並返回結果。如果需要,則生成一個 instance, 發送給 Scheduler。
- Scheduler把instance信息注冊到 OTS,將其狀態置成 Running。Scheduler 把 instance 添加到 instance 隊列。
- Worker把 Instance ID返回給客戶端。
運行作業(簡易流程):
- Scheduler會把instance拆成多個Task,並生成任務流DAG圖。
- 把可運行的Task 放入到優先級隊列TaskPool中。
- Scheduler 有一個后台線程定時對TaskPool 中的任務進行排序。Scheduler 有一個后台線程定時查詢計算集群的資源狀況。Executor在資源未滿的情況下,輪詢TaskPool,請求Task。Scheduler判斷計算資源。若集群有資源,就將該Task發給Executor。
- Executor調用SQL Parse Planner,生成SQL Plan。Executor 將 SQL Plan 轉換成計算層的 FuXi Job 描述文件。Executor 將該描述文件提交給計算層運行,並查詢 Task 執行狀態。Task 執行完成后,Executor更新 OTS 中的 Task信息,並匯報給Scheudler。
- Schduler 判斷 instance 結束,更新 OTS 中 instance 信息,置為 Terminated。
查詢狀態:
客戶端接收到返回的 Instance ID 后,可以通過 Instance ID 來查詢作業狀態:
- 客戶端會發送另一個 REST 的請求,查詢作業狀態。
- HTTP 服務器根據配置信息做用戶認證。用戶認證通過后,把查詢的請求發送給 Worker。
- Worker 根據 InstanceID 去 OTS 中查詢該作業的執行狀態。Worker 將查詢到的執行狀態返回給客戶端。
其實MaxCompute是一個透明的數據服務平台,用戶不需要了解分布式數據處理的細節,就可以在client上比較方便的處理PB級別的數據了。所以,在了解了以上內容之后,對於MaxCompute只能在分鍾級別返回結果就有一個比較清楚的理解了。
ps:以上這些內容在大數據開發套件中都是透明的。
MaxCompute SQL基礎:
MaxCompute SQL與普通關系型數據庫的SQL大體類似,不同在於MaxCompute不支持如事務、主鍵約束、索引等,可以看成標准SQL的子集。
DDL:data define language
MaxCompute 操作以表為基礎,ddl中涉及到對表的一系列操作,包括create,drop,alter
我們以大數據開發套件上的一張表為例:
CREATE TABLE IF NOT EXISTS xxxx ( aa STRING COMMENT 'xxxx', bb STRING COMMENT 'xxxx', cc STRING COMMENT 'xxxx', dd STRING COMMENT 'xxxx', ee STRING COMMENT 'xxxx', ff STRING COMMENT 'xxx', gg BIGINT COMMENT 'xxx' ) COMMENT 'xxxx' PARTITIONED BY (dt STRING COMMENT '') LIFECYCLE 10;
在工作流中我們希望任務能夠順利的執行,所以不管是DDL和DML中我們都盡量希望語句返回成功(if not exist,overwrite)
comment包括對應字段的注釋和對應表的注釋,這些都可以alter
與傳統的SQL不同,MaxCompute面向全域數據,所以即使是用create xxxx select xxxx from xxx的方式也需要as加上列的名稱。
分區字段注明,由於MaxCompute操作的數據量很大,通常來說分區字段需要特別關注
生命周期:非常方便的屬性,便於用戶釋放存儲空間,簡化回收數據的流程,不需要傳統的繁雜的空間維護。靈活運用LastDataModifiedTime與touch(修改為當前時間),關注分區表和非分區表的區別。
對於大表結構的復制,odps提供非常靈活的create like語句。
drop一張表,將會把表及表中的數據丟入回收站中,加上purge關鍵字,會被直接刪除,不可恢復。
alter幾乎可以對表的所有屬性進行更改,包括列,注釋,分區,分區屬性,生命周期等等。
Archive可以用來減少大表空間占用,壓縮空間。
DML:data manipulation language
常見的比如insert,select,join
insert overwrite|into table tablename
[partition (partcol1=val1, partcol2=val2 ...)] select_statement from from_statement;
靜態分區,分區字段常量;動態分區,可以不指定值,適用select字句中的分區列值
multi-insert 單次讀入,多次寫入,減少數據讀取。
select [all | distinct] select_expr, select_expr, ... from table_reference [where where_condition] [group by col_list] [order by order_condition] [distribute by distribute_condition [sort by sort_condition] ] [limit number]
與傳統的SQL不同的是,distinct作用所有select字段
編譯過程group > select > order/sort/distribute,理解了編譯順序也就理解了各個字句間別名的使用規范。
distribute by:對數據按照某幾列的值做hash分片。
sort by:局部排序,語句前必須加distribute by。實際上sort by是對distribute by的結果進行局部排序。
從功能的理解可知:order by不和distribute by/sort by共用,同時group by也不和distribute by/sort by共用。
join和傳統sql的表現較為一致,odps支持left outer join,right outer join,full outer join,inner join。
mapjoin hint:當大表和小表join的情況下利用mapjoin將用戶指定的小表全部加載到內存中,從而加快join的執行速度,同時支持非等值連接,full join 不可用,連接的主表需為大表
內建函數:
主要包括數學與統計函數,字符串操作函數,時間函數,窗口函數,聚合函數,轉置函數等
就不一一列舉了,功能強大。
UDF:user defined function
包括udf,udtf,udaf
udf:用戶自定義標量函數
udtf:用戶自定義表值函數(返回多個字段)
udaf:用戶自定義聚合函數
UDF:
package org.alidata.odps.udf.examples; import com.aliyun.odps.udf.UDF; public final class Lower extends UDF { public String evaluate(String s) { if (s == null) { return null; } return s.toLowerCase(); } }
繼承UDF類,實現evaluate方法即可。evaluate方法可以有多個,滿足多態特性。
UDAF:
繼承com.aliyun.odps.udf.Aggregator,主要實現iterate,merge和terminate三個接口,UDAF的主要邏輯依賴於這三個接口的實現。此外,還需要用戶實現自定義的Writable buffer,因為UDAF的主要邏輯是將數據進行分片后遍歷,處理完之后進行merge。
UDTF:
繼承com.aliyun.odps.udf.UDTF類,主要實現process和forward兩個接口,SQL中每一條記錄都會對應調用一次process,process的參數為UDTF的輸入參數。輸入參數以Object[]的形式傳入,輸出結果通過調用forward函數輸出。
UDF統一添加方法:
add jar xxx
create function xxx as packagename.classname using 'jarname'
PL:存儲過程
與傳統sql類型,區別在於變量引用時前面加$
DECLARE var_name var_type; BEGIN 可執行語句 END;
其他命令:
explain,show instance,merge smallfile,添加/移除/顯示統計信息(add/remove/show statistc 統計值或者符合某個表達式的值)
MaxCompute SQL優化與大數據開發套件:
選表原則:
- 選擇滿足需求的小表,比如匯總表。維表盡量選擇全量表,事實表盡量選擇增量表;
- 選擇產出早的表;
- 選擇可回滾的表,比如使用加購事件表代替加購全流程表;
- 依賴的N個上游表,盡量保證上游產出時間要均勻,如果有差異,考慮換依賴表;
小表原則:
- 行數小於100萬的表認為是小表,這個時候使用mapjoin性能會提高很多;
- 讀取數據的時候要加上分區等過濾條件,大表變小表。常用過濾條件字段,做成動態分區,方便下游過濾;
- 不得不讀取N天大表的時候,使用unionall方式合並多天數據;
代碼原則:
- Join關聯要盡可能是主鍵關聯。關聯字段類型要一致;
- 多天匯總,先生成1天輕度匯總表,多天使用1天數據再匯總;
- multiinsert,實現一次讀取多次寫入;
- 使用系統UDF代替自己的寫的UDF;
調度原則:
- 依賴max_pt的,要排除當天依賴;
- 上游是小時任務,使用max_pt要慎重;
- 執行超過1個小時任務要關注;
大數據開發套件提供了直觀的數據操作入口,數據研發過程代碼的編寫,調試,優化,發布都可以在大數據開發套件中進行。
拿一個任務耗時過長作例子,看看在大數據開發套件上我們是怎么處理碰到的問題的。
一個task執行時間過長,除掉本身代碼的性能問題,那么有兩種比較大的可能:
一種是等待問題,一種是數據傾斜問題
等待問題可能是由於系統資源不足,系統繁忙,優先級不夠,數據量太大,碰到了壞盤等原因導致的
我們可以通過調整優先級,重跑,過濾初始數據等方法來處理。
傾斜問題則一般是數據本身的問題,常見的數據傾斜是怎么造成的?
Shuffle的時候,將各個節點上相同的key拉取到某個節點的一個task進行處理,比如按照key進行聚合或join等操作,如果某個key對應的數據量特別大的話,就會發生數據傾斜現象。數據傾斜就成為了整個task運行時間的短板。
觸發shuffle的常見算子:distinct、groupBy、join等。
要解決數據傾斜的問題,首先要定位數據傾斜發生在什么地方,首先是哪個stage,直接在D2 UI上看就可以,查看數據是否傾斜了
logview--odps task--detail--stage--longtail
根據stage日志,判斷出數據傾斜發生在哪個算子上。
根據傾斜發生的階段,我們又可以把它們分為map傾斜,reduce傾斜,join傾斜
通常來說,對於傾斜現象,我們首先查看導致數據傾斜的key的數據分布情況,接下來大概有幾種處理方案:
1:過濾數據
過濾掉某些臟數據,比如說是否可以去掉null,去掉某些條件對應的值
2:加大並行度
給任務添加處理資源,加大instance的數量,暴力
3:對數據進行拆分,分而治之
如果大表join小表,我們可以用mapjoin,將小表cache進內存
二次分發,加上隨機前綴(數據膨脹),拆分數據集為熱點+非熱點再進一步處理
大表join超大表,還可以考慮bloomfilter
4:組合使用
上述方法,組合使用
5:修改業務
實在沒有進步空間,從業務上過濾數據