DWD(Data Warehouse Detail):數據明細層,結構和粒度與原始表保持一致,對ODS層數據進行清洗(取出空值、臟數據、超過極限范圍的數據)。
DWD層的數據來源於ODS原始數據層,在原始數據層的Hive表里,只有一個字段,存儲了原始的一條條日志信息,下面以事件(如商品點擊事件,展示詳情事件)日志來說明,原始日志如下:
1593095829089|{ "cm":{ "ln":"-89.3", "sv":"V2.6.6", "os":"8.0.3", "g":"SU1Z29ZJ@gmail.com", "mid":"1", "nw":"3G", "l":"en", "vc":"3", "hw":"640*1136", "ar":"MX", "uid":"1", "t":"1593002588300", "la":"-16.2", "md":"sumsung-3", "vn":"1.2.2", "ba":"Sumsung", "sr":"D" }, "ap":"app", "et":[ { "ett":"1593077273840", "en":"display", "kv":{ "goodsid":"0", "action":"2", "extend1":"2", "place":"1", "category":"93" } }, { "ett":"1593052169678", "en":"loading", "kv":{ "extend2":"", "loading_time":"54", "action":"1", "extend1":"", "type":"1", "type1":"102", "loading_way":"1" } }, { "ett":"1593013890514", "en":"notification", "kv":{ "ap_time":"1593003516686", "action":"4", "type":"2", "content":"" } }, { "ett":"1592999171192", "en":"error", "kv":{ "errorDetail":"java.lang.NullPointerException\\n at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound", "errorBrief":"at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)" } }, { "ett":"1593002958311", "en":"comment", "kv":{ "p_comment_id":1, "addtime":"1593079427374", "praise_count":188, "other_id":0, "comment_id":9, "reply_count":193, "userid":3, "content":"塗士震嫩廟胞洪郵騙具捶贛鍺塌舅捕瀝爺" } }, { "ett":"1593052803303", "en":"favorites", "kv":{ "course_id":4, "id":0, "add_time":"1593044515996", "userid":7 } }, { "ett":"1593095771819", "en":"praise", "kv":{ "target_id":8, "id":5, "type":4, "add_time":"1593000096852", "userid":8 } }] }
數據格式為服務器時間|事件json,json中又包括公共字段cm,數據來源ap,以及事件數組et。由於事件是一段時間提交一次,是一個包含了多個不同類型事件的json數組,用en字段區分不同的事件,如display表示商品點擊事件。因此在這里的處理需要經過兩步,首先將ODS表中的長傳json解析成一個個字段的DWD層的基礎明細表,並且利用UDTF函數,將事件數組中的每個事件炸裂開,這些數據全部放在基礎明細表里。然后針對不同的事件,將某一類事件過濾出來,並且取出事件中的kv值,放在特定的某一事件的DWD明細表中。
一 基礎事件明細表
基礎事件明細表包含了所有類型的事件數據,需要定義一個UDF函數,用來拆分長串的日志,將其處理成一個規則的格式,即以\t分隔的字符串,后續可以通過hive自帶的split函數轉化成數組,利用下標取值。
public class BaseFieldUDF extends UDF { public String evaluate(String line,String keysStr){ String[] keysArr = keysStr.split(","); //原始時間日志格式:時間|json日志 String[] logContent = line.split("\\|"); if (logContent.length != 2 || StringUtils.isBlank(logContent[1])){ return ""; } StringBuffer sb = new StringBuffer(); try { //拼接公共字段 JSONObject jsonObject = new JSONObject(logContent[1]); JSONObject cm = jsonObject.getJSONObject("cm"); for (int i = 0; i < keysArr.length; i++) { String key = keysArr[i].trim(); if (cm.has(key)){ sb.append(cm.getString(key)).append("\t"); } } //拼接事件字段 sb.append(jsonObject.getString("et")).append("\t"); //拼接服務器時間 sb.append(logContent[0]).append("\t"); } catch (JSONException e) { e.printStackTrace(); } return sb.toString(); } }
然后定義一個UDTF函數,用來對事件數組進行炸裂。傳入的是1行1列的事件數組,返回的是2列多行的數據,第1列是事件名,稍后利用這個事件名過濾出不同的事件明細表,第2列是事件的詳情kv信息。
public class EventJsonUDTF extends GenericUDTF { @Override public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException { List<String> fieldNames = new ArrayList<>(); List<ObjectInspector> fieldTypes = new ArrayList<>(); fieldNames.add("event_name"); fieldTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); fieldNames.add("event_json"); fieldTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldTypes); } @Override public void process(Object[] objects) throws HiveException { //獲取輸入數據 String input = objects[0].toString(); if (StringUtils.isBlank(input)){ return; }else { try { JSONArray ja = new JSONArray(input); String[] result = new String[2]; for (int i = 0; i < ja.length(); i++) { try { result[0] = ja.getJSONObject(i).getString("en"); result[1] = ja.getString(i); } catch (JSONException e) { //防止因為某個數據的錯誤結束整個循環 continue; } } //進來一行數據,返回2列多行數據 forward(result); } catch (JSONException e) { e.printStackTrace(); } } } @Override public void close() throws HiveException { } }
接下來就是創建存儲事件基礎明細需要的表。event_name和event_json字段就是利用UDTF函數得到的結果。
drop table if exists dwd_base_event_log; CREATE EXTERNAL TABLE dwd_base_event_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `event_name` string, `event_json` string, `server_time` string) PARTITIONED BY (`dt` string) stored as parquet location '/warehouse/gmall/dwd/dwd_base_event_log/';
然后利用腳本將數據導入到基礎明細表。
①需要在執行的sql中添加自定的UDF函數base_analizer,和UDTF函數flat_analizer。
②where條件中加了 base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la')<>'',是因為在我們自定義的UDF函數中如果數據錯誤,會返回"",所以在這里將其過濾掉。
③因為分區字段賦值了do_date,非嚴格模式似乎並沒有必要。
④UDTF函數返回2列的寫法 lateral view flat_analizer(ops) tmp_k as event_name, event_json
⑤因為我們建的是分區表,因此insert overwrite只會覆蓋當前分區的數據,並不會覆蓋表中的全部分區的數據。
#!/bin/bash # 定義變量方便修改 APP=gmall hive=/opt/module/hive/bin/hive # 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" add jar /opt/module/hive/hivefunction-1.0-SNAPSHOT.jar; create temporary function base_analizer as 'com.atguigu.udf.BaseFieldUDF'; create temporary function flat_analizer as 'com.atguigu.udtf.EventJsonUDTF'; set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table "$APP".dwd_base_event_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source , os , area , model , brand , sdk_version , gmail , height_width , network , lng , lat , app_time , event_name , event_json , server_time from ( select split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[0] as mid_id, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[1] as user_id, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[2] as version_code, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[3] as version_name, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[4] as lang, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[5] as source, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[6] as os, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[7] as area, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[8] as model, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[9] as brand, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[10] as sdk_version, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[11] as gmail, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[12] as height_width, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[13] as app_time, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[14] as network, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[15] as lng, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[16] as lat, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[17] as ops, split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[18] as server_time from "$APP".ods_event_log where dt='$do_date' and base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la')<>'' ) sdk_log lateral view flat_analizer(ops) tmp_k as event_name, event_json; " $hive -e "$sql"
二 特定事件明細表
特定事件明細表與基礎事件明細表的字段大體一樣,只是有2處改動
①去掉event_name字段,因為此表中存的就是這一類事件,不再需要event_name來區分。
②將event_json中描述事件詳情的kv取出來,形成新的字段。
以商品點擊表為例,建表語句如下。去掉了event_name字段,新增了kv信息中的action,goodsid,place,extend1,category五個字段。
drop table if exists dwd_display_log; CREATE EXTERNAL TABLE dwd_display_log( `mid_id` string, `user_id` string, `version_code` string, `version_name` string, `lang` string, `source` string, `os` string, `area` string, `model` string, `brand` string, `sdk_version` string, `gmail` string, `height_width` string, `app_time` string, `network` string, `lng` string, `lat` string, `action` string, `goodsid` string, `place` string, `extend1` string, `category` string, `server_time` string ) PARTITIONED BY (dt string) location '/warehouse/gmall/dwd/dwd_display_log/';
然后是利用腳本,將數據從事件基礎明細表,導入到特定的事件明細表。下面是一個包含了商品點擊,詳情,列表,廣告,消息通知等事件的完整腳本,雖然很長,但是每一種事件的處理邏輯都是一樣的。
①get_json_object(event_json,'$.kv.action')是一個hive內置的函數,可以從json串中取值,$符號表示此json本身。
②where dt='$do_date' and event_name='display' 通過上一步處理的事件名稱來區分,以導入不同的事件明細表。
#!/bin/bash # 定義變量方便修改 APP=gmall hive=/opt/module/hive/bin/hive # 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天 if [ -n "$1" ] ;then do_date=$1 else do_date=`date -d "-1 day" +%F` fi sql=" set hive.exec.dynamic.partition.mode=nonstrict; insert overwrite table "$APP".dwd_display_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.goodsid') goodsid, get_json_object(event_json,'$.kv.place') place, get_json_object(event_json,'$.kv.extend1') extend1, get_json_object(event_json,'$.kv.category') category, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='display'; insert overwrite table "$APP".dwd_newsdetail_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.entry') entry, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.goodsid') goodsid, get_json_object(event_json,'$.kv.showtype') showtype, get_json_object(event_json,'$.kv.news_staytime') news_staytime, get_json_object(event_json,'$.kv.loading_time') loading_time, get_json_object(event_json,'$.kv.type1') type1, get_json_object(event_json,'$.kv.category') category, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='newsdetail'; insert overwrite table "$APP".dwd_loading_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.loading_time') loading_time, get_json_object(event_json,'$.kv.loading_way') loading_way, get_json_object(event_json,'$.kv.extend1') extend1, get_json_object(event_json,'$.kv.extend2') extend2, get_json_object(event_json,'$.kv.type') type, get_json_object(event_json,'$.kv.type1') type1, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='loading'; insert overwrite table "$APP".dwd_ad_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.entry') entry, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.content') content, get_json_object(event_json,'$.kv.detail') detail, get_json_object(event_json,'$.kv.source') ad_source, get_json_object(event_json,'$.kv.behavior') behavior, get_json_object(event_json,'$.kv.newstype') newstype, get_json_object(event_json,'$.kv.show_style') show_style, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='ad'; insert overwrite table "$APP".dwd_notification_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.action') action, get_json_object(event_json,'$.kv.noti_type') noti_type, get_json_object(event_json,'$.kv.ap_time') ap_time, get_json_object(event_json,'$.kv.content') content, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='notification'; insert overwrite table "$APP".dwd_active_foreground_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.push_id') push_id, get_json_object(event_json,'$.kv.access') access, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='active_foreground'; insert overwrite table "$APP".dwd_active_background_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.active_source') active_source, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='active_background'; insert overwrite table "$APP".dwd_comment_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.comment_id') comment_id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.p_comment_id') p_comment_id, get_json_object(event_json,'$.kv.content') content, get_json_object(event_json,'$.kv.addtime') addtime, get_json_object(event_json,'$.kv.other_id') other_id, get_json_object(event_json,'$.kv.praise_count') praise_count, get_json_object(event_json,'$.kv.reply_count') reply_count, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='comment'; insert overwrite table "$APP".dwd_favorites_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.id') id, get_json_object(event_json,'$.kv.course_id') course_id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.add_time') add_time, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='favorites'; insert overwrite table "$APP".dwd_praise_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.id') id, get_json_object(event_json,'$.kv.userid') userid, get_json_object(event_json,'$.kv.target_id') target_id, get_json_object(event_json,'$.kv.type') type, get_json_object(event_json,'$.kv.add_time') add_time, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='praise'; insert overwrite table "$APP".dwd_error_log PARTITION (dt='$do_date') select mid_id, user_id, version_code, version_name, lang, source, os, area, model, brand, sdk_version, gmail, height_width, app_time, network, lng, lat, get_json_object(event_json,'$.kv.errorBrief') errorBrief, get_json_object(event_json,'$.kv.errorDetail') errorDetail, server_time from "$APP".dwd_base_event_log where dt='$do_date' and event_name='error'; " $hive -e "$sql"