前言
ETL是 Extract-Transform-Load的縮寫,也就是抽取-轉換-加載,在數據工作中是非常重要的部分。實際上,ETL就是一個對數據進行批處理的過程,一個ETL程序就是一個批處理腳本,執行時能將一堆數據轉化成我們需要的形式。
每個接觸過數據批處理的工程師,都走過ETL的流程,只是沒有意識到而已。按照ETL過程的框架來重新認識數據批處理,有利於我們更清晰地編寫批處理腳本。
在單機范圍內的數據量下,使用python的pandas包就可以非常方便地完成數據批處理工作。但當數據量達到1G以上時,pandas處理起來就有些力不從心了,到數據量達到1T以上,只能以分塊的方式存儲在分布式系統上時,pandas就無能為力了。在當前的技術背景下,典型的場景就是數據存儲在Hive on HDFS上。要做ETL,就需要新的工具。Hadoop生態下,原生的工具是MapReduce計算模型,通常用Java編寫,比較復雜,每次計算的中間結果也需要進行磁盤存取,非常費時。Spark是一個MPP架構的計算引擎,相比MapReduce,Spark 有DataFrame(又名 Schema RDD), 以表的形式來儲存數據,無論是理解還是操作,都更為簡單,還支持Python,在許多需要使用函數作參數的場合,非常好用。
本教程將介紹如何使用pyspark.sql模塊,操作Spark DataFrame,從Hive中讀取數據,經過一系列轉換,最后存入Hive中。Spark的DataFrame和pandas的DataFrame的概念很相似,只是操作略有不同,如果讀者有pandas的使用經驗,很容易就能快速上手。
教程只是為了方便讀者快速入門,想要更好地開發Spark程序,仍然需要詳細了解Spark的API接口,對python環境下,Hive的ETL來說,研究pyspark.sql模塊下的內容就足夠了,可以參考官方文檔。
環境:Spark的API隨版本不同會有較大變化,目前比較流行的版本是1.6和2.2,本文使用Spark 1.6.0,語言為Python 2.7。默認數據都儲存在Hive中,Hadoop集群帶有yarn。
冒煙測試
學習一門語言或者軟件的第一步,永遠都是冒煙測試。最經典的冒煙測試就是輸出Hello World。但對ETL來說,一個打印"Hello World"的Spark程序是沒什么用的。所以我們這里講講如何打印一張表,這張表有一行數據,列名為t,值為"Hello World"。
Spark的核心是SparkContext,它提供了Spark程序的運行環境。而SqlContext則是由SparkContext產生,提供了對數據庫表的訪問接口。因為這里數據庫的環境是Hive,通常使用SqlContext的派生類HiveContext。在Spark提供的交互式環境中,會在啟動時自動創建環境,生成SparkContext和HiveContext的實例。在pyspark的交互式環境中,SparkContext實例名為sc,HiveContext實例名為sqlContext。
交互式操作只在學習和調試時使用,實際工作中還是要靠命令行執行腳本。在腳本中我們就需要自己生成SparkContext和HiveContext了。基本操作代碼如下:
# -*- coding: UTF-8 -*-
from pyspark import SparkContext,HiveContext
sc = SparkContext(appName="Hello World") # appName就是這個Spark程序的名字,在DEBUG時有用
hc = HiveContext(sc)
df = hc.createDataFrame([["Hello World"]],['t']) # 創建一個DataFrame,第一個參數是數據,一個二維列表,第二個參數是表頭,一個列表)
first_cell = df.collect()[0][0] # 取第一個單元格的值
df.show() # 將表打印到屏幕上
print(first_cell)
將這段代碼保存成文件hello.py,在終端中進入到該文件所在目錄,輸入命令spark-submit --master yarn hello.py
,然后就可以看到屏幕上輸出如下,冒煙測試就算完成了。
+-----------+
| t|
+-----------+
|Hello World|
+-----------+
Hello World
指令解釋:spark-submit
就是spark的執行程序,master yarn
是spark-submit的參數,指定yarn作為計算調度的中心。最后hello.py就是我們的ETL程序。
Extract 抽取
ETL的第一步就是從數據源抽取數據,在Spark中就是從Hive里讀取數據。
Hive雖然實質上是個MapReduce接口的封裝,但從上層抽象模型來看,有最基本的Schema、Table和Column,還有一套類SQL語法,可以說就是一個典型的關系數據庫模型,因此在ETL過程中,我們完全可以把Hive當成一個關系數據庫來看待。
抽取的常用方法由兩種,一種是直接讀取Hive表,一種是通過Hive QL讀取。
都需要以HiveContext的實例作為入口,結果返回一個Spark DataFrame,為了檢查結果,可以使用show方法查看DataFrame的數據。
假設我們有一個名為test 的庫,里面有一張表為t1,數據結構如下:
a | b | c |
---|---|---|
1 | 2 | 3 |
4 | 5 | 6 |
7 | 8 | 9 |
直接讀取Hive表
HiveContext對讀取操作提供統一的接口- DataFrameReader,HiveContext的實例的read屬性就可以獲取到這個接口。
當然,這個接口也能用來讀取Hive的數據,read.table
就可獲取到表的數據,代碼如下
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="extract")
hc = HiveContext(sc) # 生成HiveContext實例
t =hc.read.table("test.t1")
t.show()
Hive QL讀取
實質是讓HiveContext將HiveQL傳給Hive,讓Hive執行后,將查詢結果封裝成Spark DataFrame返回。在處理過程比較簡單,或者需要大量設置別名時,比較有用(因為Spark批量設置別名不太方便),但不推薦寫太過復雜的Hive QL,因為Hive 執行Hive QL的實質是把Hive QL轉成MapReduce執行,在計算效率上是不如Spark的。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="extract")
hc = HiveContext(sc)
hc.sql("use test")
t = hc.sql("select * from t1")
t.show()
Load 加載
為什么不先講Trasform呢?因為Trasform的操作很多,先講Load有助於快速上手完成一個初級的ETL程序。
類似於讀取,HiveContext也提供了統一的寫接口,名為DataFrameWriter.調用write屬性即可獲取。
寫入的具體方式也很多,不過為了快速上手,只講最關鍵的一些東西。
mode 寫入方式
如果表已經存在,該如何操作。
- append 追加: 在尾部追加數據
- overwrite 覆寫: 覆蓋原有數據
- error 錯誤: 拋出異常
- ignore忽略 : 自動跳過
因為Hive on HDFS的關系,更新表最快的方式是全表覆寫。對於需要更新原有的ETL,一般都是全表重寫,只需要追加的,就可以用追加。
format 文件格式
在Hive on HDFS中,數據實質上是以文件的形式保存的。不同的文件格式,在壓縮容量、支持數據類型和查詢速度上都有所不同。textfile,avro,sequence,parquet,json等。目前我常用的格式是text和parquet,如果不設置文件格式,默認會使用Hive表的文件格式,如果Hive表不存在,則使用Hive表的默認格式textfile
加載新表
了解了上面的操作之后,我們就可以開始寫加載部分的代碼了,只需要使用一個saveAsTable方法就行了,非常簡單。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="load")
hc = HiveContext(sc)
hc.sql("use test")
t1 = hc.sql("select a as a1,b as b1,c as c1 from t1")
t1.write.saveAsTable("test.t2",format="parquet",mode="overwrite") # 將t1的三個列改名后存成t2表
t2.read.table("test.t2")
t2.show()
轉換
轉換是ETL過程中最復雜的部分,去掉抽取和加載,剩下的全都是轉換,包含的內容是非常多的,常見的有篩選、聚合、多列合並或計算,列賦值,根據不同的需要有不同的處理方法。由於Spark的轉換操作較為啰嗦,所以推薦把部分簡單的操作通過Hive QL的方式,在抽取步驟中交由Hive完成,這樣有助於精簡代碼,提高可讀性,降低維度難度。
下面就講一講Spark DataFrame 轉換部分的基本概念和操作。
向量化編程
對於日常用Java來做數據批處理的工程師來說,可能更習慣用for循環來逐條處理數據。但這樣做在操作上是很不方便的,也不太利於閱讀理解。在科學計算的語境下,數據總是以DataFrame的形式儲存,也就是一張表。數據處理操作通常是對這張表的某些行或者某些列來進行處理。比如,“令t1表的a列中數字大於2的值的,全部都等於2”,或者“給t1表新加一常數列d,值為99”,這樣的操作在向量化編程的語境下,就是一個調用API接口的操作,比for循環容易被理解。
可以類比pandas。在pandas中,也主要是通過向量化編程的方式來處理數據,雖然提供了迭代器的接口,可以一行行地讀取數據,但一般以表作為修改對象的操作,主要是以API接口來完成,不推薦使用迭代器來做行級修改。一來操作不方便,二來運算速度未必能比優化過的API接口快。
Spark是分布式執行的,數據分散在各個機器上,背后有一套調度系統來控制數據計算負載。如果用for循環來處理,就是把負載都加在了執行腳本的機器上,一般來說執行腳本的機器都是不儲存數據的master,實際上這一過程就會導致需要把數據從slave傳到master上,無謂地增加了網絡負擔。所以,在Spark腳本里,嚴禁使用原生的python for循環來處理SparkData Frame,即使要用,也應該使用Spark提供的API接口。
基本操作對象
在Spark DataFrame語境下,操作對象主要有三個:DataFrame,Row,Column。
- DataFrame: DataFrame就是一張表,有表頭和若干行數據。這張表是一個有序、可迭代的集合。
- Row:DataFrame 集合中的元素就是Row。每個Row儲存一行數據,有相同的屬性,這些屬性和表頭同名。DataFrame沒有API接口可以直接獲取到某個Row,但可以通過Colect方法獲取到Row對象的list,再從中獲取指定的Row。
- Column:Column與數據的實際結構無關,是一個操作上的概念。在實際的轉換操作中,絕大多數都是對若干列進行數學運算、拼接、映射等等。取DataFrame中的一列,得到的就是一個Column對象。
事實上,最常用的主要是DataFrame和Column,Row很少用到。其中,DataFrame是核心,一個ETl過程,實質就是從抽取一個DataFrame開始,經過一系列的DataFrame變換,得到一個與目標一致的DataFrame,然后寫入到目標數據庫中去。Column在其中扮演着中間點的角色,比如取DataFrame的多個列,拼接合成一個新列,然后把這個新列加到原本的DataFrame中去。
基本操作分類
上面提到了,DataFrame是核心操作對象。其實在Spark中,真正意義上的核心操作對象是RDD,一個有序的,分布式儲存在內存中的操作對象。DataFrame就是一個特殊的RDD——Schema RDD。所有的DataFrame操作,都可以歸類為兩種基本操作:轉化(Transformation)和行動(action)。轉換操作是不會觸發Spark的實際計算的,即使轉換過程中出現了錯誤,在執行到這一行代碼時,也不會報錯。直到執行了行動操作之后,才會真正讓Spark執行計算,這時候才會拋出在轉化過程中出現的錯誤。這在DEBU時,尤其是交互式編程環境下,可能會導致問題代碼定位錯誤,需要特別注意。
- Transform:典型的轉換操作有讀(read),篩選(filter)、拼接(union)等等,只要這個過程只改變DataFrame的形態,而不需要實際取出DataFrame的數據進行計算,都屬於轉換。理論上來說,ETL過程中的Transfrom過程,主干流程只會有轉換操作,不會有Action操作。
- Action:典型的動作操作有計數(count),打印表(show),寫(write)等,這些操作都需要真正地取出數據,就會觸發Spark的計算。
篩選
filter(cond):篩選出滿足條件cond的行。cond可以填字符串,格式和SQL中的where子句一樣,也可以填Bool類型的Column對象,比如 df['a']>1。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
t1 = df.filter("a > 1 and c < 9")
t1.show() # 輸出 4,5,6 這一行
t2 = df.filter( (df['b']<5) & (df['c']<8)) # 可以使用&或|對兩個bool列進行邏輯運算,但必須要用圓括號括起,限定運算順序。
t2.show() # 輸出 1,2,3 這一行
賦值,加列
withColumn(col_name,col):col_name是列名,col是列值,必須是一個Column對象。
賦值和加列操作是相同的,col_name存在,就是賦值,否則就是加列。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
t1 = df.withColumn("c",df['c']+1)
t1.show() # c的值全都增加了1
t2 = df.withColumn("d",df['a']+1)
t2.show() # 增加了新一列d
刪除列
drop(col_name):col_name為列名。該方法會返回一個刪除col_name列的DataFrame
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
t = df.drop("c")
t.show() # 只有 a,b兩列
給列取名
alias(col_name):通常和select配合使用,請看下面的例子
選取列
select(*cols):cols為列名或列對象。
賦值和刪除操作,每次只能改加減一列數據,如果想要批量地改變,尤其是調整列順序的時候,就非常有用了。在ETL中,當需要計算的列很多時,通常就是逐個計算出不同的列對象,最后用select把它們排好順序。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
a1 = (df['a']+1).alias("a1") # 新增一個列對象,取名為a1
t = df.select("a",a1,"b") # 如果用字符串,必須是df中存在的列名。
t.show() # 顯示a, a_1,b 三列
生成Column對象
在賦值的例子里,Column對象是由原DataFrame的Column經過簡單的數學運算或邏輯運算得到的,但如果我們想生成一些更特殊的Column呢?比如常數列或者自己定義復雜的規則。
Spark提供了pyspark.sql.functions,含有豐富的接口,其中就有我們需要的東西。篇幅有限,只能介紹一些常用的,更多的還是需要去看官方文檔。
常數列
lit(value):value數必須是必須為pyspark.sql.types
支持的類型,比如int,double,string,datetime等
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import lit
from datetime import datetime
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
t = df.withColumn("constant",lit(datetime(2018,1,1,2,3,4,999)))
t.show(truncate=False)
取整
round、floor:和Python的標准函數用法一致,只是數字換成列名
條件分支
when(cond,value):符合cond就取value,value可以是常數也可以是一個列對象,連續可以接when構成多分支
otherwise(value):接在when后使用,所有不滿足when的行都會取value,若不接這一項,則取Null。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import when
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
t = df.withColumn("when",when(df['a']==1,"a=1").when(df['b']==5,df['b']%5).otherwise("other"))
t.show() # 生成when列,值分別為 a=1,0,other
日期和時間
current_date():當前日期,返回一個date列
current_timestamp():當前時刻,返回一個timestamp列
date_add(start, days):日期正向偏移,start為開始時間,必須是Column或字符串對象,指向一個date或timestamp列,days為偏移天數。
date_sub(start, days):類似date_add,但是負向偏移。
date_format(date, format): 日期格式化,date為要格式化的時間,必須是Column或字符串對象,指向一個date或timestamp列,days為偏移天數,format為格式化的字符串,具體參考Hive QL的date_format函數。
datediff(end, start):計算天數差
自定義規則
udf(f, returnType=StringType): 自定義處理函數,f為自定義的處理函數,returnType為f的返回類型,必須為pyspark.sql.types
支持的類型,如果不填,會默認自動轉化為String類型。udf會返回一個函數,可以當做列函數使用。
這在處理邏輯非常復雜時很有用。比如對身份證號進行校驗計算,然后取出有效的身份證號的第1,4,10位,這個復雜流程很難用Spark提供的API拼接起來,只能自己寫。
作為教程,就不寫太復雜的函數了。
自定義函數f的傳入參數為列的值。
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],['a','b','c'])
def f(a,b,c):
r=0
if a==1:
r=1
elif b==5:
r=2
return r
col_match = udf(f,IntegerType())
t = df.withColumn("col_match",col_match("a","b","c"))
t.show() # 生成col_match列,值分別為 a=1,2,0
排序
Spark支持多字段,升降序排序。
可以使用orderBy和sort,因為操作比較簡單也符合直覺,這里略去例子,詳情可以看文檔。
聚合
Spark 支持直接聚合,也支持分組聚合。聚合的表達方式非常多,這里僅選取常用的。
直接聚合
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import sum
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,1,3],[4,1,2],[7,2,9]],['a','b','c'])
t = df.agg(sum("a"))
print(t.collect()[0][0]) # 打印 12
分組聚合
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import sum,max
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,1,3],[4,1,2],[7,2,9]],['a','b','c'])
t = df.groupBy("b").agg(sum("a"),max("c"))
t.show()
輸出:
+---+------+------+
| b|sum(a)|max(c)|
+---+------+------+
| 1| 5| 3|
| 2| 7| 9|
+---+------+------+
窗口函數
有一類分析需求,是需要分組計算,但保持數據的粒度不變的。比如通過成績表,按班計算的學生的成績排名,加一列到原本的成績表中,整個表的每一行仍然表示一名學生。這種分析需求稱為窗口分析,比如說每個班,就是一個窗口,在這個窗口中,計算出班級成績排名,再並到原表中。
這種分析,首先要創建一個窗口,然后再使用窗口函數來進行計算。Spark提供了豐富的窗口函數,可以滿足各類分析需求。
創建窗口
使用pyspark.sql.Window
對象可以創建一個窗口,最簡單的窗口可以什么都沒有,但一般不推薦這樣做。可以使用partitionBy進行分組,使用orderBy進行排序,比如
from pyspark.sql import Window
window = Window.partitionBy("a").orderBy("b")
窗口函數使用示例
rank():根據窗口中partitionBy進行分組,以orderBy排序
# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import rank,desc
from pyspark.sql import Window
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
score = [
['a','a_1',90],
['a','a_2',80],
['a','a_3',85],
['b','b_1',70],
['b','b_2',80],
['b','b_3',75],
['c','c_1',90]
]
df = hc.createDataFrame(score,['class','student','score'])
class_window = Window.partitionBy("class").orderBy(desc("score")) #降序排列
class_rank = rank().over(class_window)
class_row_number = row_number().over(class_window) #窗口函數(xxx).over(window),就是一般的用法
t = df.withColumn("rank",class_rank)
t.show()
按班級,分數從高到低,生成排名
+-----+-------+-----+----+
|class|student|score|rank|
+-----+-------+-----+----+
| a| a_1| 90| 1|
| a| a_3| 85| 2|
| a| a_2| 80| 3|
| b| b_2| 80| 1|
| b| b_3| 75| 2|
| b| b_1| 70| 3|
| c| c_1| 90| 1|
+-----+-------+-----+----+
緩存
在實際業務中,常常會碰到這種需求:需要把一個計算結果,稍加不同的改動,分別存為不同的表。比如,ETL中為了保證出錯后能重試,就會要求除了保存轉換計算結果之外,還要備份一份到備份表里。備份表通常是按天分區的,每個區存當天的轉換計算結果。而應用表則不分區,只存最新一天的計算結果。
在完成這一需求時,如果是先保存應用表,然后再添加分區列后添加到分區表,就會觸發兩次完整的計算流程,時間翻倍。而如果有緩存,就不一樣了。我們可以在計算到最終結果時,緩存一下這張表,然后把它保存為應用表,再添加分區列保存為分區表。那么,實際計算中,到緩存操作為止的計算,只會觸發一次,實際消耗時間為1次到最終結果的計算+1次加分區列,遠小於2次計算的時間。當某些中間結果需要反復使用時,緩存可以給我們帶來極大的效率提升。當然,相應地,內存也會占用更多,還是應該根據具體情況決定如何取舍。緩存的方法很簡單,只要讓DataFrame對象執行cache方法就行了:df.cache()