參考:
spark中文官方網址:http://spark.apachecn.org/#/
https://www.iteblog.com/archives/1674.html
一、知識點:
1、Dataframe新增一列:https://www.cnblogs.com/itboys/p/9762808.html
方法四和五是新增一列唯一ID
方法一:利用createDataFrame方法,新增列的過程包含在構建rdd和schema中
方法二:利用withColumn方法,新增列的過程包含在udf函數中
方法三:利用SQL代碼,新增列的過程直接寫入SQL代碼中
方法四:以上三種是增加一個有判斷的列,如果想要增加一列唯一序號,可以使用monotonically_increasing_id
方法五:使用zipWithUniqueId獲取id 並重建 DataFrame.
// dataframe新增一列方法1,利用createDataFrame方法 val trdd = input.select(targetColumns).rdd.map(x=>{ if (x.get(0).toString().toDouble > critValueR || x.get(0).toString().toDouble < critValueL) Row(x.get(0).toString().toDouble,"F") else Row(x.get(0).toString().toDouble,"T") }) val schema = input.select(targetColumns).schema.add("flag", StringType, true) val sample3 = ss.createDataFrame(trdd, schema).distinct().withColumnRenamed(targetColumns, "idx") // dataframe新增一列方法2 val code :(Int => String) = (arg: Int) => {if (arg > critValueR || arg < critValueL) "F" else "T"} val addCol = udf(code) val sample3 = input.select(targetColumns).withColumn("flag", addCol(input(targetColumns))) .withColumnRenamed(targetColumns, "idx") // dataframe新增一列方法3 input.select(targetColumns).createOrReplaceTempView("tmp") val sample3 = ss.sqlContext.sql("select distinct "+targetColname+ " as idx,case when "+targetColname+">"+critValueR+" then 'F'"+ " when "+targetColname+"<"+critValueL+" then 'F' else 'T' end as flag from tmp") // 添加序號列新增一列方法4 import org.apache.spark.sql.functions.monotonically_increasing_id val inputnew = input.withColumn("idx", monotonically_increasing_id) // 這個id雖然是唯一的,但是不能從零開始,也不是順序排列,可以簡單理解為是隨機產生的標識碼
// 方法五:使用zipWithUniqueId獲取id 並重建 DataFrame.
import spark.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df =Seq(("a", -1.0), ("b", -2.0), ("c", -3.0)).toDF("foo", "bar")
// 獲取df 的表頭
val s = df.schema
// 將原表轉換成帶有rdd, //再轉換成帶有id的rdd, //再展開成Seq方便轉化成 Dataframe val rows = df.rdd.zipWithUniqueId.map{case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)} // 再由 row 根據原表頭進行轉換 val dfWithPK = spark.createDataFrame( rows, StructType(StructField("id", LongType, false) +: s.fields))
2、新增一列ID:https://blog.csdn.net/liaodaoluyun/article/details/86232639
二、wordcount
package com.qihoo.spark.examles import com.qihoo.spark.app.SparkAppJob import org.apache.spark.SparkContext import org.kohsuke.args4j.{Option => ArgOption} import org.apache.spark.sql.functions.monotonically_increasing_id class WordCount extends SparkAppJob { //input @ArgOption(name = "-i", required = true, aliases = Array("--input"), usage = "input") var input: String = _ //output @ArgOption(name = "-o", required = true, aliases = Array("--output"), usage = "output") var output: String = _ override protected def run(sc: SparkContext): Unit = { import sparkSession.implicits._ val showDasouSegment = sparkSession.read.text(input).as[String].filter(_.trim.length() != 0) showDasouSegment.show() val words = showDasouSegment .map(line => line.split("\t")) .flatMap(line => line(1).split(" ")) .groupByKey(value=>value) // val counts = words.count() 這一句是才讓wordcount有效。以下代碼是增加一列word的ID。
// counts.show() 打印結果 val res = words.keys.withColumn("ID",monotonically_increasing_id) res.show() // res.write.text(output) 這句話應該會報錯,因為要將dataframe所有列合並成一列才能采用text存儲。
// val concatDf = res.select(concat_ws("\t", $"word", $"id")) 將res中的word和id列合並成一列。 } }
