PySpark 行列轉換


Spark實現行列轉換pivot和unpivot

背景

做過數據清洗ETL工作的都知道,行列轉換是一個常見的數據整理需求。

首先明確一下啥叫行列轉換,因為這個叫法也不是很統一,有的地方叫轉置,有的地方叫透視,不一而足。我們就以下圖為例,定義如下:

  • 從左邊這種變成右邊這種,叫透視(pivot)
  • 反之叫逆透視(unpivot)

 

image-20180611160900344

 

Spark實現

構造樣本數據

首先我們構造一個以行格式保存數據的數據集

from pyspark.sql import SparkSession spark = SparkSession.builder.appName('JupyterPySpark').enableHiveSupport().getOrCreate() import pyspark.sql.functions as F # 原始數據 df = spark.createDataFrame([('2018-01','項目1',100), ('2018-01','項目2',200), ('2018-01','項目3',300), ('2018-02','項目1',1000), ('2018-02','項目2',2000), ('2018-03','項目x',999) ], ['年月','項目','收入'])

樣本數據如下,我們可以看到,每一個項目在指定月份都只有一行記錄,並且項目是稀疏的。即,不是每個項目都會出現在每一個月份中,如項目2僅出現在2018-01當中。

+-------+---+----+
|  年月| 項目|  收入|
+-------+---+----+
|2018-01|項目1| 100|
|2018-01|項目2| 200|
|2018-01|項目3| 300|
|2018-02|項目1|1000|
|2018-02|項目2|2000|
|2018-03|項目x| 999|
+-------+---+----+

透視Pivot

透視操作簡單直接,邏輯如下

  • 按照不需要轉換的字段分組,本例中是年月;
  • 使用pivot函數進行透視,透視過程中可以提供第二個參數來明確指定使用哪些數據項;
  • 匯總數字字段,本例中是收入;

代碼如下

df_pivot = df.groupBy('年月')\ .pivot('項目', ['項目1','項目2','項目3','項目x'])\ .agg(F.sum('收入'))\ .fillna(0)

結果如下

+-------+----+----+---+---+
| 年月| 項目1| 項目2|項目3|項目x|
+-------+----+----+---+---+
|2018-03|   0|   0|  0|999|
|2018-02|1000|2000|  0|  0|
|2018-01| 100| 200|300|  0|
+-------+----+----+---+---+

逆透視Unpivot

Spark沒有提供內置函數來實現unpivot操作,不過我們可以使用Spark SQL提供的stack函數來間接實現需求。有幾點需要特別注意:

  • 使用selectExpr在Spark中執行SQL片段;
  • 如果字段名稱有中文,要使用反引號**`** 把字段包起來;

代碼如下

df_pivot.selectExpr("`年月`", "stack(4, '項目1', `項目1`,'項目2', `項目2`, '項目3', `項目3`, '項目x', `項目x`) as (`項目`,`收入`)")\ .filter("`收入` > 0 ")\ .orderBy(["`年月`", "`項目`"])\ .show()

結果如下

+-------+---+----+
|     年月| 項目|  收入|
+-------+---+----+
|2018-01|項目1| 100|
|2018-01|項目2| 200|
|2018-01|項目3| 300|
|2018-02|項目1|1000|
|2018-02|項目2|2000|
|2018-03|項目x| 999|
+-------+---+----+


參考 :https://juejin.im/post/5b1e343f518825137c1c6a27


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM