SparkSQL 實驗


Spark SQL

  • Spark SQL里面最重要的就是DataFrame結構,與Spark的RDD結構相比,差別就在於是否已知元素里面的內容結構,舉個栗子,RDD比作"{name:'lihua',age:18}",而DataFrame就是{name:'lihua',age:18}。
  • 在對DataFrame操作上沒有講很多的內容,畢竟可以利用createOrReplaceTempView語句在創建臨時表並且使用sql語句進行操作,所以學會簡單的操作就可以了
  • Spark SQL是Spark對於Shark(Hive的Spark版)的兩種改進方式之一,另外一種是Hive on Spark。目的是可以通過SQL語句使用不同的數據源,然后利用不同數據源的數據方便使用Spark進行數據分析挖掘(機器學習)。

1.為employee.json創建DataFrame,並寫出Python語句完成下列操作:

將下列JSON格式數據復制到Linux系統中,並保存命名為employee.json。
{ "id":1 , "name":" Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }

(1) 查詢所有數據;

emp = spark.read.format('json').load('file:///home/hadoop/Desktop/SparkPractice/sparkSQL/dataset1/employee.json')
emp.show()

(2) 查詢所有數據,並去除重復的數據;

emp.groupBy('name','id','age').count().select('name','id','age').show()

(3) 查詢所有數據,打印時去除id字段;

emp.select('name','age').show()

(4) 篩選出age>30的記錄;

emp.filter(emp['age']>30).show()

(5) 將數據按age分組;

emp.groupBy('age')
emp.groupBy('age').count().show()

(6) 將數據按name升序排列;

emp.sort(emp['name'].asc()).show()

(7) 取出前3行數據;

emp.take(3)

(8) 查詢所有記錄的name列,並為其取別名為username;

emp.select(emp['name'].alias('username')).show()

(9) 查詢年齡age的平均值;

emp.agg({'age':'mean'}).show()

(10) 查詢年齡age的最小值。

emp.agg({'age':'min'}).show()

2.編程實現將RDD轉換為DataFrame

源文件內容如下(包含id,name,age):
1,Ella,36
2,Bob,29
3,Jack,29
請將數據復制保存到Linux系統中,命名為employee.txt,實現從RDD轉換得到DataFrame,並按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有數據。請寫出程序代碼。

from pyspark.sql.types import *
from pyspark.sql import SparkSession,Row
from pyspark import SparkConf,SparkContext

spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
data = spark.sparkContext.textFile('file:///home/hadoop/Desktop/SparkPractice/sparkSQL/dataset2/employee.txt').map(lambda x:x.split(',')).map(lambda x:Row(int(x[0].strip()),x[1].strip(),int(x[2].strip())))
schema = StructType([StructField('id',IntegerType(),True),StructField('name',StringType(),True),StructField('age',IntegerType(),True)])

df = spark.createDataFrame(data,schema)

lateRdd = df.rdd.map(lambda x:'id:{},name:{},age:{}'.format(x.id,x.name,x.age))
lateRdd.foreach(print)

3. 編程實現利用DataFrame讀寫MySQL的數據

(1)在MySQL數據庫中新建數據庫sparktest,再創建表employee,包含如表5-2所示的兩行數據。
5-2 employee表原有數據

id name gender Age
1 Alice F 22
2 John M 25
create database sparktest;
use sparktest;
create table employee (id int(4),name char(10),gender char(4),age int(4));
insert employee values(1,'Alice','F',22);
insert employee values(2,'John','M',25);

(2)配置Spark通過JDBC連接數據庫MySQL,編程實現利用DataFrame插入如表5-3所示的兩行數據到MySQL中,最后打印出age的最大值和age的總和。
5-3 employee表新增數據

id name gender age
3 Mary F 26
4 Tom M 23
from pyspark.sql.types import *
from pyspark.sql import SparkSession,Row
from pyspark import SparkConf,SparkContext

spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
# 插入數據
schema = StructType([StructField('id',IntegerType(),True),\
                     StructField('name',StringType(),True),\
                    StructField('gender',StringType(),True),\
                    StructField('age',IntegerType(),True)])
data = spark.sparkContext.parallelize(['3,Mary,F,26','4,Tom,M,23']).\
map(lambda x:x.split(',')).\
map(lambda x:Row(int(x[0]),x[1],x[2],int(x[3])))

df = spark.createDataFrame(data,schema)

prop = {'user':'root','password':'root','driver':'com.mysql.jdbc.Driver'}
df.write.jdbc('jdbc:mysql://localhost:3306/sparktest?useSSL=False','employee','append',prop)
print('上傳成功')

# 讀入數據
newDf = spark.read.\
format('jdbc').\
option('driver','com.mysql.jdbc.Driver').\
option('url','jdbc:mysql://localhost:3306/sparktest?useSSL=False').\
option('dbtable','employee').\
option('user','root').\
option('password','root').\
load()

newDf.agg({'age':'sum'}).show()
newDf.agg({'age':'max'}).show()


人生此處,絕對樂觀


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM