odps UDAF解析


UDAF

class odps.udf.BaseUDAF
继承此类实现Python UDAF。

BaseUDAF.new_buffer()
实现此方法返回聚合函数的中间值的buffer。buffer必须是mutable object(比如list, dict),并且buffer的大小不应该随数据量递增,在极限情况下,buffer marshal过后的大小不应该超过2Mb。

BaseUDAF.iterate(buffer[, args, ...])
实现此方法将args聚合到中间值buffer中。

BaseUDAF.merge(buffer, pbuffer)
实现此方法将两个中间值buffer聚合到一起,即将pbuffer merge到buffer中。

BaseUDAF.terminate(buffer)
实现此方法将中间值buffer转换为ODPS SQL基本类型。下面是一个UDAF求平均值的例子。

sample

from odps.udf import annotate
from odps.udf import BaseUDAF


@annotate('bigint->bigint')
class Average(BaseUDAF):

    def new_buffer(self):
        return [0, 0]

    def iterate(self, buffer, number):
        if number is not None:
            buffer[0] += number
            buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def terminate(self, buffer):
        if buffer[1] == 0:
            return 0
        return buffer[0] / buffer[1]

比如计算1,2,3,4的平均值的执行过程如下图所示:

更多请参考http://www.singlex.net/3442.html?kozafo=i5er4


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM