使用zipwithindex 算子給dataframe增加自增列 row_number函數實現自增,udf函數實現自增


DataFrame df = ...
StructType schema = df.schema().add(DataTypes.createStructField("id", DataTypes.LongType, false));

使用RDD的zipWithIndex得到索引,作為ID值:
JavaRDD<Row> rdd = df
.javaRDD() // 轉為JavaRDD
.zipWithIndex() // 添加索引,結果為JavaPairRDD<Row, Long>,即行數據和對應的索引
.map(new Function<Tuple2<Row, Long>, Row>() {
@Override
public Row call(Tuple2<Row, Long> v1) throws Exception {
Object[] objects = new Object[v1._1.size() + 1];
for (int i = 0; i < v1._1.size(); i++) {
objects[i] = v1._1.get(i);
}
objects[objects.length - 1] = v1._2;
return RowFactory.create(objects);
}
}); // 把索引值作為ID字段值,構造新的行數據

將RDD再轉回DataFrame
df = sqlContext.createDataFrame(rdd, schema);

多維數據倉庫中的維度表和事實表一般都需要有一個代理鍵,作為這些表的主鍵,代理鍵一般由單列的自增數字序列構成。Hive沒有關系數據庫中的自增列,但它也有一些對自增序列的支持,通常有兩種方法生成代理鍵:使用row_number()窗口函數或者使用一個名為UDFRowSequence的用戶自定義函數(UDF)。
  假設有維度表tbl_dim和過渡表tbl_stg,現在要將tbl_stg的數據裝載到tbl_dim,裝載的同時生成維度表的代理鍵。
用row_number()函數生成代理鍵
insert into tbl_dim
select row_number() over (order by tbl_stg.id) + t2.sk_max, tbl_stg.*
from tbl_stg
cross join (select coalesce(max(sk),0) sk_max from tbl_dim) t2;
        上面語句中,先查詢維度表中已有記錄最大的代理鍵值,如果維度表中還沒有記錄,利用coalesce函數返回0。然后使用cross join連接生成過渡表和最大代理鍵值的笛卡爾集,最后使用row_number()函數生成行號,並將行號與最大代理鍵值相加的值,作為新裝載記錄的代理鍵。
用UDFRowSequence生成代理鍵
add jar hdfs:///user/hive-contrib-2.0.0.jar;
create temporary function row_sequence as 'org.apache.hadoop.hive.contrib.udf.udfrowsequence';

insert into tbl_dim
select row_sequence() + t2.sk_max, tbl_stg.*
from tbl_stg
cross join (select coalesce(max(sk),0) sk_max from tbl_dim) t2;
        hive-contrib-2.0.0.jar中包含一個生成記錄序號的自定義函數udfrowsequence。上面的語句先加載JAR包,然后創建一個名為row_sequence()的臨時函數作為調用UDF的接口,這樣可以為查詢的結果集生成一個自增偽列。之后就和row_number()寫法類似了,只不過將窗口函數row_number()替換為row_sequence()函數。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM