Spark中對DataFrame的基礎操作:列增加,列刪除,行增加,列名更換,排序等等


可以使用select和selectExpr來操作DataFrame中的列

例如查詢:DEST_COUNTRY,ORIGIN_COUNTRY

    val path="/Volumes/Data/BigData_code/data/flight-data/json/2015-summary.json"
    val dataDF = spark.read.format("json").load(path)
    val dataSelect = dataDF.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME")
    dataSelect.show(2)

新增一列

判斷目的國家和起飛國家是否是同一個。

    //創建一個新的列,用來表示目的國家和源國家是否是同一國家
    dataDF.selectExpr(
      "*", "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as inCountry"
    ).show(30)

使用withColumn添加列

    //添加1列
    dataDF.withColumn("numberOne", lit(1)).show(10)

刪除一列

    //刪除列
    //方法1:
    dataDF.selectExpr("DEST_COUNTRY_NAME", "count").show(2)
    //方法2:
    dataDF.drop("ORIGIN_COUNTRY_NAME").show(2)

連接和追加行(聯合操作)

注意:DataFrame是不可變的,這意味着用戶不能追加行,只能將想要添加的行生成ROW對象,然后再生成DataFrame,再將兩個DataFrame進行拼接

    dataDF.printSchema()
    //將需要連接的Schema賦值給需要創建的DataFrame中(因為兩個DataFrame連接需要Schema模式相同)
    val mySchema = dataDF.schema
    //創建Row對象的list
    val rowList = List(Row("NewCountry", "OtherCountry", 2L), Row("NewCountry2", "OtherCountry", 5L))
    //創建RDD
    val myrdd = sc.makeRDD(rowList)
    //創建新的DataFrame
    val newDF = spark.createDataFrame(myrdd, mySchema)
    //將兩者進行連接
    newDF.union(dataDF).show(20)

會用select語句,我們還可以使用系統預定義好的聚合函數來指定在整個DataFrame上的聚合操作。

    //使用系統已經有的函數,求所有數據的count的平均值,並求出有多少不同的目的國家
    dataDF.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)

字面量

dataDF.select(expr("*"), lit(1).as("one")).show(3)

注意:無法一次性添加多個列。一次性添加多個列,可以先將新列生成一個DataFrame,然后再進行連接即可。

修改列名:

   //修改列的名字:
    //方法1:
    dataDF.select(expr("DEST_COUNTRY_NAME as dest"), expr("ORIGIN_COUNTRY_NAME"), expr("count")).show(2)
    //方法2:
    dataDF.selectExpr("DEST_COUNTRY_NAME as dect", "ORIGIN_COUNTRY_NAME", "count").show(2)
    //方法3:
    dataDF.withColumnRenamed("DEST_COUNTRY_NAME", "dest").show(2)

行操作

過濾行

在DataFrame上實現過濾有兩種方法:

  • 創建一個字符串表達式:使用where
  • 通過列操作來構建表達式:使用filter
    下面我們只要count<2的所有行
    val dataDF = spark.read.format("json").load(path)
    dataDF.where("count < 2").show(20)

多個條件之間的關系為‘and’時

dataDF.where("count > 2").where("count < 4").show(20)

去重操作

dataDF.selectExpr("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").distinct().show(20)

排序

    //排序
    //方法1:
    dataDF.sort("count", "DEST_COUNTRY_NAME").show(2)   //默認升序
    //方法2:
    dataDF.sort(asc("count"), desc("DEST_COUNTRY_NAME")).show(2)  //asc指定升序,desc指定降序
    //方法3:
    dataDF.sort(expr("count asc"), expr("DEST_COUNTRY_NAME desc")).show(2)
    //方法4:
    dataDF.orderBy(desc("count"), asc("DEST_COUNTRY_NAME")).show(2)
    //方法5:
    dataDF.orderBy(expr("count asc"), expr("DEST_COUNTRY_NAME desc")).show(2)

一個高級技巧是你可以指定空值在排序列表中的位置,使用asc_nulls_first指示空值安排在升序排列的前面,使用desc_nulls_first指示空值安排在降序排列的前面,使用asc_nulls_last指示空值安排在升序排列的后面,使用desc_nulls_last指示空值安排在降序排列的后面。
還有一種排序是分區內進行排序。這樣做能夠大大提高性能。使用的函數是sortWithinPartitions


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM