摘要
Spark2.0 推出了一個新功能pandas_udf,本文結合spark 官方文檔和自己的使用情況,講解pandas udf的基本知識,並添加實例,方便初學的同學快速上手和理解。
Apche Arrow
ApacheArrow 是一種內存中的列式數據格式,用於在 Spark 中 JVM 和 Python 進程之間數據的高效傳輸。這對於使用 pandas/numpy 數據的 python 用戶來說是最有利的。它的使用不是自動的,可能需要對配置或代碼進行一些細微的更改,以充分利用並確保兼容性。
Apche Arrow 的安裝
在pyspark安裝的時候,Apche Arrow就已經安裝了,可能安裝的版本比較低,在你使用pandas udf的時候會報如下的錯誤,
1 |
"it was not found." % minimum_pyarrow_version) |
可以從報錯信息中發現是Arrow的版本過低了,可以通過pip install pyspark進行安裝或更新。
使用 Arrow 對 spark df 與 pandas df 的轉換
Arrow能夠優化spark df與pandas df的相互轉換,在調用Arrow之前,需要將 spark 配置spark.sql.execution.arrow.enabled設置為`true。這在默認情況下是禁用的。
此外,如果在 spark 實際計算之前發生錯誤,spark.sql.execution.arrow.enabled啟用的優化會自動回退到非 Arrow 優化實現。這可以有spark.sql.execution.arrow.fallback.enabled來控制。
對 arrow 使用優化將產生與未啟用 arrow 時相同的結果。但是,即使使用 arrow,toPandas()也會將數據中的所有記錄收集到驅動程序中,所以應該在數據的一小部分中使用。
目前,並非所有 spark 數據類型都受支持,如果列的類型不受支持,則可能會引發錯誤,請參閱受支持的 SQL 類型。如果在create dataframe()期間發生錯誤,spark 將返回非 Arrow 優化實現的數據。
設置與轉換
1 |
import numpy as np |
Pandas UDFs (a.k.a Vectorized UDFs)
pandas udf是用戶定義的函數,是由 spark 用arrow傳輸數據,pandas去處理數據。我們可以使用pandas_udf作為decorator或者registor來定義一個pandas udf函數,不需要額外的配置。目前,pandas udf有三種類型:標量映射(Scalar)和分組映射(Grouped Map)和分組聚合(Grouped Aggregate)。
-
Scalar
其用於向量化標量操作。它們可以與
select和withColumn等函數一起使用。python 函數應該以pandas.series作為輸入,並返回一個長度相同的pandas.series。在內部,spark 將通過將列拆分為batch,並將每個batch的函數作為數據的子集調用,然后將結果連接在一起,來執行 padas UDF。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def (a, b):
return a * b
# or multiply = pandas_udf(multiply_func, returnType=LongType())
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply_func(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
-
Grouped Map
Grouped Map
pandas_udf與groupBy().apply()一起使用,后者實現了split-apply-combine模式。拆分應用組合包括三個步驟:df.groupBy()對數據分組apply()對每個組進行操作,輸入和輸出都是 dataframe 格式- 匯總所有結果到一個 dataframe 中
使用
groupBy().apply(),用戶需要定義以下內容:- 一個函數,放在
apply()里 - 一個輸入輸出的
schema,兩者必須相同
請注意,在應用函數之前,組的所有數據都將加載到內存中。這可能導致內存不足異常,尤其是當組的大小
skwed的時候。maxRecordsPerBatch不適用於這里。所以,用戶需要來確保分組的數據適合可用內存。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# or df.schema
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v = v - v.mean())
df.groupby("id").apply(subtract_mean).show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
-
Grouped Aggregate
其類似於 Spark 聚合函數。使用
groupby().agg()和pyspark.sql.Window一起使用。它定義從一個或多個pandas.series到一個標量值的聚合,其中每個pandas.series表示組中的一列或窗口。請注意,這種類型的 UDF 不支持部分聚合,組或窗口的所有數據都將加載到內存中。此外,這種類型只接受
unbounded window,也就是說,我們不能定義window size。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def mean_udf(v):
return v.mean()
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window
.partitionBy('id')
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+ -
結合使用
如果想用
agg()的思想,又想定義window size,我們可以用 Group Map,並在pandas udf function中使用 pandas 的rolling()來實現。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
df = df.withColumn("mv", f.lit(0.))
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def moving_mean(pdf):
v = pdf.v
pdf['mv'] = v.rolling(3,1).mean()
return pdf
df.groupby("id").apply(moving_mean).show()
# +---+----+---+
# | id| v| mv|
# +---+----+---+
# | 1| 1.0|1.0|
# | 1| 2.0|1.5|
# | 2| 3.0|3.0|
# | 2| 5.0|4.0|
# | 2|10.0|6.0|
# +---+----+---+
其他使用說明
-
支持的 SQL 類型
目前,Arrow-base 的轉換支持所有的 spark sql 數據類型,除了
MapType,ArrayTpye中的TimestampType和nested StructType。BinaryType僅在 Arrow 版本大於等於0.10.0時被支持。 -
設置
Arrow Batch Sizespark 中的數據分區被轉換成 arrow 記錄批處理,這會暫時導致 JVM 中的高內存使用率。為了避免可能的內存不足異常,可以通過 conf 的
spark.sql.execution.arrow.maxRecordsPerBatch設置為一個整數來調整 Arrow 記錄 batch 的大小,該整數將確定每個 batch 的最大行數。默認值為 10000 條記錄。如果列數較大,則應相應調整該值。使用這個限制,每個數據分區將被制成一個或多個記錄 batch 處理。 -
Timestamp 的時區問題
Spark 內部將
Timestamp存儲為 UTC 值,在沒有指定時區的情況下引入的Timestamp數據將轉換為具有微秒分辨率的本地時間到 UTC。在 spark 中導出或顯示Timestamp數據時,會話時區用於本地化Timestamp值。會話時區是使用配置spark.sql.session.time zone設置的,如果不設置,則默認為 JVM 系統本地時區。pandas 使用具有納秒分辨率的datetime64類型,datetime64[ns],每個列上都有可選的時區。當
Timestamp數據從 spark 傳輸到 pandas 時,它將被轉換為納秒,並且每一列將被轉換為 spark 會話時區,然后本地化到該時區,該時區將刪除時區並將值顯示為本地時間。當使用Timestamp列調用toPandas()或pandas_udf時,會發生這種情況。當
Timestamp數據從 pandas 傳輸到 spark 時,它將轉換為 UTC 微秒。當使用 pandas dataframe 調用CreateDataFrame或從 pandas dataframe 返回Timestamp時,會發生這種情況。這些轉換是自動完成的,以確保 Spark 具有預期格式的數據,因此不需要自己進行這些轉換。任何納秒值都將被截斷。請注意,標准 UDF(非 PANDAS)將以 python 日期時間對象的形式加載
Timestamp數據,這與 PANDAS 的Timestamp不同。建議使用 pandas 的Timestamp時使用 pandas 的時間序列功能,以獲得最佳性能,有關詳細信息,請參閱此處。 -
Pandas udf 其他使用案例
