在做大數據開發和分析時,其實流程和以前做數據倉庫有類似的地方。第一步總是需要做數據收集、其次做數據清洗,然后才會進行后續的處理分析。那么首要的任務就是要把數據收集到一個適合的地方,如果是采用阿里雲的大數據平台,這個適合的地方就ODPS。ODPS的作用就是用來存儲數據和計算數據的。從我個人目前接觸的數據分析項目來看,數據來源一般來自兩處,一個是日志文件,一個是業務數據庫中的表。針對這兩種數據源,阿里雲提供了兩種解決方案:
1.使用“日志服務”來進行日志文件的收集
2.使用“數加服務”中的數據開發—任務開發來采集業務數據庫中的表的數據
以上兩種方式收集到的數據最終都會落到ODPS中的表中。本文主要介紹“采集業務數據庫中的表的數據”的方式,即如何使用“數加服務-數據開發-任務開發”。先截張圖,來看下在數加服務的數據開發中都提供了哪些功能,如下圖:
從圖中可看到有“任務開發”、“腳本開發”、“資源管理”、“函數管理”、“表查詢”這幾個功能。各個功能作用如下:
表查詢:提供了UI界面對ODPS中的表進行方便的管理以及表中數據預覽功能
函數管理:系統函數及自定義函數的管理以及使用幫助的查看。
腳本管理:這里是管理、存儲、調試SQL腳本的地方,這些SQL是針對ODPS中的表的,其實也類似於hive的語法。
資源管理:管理ODPS需要使用的額外jar,比如我們的mapreduce的job或者是UDF的函數的jar
我們做業務數據庫中的數據收集和清洗時一般會先把業務數據庫的表的數據同步到ODPS相應的表上,然后通過執行計算,把產生的數據插入到另一張表A中,因為ODPS不適合做實時查詢,這個A表的數據我們通常就會導入到阿里雲另一個產品“分析型數據庫”(也叫ADS)中。然后通過使用“BI報表”就能做出我們想要的報表和圖表。下面我們分別來介紹如何做同步和如何做計算。
數據同步
任務開發中包含了兩種任務類型,一個是“工作流任務”,另一個是“節點任務”。
工作流任務中可以配置多個任務,任務和任務之間有依賴關系,可以通過流程圖的方式進行配置。如圖:
節點任務就是單一任務。具體的任務又分三種:ODPS_SQL、數據同步和機器學習。ODPS_SQL任務是用來做數據計算用的(或稱數據清洗也可以),數據同步任務就是用來完成ODPS與RDS(關系型數據庫)中的表數據的雙向同步的,機器學習任務是用來做數據深入分析和挖掘用的。本文只介紹數據同步任務,其實這個任務的配置很容易理解,選中源表和目的表,然后對兩邊字段做個映射配置,設好調度執行的周期就可以了,還是截個圖直觀的看下:
源表和目的表的映射配置:
調度執行的配置:
參數的配置:
在數據同步任務中,參數主要是用在配置增量同步時,在數據過濾這里配置,通過數據過濾我們能控制同步哪些數據。並且控制條件是可以動態的,所以參數在這里就起作用了。參數分兩類,系統參數阿里雲提供了兩個,分別是:
${bdp.system.bizdate}
格式yyyymmdd,日常調度實例定時時間的前一天(年月日)。節點每天自動調度實例定時時間年月日減1天。
${bdp.system.cyctime}
格式yyyymmddhh24miss,日常調度實例定時時間(年月日時分秒)。yyyy表示4位數年份,mm表示2位數月份,dd表示2位數天,hh24表示24小時制的時,mi表示2位數分鍾,ss表示2位數秒。
自定義參數
聲明變量方式:${變量名}
自定義參數支持代碼中自定義變量名,調度的時候需要給自定義變量名指定賦值。在大數據開發套件中以${變量名}方式來聲明變量,在參數中以變量名=調度參數或常量的方式來賦值。
$[…]基於系統參數${bdp.system.cyctime}進行自定義格式
${bdp.system.cyctime}=$[yyyymmddhh24miss]可以任意分解組合,以及指定格式,如定時時間年月日$[yyyymmdd]、$[yyyy-mm-dd]等格式可自定義,定時時間時分秒$[hh24miss]、$[hh24:mi:ss]等格式可自定義。
【注意】:所有參數都不允許包含空格!
獲取+/-周期的方法:
-
后N年:$[add_months(yyyymmdd,12*N)]
-
前N年:$[add_months(yyyymmdd,-12*N)]
-
后N月:$[add_months(yyyymmdd,N)]
-
前N月:$[add_months(yyyymmdd,-N)]
-
后N周:$[yyyymmdd+7*N]
-
前N周:$[yyyymmdd-7*N]
-
后N天:$[yyyymmdd+N]
-
前N天:$[yyyymmdd-N]
-
后N小時:$[hh24miss+N/24]
-
前N小時:$[hh24miss-N/24]
同步時我們即可以做全量同步,也可以做增量同步,其實就需要源表能有類似於“創建時間”、“最后更新時間”、“是否失效”、“失效時間”這樣的字段來結合系統提供了兩個參數可配合使用,就能達到增量同步的目的。但這里要多說幾句,直接的增量同步是做不到的,只能間接的做,因為數據同步任務只會往“目的表”中插入數據,並不會對目的表數據做更新和刪除。所以做增量同步時,目的表應該是一張臨時表,這樣我們再通過SQL任務把從臨時表中提取增量數據,然后對ODPS的表進行批量更新。具體怎么做就不細說了,懂的人一聽就明白了。
數據同步任務配置完成后,就得進行測試,所以也提供了測試按鈕進行測試操作,在測試時需要指定同步數據的“業務日期”,如果測試是成功的,那設定的日務日期的數據就真的會同步過來。如果測試通了,我們還需要提交一下任務,這樣整個配置才會生效。但是真正的調度一定是要在次日才會產生。那么如果此時我們也可進入“運維中心”通過補數據的功能,把歷史數據同步過來。
日常的時候我們就可以通過運維中心去監控任務的運行情況,如果失敗了,也能夠在運維中心查看到日志。