數據倉庫 用戶行為數倉 DWD數據明細層操作示例


DWD(Data Warehouse Detail):數據明細層,結構和粒度與原始表保持一致,對ODS層數據進行清洗(取出空值、臟數據、超過極限范圍的數據)。

DWD層的數據來源於ODS原始數據層,在原始數據層的Hive表里,只有一個字段,存儲了原始的一條條日志信息,下面以事件(如商品點擊事件,展示詳情事件)日志來說明,原始日志如下:

1593095829089|{
    "cm":{
        "ln":"-89.3",
        "sv":"V2.6.6",
        "os":"8.0.3",
        "g":"SU1Z29ZJ@gmail.com",
        "mid":"1",
        "nw":"3G",
        "l":"en",
        "vc":"3",
        "hw":"640*1136",
        "ar":"MX",
        "uid":"1",
        "t":"1593002588300",
        "la":"-16.2",
        "md":"sumsung-3",
        "vn":"1.2.2",
        "ba":"Sumsung",
        "sr":"D"
    },
    "ap":"app",
    "et":[
        {
            "ett":"1593077273840",
            "en":"display",
            "kv":{
                "goodsid":"0",
                "action":"2",
                "extend1":"2",
                "place":"1",
                "category":"93"
            }
        },
        {
            "ett":"1593052169678",
            "en":"loading",
            "kv":{
                "extend2":"",
                "loading_time":"54",
                "action":"1",
                "extend1":"",
                "type":"1",
                "type1":"102",
                "loading_way":"1"
            }
        },
        {
            "ett":"1593013890514",
            "en":"notification",
            "kv":{
                "ap_time":"1593003516686",
                "action":"4",
                "type":"2",
                "content":""
            }
        },
        {
            "ett":"1592999171192",
            "en":"error",
            "kv":{
                "errorDetail":"java.lang.NullPointerException\\n    at cn.lift.appIn.web.AbstractBaseController.validInbound(AbstractBaseController.java:72)\\n at cn.lift.dfdf.web.AbstractBaseController.validInbound",
                "errorBrief":"at cn.lift.appIn.control.CommandUtil.getInfo(CommandUtil.java:67)"
            }
        },
        {
            "ett":"1593002958311",
            "en":"comment",
            "kv":{
                "p_comment_id":1,
                "addtime":"1593079427374",
                "praise_count":188,
                "other_id":0,
                "comment_id":9,
                "reply_count":193,
                "userid":3,
                "content":"塗士震嫩廟胞洪郵騙具捶贛鍺塌舅捕瀝爺"
            }
        },
        {
            "ett":"1593052803303",
            "en":"favorites",
            "kv":{
                "course_id":4,
                "id":0,
                "add_time":"1593044515996",
                "userid":7
            }
        },
        {
            "ett":"1593095771819",
            "en":"praise",
            "kv":{
                "target_id":8,
                "id":5,
                "type":4,
                "add_time":"1593000096852",
                "userid":8
            }
        }]
}

數據格式為服務器時間|事件json,json中又包括公共字段cm,數據來源ap,以及事件數組et。由於事件是一段時間提交一次,是一個包含了多個不同類型事件的json數組,用en字段區分不同的事件,如display表示商品點擊事件。因此在這里的處理需要經過兩步,首先將ODS表中的長傳json解析成一個個字段的DWD層的基礎明細表,並且利用UDTF函數,將事件數組中的每個事件炸裂開,這些數據全部放在基礎明細表里。然后針對不同的事件,將某一類事件過濾出來,並且取出事件中的kv值,放在特定的某一事件的DWD明細表中。

一 基礎事件明細表

基礎事件明細表包含了所有類型的事件數據,需要定義一個UDF函數,用來拆分長串的日志,將其處理成一個規則的格式,即以\t分隔的字符串,后續可以通過hive自帶的split函數轉化成數組,利用下標取值。

public class BaseFieldUDF extends UDF {

    public String evaluate(String line,String keysStr){
        String[] keysArr = keysStr.split(",");

        //原始時間日志格式:時間|json日志
        String[] logContent = line.split("\\|");
        if (logContent.length != 2 || StringUtils.isBlank(logContent[1])){
            return "";
        }

        StringBuffer sb = new StringBuffer();
        try {
            //拼接公共字段
            JSONObject jsonObject = new JSONObject(logContent[1]);
            JSONObject cm = jsonObject.getJSONObject("cm");
            for (int i = 0; i < keysArr.length; i++) {
                String key = keysArr[i].trim();
                if (cm.has(key)){
                    sb.append(cm.getString(key)).append("\t");
                }

            }

            //拼接事件字段
            sb.append(jsonObject.getString("et")).append("\t");

            //拼接服務器時間
            sb.append(logContent[0]).append("\t");


        } catch (JSONException e) {
            e.printStackTrace();
        }

        return sb.toString();
    }
    
}

然后定義一個UDTF函數,用來對事件數組進行炸裂。傳入的是1行1列的事件數組,返回的是2列多行的數據,第1列是事件名,稍后利用這個事件名過濾出不同的事件明細表,第2列是事件的詳情kv信息。

public class EventJsonUDTF extends GenericUDTF {

    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
        List<String> fieldNames = new ArrayList<>();
        List<ObjectInspector> fieldTypes = new ArrayList<>();

        fieldNames.add("event_name");
        fieldTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        fieldNames.add("event_json");
        fieldTypes.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldTypes);
    }

    @Override
    public void process(Object[] objects) throws HiveException {
        //獲取輸入數據
        String input = objects[0].toString();

        if (StringUtils.isBlank(input)){
            return;
        }else {
            try {
                JSONArray ja = new JSONArray(input);
                String[] result = new String[2];

                for (int i = 0; i < ja.length(); i++) {

                    try {
                        result[0] = ja.getJSONObject(i).getString("en");
                        result[1] = ja.getString(i);
                    } catch (JSONException e) {
                        //防止因為某個數據的錯誤結束整個循環
                        continue;
                    }
                }

                //進來一行數據,返回2列多行數據
                forward(result);

            } catch (JSONException e) {
                e.printStackTrace();
            }
        }

    }

    @Override
    public void close() throws HiveException {

    }
}

接下來就是創建存儲事件基礎明細需要的表。event_name和event_json字段就是利用UDTF函數得到的結果。

drop table if exists dwd_base_event_log;
CREATE EXTERNAL TABLE dwd_base_event_log(
`mid_id` string,
`user_id` string, 
`version_code` string, 
`version_name` string, 
`lang` string, 
`source` string, 
`os` string, 
`area` string, 
`model` string,
`brand` string, 
`sdk_version` string, 
`gmail` string, 
`height_width` string, 
`app_time` string, 
`network` string, 
`lng` string, 
`lat` string, 
`event_name` string, 
`event_json` string, 
`server_time` string)
PARTITIONED BY (`dt` string)
stored as parquet
location '/warehouse/gmall/dwd/dwd_base_event_log/';

然后利用腳本將數據導入到基礎明細表。

①需要在執行的sql中添加自定的UDF函數base_analizer,和UDTF函數flat_analizer。

②where條件中加了 base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la')<>'',是因為在我們自定義的UDF函數中如果數據錯誤,會返回"",所以在這里將其過濾掉。

③因為分區字段賦值了do_date,非嚴格模式似乎並沒有必要。

④UDTF函數返回2列的寫法 lateral view flat_analizer(ops) tmp_k as event_name, event_json

⑤因為我們建的是分區表,因此insert overwrite只會覆蓋當前分區的數據,並不會覆蓋表中的全部分區的數據。

#!/bin/bash

# 定義變量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "$1" ] ;then
        do_date=$1
else 
        do_date=`date -d "-1 day" +%F`  
fi 

sql="
        add jar /opt/module/hive/hivefunction-1.0-SNAPSHOT.jar;

        create temporary function base_analizer as 'com.atguigu.udf.BaseFieldUDF';
        create temporary function flat_analizer as 'com.atguigu.udtf.EventJsonUDTF';

        set hive.exec.dynamic.partition.mode=nonstrict;

        insert overwrite table "$APP".dwd_base_event_log 
        PARTITION (dt='$do_date')
        select
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source ,
        os ,
        area ,
        model ,
        brand ,
        sdk_version ,
        gmail ,
        height_width ,
        network ,
        lng ,
        lat ,
        app_time ,
        event_name , 
        event_json , 
        server_time  
        from
        (
        select
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[0]   as mid_id,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[1]   as user_id,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[2]   as version_code,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[3]   as version_name,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[4]   as lang,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[5]   as source,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[6]   as os,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[7]   as area,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[8]   as model,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[9]   as brand,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[10]   as sdk_version,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[11]  as gmail,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[12]  as height_width,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[13]  as app_time,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[14]  as network,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[15]  as lng,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[16]  as lat,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[17]  as ops,
        split(base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la'),'\t')[18]  as server_time
        from "$APP".ods_event_log where dt='$do_date'  and base_analizer(line,'mid,uid,vc,vn,l,sr,os,ar,md,ba,sv,g,hw,t,nw,ln,la')<>'' 
        ) sdk_log lateral view flat_analizer(ops) tmp_k as event_name, event_json;
"

$hive -e "$sql"

 

二 特定事件明細表

特定事件明細表與基礎事件明細表的字段大體一樣,只是有2處改動

①去掉event_name字段,因為此表中存的就是這一類事件,不再需要event_name來區分。

②將event_json中描述事件詳情的kv取出來,形成新的字段。

以商品點擊表為例,建表語句如下。去掉了event_name字段,新增了kv信息中的action,goodsid,place,extend1,category五個字段。

drop table if exists dwd_display_log;
CREATE EXTERNAL TABLE dwd_display_log(
`mid_id` string,
`user_id` string,
`version_code` string,
`version_name` string,
`lang` string,
`source` string,
`os` string,
`area` string,
`model` string,
`brand` string,
`sdk_version` string,
`gmail` string,
`height_width` string,
`app_time` string,
`network` string,
`lng` string,
`lat` string,
`action` string,
`goodsid` string,
`place` string,
`extend1` string,
`category` string,
`server_time` string
)
PARTITIONED BY (dt string)
location '/warehouse/gmall/dwd/dwd_display_log/';

然后是利用腳本,將數據從事件基礎明細表,導入到特定的事件明細表。下面是一個包含了商品點擊,詳情,列表,廣告,消息通知等事件的完整腳本,雖然很長,但是每一種事件的處理邏輯都是一樣的。

①get_json_object(event_json,'$.kv.action')是一個hive內置的函數,可以從json串中取值,$符號表示此json本身。

②where dt='$do_date' and event_name='display' 通過上一步處理的事件名稱來區分,以導入不同的事件明細表。

#!/bin/bash

# 定義變量方便修改
APP=gmall
hive=/opt/module/hive/bin/hive

# 如果是輸入的日期按照取輸入日期;如果沒輸入日期取當前時間的前一天
if [ -n "$1" ] ;then
        do_date=$1
else 
        do_date=`date -d "-1 day" +%F`  
fi 

sql="
set hive.exec.dynamic.partition.mode=nonstrict;

insert overwrite table "$APP".dwd_display_log
PARTITION (dt='$do_date')
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,'$.kv.action') action,
        get_json_object(event_json,'$.kv.goodsid') goodsid,
        get_json_object(event_json,'$.kv.place') place,
        get_json_object(event_json,'$.kv.extend1') extend1,
        get_json_object(event_json,'$.kv.category') category,
        server_time
from "$APP".dwd_base_event_log 
where dt='$do_date' and event_name='display';


insert overwrite table "$APP".dwd_newsdetail_log
PARTITION (dt='$do_date')
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,'$.kv.entry') entry,
        get_json_object(event_json,'$.kv.action') action,
        get_json_object(event_json,'$.kv.goodsid') goodsid,
        get_json_object(event_json,'$.kv.showtype') showtype,
        get_json_object(event_json,'$.kv.news_staytime') news_staytime,
        get_json_object(event_json,'$.kv.loading_time') loading_time,
        get_json_object(event_json,'$.kv.type1') type1,
        get_json_object(event_json,'$.kv.category') category,
        server_time
from "$APP".dwd_base_event_log 
where dt='$do_date' and event_name='newsdetail';


insert overwrite table "$APP".dwd_loading_log
PARTITION (dt='$do_date')
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,'$.kv.action') action,
        get_json_object(event_json,'$.kv.loading_time') loading_time,
        get_json_object(event_json,'$.kv.loading_way') loading_way,
        get_json_object(event_json,'$.kv.extend1') extend1,
        get_json_object(event_json,'$.kv.extend2') extend2,
        get_json_object(event_json,'$.kv.type') type,
        get_json_object(event_json,'$.kv.type1') type1,
        server_time
from "$APP".dwd_base_event_log 
where dt='$do_date' and event_name='loading';


insert overwrite table "$APP".dwd_ad_log
PARTITION (dt='$do_date')
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,'$.kv.entry') entry,
        get_json_object(event_json,'$.kv.action') action,
        get_json_object(event_json,'$.kv.content') content,
        get_json_object(event_json,'$.kv.detail') detail,
        get_json_object(event_json,'$.kv.source') ad_source,
        get_json_object(event_json,'$.kv.behavior') behavior,
        get_json_object(event_json,'$.kv.newstype') newstype,
        get_json_object(event_json,'$.kv.show_style') show_style,
        server_time
from "$APP".dwd_base_event_log 
where dt='$do_date' and event_name='ad';


insert overwrite table "$APP".dwd_notification_log
PARTITION (dt='$do_date')
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,'$.kv.action') action,
        get_json_object(event_json,'$.kv.noti_type') noti_type,
        get_json_object(event_json,'$.kv.ap_time') ap_time,
        get_json_object(event_json,'$.kv.content') content,
        server_time
from "$APP".dwd_base_event_log 
where dt='$do_date' and event_name='notification';


insert overwrite table "$APP".dwd_active_foreground_log
PARTITION (dt='$do_date')
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
get_json_object(event_json,'$.kv.push_id') push_id,
get_json_object(event_json,'$.kv.access') access,
        server_time
from "$APP".dwd_base_event_log 
where dt='$do_date' and event_name='active_foreground';


insert overwrite table "$APP".dwd_active_background_log
PARTITION (dt='$do_date')
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,'$.kv.active_source') active_source,
        server_time
from "$APP".dwd_base_event_log 
where dt='$do_date' and event_name='active_background';


insert overwrite table "$APP".dwd_comment_log
PARTITION (dt='$do_date')
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,'$.kv.comment_id') comment_id,
        get_json_object(event_json,'$.kv.userid') userid,
        get_json_object(event_json,'$.kv.p_comment_id') p_comment_id,
        get_json_object(event_json,'$.kv.content') content,
        get_json_object(event_json,'$.kv.addtime') addtime,
        get_json_object(event_json,'$.kv.other_id') other_id,
        get_json_object(event_json,'$.kv.praise_count') praise_count,
        get_json_object(event_json,'$.kv.reply_count') reply_count,
        server_time
from "$APP".dwd_base_event_log 
where dt='$do_date' and event_name='comment';


insert overwrite table "$APP".dwd_favorites_log
PARTITION (dt='$do_date')
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,'$.kv.id') id,
        get_json_object(event_json,'$.kv.course_id') course_id,
        get_json_object(event_json,'$.kv.userid') userid,
        get_json_object(event_json,'$.kv.add_time') add_time,
        server_time
from "$APP".dwd_base_event_log 
where dt='$do_date' and event_name='favorites';


insert overwrite table "$APP".dwd_praise_log
PARTITION (dt='$do_date')
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,'$.kv.id') id,
        get_json_object(event_json,'$.kv.userid') userid,
        get_json_object(event_json,'$.kv.target_id') target_id,
        get_json_object(event_json,'$.kv.type') type,
        get_json_object(event_json,'$.kv.add_time') add_time,
        server_time
from "$APP".dwd_base_event_log 
where dt='$do_date' and event_name='praise';


insert overwrite table "$APP".dwd_error_log
PARTITION (dt='$do_date')
select 
        mid_id,
        user_id,
        version_code,
        version_name,
        lang,
        source,
        os,
        area,
        model,
        brand,
        sdk_version,
        gmail,
        height_width,
        app_time,
        network,
        lng,
        lat,
        get_json_object(event_json,'$.kv.errorBrief') errorBrief,
        get_json_object(event_json,'$.kv.errorDetail') errorDetail,
        server_time
from "$APP".dwd_base_event_log 
where dt='$do_date' and event_name='error';
"

$hive -e "$sql"

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM