一。
從Spark2.0以上版本開始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口來實現其對數據加載、轉換、處理等功能。SparkSession實現了SQLContext及HiveContext所有功能。
SparkSession支持從不同的數據源加載數據,並把數據轉換成DataFrame,並且支持把DataFrame轉換成SQLContext自身中的表,然后使用SQL語句來操作數據。SparkSession亦提供了HiveQL以及其他依賴於Hive的功能的支持。
下面我們就介紹如何使用SparkSession來創建DataFrame。
請進入Linux系統,打開“終端”,進入Shell命令提示符狀態。
首先,請找到樣例數據。 Spark已經為我們提供了幾個樣例數據,就保存在“/usr/local/spark/examples/src/main/resources/”這個目錄下,這個目錄下有兩個樣例數據people.json和people.txt。
people.json文件的內容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
people.txt文件的內容如下:
Michael, 29
Andy, 30
Justin, 19
下面我們就介紹如何從people.json文件中讀取數據並生成DataFrame並顯示數據(從people.txt文件生成DataFrame需要后面將要介紹的另外一種方式)。
請使用如下命令打開pyspark:
cd /usr/local/spark
./bin/pyspark
進入到pyspark狀態后執行下面命令:
>>> spark=SparkSession.builder.getOrCreate()
>>> df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
>>> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
現在,我們可以執行一些常用的DataFrame操作。
// 打印模式信息
>>> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
// 選擇多列
>>> df.select(df.name,df.age + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
// 條件過濾
>>> df.filter(df.age > 20 ).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
// 分組聚合
>>> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
// 排序
>>> df.sort(df.age.desc()).show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
|null|Michael|
+----+-------+
//多列排序
>>> df.sort(df.age.desc(), df.name.asc()).show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
|null|Michael|
+----+-------+
//對列進行重命名
>>> df.select(df.name.alias("username"),df.age).show()
+--------+----+
|username| age|
+--------+----+
| Michael|null|
| Andy| 30|
| Justin| 19|
+--------+----+
二。由RDD轉換到DataFrame。
Spark官網提供了兩種方法來實現從RDD轉換得到DataFrame,第一種方法是,利用反射來推斷包含特定類型對象的RDD的schema,適用對已知數據結構的RDD轉換;第二種方法是,使用編程接口,構造一個schema並將其應用在已知的RDD上。
1.利用反射機制推斷RDD模式
>>> from pyspark.sql.types import Row >>> def f(x): ... rel = {} ... rel['name'] = x[0] ... rel['age'] = x[1] ... return rel ... >>> peopleDF = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line : line.split(',')).map(lambda x: Row(**f(x))).toDF() >>> peopleDF.createOrReplaceTempView("people") //必須注冊為臨時表才能供下面的查詢使用 >>> personsDF = spark.sql("select * from people") >>> personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).foreach(print) Name: 19,Age:Justin Name: 29,Age:Michael Name: 30,Age:Andy
2.使用編程方式定義RDD模式
>>> from pyspark.sql.types import Row >>> from pyspark.sql.types import StructType >>> from pyspark.sql.types import StructField >>> from pyspark.sql.types import StringType //生成 RDD >>> peopleRDD = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt") //定義一個模式字符串 >>> schemaString = "name age" //根據模式字符串生成模式 >>> fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(" "))) >>> schema = StructType(fields) //從上面信息可以看出,schema描述了模式信息,模式中包含name和age兩個字段 >>> rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1])) >>> peopleDF = spark.createDataFrame(rowRDD, schema) //必須注冊為臨時表才能供下面查詢使用 scala> peopleDF.createOrReplaceTempView("people") >>> results = spark.sql("SELECT * FROM people") >>> results.rdd.map( lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]).foreach(print) name: Michael,age: 29 name: Andy,age: 30 name: Justin,age: 19
三。保存成文件
>>> peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json") >>> peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark/mycode/newpeople.csv")
>>> peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json" >>> peopleDF.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")