pandas_udf使用說明


摘要

Spark2.0 推出了一個新功能pandas_udf,本文結合spark 官方文檔和自己的使用情況,講解pandas udf的基本知識,並添加實例,方便初學的同學快速上手和理解。

 

Apche Arrow

ApacheArrow 是一種內存中的列式數據格式,用於在 Spark 中 JVM 和 Python 進程之間數據的高效傳輸。這對於使用 pandas/numpy 數據的 python 用戶來說是最有利的。它的使用不是自動的,可能需要對配置或代碼進行一些細微的更改,以充分利用並確保兼容性。

Apche Arrow 的安裝

pyspark安裝的時候,Apche Arrow就已經安裝了,可能安裝的版本比較低,在你使用pandas udf的時候會報如下的錯誤,

1
2
3
"it was not found." % minimum_pyarrow_version)
ImportError: PyArrow >= 0.8.0 must be installed; however, it was not found.
解決:pip install pyspark[sql]

可以從報錯信息中發現是Arrow的版本過低了,可以通過pip install pyspark進行安裝或更新。

 

使用 Arrow 對 spark df 與 pandas df 的轉換

Arrow能夠優化spark dfpandas df的相互轉換,在調用Arrow之前,需要將 spark 配置spark.sql.execution.arrow.enabled設置為`true。這在默認情況下是禁用的。

此外,如果在 spark 實際計算之前發生錯誤,spark.sql.execution.arrow.enabled啟用的優化會自動回退到非 Arrow 優化實現。這可以有spark.sql.execution.arrow.fallback.enabled來控制。

對 arrow 使用優化將產生與未啟用 arrow 時相同的結果。但是,即使使用 arrow,toPandas()也會將數據中的所有記錄收集到驅動程序中,所以應該在數據的一小部分中使用。

目前,並非所有 spark 數據類型都受支持,如果列的類型不受支持,則可能會引發錯誤,請參閱受支持的 SQL 類型。如果在create dataframe()期間發生錯誤,spark 將返回非 Arrow 優化實現的數據。

設置與轉換

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import numpy as np
import pandas as pd

app_name = 'Temp'
spark = SparkSession
.builder.appName(app_name)
.config('spark.sql.execution.arrow.enabled', 'true')
.getOrCreate()

# spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

 

Pandas UDFs (a.k.a Vectorized UDFs)

pandas udf是用戶定義的函數,是由 spark 用arrow傳輸數據,pandas去處理數據。我們可以使用pandas_udf作為decorator或者registor來定義一個pandas udf函數,不需要額外的配置。目前,pandas udf有三種類型:標量映射(Scalar)和分組映射(Grouped Map)和分組聚合(Grouped Aggregate)。

  • Scalar

    其用於向量化標量操作。它們可以與selectwithColumn等函數一起使用。python 函數應該以pandas.series作為輸入,並返回一個長度相同的pandas.series。在內部,spark 將通過將列拆分為batch,並將每個batch的函數作為數據的子集調用,然后將結果連接在一起,來執行 padas UDF。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    import pandas as pd

    from pyspark.sql.functions import col, pandas_udf
    from pyspark.sql.types import LongType

    # Declare the function and create the UDF

    def (a, b):
    return a * b
    # or multiply = pandas_udf(multiply_func, returnType=LongType())

    # Create a Spark DataFrame, 'spark' is an existing SparkSession
    df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

    # Execute function as a Spark vectorized UDF
    df.select(multiply_func(col("x"), col("x"))).show()
    # +-------------------+
    # |multiply_func(x, x)|
    # +-------------------+
    # | 1|
    # | 4|
    # | 9|
    # +-------------------+
  • Grouped Map

    Grouped Map pandas_udfgroupBy().apply()一起使用,后者實現了split-apply-combine模式。拆分應用組合包括三個步驟:

    • df.groupBy()對數據分組
    • apply()對每個組進行操作,輸入和輸出都是 dataframe 格式
    • 匯總所有結果到一個 dataframe 中

    使用groupBy().apply(),用戶需要定義以下內容:

    • 一個函數,放在apply()
    • 一個輸入輸出的schema,兩者必須相同

    請注意,在應用函數之前,組的所有數據都將加載到內存中。這可能導致內存不足異常,尤其是當組的大小skwed的時候。maxRecordsPerBatch不適用於這里。所以,用戶需要來確保分組的數據適合可用內存。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    from pyspark.sql.functions import pandas_udf, PandasUDFType

    df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

    # or df.schema
    @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
    def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v = v - v.mean())

    df.groupby("id").apply(subtract_mean).show()
    # +---+----+
    # | id| v|
    # +---+----+
    # | 1|-0.5|
    # | 1| 0.5|
    # | 2|-3.0|
    # | 2|-1.0|
    # | 2| 4.0|
    # +---+----+

     

  • Grouped Aggregate

    其類似於 Spark 聚合函數。使用groupby().agg()pyspark.sql.Window一起使用。它定義從一個或多個pandas.series到一個標量值的聚合,其中每個pandas.series表示組中的一列或窗口。

    請注意,這種類型的 UDF 不支持部分聚合,組或窗口的所有數據都將加載到內存中。此外,這種類型只接受unbounded window,也就是說,我們不能定義window size

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    from pyspark.sql import Window

    df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

    @pandas_udf("double", PandasUDFType.GROUPED_AGG)
    def mean_udf(v):
    return v.mean()

    df.groupby("id").agg(mean_udf(df['v'])).show()
    # +---+-----------+
    # | id|mean_udf(v)|
    # +---+-----------+
    # | 1| 1.5|
    # | 2| 6.0|
    # +---+-----------+

    w = Window
    .partitionBy('id')
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
    # +---+----+------+
    # | id| v|mean_v|
    # +---+----+------+
    # | 1| 1.0| 1.5|
    # | 1| 2.0| 1.5|
    # | 2| 3.0| 6.0|
    # | 2| 5.0| 6.0|
    # | 2|10.0| 6.0|
    # +---+----+------+
  • 結合使用

    如果想用agg()的思想,又想定義window size,我們可以用 Group Map,並在pandas udf function中使用 pandas 的rolling()來實現。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    from pyspark.sql.functions import lit, pandas_udf, PandasUDFType

    df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))
    df = df.withColumn("mv", f.lit(0.))

    @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
    def moving_mean(pdf):
    v = pdf.v
    pdf['mv'] = v.rolling(3,1).mean()
    return pdf

    df.groupby("id").apply(moving_mean).show()
    # +---+----+---+
    # | id| v| mv|
    # +---+----+---+
    # | 1| 1.0|1.0|
    # | 1| 2.0|1.5|
    # | 2| 3.0|3.0|
    # | 2| 5.0|4.0|
    # | 2|10.0|6.0|
    # +---+----+---+

 

其他使用說明

  • 支持的 SQL 類型

    目前,Arrow-base 的轉換支持所有的 spark sql 數據類型,除了MapTypeArrayTpye中的TimestampTypenested StructTypeBinaryType僅在 Arrow 版本大於等於0.10.0時被支持。

     

  • 設置Arrow Batch Size

    spark 中的數據分區被轉換成 arrow 記錄批處理,這會暫時導致 JVM 中的高內存使用率。為了避免可能的內存不足異常,可以通過 conf 的 spark.sql.execution.arrow.maxRecordsPerBatch設置為一個整數來調整 Arrow 記錄 batch 的大小,該整數將確定每個 batch 的最大行數。默認值為 10000 條記錄。如果列數較大,則應相應調整該值。使用這個限制,每個數據分區將被制成一個或多個記錄 batch 處理。

     

  • Timestamp 的時區問題

    Spark 內部將Timestamp存儲為 UTC 值,在沒有指定時區的情況下引入的Timestamp數據將轉換為具有微秒分辨率的本地時間到 UTC。在 spark 中導出或顯示Timestamp數據時,會話時區用於本地化Timestamp值。會話時區是使用配置spark.sql.session.time zone設置的,如果不設置,則默認為 JVM 系統本地時區。pandas 使用具有納秒分辨率的datetime64類型,datetime64[ns],每個列上都有可選的時區。

    Timestamp數據從 spark 傳輸到 pandas 時,它將被轉換為納秒,並且每一列將被轉換為 spark 會話時區,然后本地化到該時區,該時區將刪除時區並將值顯示為本地時間。當使用Timestamp列調用toPandas()pandas_udf時,會發生這種情況。

    Timestamp數據從 pandas 傳輸到 spark 時,它將轉換為 UTC 微秒。當使用 pandas dataframe 調用CreateDataFrame或從 pandas dataframe 返回Timestamp時,會發生這種情況。這些轉換是自動完成的,以確保 Spark 具有預期格式的數據,因此不需要自己進行這些轉換。任何納秒值都將被截斷。

    請注意,標准 UDF(非 PANDAS)將以 python 日期時間對象的形式加載Timestamp數據,這與 PANDAS 的Timestamp不同。建議使用 pandas 的Timestamp時使用 pandas 的時間序列功能,以獲得最佳性能,有關詳細信息,請參閱此處

     

  • Pandas udf 其他使用案例

    使用案例


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM