數據抽象
sparkSQL 的數據抽象是 DataFrame,df 相當於表格,它的每一行是一條信息,形成了一個 Row
Row
它是 sparkSQL 的一個抽象,用於表示一行數據,從表現形式上看,相當於一個 tuple 或者 表中的一行;
from pyspark.sql import Row ##### 創建 Row #### method 1 row = Row(name="Alice", age=11) print row # Row(age=11, name='Alice') print row['name'], row['age'] # ('Alice', 11) print row.name, row.age # ('Alice', 11) print 'name' in row # True print 'wrong_key' in row # False #### method 2 Person = Row("name", "age") print Person # <Row(name, age)> print 'name' in Person # True print 'wrong_key' in Person # False print Person("Alice", 11) # Row(name='Alice', age=11)
DataFrame (DF)
與 RDD 類似,df 也是分布式的數據容器,不同的是,df 更像一個 二維數據表,除了數據本身外,還包含了數據的結構信息,即 schema;

df 的 API 提供了更高層的關系操作,比函數式的 RDD API 更加友好;
df 的底層仍是 RDD,所以 df 也是惰性執行的,但值得注意的是,它比 RDD 性能更高;
問題來了:為什么底層實現是 RDD,卻比 RDD 更快,不合常理啊
其實是這樣的,因為 df 是由 spark 自己轉換成 RDD 的,那么 spark 自然會用最合適的、最優化的方式轉換成 RDD,因為它比任何人都清楚怎么才能更高效,
對比我們自己操作 RDD 去實現各種功能,大部分情況下我們的作法可能不是最優,自己玩不如作者玩,所以說 df 性能高於 RDD
舉個簡單例子:
data1 = sc.parallelize([('1','a'), ('2', 'b'), ('3', 'c')]) data2 = sc.parallelize([('1','1'), ('2', '2'), ('3', '3')]) ### 找到兩個list中 key 為 1 的對應值的集合 ## 自己寫可能這么寫 data1.join(data2).collect() # [('1', ('a', '1')), ('3', ('c', '3')), ('2', ('b', '2'))] data1.join(data2).filter(lambda x: x[0] == '1').collect() # [('1', ('a', '1'))] ## spark 可能這么寫 data1.filter(lambda x: x[0] == '1').join(data2.filter(lambda x: x[0] == '1')).collect() # [('1', ('a', '1'))]
為什么 spark 這么寫快呢?這里簡單解釋下
join 是把 兩個元素做 笛卡爾內積,生成了 3x3=9 個元素,然后 shuffle,每個分區分別比較 key 是否相同,如果相同,合並,然后合並分區結果;
我們自己寫的就是這樣,shuffle 了 9 個元素;
而 spark 是先 filter,每個 list 變成了 一個元素,然后 join,join 的結果直接就是所需,不用 shuffle;
shuffle 本身是耗時的,而 filter 無需 shuffle,所以效率高 【join 是個 低效方法的原因】
小結
1. df 也是一個查詢優化的手段
2. df 允許我們像操作數據庫一樣操作它
DataSet
DataSet 是 DataFrame 的擴展,是 spark 最新的數據抽象;
dataSet 像個對象,允許我們像操作類一樣操作它,通過屬性查看數據;
實際上 DataSet 是在 df 的基礎上增加了數據類型;
df 只指定了字段名,而沒有指定字段類型,sparkSQL 需要自動推斷數據集的格式,這也是一種消耗,而 dataSet 直接指定了字段名和字段屬性,效率更高
python 目前不支持 dataSet,所以后續支持了再說
SparkSession
在老版本中,sparkSQL 提供了兩種 SQL 查詢的起始點:
SQLContext,用於 spark 自己提供的 SQL 查詢;
HiveContext,用於連接 hive 的查詢
sparkSession 是新版的 SQL 查詢起始點,實質上是組合了 SQLContext 和 HiveContext;
sparkSession 只是封裝了 sparkContext,sparkContext 包含 SQLContext 和 HiveContext;
所以 sparkSession 實際上還是 依靠 sparkContext 實現了 SQLContext 和 HiveContext,故老版本用法也適用新版本。
DataFrame 的創建
sparkSession 直接生成 df
df 的創建有 3 種方式
從 spark 的數據源創建:讀取 spark 支持的文件
從內部 RDD 創建:RDD 轉換成 df
從 hive 創建:hive 查詢
spark 數據源創建 DF
spark 支持的文件格式都有統一的入口
>>> dir(spark.read) [ 'csv', 'format', 'jdbc', 'json', 'load', 'option', 'options', 'orc', 'parquet', 'schema', 'table', 'text']
sparkSQL 定義了一個 DataFrameReader 的類,在這個類中定義了所有數據源的接口,spark.read 是這個類的入口
創建 df 的方法也是惰性的
json
json 文件必須每行是一個 json 對象
## json 文件如下 # {'age': '10','name': 'zhangsan'} # {'age': '20','name': 'lisi'} #### method 1 df1 = spark.read.json('data.json') # 相對路徑 # >>> df1 # DataFrame[age: string, name: string] 可以看到 df 具備了字段名和字段屬性 df1.show() # +---+--------+ # |age| name| # +---+--------+ # | 10|zhangsan| # | 20| lisi| # +---+--------+ df2 = spark.read.json('file:///usr/lib/spark/data.json') # 絕對路徑 #### method 2 spark.read.format('json').load('/data.json').show()
其他文件讀取方式與 json 完全相同
jdbc
spark.read.jdbc('jdbc:postgresql://172.16.89.80:5432/postgres', 'subtable', 'max_lng', 5, 10, 3, properties={'user':'postgres', 'password':'postgres'}).show()
注意
sparkSQL 會自動推斷數據集的格式,以 json 為例,sparkSQL 會掃描數據集的每一項從而推斷數據格式,
如果我們已經知道數據格式,可以在創建 df 時指定數據格式,從而加速創建過程且避免掃描數據集
from pyspark.sql.types import StructType, StructField, StringType, IntegerType schema = StructType([StructField("age", StringType(), True), StructField("name", StringType(), True)]) spark.read.schema(schema).json('/data.json').show()
RDD 創建 DF
見下面的格式互轉
hive 創建 DF
有兩種方式從 hive 創建 df
1. 使用 DataFrameReader 中定義的 table 方法
注意這種方式不只適用於 hive,也用於其他表
spark.read.table('hive1101.person').show()
2. 使用 HiveContext 或者 SparkSession 中的 sql 方法,直接運行 hql
DF 操作
sparkSQL 對 DF 的操作有兩種風格,一種是類 sql 的方式,一種是 領域專屬語言 DSL
SQL 風格操作 DF
df 並不是一張數據表,而 sql 風格需要一張表;
如果有 hive 環境,可以直接用 hive 中的表,
如果沒有,需要把 df 當成一個臨時表注冊到應用上,而且只有注冊到的應用正在運行,這個臨時表才可以使用
注冊方法不止一種,比如 createTempView、registerTempTable、
### 創建臨時視圖 df1.createTempView('student') # df1.createOrReplaceTempView('student') # ok spark.sql('select * from student').show() # +---+--------+ # |age| name| # +---+--------+ # | 10|zhangsan| # | 20| lisi| # +---+--------+ spark.sql('select age from student').show() spark.sql('select avg(age) from student').show() # +------------------------+ # |avg(CAST(age AS DOUBLE))| # +------------------------+ # | 15.0| # +------------------------+
關閉 SparkSession 后這張表無法使用
session
這里穿插講下 session 的概念;
session 的本意是會話,我們在多個場合都見過 session,如 web,如 tensorflow,但是在 web 中貌似不是 會話啊;
其實是這樣的,session 有廣義和狹義之分
廣義 session:就是我們說的會話
狹義 session:它是一個存儲位置,和 cookie 相對,cookie 是把某個信息存在客戶度,session 是把 某個信息存在服務器上
全局表
臨時表是在 session 范圍內的,session 關閉后,臨時表失效,如果想應用范圍內有效,可以使用全局表,
全局表需要全路徑訪問
### 為了在應用范圍內使用數據表,創建全局表 df1.createGlobalTempView('people') ## 查詢 spark.sql('select * from global_temp.people').show() # global_temp.people 全路徑訪問表 ## 在另一個 session 中查詢該表 spark.newSession().sql('select * from global_temp.people').show()
DSL 風格操作 DF
df 知道每列的名字和數據類型,可以提供用於數據處理的領域專屬語言DSL 【這種方式不常用】
df1.printSchema() # 打印表結構 # root # |-- age: string (nullable = true) # |-- name: string (nullable = true) df1.select('name').show() # 查詢name字段 df1.select("name", df1.age + 1).show() # age 字段的值都 加1,scala 中是用 $'age' 代替 df.age # +--------+---------+ # | name|(age + 1)| # +--------+---------+ # |zhangsan| 11.0| # | lisi| 21.0| # +--------+---------+ df1.filter(df1.age > 15).show() # 查看 age 大於 15 # +---+----+ # |age|name| # +---+----+ # | 20|lisi| # +---+----+
RDD-DF-dataSet
rdd、dataFrame、dataSet 相當於 spark 中三種數據類型,簡單總結幾點:
1. rdd 是 df、ds 的底層實現
2. df 在 rdd 的基礎上添加了結構,可以像數據表一個進行字段操作,易用,且高效
3. ds 在 df 的基礎上添加了數據類型,並且可以像操作類一樣進行屬性操作,目前 python 不支持
4. 三者可互相轉換
5. df、ds 是 sparkSQL 中的數據類型,准確的說叫數據抽象,在 sparkSQL 中他們被轉換成 table,進行 sql 操作
6. 三者的計算邏輯並無差異,也就是說相同的數據,結果是相同的
7. 三者的計算效率和執行方式不同
8. 在未來 spark 演進過程中, ds 會逐步取代 df、rdd
發展歷程
RDD(spark1.0) ===> DataFrame(spark1.3) ===> DataSet(spark1.6)
轉換邏輯
rdd + 表結構 = df
rdd + 表結構 + 數據類型 = ds
df + 數據類型 = ds
ds - 數據類型 - 表結構 = rdd
ds - 數據類型 = df
df - 表結構 = rdd
轉換方法

RDD to DF 之 toDF
dataFrame 類似於數據表,數據表有行的概念,df 也有 Row 的概念,也就是說 df 必須是有行有列,二維的概念;
如果 RDD 不是二維,或者說沒有 Row 的概念,需要顯示的構建 Row 的格式;
## 手動構建 Row 的概念 rdd1 = sc.parallelize(range(5)) df1_1 = rdd1.map(lambda x: Row(id = x)).toDF() # 先加入結構,即字段,或者說 key,然后調用 toDF # >>> df1 # DataFrame[id: bigint]
RDD to DF 之 spark.createDataFrame
該方法有兩個輸入:一個由行構成的RDD,一個數據格式;
數據格式可以是一個 StructType 類實例;
一個 StructType 對象包含一個 StructField 對象序列;
一個StructField 對象用於指定一列的名字、數據類型,並可選擇的指定這一列是否包含空值及其元數據
### 方法2 rdd1 = sc.parallelize([range(5)]) # 注意必須是 二維 的,sc.parallelize(range(5)) 是不行的 df2_1 = spark.createDataFrame(rdd1).collect() # 沒有顯示地添加字段,以 默認值為 字段名 # [Row(_1=0, _2=1, _3=2, _4=3, _5=4)] rdd2 = sc.parallelize([('a', 1), ('b', 2)]) # 二維數據 df2_2 = spark.createDataFrame(rdd2, ['label', 'num']).collect() # 顯示地添加字段 # [Row(label=u'a', num=1), Row(label=u'b', num=2)] ### 方法3 rdd3 = sc.parallelize([('zhangsan', 20), ('lisi', 30)]) Person = Row('name', 'age') # 格式化 Row,每行代表一個 Person person = rdd3.map(lambda x: Person(*x)) # 把 RDD 格式化成 新的 RDD,並加入 Row 的概念 df3_1 = spark.createDataFrame(person).show() # +--------+---+ # | name|age| # +--------+---+ # |zhangsan| 20| # | lisi| 30| # +--------+---+ ### 方法4 from pyspark.sql.types import StructType, StructField, StringType, IntegerType schema = StructType([StructField("name", StringType(), True), StructField("age", IntegerType(), True)]) df3 = spark.createDataFrame(rdd3, schema).collect() # [Row(name=u'zhangsan', age=20), Row(name=u'lisi', age=30)]
toDF() vs createDataFrame()
1. 前者需要自己推斷數據集的數據格式,因為並沒有指定,后者則需要指定數據格式;
2. 前者易用;
3. 后者更加靈活,可以根據需要對同一數據設定多個數據格式,滿足不同需求
DF to RDD
只需調用 rdd 屬性即可
rdd = sc.parallelize([('a', 1), ('b', 2)]) # 二維數據 df = spark.createDataFrame(rdd2, ['label', 'num']) df.rdd.collect() # [Row(label=u'a', num=1), Row(label=u'b', num=2)]
參考資料:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext 官網 rdd to df
