方法一:利用createDataFrame方法,新增列的過程包含在構建rdd和schema中
方法二:利用withColumn方法,新增列的過程包含在udf函數中
方法三:利用SQL代碼,新增列的過程直接寫入SQL代碼中
方法四:以上三種是增加一個有判斷的列,如果想要增加一列唯一序號,可以使用monotonically_increasing_id
代碼塊:
//dataframe新增一列方法1,利用createDataFrame方法
val trdd = input.select(targetColumns).rdd.map(x=>{ if (x.get(0).toString().toDouble > critValueR || x.get(0).toString().toDouble < critValueL) Row(x.get(0).toString().toDouble,"F") else Row(x.get(0).toString().toDouble,"T") }) val schema = input.select(targetColumns).schema.add("flag", StringType, true) val sample3 = ss.createDataFrame(trdd, schema).distinct().withColumnRenamed(targetColumns, "idx") //dataframe新增一列方法2
val code :(Int => String) = (arg: Int) => {if (arg > critValueR || arg < critValueL) "F" else "T"} val addCol = udf(code) val sample3 = input.select(targetColumns).withColumn("flag", addCol(input(targetColumns))) .withColumnRenamed(targetColumns, "idx") //dataframe新增一列方法3
input.select(targetColumns).createOrReplaceTempView("tmp") val sample3 = ss.sqlContext.sql("select distinct "+targetColname+
" as idx,case when "+targetColname+">"+critValueR+" then 'F'"+
" when "+targetColname+"<"+critValueL+" then 'F' else 'T' end as flag from tmp") //添加序號列新增一列方法4
import org.apache.spark.sql.functions.monotonically_increasing_id val inputnew = input.withColumn("idx", monotonically_increasing_id)