Pyspark 最近使用的一些有趣姿勢的梳理


之前對 SQL 還是不是非常熟悉的,但是現在或多或少還是會寫一些計算任務。比如最近在推送將所有天級的耗時任務都從傳統關系型數據庫遷移至 Spark 集群當中進行計算,中間遇到一些有趣的小問題在這里記錄一下。

 

Q: 我想按照某個字段分組並且把一組查詢字段連起來得到一個 json 然后把結果作為一個字段應該怎么弄?

A: 這里我的思路是將我們需要 dumps 的字段給拼接起來,然后使用列表將同一個分組里面的是數據組合起來。然后過一個 udf 把列表中的記錄處理成數組最后 json.dumps 一下即可。來看個栗子

# 先查詢出要操作的目標信息 這一步可以和下面的操作合並,我這里為了方便看拆開了
df = ss.sql("""
                        SELECT 
                            t1.pay_id, 
                            t1.pay_money, 
                            t1.user_id
                        FROM
                            analytics_db.hd_day_order_record t1 
                    """)

# 拼接目標字符串並且組合
df = df.select(
               df.pay_id,
               df.pay_money,
               df.pay_user_id,
               f.concat_ws('\001', df.pay_id,  df.pay_user_id, df.pay_money).alias('sku_buys'))
)

# 拼接一個重復 user_id 的 list
df = df.groupBy(df.pay_user_id).agg(f.collect_list('sku_buys').alias('sku_buys'))

# 將 sku_buys 丟給一個 jsondump 的 udf 就可以得到結果了
df = df.select(df.pay_user_id, sb_json(df.sku_buys).alias('sku_buys'))

 

Q: 如果我想對目標進行分組,並且讓他在組內有序應該怎么做?

A: 這通常被稱為進行組內排序。其實我之前一直嘗試用類似的語法來達到這種效果

df = ss.sql("""
        SELECT
            first(t1.sku_mode) AS sku_mode,
            first(t1.exchange_type_t01) AS exchange_type_t01,
            first(t1.user_id) AS user_id,
            first(t1.pay_id) AS pay_id,
            first(t1.charge_time) AS charge_time,
            first(t2.has_yxs_payment) AS has_yxs_payment,
            first(t2.has_sxy_payment) AS has_sxy_payment,
            first(t2.has_cxy_payment) AS has_cxy_payment,
            first(t2.has_sxy19_payment) AS has_sxy19_payment,
            first(t2.sxy19_join_time) AS sxy19_join_time,
            first(t2.yxs_join_time) AS yxs_join_time
        FROM
            d_exchange_info t1
        JOIN
            analytics_db.md_day_dump_users t2
        ON
            t2.the_day = '{}'
            AND t1.user_id = t2.user_id
        GROUP BY
            t1.user_id
        ORDER BY
            charge_time
""".format(st))

其實這是錯的,這里的 order by 並不能達到一個組內排序的效果,而是一個外部排序。所以這里取 first 是一個不穩定的結果。有時候你拿到的是一個結果,也許有時候你拿到的是另外一個結果。要進行組內排序,我們可以先用這樣的思路,先對需要 order by 字段的表進行組內排序,然后再讓他與其他表 join 獲得更多的信息,這樣就能保證是有序的。

這里我引用一個窗口函數來達到這樣的效果。

        _df = ss.sql("""
                        SELECT 
                            t1.user_id,
                            t1.pay_id,
                            t1.sku_mode,
                            t1.charge_time,
                            t1.exchange_type_t01,
                            ROW_NUMBER() OVER(PARTITION BY t1.user_id ORDER BY t1.charge_time) as rid
                        FROM 
                            {} t1 
                        WHERE 
                            t1.refund_state = 0
                    """.format(exchange_info_table))
    _df = _df.filter(_df.rid==1)

我先使用窗口函數 ROW_NUMBER 以 user_id 分組並且根據 charge_time 對表一進行組內排序。得到結果之后,使用 filter 過濾一下 rid =1 的結果。再與另外一張表 join 得到補充信息就能達到想要的效果。

 

Q: 我想對結果進行轉列應該怎么做?

A: 行轉列 列轉行可能是 SQL 計算里面會經常使用到的方法,但是對於 SQL 並不熟悉的同學(比如我)就不知道該怎么整來看個例子

df = ss.sql("""
    SELECT
        user_id,
        sku_mode,
        credit_score
    FROM
        analytics_db.hd_day_user_credit
    WHERE
        gain_time >= '{}'
        AND gain_time < '{}'
        AND the_day = '{}'
""".format(start_time, end_time, st))
# df.show(10)

展示的數據類似於

+--------------------+--------+------------+
|             user_id|sku_mode|credit_score|
+--------------------+--------+------------+
|d394899919216bc10...|     yxs|           3|
|625002ad625bc9a69...|     yxs|           3|
|8dd11e29bf50cb8c8...|     cxy|           3|
|0f0b88ff589cb46cd...|     yxs|           3|
|eeb8e839139876971...|     yxs|           1|
|f63b2b9c8340d3c80...|     cxy|           1|
|806c9f0349e7e8389...|     cxy|           1|
|bf312181eaaa0ec9e...|     yxs|           1|
|ee4a7984dc40cabbf...|     yxs|           3|
|7a6b15f16c1f782de...|   sxy19|           3|
+--------------------+--------+------------+
only showing top 10 rows

我們可以基於此將 sku_mode 一樣的類型合並進行行轉列變換

df = df.groupby('user_id').pivot(
    'sku_mode', ['yxs', 'cxy', 'sxy', 'sxy19']
).agg(
    f.sum('credit_score')
).fillna(0)

這句話的意思是根據 user_id 進行分組,並且將 sku_mode 的行轉列,需要轉列的類型需要在后面的 list 中添加,並且列里記錄 各sku_mode credit_score 匯總的量。

+--------------------+---+---+---+-----+
|             user_id|yxs|cxy|sxy|sxy19|
+--------------------+---+---+---+-----+
|5ec336994e7b5d73f...|  0|  0|  0|    2|
|06b1120a4544b1b8a...|  0|  0|  0|    2|
|6fe19e193ad43bafc...|  0|  0|  0|    3|
|3e5f9fc4550ae7cba...|  1|  0|  0|    0|
|b1d1d856e28908f5a...|  1|  0|  0|    3|
|7a065e02ed1693cf4...|  2|  0|  0|    0|
|651f9f0b11de08003...|  0|  0|  0|    3|
|d02491502946aba2f...|  0|  0|  0|    2|
|e24b58cb87762b2da...|  0|  6|  0|   15|
|925f6a832b1a95c45...|  1|  0|  0|    0|
+--------------------+---+---+---+-----+
only showing top 10 rows

 

Q: 我想對結果進行列轉行應該怎么做?

A: 我們接着上面的例子來講 unpivot 行轉列的逆操作。還是接着剛才那個栗子。

df2 = df
df2 = df2.selectExpr("user_id", 
                     "stack(4, 'yxs', yxs, 'cxy', cxy, 'sxy', sxy, 'sxy19', sxy19) AS (sku_mode, credit_score)")

df.where(df.user_id=='e24b58cb87762b2da9fa118316e9c91a').show(10, False)
df2.filter(df2.user_id=='e24b58cb87762b2da9fa118316e9c91a').show(10, False)


+--------------------------------+---+---+---+-----+
|user_id                         |yxs|cxy|sxy|sxy19|
+--------------------------------+---+---+---+-----+
|e24b58cb87762b2da9fa118316e9c91a|0  |6  |0  |15   
+--------------------------------+---+---+---+-----+

+--------------------------------+--------+------------+
|user_id                         |sku_mode|credit_score|
+--------------------------------+--------+------------+
|e24b58cb87762b2da9fa118316e9c91a|yxs     |0           |
|e24b58cb87762b2da9fa118316e9c91a|cxy     |6           |
|e24b58cb87762b2da9fa118316e9c91a|sxy     |0           |
|e24b58cb87762b2da9fa118316e9c91a|sxy19   |15          |
+--------------------------------+--------+------------+

可以看到我們通過這種辦法將列重新組合成行記錄。這里需要多延伸一下,這里使用的 selectExpr 語句的語意是將里面的參數直接解析成 select 里面的內容。

stack 函數是 spark 中的 func.他接收無數個參數,第一個參數 n 的意義是轉換的行數,對二個開始到后面的參數都是內容。

stack 的作用是將第二個開始的到后面的參數 塞進 n 行中。

舉個栗子來說哦,就是上文使用的

stack(4, 'yxs', yxs, 'cxy', cxy, 'sxy', sxy, 'sxy19', sxy19) AS (sku_mode, credit_score)

這里的語意就是切分成 4 行。從第二個字段開始 字符串部分表達的是匹配的 sku_mode 分辨是('yxs', 'cxy', 'sxy', 'sxy19')然后跟在他們后面的分別是credit_score 的值  然后展現成兩列兩個字段。有點繞需要多理解一下。最好是在 spark 終端中試一試比較有感覺。

 

之后還有有意思的姿勢會繼續補充在這里。

 

 

Reference:

https://sparkbyexamples.com/how-to-pivot-table-and-unpivot-a-spark-dataframe/   How to Pivot and Unpivot a Spark SQL DataFrame

https://stackoverflow.com/questions/56371391/in-group-sort-table-join-a-another-table-use-first-func/56371513#56371513


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM