spark DataFrame新增一列id列(單調遞增,不重復)的幾種方法


方案一:使用functions里面的monotonically_increasing_id(),生成單調遞增,不保證連續,最大64bit,的一列.分區數不變。

import org.apache.spark.sql.functions._
val df1 = spark.range(0,1000).toDF("col1")
val df2 = df1.withColumn("id", monotonically_increasing_id())

注意:有多個分區的時候,每個分區里面是單調遞增,step為1,分區之間不保證連續,如一共兩個分區,0分區id是0-499,1分區id可能99000-99499,甚至更大,最大64bit的integer。

如果想要整體連續,可以先repartition(1),操作完后在repartition(n)

 

方案二:使用row_number().over(Windo.orderBy(ColName)),生成按某列排序后,新增單調遞增,連續的一列。操作完后分區數變為1,id列從1開始。

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
 
val df1 = spark.range(0,1000).toDF("col1")
 
println(df.rdd.getNumPartitions)
 
val w = Window.orderBy("col1")
val df2 = df.withColumn("id", row_number().over(w))
 
println(df2.rdd.getNumPartitions

 

方案三:將DataFrame轉成RDD,使用RDD的方法zipWithIndex()/zipWithUniqueId(),分區數不變。

val df1: DataFrame = spark.range(0,1000000).toDF("col1")
 
 
//轉成rdd並使用zipWithIndex()
var tempRDD: RDD[(Row, Long)] = df1.rdd.zipWithIndex()
//使用map
val record: RDD[Row] = tempRDD.map(x => {
      Row(x._1.get(0), x._2)
    })
 
val schema= new StructType().add("col1","long")
        .add("id","long")
spark.createDataFrame(record,schema).show()

zipWithIndex():首先基於分區索引排序,然后是每個分區中的項的排序。所以第一個分區中的第一項得到索引0,最后一個分區中的最后一項得到最大的索引。從0開始

zipWithUniqueId(): 每個分區是一個等差數列,等差為分區數n,每個分區的第一個值為分區id(id從0開始)。第k個分區:num*n+k,num在每個分區都是從0開始,step為1

3個分區,abc 0分區   def  1分區   ghi  2分區
col1         id
a         0*3+0=0
b         1*3+0=3
c         2*3+0=6
d         0*3+1=1
e         1*3+1=4
f         2*3+1=7
g         0*3+2=2
h         1*3+2=5
i         2*3+2=8

  


原文鏈接:https://blog.csdn.net/liaodaoluyun/article/details/86232639


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM