方案一:使用functions里面的monotonically_increasing_id(),生成單調遞增,不保證連續,最大64bit,的一列.分區數不變。
import org.apache.spark.sql.functions._ val df1 = spark.range(0,1000).toDF("col1") val df2 = df1.withColumn("id", monotonically_increasing_id())
注意:有多個分區的時候,每個分區里面是單調遞增,step為1,分區之間不保證連續,如一共兩個分區,0分區id是0-499,1分區id可能99000-99499,甚至更大,最大64bit的integer。
如果想要整體連續,可以先repartition(1),操作完后在repartition(n)
方案二:使用row_number().over(Windo.orderBy(ColName)),生成按某列排序后,新增單調遞增,連續的一列。操作完后分區數變為1,id列從1開始。
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.row_number val df1 = spark.range(0,1000).toDF("col1") println(df.rdd.getNumPartitions) val w = Window.orderBy("col1") val df2 = df.withColumn("id", row_number().over(w)) println(df2.rdd.getNumPartitions
方案三:將DataFrame轉成RDD,使用RDD的方法zipWithIndex()/zipWithUniqueId(),分區數不變。
val df1: DataFrame = spark.range(0,1000000).toDF("col1") //轉成rdd並使用zipWithIndex() var tempRDD: RDD[(Row, Long)] = df1.rdd.zipWithIndex() //使用map val record: RDD[Row] = tempRDD.map(x => { Row(x._1.get(0), x._2) }) val schema= new StructType().add("col1","long") .add("id","long") spark.createDataFrame(record,schema).show()
zipWithIndex():首先基於分區索引排序,然后是每個分區中的項的排序。所以第一個分區中的第一項得到索引0,最后一個分區中的最后一項得到最大的索引。從0開始
zipWithUniqueId(): 每個分區是一個等差數列,等差為分區數n,每個分區的第一個值為分區id(id從0開始)。第k個分區:num*n+k,num在每個分區都是從0開始,step為1
3個分區,abc 0分區 def 1分區 ghi 2分區 col1 id a 0*3+0=0 b 1*3+0=3 c 2*3+0=6 d 0*3+1=1 e 1*3+1=4 f 2*3+1=7 g 0*3+2=2 h 1*3+2=5 i 2*3+2=8
原文鏈接:https://blog.csdn.net/liaodaoluyun/article/details/86232639