DataFrame的創建


DataFrame的創建
從Spark2.0以上版本開始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口來實現其對數據加載、轉換、處理等功能。SparkSession實現了SQLContext及HiveContext所有功能
SparkSession支持從不同的數據源加載數據,並把數據轉換成DataFrame,並且支持把DataFrame轉換成SQLContext自身中的表,然后使用SQL語句來操作數據。SparkSession亦提供了HiveQL以及其他依賴於Hive的功能的支持
可以通過如下語句創建一個SparkSession對象:

scala> import org.apache.spark.sql.SparkSession
scala> val spark=SparkSession.builder().getOrCreate()

在創建DataFrame之前,為了支持RDD轉換為DataFrame及后續的SQL操作,需要通過import語句(即import spark.implicits._)導入相應的包,啟用隱式轉換。
在創建DataFrame時,可以使用spark.read操作,從不同類型的文件中加載數據創建DataFrame,例如:

spark.read.json("people.json"):讀取people.json文件創建DataFrame;在讀取本地文件或HDFS文件時,要注意給出正確的文件路徑;
spark.read.parquet("people.parquet"):讀取people.parquet文件創建DataFrame;
spark.read.csv("people.csv"):讀取people.csv文件創建DataFrame。

在“/usr/local/spark/examples/src/main/resources/”這個目錄下,這個目錄下有兩個樣例數據people.json和people.txt。 people.json文件的內容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
people.txt文件的內容如下:
Michael, 29
Andy, 30
Justin, 19

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> val spark=SparkSession.builder().getOrCreate()
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@2bdab835

//使支持RDDs轉換為DataFrames及后續sql操作
scala> import spark.implicits._
import spark.implicits._

scala> val df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

DataFrame的保存

可以使用spark.write操作,把一個DataFrame保存成不同格式的文件,例如,把一個名稱為df的DataFrame保存到不同格式文件中,方法如下:

df.write.json("people.json“)
df.write.parquet("people.parquet“)
df.write.csv("people.csv")

下面從示例文件people.json中創建一個DataFrame,然后保存成csv格式文件,代碼如下:

scala> val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
scala> peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark/mycode/sql/newpeople.csv")

DataFrame的常用操作

//打印模式信息
scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

//選擇多列
scala> df.select(df("name"),df("age"+1).show)

//條件過濾
scala> df.filter(df("age") > 20).show()

//分組聚合
scala> df.groupBy("age").count().show()

//排序
scala> df.sort(df("age").desc).show()

//多列排序
scala> df.sort(df.("age").desc,df("name").asc).show()

//對列進行重命名
scala> df.select(df("name").as("username"),df("age")).show()

在“/usr/local/spark/examples/src/main/resources/”目錄下,有個Spark安裝時自帶的樣例數據people.txt,其內容如下:
Michael, 29
Andy, 30
Justin, 19
現在要把people.txt加載到內存中生成一個DataFrame,並查詢其中的數據
在利用反射機制推斷RDD模式時,需要首先定義一個case class,因為,只有case class才能被Spark隱式地轉換為DataFrame

scala> import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

scala> import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoder

scala> import spark.implicits._ //導入包,支持把一個RDD隱式轉換為一個DataFrame
import spark.implicits._

scala> case class Person(name: String, age: Long) //定義一個case class
defined class Person

scala> val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]

scala> peopleDF.createOrReplaceTempView("people") //必須注冊為臨時表才能供下面的查詢使用

scala> val personsRDD = spark.sql("select name,age from people where age > 20")
//最終生成一個DataFrame,下面是系統執行返回的信息
personsRDD: org.apache.spark.sql.DataFrame = [name: string, age: bigint]

scala> personsRDD.map(t => "Name: "+t(0)+ ","+"Age: "+t(1)).show() //DataFrame中的每個元素都是一行記錄,包含name和age兩個字段,分別用t(0)和t(1)來獲取值

//下面是系統執行返回的信息
+------------------+ 
| value|
+------------------+
|Name:Michael,Age:29|
| Name:Andy,Age:30|
+------------------+

當無法提前定義case class時,就需要采用編程方式定義RDD模式。
比如,現在需要通過編程方式把people.txt加載進來生成DataFrame,並完成SQL查詢。

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
//生成字段

scala> val fields = Array(StructField("name",StringType,true), StructField("age",IntegerType,true))
fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(name,StringType,true), StructField(age,IntegerType,true))

scala> val schema = StructType(fields)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age, IntegerType,true))
//從上面信息可以看出,schema描述了模式信息,模式中包含name和age兩個字段
//shcema就是“表頭”

//下面加載文件生成RDD
scala> val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/examples/src/main/resources/people.txt MapPartitionsRDD[1] at textFile at <console>:26

//對peopleRDD 這個RDD中的每一行元素都進行解析
scala> val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim.toInt))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:29
//上面得到的rowRDD就是“表中的記錄”

//下面把“表頭”和“表中的記錄”拼裝起來
scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: int]
//必須注冊為臨時表才能供下面查詢使用
scala> peopleDF.createOrReplaceTempView("people")

scala> val results = spark.sql("SELECT name,age FROM people")
results: org.apache.spark.sql.DataFrame = [name: string, age: int] 

scala> results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).show()
+--------------------+
| value|
+--------------------+
|name: Michael,age:29|
| name: Andy,age:30|
| name: Justin,age:19|
+--------------------+

通過JDBC連接數據庫
在Linux中啟動MySQL數據庫
$ service mysql start
$ mysql -u root -p
#屏幕會提示你輸入密碼
輸入下面SQL語句完成數據庫和表的創建:

mysql> create database spark;
mysql> use spark;
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into student values(1,'Xueqian','F',23);
mysql> insert into student values(2,'Weiliang','M',24);
mysql> select * from student;

下載MySQL的JDBC驅動程序,比如mysql-connector-java-5.1.40.tar.gz
把該驅動程序拷貝到spark的安裝目錄” /usr/local/spark/jars”下
啟動一個spark-shell,啟動Spark Shell時,必須指定mysql連接驅動jar包
$ cd /usr/local/spark
$ ./bin/spark-shell \
--jars /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \
--driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar
讀取MySQL數據庫中的數據

scala> val jdbcDF = spark.read.format("jdbc").
| option("url","jdbc:mysql://localhost:3306/spark").
| option("driver","com.mysql.jdbc.Driver").
| option("dbtable", "student").
| option("user", "root").
| option("password", "hadoop").
| load()
scala> jdbcDF.show()
+---+--------+------+---+
| id| name|gender|age|
+---+--------+------+---+
| 1| Xueqian| F| 23|
| 2|Weiliang| M| 24|
+---+--------+------+---+

向MySQL數據庫寫入數據

import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
 
//下面我們設置兩條數據表示兩個學生信息
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
 
//下面要設置模式信息
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
//下面創建Row對象,每個Row對象都是rowRDD中的一行
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt))
 
//建立起Row對象和模式之間的對應關系,也就是把數據和模式對應起來
val studentDF = spark.createDataFrame(rowRDD, schema)
 
//下面創建一個prop變量用來保存JDBC連接參數
val prop = new Properties()
prop.put("user", "root") //表示用戶名是root
prop.put("password", "hadoop") //表示密碼是hadoop
prop.put("driver","com.mysql.jdbc.Driver") //表示驅動程序是com.mysql.jdbc.Driver
 
//下面就可以連接數據庫,采用append模式,表示追加記錄到數據庫spark的student表中
studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark", "spark.student", prop) 

連接Hive讀寫數據
2.在Hive中創建數據庫和表
進入Hive,新建一個數據庫sparktest,並在這個數據庫下面創建一個表student,並錄入兩條數據

hive> create database if not exists sparktest;//創建數據庫sparktest
hive> show databases; //顯示一下是否創建出了sparktest數據庫
//下面在數據庫sparktest中創建一個表student
hive> create table if not exists sparktest.student(
> id int,
> name string,
> gender string,
> age int);
hive> use sparktest; //切換到sparktest
hive> show tables; //顯示sparktest數據庫下面有哪些表
hive> insert into student values(1,'Xueqian','F',23); //插入一條記錄
hive> insert into student values(2,'Weiliang','M',24); //再插入一條記錄
hive> select * from student; //顯示student表中的記錄

3.連接Hive讀寫數據
需要修改“/usr/local/sparkwithhive/conf/spark-env.sh”這個配置文件:

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export CLASSPATH=$CLASSPATH:/usr/local/hive/lib
export SCALA_HOME=/usr/local/scala
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
export HIVE_CONF_DIR=/usr/local/hive/conf
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/local/hive/lib/mysql-connector-java-5.1.40-bin.jar

請在spark-shell(包含Hive支持)中執行以下命令從Hive中讀取數據:

Scala> import org.apache.spark.sql.Row
Scala> import org.apache.spark.sql.SparkSession 
Scala> case class Record(key: Int, value: String) 
// warehouseLocation points to the default location for managed databases and tables
Scala> val warehouseLocation = "spark-warehouse” 
Scala> val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate() 
Scala> import spark.implicits._
Scala> import spark.sql
//下面是運行結果
scala> sql("SELECT * FROM sparktest.student").show()
+---+--------+------+---+
| id| name|gender|age|
+---+--------+------+---+
| 1| Xueqian| F| 23|
| 2|Weiliang| M| 24|
+---+--------+------+---+

編寫程序向Hive數據庫的sparktest.student表中插入兩條數據:

scala> import java.util.Properties
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row 
//下面我們設置兩條數據表示兩個學生信息
scala> val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" ")) 
//下面要設置模式信息
scala> val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
 //下面創建Row對象,每個Row對象都是rowRDD中的一行
scala> val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim, p(3).toInt)) 
//建立起Row對象和模式之間的對應關系,也就是把數據和模式對應起來
scala> val studentDF = spark.createDataFrame(rowRDD, schema)
//查看studentDF
scala> studentDF.show()
+---+---------+------+---+
| id| name|gender|age|
+---+---------+------+---+
| 3|Rongcheng| M| 26|
| 4| Guanhua| M| 27|
+---+---------+------+---+
//下面注冊臨時表
scala> studentDF.registerTempTable("tempTable")
 
scala> sql("insert into sparktest.student select * from tempTable")

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM