spark DataFrame的創建幾種方式和存儲


一。

從Spark2.0以上版本開始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口來實現其對數據加載、轉換、處理等功能。SparkSession實現了SQLContext及HiveContext所有功能。

SparkSession支持從不同的數據源加載數據,並把數據轉換成DataFrame,並且支持把DataFrame轉換成SQLContext自身中的表,然后使用SQL語句來操作數據。SparkSession亦提供了HiveQL以及其他依賴於Hive的功能的支持。

下面我們就介紹如何使用SparkSession來創建DataFrame。
請進入Linux系統,打開“終端”,進入Shell命令提示符狀態。
首先,請找到樣例數據。 Spark已經為我們提供了幾個樣例數據,就保存在“/usr/local/spark/examples/src/main/resources/”這個目錄下,這個目錄下有兩個樣例數據people.json和people.txt。
people.json文件的內容如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

people.txt文件的內容如下:

Michael, 29
Andy, 30
Justin, 19

下面我們就介紹如何從people.json文件中讀取數據並生成DataFrame並顯示數據(從people.txt文件生成DataFrame需要后面將要介紹的另外一種方式)。
請使用如下命令打開pyspark:

  cd /usr/local/spark
  ./bin/pyspark

進入到pyspark狀態后執行下面命令:

  

>>> spark=SparkSession.builder.getOrCreate()
>>> df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
>>> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

 

現在,我們可以執行一些常用的DataFrame操作。

// 打印模式信息
>>> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

// 選擇多列
>>> df.select(df.name,df.age + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+

// 條件過濾
>>> df.filter(df.age > 20 ).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

// 分組聚合
>>> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+

// 排序
>>> df.sort(df.age.desc()).show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
|null|Michael|
+----+-------+

//多列排序
>>> df.sort(df.age.desc(), df.name.asc()).show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
|null|Michael|
+----+-------+

//對列進行重命名
>>> df.select(df.name.alias("username"),df.age).show()
+--------+----+
|username| age|
+--------+----+
| Michael|null|
| Andy| 30|
| Justin| 19|
+--------+----+

 

二。由RDD轉換到DataFrame。

  Spark官網提供了兩種方法來實現從RDD轉換得到DataFrame,第一種方法是,利用反射來推斷包含特定類型對象的RDD的schema,適用對已知數據結構的RDD轉換;第二種方法是,使用編程接口,構造一個schema並將其應用在已知的RDD上。

  1.利用反射機制推斷RDD模式

   

>>> from pyspark.sql.types import Row
>>> def f(x):
...     rel = {}
...     rel['name'] = x[0]
...     rel['age'] = x[1]
...     return rel
... 
>>> peopleDF = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line : line.split(',')).map(lambda x: Row(**f(x))).toDF()
>>> peopleDF.createOrReplaceTempView("people")  //必須注冊為臨時表才能供下面的查詢使用
 
>>> personsDF = spark.sql("select * from people")
>>> personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).foreach(print)
 
Name: 19,Age:Justin
Name: 29,Age:Michael
Name: 30,Age:Andy

 

  2.使用編程方式定義RDD模式

  

>>>  from pyspark.sql.types import Row
>>>  from pyspark.sql.types import StructType
>>> from pyspark.sql.types import StructField
>>> from pyspark.sql.types import StringType
 
//生成 RDD
>>> peopleRDD = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
 
//定義一個模式字符串
>>> schemaString = "name age"
 
//根據模式字符串生成模式
>>> fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(" ")))
>>> schema = StructType(fields)
//從上面信息可以看出,schema描述了模式信息,模式中包含name和age兩個字段
 
 
>>> rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1]))
 
>>> peopleDF = spark.createDataFrame(rowRDD, schema)
 
//必須注冊為臨時表才能供下面查詢使用
scala> peopleDF.createOrReplaceTempView("people")
 
>>> results = spark.sql("SELECT * FROM people")
>>> results.rdd.map( lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]).foreach(print)
 
name: Michael,age: 29
name: Andy,age: 30
name: Justin,age: 19
 

 

 

三。保存成文件

  

>>> peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
 
>>> peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark/mycode/newpeople.csv")

 

>>> peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json"
>>> peopleDF.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM