1. Pandas_UDF介紹
PySpark和Pandas之間改進性能和互操作性的其核心思想是將Apache Arrow作為序列化格式,以減少PySpark和Pandas之間的開銷。
Pandas_UDF是在PySpark2.3中新引入的API,由Spark使用Arrow傳輸數據,使用Pandas處理數據。Pandas_UDF是使用關鍵字pandas_udf作為裝飾器或包裝函數來定義的,不需要額外的配置。目前,有兩種類型的Pandas_UDF,分別是Scalar(標量映射)和Grouped Map(分組映射)。
1.1 Scalar
Scalar Pandas UDF用於向量化標量操作。常常與select和withColumn等函數一起使用。其中調用的Python函數需要使用pandas.Series作為輸入並返回一個具有相同長度的pandas.Series。具體執行流程是,Spark將列分成批,並將每個批作為數據的子集進行函數的調用,進而執行panda UDF,最后將結果連接在一起。
下面的示例展示如何創建一個scalar panda UDF,計算兩列的乘積:
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# 聲明函數並創建UDF
def multiply_func(a, b):
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
x = pd.Series([1, 2, 3])
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
1.2 Grouped Map
Grouped map(分組映射)panda_udf與groupBy().apply()一起使用,后者實現了“split-apply-combine”模式。“split-apply-combine”包括三個步驟:
- 使用DataFrame.groupBy將數據分成多個組。
- 對每個分組應用一個函數。函數的輸入和輸出都是pandas.DataFrame。輸入數據包含每個組的所有行和列。
- 將結果合並到一個新的DataFrame中。
要使用groupBy().apply(),需要定義以下內容:
- 定義每個分組的Python計算函數,這里可以使用pandas包或者Python自帶方法。
- 一個StructType對象或字符串,它定義輸出DataFrame的格式,包括輸出特征以及特征類型。
需要注意的是,StructType對象中的Dataframe特征順序需要與分組中的Python計算函數返回特征順序保持一致。
此外,在應用該函數之前,分組中的所有數據都會加載到內存,這可能導致內存不足拋出異常。
下面的例子展示了如何使用groupby().apply() 對分組中的每個值減去分組平均值。
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
1.3 Grouped Aggregate
Grouped aggregate Panda UDF類似於Spark聚合函數。Grouped aggregate Panda UDF常常與groupBy().agg()和pyspark.sql.window一起使用。它定義了來自一個或多個的聚合。級數到標量值,其中每個pandas.Series表示組或窗口中的一列。 需要注意的是,這種類型的UDF不支持部分聚合,組或窗口的所有數據都將加載到內存中。此外,目前只支持Grouped aggregate Pandas UDFs的無界窗口。 下面的例子展示了如何使用這種類型的UDF來計算groupBy和窗口操作的平均值:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def mean_udf(v):
return v.mean()
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
2. 快速使用Pandas_UDF
需要注意的是schema變量里的字段名稱為pandas_dfs() 返回的spark dataframe中的字段,字段對應的格式為符合spark的格式。
這里,由於pandas_dfs()功能只是選擇若干特征,所以沒有涉及到字段變化,具體的字段格式在進入pandas_dfs()之前已通過printSchema()打印。如果在pandas_dfs()中使用了pandas的reset_index()方法,且保存index,那么需要在schema變量中第一個字段處添加'index'字段及對應類型(下段代碼注釋內容)
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
spark = SparkSession.builder.appName("demo3").config("spark.some.config.option", "some-value").getOrCreate()
df3 = spark.createDataFrame(
[(18862669710, '/未知類型', 'IM傳文件', 'QQ接收文件', 39.0, '2018-03-08 21:45:45', 178111558222, 1781115582),
(18862669710, '/未知類型', 'IM傳文件', 'QQ接收文件', 39.0, '2018-03-08 21:45:45', 178111558222, 1781115582),
(18862228190, '/移動終端', '移動終端應用', '移動騰訊視頻', 292.0, '2018-03-08 21:45:45', 178111558212, 1781115582),
(18862669710, '/未知類型', '訪問網站', '搜索引擎', 52.0, '2018-03-08 21:45:46', 178111558222, 1781115582)],
('online_account', 'terminal_type', 'action_type', 'app', 'access_seconds', 'datetime', 'outid', 'class'))
def compute(x):
result = x[
['online_account', 'terminal_type', 'action_type', 'app', 'access_seconds', 'datetime', 'outid', 'class', 'start_time', 'end_time']]
return result
schema = StructType([
# StructField("index", DoubleType()),
StructField("online_account", LongType()),
StructField("terminal_type", StringType()),
StructField("action_type", StringType()),
StructField("app", StringType()),
StructField("access_seconds", DoubleType()),
StructField("datetime", StringType()),
StructField("outid", LongType()),
StructField("class", LongType()),
StructField("end_time", TimestampType()),
StructField("start_time", TimestampType()),
])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
print('ok')
mid = df.groupby(['online_account']).apply(lambda x: compute(x))
result = pd.DataFrame(mid)
# result.reset_index(inplace=True, drop=False)
# return result
df3 = df3.withColumn("end_time", df3['datetime'].cast(TimestampType()))
df3 = df3.withColumn('end_time_convert_seconds', df3['end_time'].cast('long').cast('int'))
time_diff = df3.end_time_convert_seconds - df3.access_seconds
df3 = df3.withColumn('start_time', time_diff.cast('int').cast(TimestampType()))
df3 = df3.drop('end_time_convert_seconds')
df3.printSchema()
aa = df3.groupby(['online_account']).apply(g)
aa.show()

3. 優化Pandas_UDF代碼
在上一小節中,我們是通過Spark方法進行特征的處理,然后對處理好的數據應用@pandas_udf裝飾器調用自定義函數。但這樣看起來有些凌亂,因此可以把這些Spark操作都寫入pandas_udf方法中。
注意:上小節中存在一個字段沒有正確對應的bug,而pandas_udf方法返回的特征順序要與schema中的字段順序保持一致!
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
spark = SparkSession.builder.appName("demo3").config("spark.some.config.option", "some-value").getOrCreate()
df3 = spark.createDataFrame(
[(18862669710, '/未知類型', 'IM傳文件', 'QQ接收文件', 39.0, '2018-03-08 21:45:45', 178111558222, 1781115582),
(18862669710, '/未知類型', 'IM傳文件', 'QQ接收文件', 39.0, '2018-03-08 21:45:45', 178111558222, 1781115582),
(18862228190, '/移動終端', '移動終端應用', '移動騰訊視頻', 292.0, '2018-03-08 21:45:45', 178111558212, 1781115582),
(18862669710, '/未知類型', '訪問網站', '搜索引擎', 52.0, '2018-03-08 21:45:46', 178111558222, 1781115582)],
('online_account', 'terminal_type', 'action_type', 'app', 'access_seconds', 'datetime', 'outid', 'class'))
def compute(x):
x['end_time'] = pd.to_datetime(x['datetime'], errors='coerce', format='%Y-%m-%d')
x['end_time_convert_seconds'] = pd.to_timedelta(x['end_time']).dt.total_seconds().astype(int)
x['start_time'] = pd.to_datetime(x['end_time_convert_seconds'] - x['access_seconds'], unit='s')
x = x.sort_values(by=['start_time'], ascending=True)
result = x[['online_account', 'terminal_type', 'action_type', 'app', 'access_seconds', 'datetime', 'outid', 'class','start_time', 'end_time']]
return result
schema = StructType([
StructField("online_account", LongType()),
StructField("terminal_type", StringType()),
StructField("action_type", StringType()),
StructField("app", StringType()),
StructField("access_seconds", DoubleType()),
StructField("datetime", StringType()),
StructField("outid", LongType()),
StructField("class", LongType()),
StructField("start_time", TimestampType()),
StructField("end_time", TimestampType()),
])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
print('ok')
mid = df.groupby(['online_account']).apply(lambda x: compute(x))
result = pd.DataFrame(mid)
return result
df3.printSchema()
aa = df3.groupby(['online_account']).apply(g)
aa.show()

4. Pandas_UDF與toPandas的區別
- @pandas_udf 創建一個向量化的用戶定義函數(UDF),利用了panda的矢量化特性,是udf的一種更快的替代方案,因此適用於分布式數據集。
- toPandas將分布式spark數據集轉換為pandas數據集,對pandas數據集進行本地化,並且所有數據都駐留在驅動程序內存中,因此此方法僅在預期生成的pandas DataFrame較小的情況下使用。
換句話說,@pandas_udf使用panda API來處理分布式數據集,而toPandas()將分布式數據集轉換為本地數據,然后使用pandas進行處理。
5. 參考文獻
[1] PySpark Usage Guide for Pandas with Apache Arrow
[2] pyspark.sql.functions.pandas_udf
