hive 匯率拉鏈表轉日連續流水表


1.什么是拉鏈表

拉鏈表是針對數據倉庫設計中表存儲數據的方式而定義的,顧名思義,所謂拉鏈,就是記錄歷史。記錄一個事物從開始,一直到當前狀態的所有變化的信息。

我們先看一個示例,這就是一張拉鏈表,存儲的是匯率以及每條記錄的生命周期。我們可以使用這張表拿到最新的當天的最新數據以及之前的歷史數據。
我們首先介紹一下我們公司用到的匯率分區拉鏈表

每個公司的拉鏈表設計可能並不相同但是拉鏈表以記錄生命周期的設計目的是不會改變的。

 2.匯率拉鏈表轉日連續流水表

進行對間斷的時間序列補全,然后對null補全(這里的規則是取同類上一條數據的非空值)

 3.匯率拉鏈表轉日連續流水表

代碼實現思路是

step1.使用utf生成連續的時間序列 left join exchangeRate拉鏈表

step2.使用開窗函數解決補空值問題

 

為了簡單我們用下面這個表代替

 

1.udtf函數

public class GenDay extends GenericUDTF {
    private PrimitiveObjectInspector poi1;
    private PrimitiveObjectInspector poi2;
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        if (argOIs.getAllStructFieldRefs().size() != 2) {
            throw new UDFArgumentException("參數個數只能為2");
        }
        //如果輸入字段類型非String,則拋異常
        ObjectInspector oi1 = argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector();
        if (oi1.getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("參數非基本類型,需要基本類型");
        }
        //如果輸入字段類型非String,則拋異常
        ObjectInspector oi2 = argOIs.getAllStructFieldRefs().get(1).getFieldObjectInspector();
        if (oi2.getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("參數非基本類型,需要基本類型");
        }
        //強轉為基本類型對象檢查器
        poi1 = (PrimitiveObjectInspector) oi1;
        if (poi1.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            throw new UDFArgumentException("參數1非string,需要基本類型string");
        }
        poi2 = (PrimitiveObjectInspector) oi2;
        if (poi2.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            throw new UDFArgumentException("參數1非string,需要基本類型string");
        }

        //構造字段名,word
        List<String> fieldNames = new ArrayList<String>();
        fieldNames.add("everyday");


        //構造字段類型,string
        List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
        //通過基本數據類型工廠獲取java基本類型oi
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);


        //構造對象檢查器
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
                fieldOIs);

    }

    @Override
    public void process(Object[] args) throws HiveException {
        Date dBegin=null;
        Date dEnd=null;


        //得到一行數據
        String start = (String) poi1.getPrimitiveJavaObject(args[0]);
        String end  = (String) poi2.getPrimitiveJavaObject(args[1]);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        try {
             dBegin = sdf.parse(start);
             dEnd = sdf.parse(end);
        } catch (ParseException e) {
            e.printStackTrace();
        }

        assert dEnd != null;
        List<String> lDate=getDatesBetweenTwoDate(dBegin,dEnd);
        StringBuilder stringBuffer = new StringBuilder();
        for (int i=0;i<lDate.size(); i += 1) {
          if (i!=0){
              stringBuffer.append(" ").append(lDate.get(i));
          }else {
              stringBuffer.append(lDate.get(i));

          }

        }
        String s = stringBuffer.toString();
        Object[] objs = new Object[1];
        objs[0]= s;
        forward(objs);
        
    }

    @Override
    public void close() throws HiveException {

    }

    public  List<String> getDatesBetweenTwoDate(Date beginDate, Date endDate) {
        List<String> lDate = new ArrayList<String>();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");

        lDate.add(sdf.format(beginDate));
        Calendar cal = Calendar.getInstance();
        // 使用給定的 Date 設置此 Calendar 的時間
        cal.setTime(beginDate);
        while (true) {
            // 根據日歷的規則,為給定的日歷字段添加或減去指定的時間量
            cal.add(Calendar.DAY_OF_MONTH, 1);
            // 測試此日期是否在指定日期之后
            if (endDate.after(cal.getTime())) {
                lDate.add(sdf.format(cal.getTime()));
            } else {
                break;
            }
        }
        lDate.add(sdf.format(endDate));// 把結束時間加入集合
        return lDate;
    }

}

2.先用笛卡爾積找到所有的uid和連續完全的時間序列的組合,然后left join得到 時間連續但有空值的 序列。

select c.uid,c.everyday,d.event
from
(select a.uid,b.everyday from 
(select uid from group by big12.test) a 
join  (select expode(split(everyday,' ')) as everyday select everyday from GenDay('2018-01-01','2018-12-31'))b
--笛卡爾積
on 1=1) c
left join test d
on c.uid=d.uid and c.everyday=d.time;

像是這樣:

 

3.1用上一條數據補充字段空值(我自己想的)

不過必須單節點 對於匯率來說,一般我的口徑里只用到3-5個匯率,這樣最多1500條。數據量不大。有風險(自己玩吧別去生產)

package udf;

import org.apache.hadoop.hive.ql.exec.UDF;

public class GetNotNull extends UDF {

    private static String lrkey = null;
    private static String lrvalue = null;


    public String evaluate(String key, String value) {
        if (key.equals(lrkey)) {
            if (value.isEmpty()) {
                value = lrvalue;
            }else{
                lrvalue=value;
            }
        } else {
            lrkey = key;
            lrvalue = value;
        }
        return value;
    }
}

使用靜態類保存上一條非空值。

3.2用上一條數據補充字段空值

drop table if exists big12.test;
create table big12.test( 
uid int,
time string,
event string
)comment ''
row format delimited
fields terminated by '\031'
stored as textfile
;

insert into big12.test values(1,'2018-12-02 11:00:29','1');
insert into big12.test values(1,'2018-12-02 11:00:30','');
insert into big12.test values(1,'2018-12-02 11:00:31','2');
insert into big12.test values(1,'2018-12-02 11:00:32','');
insert into big12.test values(1,'2018-12-02 11:00:33','');
insert into big12.test values(2,'2018-12-02 11:00:40','3');
insert into big12.test values(2,'2018-12-02 11:00:41','');
insert into big12.test values(2,'2018-12-02 11:00:42','4');
insert into big12.test values(2,'2018-12-02 11:00:44','');


use big12;
select
      t1.uid,
      t1.time,
      t2.event
from 
(
    select
          uid,
          time,
          event,
          row,
          all_row
      from 
      (
      select
      uid,
      time,
      event,
      row_number()over(partition by case when event is not null and trim(event)<>'' then 1 else 0 end order by time asc) as row,
      row_number()over( order by time asc) as all_row
      from test
      )t 
      where event is  null or trim(event)=''
)t1 
left join 
(
    select
          uid,
          time,
          event,
          row,
          all_row
    from 
     (
      select
      uid,
      time,
      event,
      row_number()over(partition by case when event is not null and trim(event)<>'' then 1 else 0 end order by time asc) as row,
      row_number()over( order by time asc) as all_row
      from test
      )t 
     where event is not  null and trim(event)<>''
)t2 
on  t1.all_row-t1.row=t2.row
union all
select
uid,
time,
event
from test
where event is not  null and trim(event)<>'';

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM