1.什么是拉鏈表
拉鏈表是針對數據倉庫設計中表存儲數據的方式而定義的,顧名思義,所謂拉鏈,就是記錄歷史。記錄一個事物從開始,一直到當前狀態的所有變化的信息。
我們先看一個示例,這就是一張拉鏈表,存儲的是匯率以及每條記錄的生命周期。我們可以使用這張表拿到最新的當天的最新數據以及之前的歷史數據。
我們首先介紹一下我們公司用到的匯率分區拉鏈表
每個公司的拉鏈表設計可能並不相同但是拉鏈表以記錄生命周期的設計目的是不會改變的。
2.匯率拉鏈表轉日連續流水表
進行對間斷的時間序列補全,然后對null補全(這里的規則是取同類上一條數據的非空值)
3.匯率拉鏈表轉日連續流水表
代碼實現思路是
step1.使用utf生成連續的時間序列 left join exchangeRate拉鏈表
step2.使用開窗函數解決補空值問題
為了簡單我們用下面這個表代替
1.udtf函數
public class GenDay extends GenericUDTF { private PrimitiveObjectInspector poi1; private PrimitiveObjectInspector poi2; @Override public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException { if (argOIs.getAllStructFieldRefs().size() != 2) { throw new UDFArgumentException("參數個數只能為2"); } //如果輸入字段類型非String,則拋異常 ObjectInspector oi1 = argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector(); if (oi1.getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("參數非基本類型,需要基本類型"); } //如果輸入字段類型非String,則拋異常 ObjectInspector oi2 = argOIs.getAllStructFieldRefs().get(1).getFieldObjectInspector(); if (oi2.getCategory() != ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("參數非基本類型,需要基本類型"); } //強轉為基本類型對象檢查器 poi1 = (PrimitiveObjectInspector) oi1; if (poi1.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) { throw new UDFArgumentException("參數1非string,需要基本類型string"); } poi2 = (PrimitiveObjectInspector) oi2; if (poi2.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) { throw new UDFArgumentException("參數1非string,需要基本類型string"); } //構造字段名,word List<String> fieldNames = new ArrayList<String>(); fieldNames.add("everyday"); //構造字段類型,string List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(); //通過基本數據類型工廠獲取java基本類型oi fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector); //構造對象檢查器 return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); } @Override public void process(Object[] args) throws HiveException { Date dBegin=null; Date dEnd=null; //得到一行數據 String start = (String) poi1.getPrimitiveJavaObject(args[0]); String end = (String) poi2.getPrimitiveJavaObject(args[1]); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); try { dBegin = sdf.parse(start); dEnd = sdf.parse(end); } catch (ParseException e) { e.printStackTrace(); } assert dEnd != null; List<String> lDate=getDatesBetweenTwoDate(dBegin,dEnd); StringBuilder stringBuffer = new StringBuilder(); for (int i=0;i<lDate.size(); i += 1) { if (i!=0){ stringBuffer.append(" ").append(lDate.get(i)); }else { stringBuffer.append(lDate.get(i)); } } String s = stringBuffer.toString(); Object[] objs = new Object[1]; objs[0]= s; forward(objs); } @Override public void close() throws HiveException { } public List<String> getDatesBetweenTwoDate(Date beginDate, Date endDate) { List<String> lDate = new ArrayList<String>(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); lDate.add(sdf.format(beginDate)); Calendar cal = Calendar.getInstance(); // 使用給定的 Date 設置此 Calendar 的時間 cal.setTime(beginDate); while (true) { // 根據日歷的規則,為給定的日歷字段添加或減去指定的時間量 cal.add(Calendar.DAY_OF_MONTH, 1); // 測試此日期是否在指定日期之后 if (endDate.after(cal.getTime())) { lDate.add(sdf.format(cal.getTime())); } else { break; } } lDate.add(sdf.format(endDate));// 把結束時間加入集合 return lDate; } }
2.先用笛卡爾積找到所有的uid和連續完全的時間序列的組合,然后left join得到 時間連續但有空值的 序列。
select c.uid,c.everyday,d.event from (select a.uid,b.everyday from (select uid from group by big12.test) a join (select expode(split(everyday,' ')) as everyday select everyday from GenDay('2018-01-01','2018-12-31'))b --笛卡爾積 on 1=1) c left join test d on c.uid=d.uid and c.everyday=d.time;
像是這樣:
3.1用上一條數據補充字段空值(我自己想的)
不過必須單節點 對於匯率來說,一般我的口徑里只用到3-5個匯率,這樣最多1500條。數據量不大。有風險(自己玩吧別去生產)
package udf; import org.apache.hadoop.hive.ql.exec.UDF; public class GetNotNull extends UDF { private static String lrkey = null; private static String lrvalue = null; public String evaluate(String key, String value) { if (key.equals(lrkey)) { if (value.isEmpty()) { value = lrvalue; }else{ lrvalue=value; } } else { lrkey = key; lrvalue = value; } return value; } }
使用靜態類保存上一條非空值。
3.2用上一條數據補充字段空值
drop table if exists big12.test; create table big12.test( uid int, time string, event string )comment '' row format delimited fields terminated by '\031' stored as textfile ; insert into big12.test values(1,'2018-12-02 11:00:29','1'); insert into big12.test values(1,'2018-12-02 11:00:30',''); insert into big12.test values(1,'2018-12-02 11:00:31','2'); insert into big12.test values(1,'2018-12-02 11:00:32',''); insert into big12.test values(1,'2018-12-02 11:00:33',''); insert into big12.test values(2,'2018-12-02 11:00:40','3'); insert into big12.test values(2,'2018-12-02 11:00:41',''); insert into big12.test values(2,'2018-12-02 11:00:42','4'); insert into big12.test values(2,'2018-12-02 11:00:44',''); use big12; select t1.uid, t1.time, t2.event from ( select uid, time, event, row, all_row from ( select uid, time, event, row_number()over(partition by case when event is not null and trim(event)<>'' then 1 else 0 end order by time asc) as row, row_number()over( order by time asc) as all_row from test )t where event is null or trim(event)='' )t1 left join ( select uid, time, event, row, all_row from ( select uid, time, event, row_number()over(partition by case when event is not null and trim(event)<>'' then 1 else 0 end order by time asc) as row, row_number()over( order by time asc) as all_row from test )t where event is not null and trim(event)<>'' )t2 on t1.all_row-t1.row=t2.row union all select uid, time, event from test where event is not null and trim(event)<>'';