Pyspark 使用 Spark Udf 的一些經驗


 

起初開始寫一些 udf 的時候感覺有一些奇怪,在 spark 的計算中,一般通過轉換(Transformation) 在不觸發計算(Action) 的情況下就行一些預處理。udf 就是這樣一個好用的東西,他可以在我們進行 Transformation 的時候給我們帶來對復雜問題的處理能力。

這里有兩種最典型的方法。

應用於 spark 2.4

1. 直接在 SparkSession.sql 里面直接使用注冊好的 udf,類似於這種寫法

xx = SparkSession.catalog.registerFunction('fmt_buy_channel', lambda i, j, x, y: HdNewOrderRecord.fmt_buy_channel(i, j, x, y))

ss.sql("""
           SELECT t1.pay_id,
           t1.sku_mode,
           LEFT(t1.charge_time, 19) AS buy_time,
           fmt_buy_channel(t1.join_type, t1.special_card_type, t1.channel_type, t1.pay_channel) AS channel,
           t1.pay_money,
           t1.charge_user_id
           FROM analytics_db.hd_new_order_record t1 JOIN user_info t2
           ON (t1.charge_user_id = t2.user_id
           AND t1.charge_time < '{}') ORDER BY t1.charge_time ASC
       """.format(dump_time))

可以看到我們定義的 udf "fmt_buy_channel" 被直接用在了 sql 語句里面。這種 spark 是可以輕松處理的。不過這種寫法有個問題,在使用了 udf 之后,這個字段不能立即嵌套另外的 function 。否則可能會報錯,比如我寫一個這樣的函數

df = ss.sql("""
                SELECT t1.pay_id,
                t1.sku_mode,
                LEFT(t1.charge_time, 19) AS buy_time,
                fmt_buy_channel(t1.join_type, t1.special_card_type, t1.channel_type, t1.pay_channel) AS channel,
                t1.pay_money,
                t1.charge_user_id
                FROM analytics_db.hd_new_order_record t1 JOIN user_info t2
                ON (t1.charge_user_id = t2.user_id
                AND t1.charge_time < '{}') ORDER BY t1.charge_time ASC
            """.format(dump_time))

會無法正確執行。

 

2. 第二種方法是我們可以直接使用 pyspark 提供的函數進行 udf 調用,pyspark 或者本身的 scala spark 他們為我們封裝了非常多基於 SparkSession 和 DataFrame 的函數。

來看一個結合了兩者的一個完整的例子

df = ss.sql("""
                SELECT t1.pay_id,
                t1.sku_mode,
                LEFT(t1.charge_time, 19) AS buy_time,
                fmt_buy_channel(t1.join_type, t1.special_card_type, t1.channel_type, t1.pay_channel) AS channel,
                t1.pay_money,
                t1.charge_user_id
                FROM analytics_db.hd_new_order_record t1 JOIN user_info t2
                ON (t1.charge_user_id = t2.user_id
                AND t1.charge_time < '{}') ORDER BY t1.charge_time ASC
            """.format(dump_time))
df = df.select(df.charge_user_id, concat_ws('_', df.pay_id, df.channel, df.sku_mode, df.buy_time, df.pay_money).alias('sku_buys'))\
    .groupBy(df.charge_user_id)\
    .agg(collect_list('sku_buys').alias('sku_buys'))
df.createOrReplaceTempView(table_name)

上面我使用了常用的一些 SQL 函數,其實 spark 對這些函數都有包裝 。比如 left 之類的函數都可以在 pyspark.sql.functions import 中找到例如 ltrim。

第一條語句我們通過 ss.sql 獲得一個 df 。

第二條語句我們通過操縱 df 的函數生成我們自己需要的字段,並且對字符串進行拼接。最后分組展示。這里用到了幾個函數需要介紹一下。

concat_ws: concat_ws 用於拼接字符串,第一個參數接受一個拼接用的符號,后面依次跟上需要拼接的字段即可。

.groupBy().agg(collect_list): 在被基於某一項分組之后,可以使用 spark 提供的 agg 來接收一個聚合函數。 collect_list 這里可以將分組的多個字段基於被 group by 的字段拼接成一個 list 。他還有一個類似功能的函數是 collect_set,在拼接的時候會去重被 append 的數據。

新老版本 spark 在 udf 的使用上會有一些位置上的不一樣。特別是在 1.6 跨度到 2.0 的時候。之前還看到過另外一個注冊使用方法,放出來給大家看。

from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

def regex_filter(x):
    regexs = ['.*ALLYOURBASEBELONGTOUS.*']
    
    if x and x.strip():
        for r in regexs:
            if re.match(r, x, re.IGNORECASE):
                return True
    
    return False 
    
    
filter_udf = udf(regex_filter, BooleanType())

df_filtered = df.filter(filter_udf(df.field_to_filter_on))

這個跟上面的注冊方法最終都會走到 udf 的注冊和 udf._wrapped 這個方法並且返回一個函數。如果不接收這個函數返回值,那么可以直接在 ss.sql 中當 udf 進行使用。如果接收當函數值,可以放在 df 的函數里面方便的進行使用。

另外在 spark 2.4 版本以前的 2.2 版本,要想直接獲得一個注冊完畢的 udf 不能使用上面的 register 方法。那個方法在 2.3 追加 return 。如果我們需要 return 一個 udf 對象我們要這樣做

import pyspark.sql.functions as f
right_user = f.udf(lambda i, j, x, y, o, p: HdNewUserInfo.right_user(i, j, x, y, o, p))

 

使用 udf + sql 函數可以方便的幫助我們進行 transformation ,來完成更加復雜的的計算邏輯。

 

Reference:

https://stackoverflow.com/questions/31816975/how-to-pass-whole-row-to-udf-spark-dataframe-filter   How to pass whole Row to UDF - Spark DataFrame filter

https://stackoverflow.com/questions/52051985/filter-pyspark-dataframe-with-udf-on-entire-row/52055861   Filter Pyspark Dataframe with udf on entire row

https://gist.github.com/samuelsmal/feb86d4bdd9a658c122a706f26ba7e1e   pyspark_udf_filtering.py

https://stackoverflow.com/questions/36784000/how-to-filter-a-spark-dataframe-by-a-boolean-column   how to filter a spark dataframe by a boolean column

https://stackoverflow.com/questions/37580782/pyspark-collect-set-or-collect-list-with-groupby   pyspark collect_set or collect_list with groupby 

https://www.jianshu.com/p/bded081b5350

https://www.cnblogs.com/fudashi/p/7491039.html 

https://gist.github.com/samuelsmal/feb86d4bdd9a658c122a706f26ba7e1e

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM