問題描述:根據用戶標識和歷史庫的匹配結果,識別是否是新增用戶,單位:天
要求:歷史庫每天累加更新,要考錄用戶歷史數據庫的冪等性及回補數據策略
輸出:
- 用戶pushid
- pushid對應的uid(如果當天沒有沒有登錄就沒有對應的pushid則從歷史庫中匹配)
- pushid當天和uid是否有對應關系
- 用戶新增時間
- 用戶歷史所有投資次數
- 當天用戶投資次數
- 用戶每次投資時間(rechargeTime)
說明:
用戶標識有兩個 pushid、uid,pushid表示用戶的注冊id,登錄的時候才會存在,uid是用戶訪問的cookie(會頻繁變化)。
因此在業務中要關聯兩者之間的關系。
創建歷史庫:
CREATE TABLE IF NOT EXISTS `db_name`.`table_name` (
partition Date DEFAULT '1970-01-01', pushid String DEFAULT '', opTime DateTime DEFAULT 0, rechargeTime DateTime DEFAULT 0, # rechargeTime如果不是默認值則表示用戶發生投資時間 appkey String DEFAULT '', uid String DEFAULT '', ver UInt64 DEFAULT 0 ) ENGINE = ReplacingMergeTree(partition, (pushid, rechargeTime), 8192, ver)
利用ReplacingMergeTree實現數據冪等性,當重復入庫數據時會去除重復項,保證數據執行多次時數據不重復。
ver表示版本號,當數據重復時,會以最大的版本號為准,版本號可以是一個遞增的數字,業務中數據的版本號是插入的時的時間戳。
*其中:(pushid, rechargeTime)中的rechargeTime表示用戶復投時間,如果業務中沒有對用戶發生某一個行為特殊要求則可以刪除。
歷史庫更新代碼(每天更新):
INSERT INTO db_name.table_name SELECT partition, pushid, opTime, rechargeTime, appkey, uid, ver FROM ( SELECT partition, jhd_pushid AS pushid, jhd_opTime AS opTime, jhd_opTime AS rechargeTime, jhd_datatype AS appkey, jhd_userkey AS uid, CAST(835664 AS UInt64) AS ver FROM ncf_h5.userevent WHERE (partition = toDate('2017-03-28')) AND (jhd_pushid != '') AND (jhd_opType = 'page') AND (visitParamExtractString(jhd_map, 'uri') LIKE '%/pay_result%') UNION ALL SELECT partition, jhd_pushid AS pushid, min(jhd_opTime) AS opTime, toDateTime('1970-01-01 00:00:00') AS rechargeTime, jhd_datatype AS appkey, jhd_userkey AS uid, CAST(835664 AS UInt64) AS ver FROM ncf_h5.userevent WHERE (partition = toDate('2017-03-28')) AND (jhd_pushid != '') GROUP BY jhd_datatype, partition, pushid, jhd_userkey UNION ALL SELECT partition, jhd_pushid AS pushid, jhd_opTime AS opTime, jhd_opTime AS rechargeTime, jhd_datatype AS appkey, jhd_userkey AS uid, CAST(835664 AS UInt64) AS ver FROM ncf_ws.userevent WHERE (partition = toDate('2017-03-28')) AND (jhd_pushid != '') AND (jhd_opType = 'page') AND (visitParamExtractString(jhd_map, 'uri') LIKE '%/success%') UNION ALL SELECT partition, jhd_pushid AS pushid, min(jhd_opTime) AS opTime, toDateTime('1970-01-01 00:00:00') AS rechargeTime, jhd_datatype AS appkey, jhd_userkey AS uid, CAST(835664 AS UInt64) AS ver FROM ncf_ws.userevent WHERE (partition = toDate('2017-03-28')) AND (jhd_pushid != '') GROUP BY jhd_datatype, partition, pushid, jhd_userkey )
數據導出代碼:
數據格式:
pushid、是否當天登錄、uid、新增時間、用戶歷史所有投資次數、當天用戶投資次數、用戶每次投資時間
SELECT pushid, 1 AS isfind, uids, earliest, recharge_n, recharge_today, recharge_arr FROM ( SELECT pushid, CAST(earliest AS String) AS earliest, recharge_n, recharge_today, arrayMap(lambda(tuple(x), CAST(x AS String)), arrayFilter(lambda(tuple(x), x != '1970-01-01 00:00:00'), recharge_arr)) AS recharge_arr, arrayFilter(lambda(tuple(x), x != ''), uids) AS uids FROM ( SELECT pushid, groupUniqArray(uid) AS uids FROM ncf_common.user_pushid WHERE partition = toDate('2017-04-04') GROUP BY pushid ) ANY LEFT JOIN ( SELECT pushid, min(opTime) AS earliest, sumIf(1, rechargeTime != '1970-01-01 00:00:00') AS recharge_n, sumIf(1, toDate(rechargeTime) = toDate('2017-04-04')) AS recharge_today, groupArray(rechargeTime) AS recharge_arr FROM ncf_common.user_pushid WHERE (partition <= toDate('2017-04-04')) AND (partition >= (toDate('2017-04-04') - 365)) GROUP BY pushid ) USING (pushid) ) ARRAY JOIN uids UNION ALL SELECT pushid, 0 AS isfind, uids, CAST(earliest AS String) AS earliest, recharge_n, recharge_today, arrayMap(lambda(tuple(x), CAST(x AS String)), arrayFilter(lambda(tuple(x), x != '1970-01-01 00:00:00'), recharge_arr)) AS recharge_arr FROM ( SELECT pushid, groupUniqArray(uid) AS uids, min(opTime) AS earliest, sumIf(1, rechargeTime != '1970-01-01 00:00:00') AS recharge_n, sumIf(1, toDate(rechargeTime) = toDate('2017-04-04')) AS recharge_today, arrayFilter(lambda(tuple(x), x != '1970-01-01 00:00:00'), groupArray(rechargeTime)) AS recharge_arr FROM ( SELECT pushid, uid, opTime, rechargeTime FROM ( SELECT jhd_userkey AS uid, groupUniqArray(jhd_pushid) AS pushids, 'ncf_ws' AS appkey FROM ncf_ws.userevent WHERE partition = toDate('2017-04-04') GROUP BY jhd_userkey HAVING (length(pushids) = 1) AND has(pushids, '') UNION ALL SELECT jhd_userkey AS uid, groupUniqArray(jhd_pushid) AS pushids, 'ncf_h5' AS appkey FROM ncf_h5.userevent WHERE partition = toDate('2017-04-04') GROUP BY jhd_userkey HAVING (length(pushids) = 1) AND has(pushids, '') ) ALL INNER JOIN ( SELECT pushid, uid, opTime, rechargeTime, appkey FROM ncf_common.user_pushid WHERE partition < toDate('2017-04-04') ) USING (uid) ) GROUP BY pushid ) ARRAY JOIN uids
