摘要
Spark2.0 推出了一個新功能pandas_udf
,本文結合spark 官方文檔和自己的使用情況,講解pandas udf
的基本知識,並添加實例,方便初學的同學快速上手和理解。
Apche Arrow
ApacheArrow 是一種內存中的列式數據格式,用於在 Spark 中 JVM 和 Python 進程之間數據的高效傳輸。這對於使用 pandas/numpy 數據的 python 用戶來說是最有利的。它的使用不是自動的,可能需要對配置或代碼進行一些細微的更改,以充分利用並確保兼容性。
Apche Arrow 的安裝
在pyspark
安裝的時候,Apche Arrow
就已經安裝了,可能安裝的版本比較低,在你使用pandas udf
的時候會報如下的錯誤,
1 |
"it was not found." % minimum_pyarrow_version) |
可以從報錯信息中發現是Arrow
的版本過低了,可以通過pip install pyspark
進行安裝或更新。
使用 Arrow 對 spark df 與 pandas df 的轉換
Arrow
能夠優化spark df
與pandas df
的相互轉換,在調用Arrow
之前,需要將 spark 配置spark.sql.execution.arrow.enabled
設置為`true。這在默認情況下是禁用的。
此外,如果在 spark 實際計算之前發生錯誤,spark.sql.execution.arrow.enabled
啟用的優化會自動回退到非 Arrow 優化實現。這可以有spark.sql.execution.arrow.fallback.enabled
來控制。
對 arrow 使用優化將產生與未啟用 arrow 時相同的結果。但是,即使使用 arrow,toPandas()
也會將數據中的所有記錄收集到驅動程序中,所以應該在數據的一小部分中使用。
目前,並非所有 spark 數據類型都受支持,如果列的類型不受支持,則可能會引發錯誤,請參閱受支持的 SQL 類型。如果在create dataframe()
期間發生錯誤,spark 將返回非 Arrow 優化實現的數據。
設置與轉換
1 |
import numpy as np |
Pandas UDFs (a.k.a Vectorized UDFs)
pandas udf
是用戶定義的函數,是由 spark 用arrow
傳輸數據,pandas
去處理數據。我們可以使用pandas_udf
作為decorator
或者registor
來定義一個pandas udf
函數,不需要額外的配置。目前,pandas udf
有三種類型:標量映射(Scalar
)和分組映射(Grouped Map
)和分組聚合(Grouped Aggregate
)。
-
Scalar
其用於向量化標量操作。它們可以與
select
和withColumn
等函數一起使用。python 函數應該以pandas.series
作為輸入,並返回一個長度相同的pandas.series
。在內部,spark 將通過將列拆分為batch
,並將每個batch
的函數作為數據的子集調用,然后將結果連接在一起,來執行 padas UDF。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def (a, b):
return a * b
# or multiply = pandas_udf(multiply_func, returnType=LongType())
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply_func(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
-
Grouped Map
Grouped Map
pandas_udf
與groupBy().apply()
一起使用,后者實現了split-apply-combine
模式。拆分應用組合包括三個步驟:df.groupBy()
對數據分組apply()
對每個組進行操作,輸入和輸出都是 dataframe 格式- 匯總所有結果到一個 dataframe 中
使用
groupBy().apply()
,用戶需要定義以下內容:- 一個函數,放在
apply()
里 - 一個輸入輸出的
schema
,兩者必須相同
請注意,在應用函數之前,組的所有數據都將加載到內存中。這可能導致內存不足異常,尤其是當組的大小
skwed
的時候。maxRecordsPerBatch
不適用於這里。所以,用戶需要來確保分組的數據適合可用內存。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# or df.schema
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v = v - v.mean())
df.groupby("id").apply(subtract_mean).show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
-
Grouped Aggregate
其類似於 Spark 聚合函數。使用
groupby().agg()
和pyspark.sql.Window
一起使用。它定義從一個或多個pandas.series
到一個標量值的聚合,其中每個pandas.series
表示組中的一列或窗口。請注意,這種類型的 UDF 不支持部分聚合,組或窗口的所有數據都將加載到內存中。此外,這種類型只接受
unbounded window
,也就是說,我們不能定義window size
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def mean_udf(v):
return v.mean()
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window
.partitionBy('id')
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+ -
結合使用
如果想用
agg()
的思想,又想定義window size
,我們可以用 Group Map,並在pandas udf function
中使用 pandas 的rolling()
來實現。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23from pyspark.sql.functions import lit, pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
df = df.withColumn("mv", f.lit(0.))
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def moving_mean(pdf):
v = pdf.v
pdf['mv'] = v.rolling(3,1).mean()
return pdf
df.groupby("id").apply(moving_mean).show()
# +---+----+---+
# | id| v| mv|
# +---+----+---+
# | 1| 1.0|1.0|
# | 1| 2.0|1.5|
# | 2| 3.0|3.0|
# | 2| 5.0|4.0|
# | 2|10.0|6.0|
# +---+----+---+
其他使用說明
-
支持的 SQL 類型
目前,Arrow-base 的轉換支持所有的 spark sql 數據類型,除了
MapType
,ArrayTpye
中的TimestampType
和nested StructType
。BinaryType
僅在 Arrow 版本大於等於0.10.0
時被支持。 -
設置
Arrow Batch Size
spark 中的數據分區被轉換成 arrow 記錄批處理,這會暫時導致 JVM 中的高內存使用率。為了避免可能的內存不足異常,可以通過 conf 的
spark.sql.execution.arrow.maxRecordsPerBatch
設置為一個整數來調整 Arrow 記錄 batch 的大小,該整數將確定每個 batch 的最大行數。默認值為 10000 條記錄。如果列數較大,則應相應調整該值。使用這個限制,每個數據分區將被制成一個或多個記錄 batch 處理。 -
Timestamp 的時區問題
Spark 內部將
Timestamp
存儲為 UTC 值,在沒有指定時區的情況下引入的Timestamp
數據將轉換為具有微秒分辨率的本地時間到 UTC。在 spark 中導出或顯示Timestamp
數據時,會話時區用於本地化Timestamp
值。會話時區是使用配置spark.sql.session.time zone
設置的,如果不設置,則默認為 JVM 系統本地時區。pandas 使用具有納秒分辨率的datetime64
類型,datetime64[ns]
,每個列上都有可選的時區。當
Timestamp
數據從 spark 傳輸到 pandas 時,它將被轉換為納秒,並且每一列將被轉換為 spark 會話時區,然后本地化到該時區,該時區將刪除時區並將值顯示為本地時間。當使用Timestamp
列調用toPandas()
或pandas_udf
時,會發生這種情況。當
Timestamp
數據從 pandas 傳輸到 spark 時,它將轉換為 UTC 微秒。當使用 pandas dataframe 調用CreateDataFrame
或從 pandas dataframe 返回Timestamp
時,會發生這種情況。這些轉換是自動完成的,以確保 Spark 具有預期格式的數據,因此不需要自己進行這些轉換。任何納秒值都將被截斷。請注意,標准 UDF(非 PANDAS)將以 python 日期時間對象的形式加載
Timestamp
數據,這與 PANDAS 的Timestamp
不同。建議使用 pandas 的Timestamp
時使用 pandas 的時間序列功能,以獲得最佳性能,有關詳細信息,請參閱此處。 -
Pandas udf 其他使用案例