UDAF
class odps.udf.BaseUDAF
繼承此類實現Python UDAF。
BaseUDAF.new_buffer()
實現此方法返回聚合函數的中間值的buffer。buffer必須是mutable object(比如list, dict),並且buffer的大小不應該隨數據量遞增,在極限情況下,buffer marshal過后的大小不應該超過2Mb。
BaseUDAF.iterate(buffer[, args, ...])
實現此方法將args聚合到中間值buffer中。
BaseUDAF.merge(buffer, pbuffer)
實現此方法將兩個中間值buffer聚合到一起,即將pbuffer merge到buffer中。
BaseUDAF.terminate(buffer)
實現此方法將中間值buffer轉換為ODPS SQL基本類型。下面是一個UDAF求平均值的例子。
sample
from odps.udf import annotate
from odps.udf import BaseUDAF
@annotate('bigint->bigint')
class Average(BaseUDAF):
def new_buffer(self):
return [0, 0]
def iterate(self, buffer, number):
if number is not None:
buffer[0] += number
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def terminate(self, buffer):
if buffer[1] == 0:
return 0
return buffer[0] / buffer[1]
比如計算1,2,3,4的平均值的執行過程如下圖所示:
更多請參考http://www.singlex.net/3442.html?kozafo=i5er4