pyspark 自定義聚合函數 UDAF


自定義聚合函數 UDAF 目前有點麻煩,PandasUDFType.GROUPED_AGG 在2.3.2的版本中不知怎么回事,不能使用!

這樣的話只能曲線救國了!

 

 

PySpark有一組很好的聚合函數(例如,count,countDistinct,min,max,avg,sum),但這些並不適用於所有情況(特別是如果你試圖避免代價高昂的Shuffle操作)。

PySpark目前有pandas_udfs,它可以創建自定義聚合器,但是你一次只能“應用”一個pandas_udf。如果你想使用多個,你必須預先形成多個groupBys ......並且避免那些改組。

在這篇文章中,我描述了一個小黑客,它使您能夠創建簡單的python UDF,它們對聚合數據起作用(此功能只應存在於Scala中!)。

1 
2 3 4 5 6 7 8 
from pyspark.sql import functions as F from pyspark.sql import types as T  a = sc.parallelize([[1, 'a'],  [1, 'b'],  [1, 'b'],  [2, 'c']]).toDF(['id', 'value']) a.show() 
ID
1 '一個'
1 'B'
1 'B'
2 'C'

我使用collect_list將給定組中的所有數據放入一行。我打印下面這個操作的輸出。

1
a.groupBy('id').agg(F.collect_list('value').alias('value_list')).show() 
ID VALUE_LIST
1 ['a','b','b']
2 ['C']

然后我創建一個UDF,它將計算這些列表中字母'a'的所有出現(這可以很容易地在沒有UDF的情況下完成,但是你明白了)。此UDF包含collect_list,因此它作用於collect_list的輸出。

1 
2 3 4 5 6 7 8 9 10 11 
def find_a(x):  """Count 'a's in list."""  output_count = 0  for i in x:  if i == 'a':  output_count += 1  return output_count  find_a_udf = F.udf(find_a, T.IntegerType())  a.groupBy('id').agg(find_a_udf(F.collect_list('value')).alias('a_count')).show() 
ID A_COUNT
1 1
2 0

我們去!作用於聚合數據的UDF!接下來,我展示了這種方法的強大功能,結合何時讓我們控制哪些數據進入F.collect_list。

首先,讓我們創建一個帶有額外列的數據框。

1 
2 3 4 5 6 7 8 9 
from pyspark.sql import functions as F from pyspark.sql import types as T  a = sc.parallelize([[1, 1, 'a'],  [1, 2, 'a'],  [1, 1, 'b'],  [1, 2, 'b'],  [2, 1, 'c']]).toDF(['id', 'value1', 'value2']) a.show() 
ID 值1 值2
1 1 '一個'
1 2 '一個'
1 1 'B'
1 2 'B'
2 1 'C'

請注意,我如何在collect_list中包含一個when。請注意,UDF仍然包含collect_list。

1
a.groupBy('id').agg(find_a_udf( F.collect_list(F.when(F.col('value1') == 1, F.col('value2')))).alias('a_count')).show() 
ID A_COUNT
1 1
2 0

https://danvatterott.com/blog/2018/09/06/python-aggregate-udfs-in-pyspark/

 

 

 

還有一種做法就是用pandas_udf, series 添加一列分組變量然后去重。

 

還有就是使用輸入輸出都是dataframe 的 pandas_udf

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM