pyspark編程實踐(過濾、行運算、字符串操作、缺失處理)


過濾篩選

在pyspark中支持了用filter/where等方法進行數據篩選與過濾的操作(這樣的操作在習慣用pandas后未免會覺得有點冗余).

from pyspark.sql import SparkSession
spark = SparkSession.bulider.appName('test').master('local[2]').getOrCreate()
df = spark.read.csv('D:/tips.csv', header=True)
df = df.filter(df.tip > 1.01)
df = df.where(df.tip > 1.01)
OUT:
+----------+----+------+---+------+----+-------+
|total_bill| tip|smoker|day|  time|size|  tip_2|
+----------+----+------+---+------+----+-------+
|     21.01| 3.5|    No|Sun|Dinner| 3.0|  12.25|
|     23.68|3.31|    No|Sun|Dinner| 2.0|10.9561|
|     24.59|3.61|    No|Sun|Dinner| 4.0|13.0321|
|     25.29|4.71|    No|Sun|Dinner| 4.0|22.1841|
|     26.88|3.12|    No|Sun|Dinner| 4.0| 9.7344|
|     14.78|3.23|    No|Sun|Dinner| 2.0|10.4329|
|     35.26| 5.0|    No|Sun|Dinner| 4.0|   25.0|
|     18.43| 3.0|    No|Sun|Dinner| 4.0|    9.0|
|     14.83|3.02|    No|Sun|Dinner| 2.0| 9.1204|
|     21.58|3.92|    No|Sun|Dinner| 2.0|15.3664|
|     16.29|3.71|    No|Sun|Dinner| 3.0|13.7641|
|     16.97| 3.5|    No|Sun|Dinner| 3.0|  12.25|
|     20.65|3.35|    No|Sat|Dinner| 3.0|11.2225|
|     17.92|4.08|    No|Sat|Dinner| 2.0|16.6464|
|     20.29|2.75|    No|Sat|Dinner| 2.0| 7.5625|
|     15.77|2.23|    No|Sat|Dinner| 2.0| 4.9729|
|     39.42|7.58|    No|Sat|Dinner| 4.0|57.4564|
|     19.82|3.18|    No|Sat|Dinner| 2.0|10.1124|
|     17.81|2.34|    No|Sat|Dinner| 4.0| 5.4756|
|      21.7| 4.3|    No|Sat|Dinner| 2.0|  18.49|
+----------+----+------+---+------+----+-------+
only showing top 20 rows

意外發現pyspark的DataFrame數據格式還支持跟pandas.DataFrame一樣的過濾寫法與isin關鍵字

df = df[df.tip > 1.01]
df.show()
OUT:
+----------+----+------+---+------+----+-------+
|total_bill| tip|smoker|day|  time|size|  tip_2|
+----------+----+------+---+------+----+-------+
|     21.01| 3.5|    No|Sun|Dinner| 3.0|  12.25|
|     23.68|3.31|    No|Sun|Dinner| 2.0|10.9561|
|     24.59|3.61|    No|Sun|Dinner| 4.0|13.0321|
|     25.29|4.71|    No|Sun|Dinner| 4.0|22.1841|
|     26.88|3.12|    No|Sun|Dinner| 4.0| 9.7344|
|     14.78|3.23|    No|Sun|Dinner| 2.0|10.4329|
|     35.26| 5.0|    No|Sun|Dinner| 4.0|   25.0|
|     18.43| 3.0|    No|Sun|Dinner| 4.0|    9.0|
|     14.83|3.02|    No|Sun|Dinner| 2.0| 9.1204|
|     21.58|3.92|    No|Sun|Dinner| 2.0|15.3664|
|     16.29|3.71|    No|Sun|Dinner| 3.0|13.7641|
|     16.97| 3.5|    No|Sun|Dinner| 3.0|  12.25|
|     20.65|3.35|    No|Sat|Dinner| 3.0|11.2225|
|     17.92|4.08|    No|Sat|Dinner| 2.0|16.6464|
|     20.29|2.75|    No|Sat|Dinner| 2.0| 7.5625|
|     15.77|2.23|    No|Sat|Dinner| 2.0| 4.9729|
|     39.42|7.58|    No|Sat|Dinner| 4.0|57.4564|
|     19.82|3.18|    No|Sat|Dinner| 2.0|10.1124|
|     17.81|2.34|    No|Sat|Dinner| 4.0| 5.4756|
|      21.7| 4.3|    No|Sat|Dinner| 2.0|  18.49|
+----------+----+------+---+------+----+-------+
only showing top 20 rows

# pyspark也可以直接支持df[df.xxx]的過濾寫法
# 同時還支持isin關鍵字  df[df.day.isin(['Fri','Sat'])].show()
df[df.day.isin('Fri')].show()
OUT:
+----------+----+------+---+------+----+-------+
|total_bill| tip|smoker|day|  time|size|  tip_2|
+----------+----+------+---+------+----+-------+
|     28.97| 3.0|   Yes|Fri|Dinner| 2.0|    9.0|
|     22.49| 3.5|    No|Fri|Dinner| 2.0|  12.25|
|      5.75| 1.0|   Yes|Fri|Dinner| 2.0|    1.0|
|     16.32| 4.3|   Yes|Fri|Dinner| 2.0|  18.49|
|     22.75|3.25|    No|Fri|Dinner| 2.0|10.5625|
|     40.17|4.73|   Yes|Fri|Dinner| 4.0|22.3729|
|     27.28| 4.0|   Yes|Fri|Dinner| 2.0|   16.0|
|     12.03| 1.5|   Yes|Fri|Dinner| 2.0|   2.25|
|     21.01| 3.0|   Yes|Fri|Dinner| 2.0|    9.0|
|     12.46| 1.5|    No|Fri|Dinner| 2.0|   2.25|
|     11.35| 2.5|   Yes|Fri|Dinner| 2.0|   6.25|
|     15.38| 3.0|   Yes|Fri|Dinner| 2.0|    9.0|
|     12.16| 2.2|   Yes|Fri| Lunch| 2.0|   4.84|
|     13.42|3.48|   Yes|Fri| Lunch| 2.0|12.1104|
|      8.58|1.92|   Yes|Fri| Lunch| 1.0| 3.6864|
|     15.98| 3.0|    No|Fri| Lunch| 3.0|    9.0|
|     13.42|1.58|   Yes|Fri| Lunch| 2.0| 2.4964|
|     16.27| 2.5|   Yes|Fri| Lunch| 2.0|   6.25|
|     10.09| 2.0|   Yes|Fri| Lunch| 2.0|    4.0|
+----------+----+------+---+------+----+-------+

# 對於一些類似於isNotNull()的函數也是可以直接用的
df_tip[df_tip.tip.isNotNull()].show()

行運算

感覺在spark中對行進行的匯總運算總是不太方便的, 因此嘗試使用了udf以及reduce等方法進行實現, 此外還可以直接運用類似pandas里面的寫法來進行實現, 不過spark里面沒有broadcast機制, 因此不能用shape不匹配的數據進行計算

# 基於reduce的寫法就不再在這里贅述了

tips_.show(2)
+----------+----+----+
|total_bill| tip|size|
+----------+----+----+
|     16.99|1.01| 2.0|
|     10.34|1.66| 3.0|
+----------+----+----+
only showing top 2 rows

tips_.select((col('total_bill') + col('tip') + col('size')).alias('sum')).show(2)
+----+
| sum|
+----+
|20.0|
|15.0|
+----+
only showing top 2 rows

tips_.select(((col('total_bill') + col('tip') + col('size')) / 3).alias('sum')).show(2)
+-----------------+
|              sum|
+-----------------+
|6.666666666666667|
|              5.0|
+-----------------+
only showing top 2 rows

# 這里直接用lit()創建了一整列進行一一的映射計算
tips_.select(((col('total_bill') + col('tip') + col('size')) / lit(3)).alias('sum')).show(2)
+-----------------+
|              sum|
+-----------------+
|6.666666666666667|
|              5.0|
+-----------------+
only showing top 2 rows

字符串操作

字符串拼接

在算法操作中比較常見的做法就是把多個字符串拼接到一起作為一個單獨的列, 在spark中也有多種方法可以完成這一實現

data.show(5)
+--------+-------+--------+--------------------+-----+--------+
|glass_id|step_id|equip_id|             timekey|label| unit_id|
+--------+-------+--------+--------------------+-----+--------+
|Y95PR090|  14200|A2PDC100|20190601094814153863|    1|A2PDC100|
|Y95PR090|  14207|A2VTM100|20190601120431648744|    1|A2VTM100|
|Y95PR090|  1420V|A2PVD100|20190601120428511673|    1|A2PVD100|
|Y95PR090|  14300|A2RSM100|20190601125957981111|    1|A2RSM100|
|Y95PR090|  14315|A2PHT500|20190601150105054455|    1|A2PHT500|
+--------+-------+--------+--------------------+-----+--------+
only showing top 5 rows

# 直接使用SQL表達式完成
# ===SQL的寫法雖然是字符串, 但是借用format應該是可以將其變得更加靈活的, 有待改進====
datpath.selectExpr("step_id","equip_id","concat_ws('_',unit_id,equip_id,step_id) as col").show(5)
+-------+--------+--------------------+
|step_id|equip_id|                 col|
+-------+--------+--------------------+
|  14200|A2PDC100|A2PDC100_A2PDC100...|
|  14207|A2VTM100|A2VTM100_A2VTM100...|
|  1420V|A2PVD100|A2PVD100_A2PVD100...|
|  14300|A2RSM100|A2RSM100_A2RSM100...|
|  14315|A2PHT500|A2PHT500_A2PHT500...|
+-------+--------+--------------------+
only showing top 5 rows

# udf也是有力的工具, 不過效率可能有一定的問題
combine_udf = udf(lambda x, y : x + '_' + y)
for i in range(1, len(cols)):
    datpath = datpath.withColumn('path', combine_udf(col('path'), col(cols[i])))
    
# spark自帶拼接工具, 效率比udf高一點點
from pyspark.sql.functions import concat, concat_ws
df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
 
# 直接拼接
df.select(concat(df.s, df.d).alias('s')).show()
# abcd123
 
# 指定拼接符
df.select(concat_ws('-', df.s, df.d).alias('s')).show()
# 'abcd-123'

缺失

缺失判斷&統計

# 簡單的缺失判斷
from pyspark.sql.functions import *
tips.select(when(isnull('tip'), 0).otherwise(1).alias('tip_isnull'))
+----------+
|tip_isnull|
+----------+
|         1|
|         1|
|         1|
|         1|
|         1|
+----------+
only showing top 5 rows

# 更為復雜的缺失統計, 涉及到一行上的操作, 在行上的統計需要用到python的內置reduce來進行實現
tips = tips.withColumn('null', lit(None))
tips = tips.withColumn('null_sum', lit(0))
reduce(lambda data,idx: data.withColumn('num_sum', data['num_sum'] + when(isnull(data[idx]),1).otherwise(0)),tips.columns ,tips).show(5)
+----------+----+------+---+------+----+-------+----+
|total_bill| tip|smoker|day|  time|size|num_sum|null|
+----------+----+------+---+------+----+-------+----+
|     16.99|1.01|    No|Sun|Dinner|   2|      1|null|
|     10.34|1.66|    No|Sun|Dinner|   3|      1|null|
|     21.01| 3.5|    No|Sun|Dinner|   3|      1|null|
|     16.97| 3.5|    No|Sun|Dinner|   3|      1|null|
|     20.65|3.35|    No|Sat|Dinner|   3|      1|null|
+----------+----+------+---+------+----+-------+----+
only showing top 5 rows

# 對一列進行判斷和聚合
tips.select(sum(when(isnull(tips['null']),1).otherwise(0)).alias('static')).show()
+------+
|static|
+------+
|   244|
+------+

缺失填充

DataFrame.na.fill 和 DataFrame.fillna都是可以對缺失進行填充的, 填充的方式可以直接對全部的缺失填入同樣的target, 也可以采用字典的形式指定每一列的填充的target.

  • 需要避開的一個大坑在於spark中的缺失填充target必須要和目標列的數據類型一致, 否則會被ignore
# 類型不一致, 填充失敗
df = df.withColumn('null_', lit(None))
df.na.fill(50).show(2)  # 填充失敗
+----------+----+------+---+------+----+-----+
|total_bill| tip|smoker|day|  time|size|null_|
+----------+----+------+---+------+----+-----+
|     16.99|1.01|    No|Sun|Dinner|   2| null|
|     10.34|1.66|    No|Sun|Dinner|   3| null|
+----------+----+------+---+------+----+-----+
only showing top 2 rows

# 類型相同填充成功
df = df.withColumn('null_', df.null_.cast('int'))
df.fillna(50).show(3)
+----------+----+------+---+------+----+-----+
|total_bill| tip|smoker|day|  time|size|null_|
+----------+----+------+---+------+----+-----+
|     16.99|1.01|    No|Sun|Dinner|   2|   50|
|     10.34|1.66|    No|Sun|Dinner|   3|   50|
|     21.01| 3.5|    No|Sun|Dinner|   3|   50|
+----------+----+------+---+------+----+-----+
only showing top 3 rows

# 以字典的形式進行填充
df4.na.fill({'age': 50, 'name': 'unknown'}).show()
+---+------+-------+
|age|height|   name|
+---+------+-------+
| 10|    80|  Alice|
|  5|  null|    Bob|
| 50|  null|    Tom|
| 50|  null|unknown|
+---+------+-------+


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM