ODPS_ele—UDF Python API


自定義函數(UDF)

UDF全稱User Defined Function,即用戶自定義函數。ODPS提供了很多內建函數來滿足用戶的計算需求,同時用戶還可以通過創建自定義函數來滿足不同的計算需求。UDF在使用上與普通的 SQL內建函數 類似。

在ODPS中,用戶可以擴展的UDF有三種,分別是:

UDF 分類 |  描述

User Defined Scalar Function 通常也稱之為UDF
自定義函數,准確的說是用戶自定義標量函數 (User Defined Scalar Function)。UDF的輸入與輸 出是一對一的關系,即讀入一行數據, 寫出一條輸出值。

UDAF(User Defined Aggregation Function)
自定義聚合函數,其輸入與輸出是多對一的關系, 即將多條輸入記錄聚合成一條輸出值。可以與 SQL中的Group By語句聯用。具體語法請參考聚合函數

UDTF(User Defined Table Valued Function)
自定義表函數,是用來解決一次函數調用輸出 多行數據場景的,也是唯一能返回多個字段的自定 義函數。而UDF及UDAF只能一次計算輸出一條 返回值。

注解

UDF廣義的說法代表了自定義標量函數,自定義聚合函數及自定義表函數三種類型的自定義函數的集合。狹義來說,僅代表用戶自定義標量函數。文檔會經常使用這一名詞,請讀者根據文檔上下文判斷具體含義。

受限環境

ODPS UDF的Python版本為2.7,並以沙箱模式執行用戶代碼,即代碼是在一個受限的運行環境中執行的,在這個環境中,被禁止的行為包括:

  • 讀寫本地文件
  • 啟動子進程
  • 啟動線程
  • 使用socket通信
  • 其他系統調用

基於上述原因,用戶上傳的代碼必須都是純Python實現,C擴展模塊是被禁止的。

此外,Python的標准庫中也不是所有模塊都可用,涉及到上述功能的模塊都會被禁止。具體標准庫可用模塊說明如下:

  1. 所有純Python實現(不依賴擴展模塊)的模塊都可用
  2. C實現的擴展模塊中下列模塊可用
  • array ;audioop ;
  • binascii ;_bisect ;
  • cmath ;_codecs_cn ;_codecs_hk ;_codecs_iso2022 ;_codecs_jp ;_codecs_kr
  • _codecs_tw ;_collections ;cStringIO ;
  • datetime ;
  • _functools ;future_builtins ;
  • _hashlib ;_heapq ;
  • itertools ;
  • _json ;
  • _locale ;_lsprof ;
  • math ;_md5 ;_multibytecodec
  • operator ;
  • _random ;
  • _sha256 ;_sha512 ;_sha ;_struct ;strop
  • time ;
  • unicodedata ;
  • _weakref ;
  • cPickle;
  1. 部分模塊功能受限。比如沙箱限制了用戶代碼最多能往標准輸出和標准錯誤輸出寫出數據的大小,即``sys.stdout/sys.stderr``最多能寫20Kb,多余的字符會被忽略。

第三方庫

運行環境中還安裝了除標准庫以外比較常用的三方庫,做為標准庫的補充。支持的三方庫列表如下:

  • numpy

警告

三方庫的使用同樣受到禁止本地、網絡IO或其他在受限環境下的限制,因此三方庫中涉及到相關功能的API也是被禁止的。

參數與返回值類型

@odps.udf.annotate(signature)

Python UDF目前支持ODPS SQL數據類型有:bigint, string, double, boolean和datetime。SQL語句在執行之前,所有函數的參數類型和返回值類型必須確定。因此對於Python這一動態類型語言,需要通過對UDF類加decorator的方式指定函數簽名。

函數簽名signature通過字符串指定,語法如下:

arg_type_list '->' type_list
arg_type_list: type_list | '*' | ''
type_list: [type_list ','] type
type: 'bigint' | 'string' | 'double' | 'boolean' | 'datetime'
  1. 箭頭左邊表示參數類型,右邊表示返回值類型。
  2. 只有UDTF的返回值可以是多列, UDF和UDAF只能返回一列。
  3. ‘*’代表變長參數,使用變長參數,UDF/UDTF/UDAF可以匹配任意輸入參數。

下面是合法的signature的例子:

'bigint,double->string'            # 參數為bigint、double,返回值為string
'bigint,boolean->string,datetime'  
# UDTF參數為bigint、boolean,返回值為string,datetime
'*->string'                        # 變長參數,
輸入參數任意
,返回值為string
'->double'                         # 參數為空,返回值為double

Query語義解析階段會將檢查到不符合函數簽名的用法,拋出錯誤禁止執行。執行期,UDF函數的參數會以函數簽名指定的類型傳給用戶。用戶的返回值類型也要與函數簽名指定的類型一致,否則檢查到類型不匹配時也會報錯。ODPS SQL數據類型對應Python類型如下:

image

注解

  • Datetime類型是以int的形式傳給用戶代碼的,值為epoch utc time起始至今的毫秒數。用戶可以通過Python標准庫中的datetime模塊處理日期時間類型。
  • NULL值對應Python里的None。

odps.udf.int(value[, silent=True])

Python builtin函數 int 的修改。增加了參數 silent 。當 silentTrue 時,如果 value 無法轉為 int ,不會拋出異常,而是返回 None

UDF

實現Python UDF非常簡單,只需要定義一個new-style class,並實現 evaluate 方法。下面是一個例子:

from odps.udf import annotate
@annotate("bigint,bigint->bigint")
class MyPlus(object):

   def evaluate(self, arg0, arg1):
       if None in (arg0, arg1):
           return None
       return arg0 + arg1

注解:Python UDF必須通過annotate指定函數簽名

UDAF
class odps.udf. BaseUDAF

繼承此類實現Python UDAF。

BaseUDAF. new_buffer ( )

實現此方法返回聚合函數的中間值的buffer。buffer必須是mutable object(比如list, dict),並且buffer的大小不應該隨數據量遞增,在極限情況下,buffer marshal過后的大小不應該超過2Mb。

BaseUDAF. iterate ( buffer[, args, ...] )

實現此方法將args聚合到中間值buffer中。

BaseUDAF. merge ( buffer, pbuffer )

實現此方法將兩個中間值buffer聚合到一起,即將pbuffer merge到buffer中。

BaseUDAF. terminate ( buffer )

實現此方法將中間值buffer轉換為ODPS SQL基本類型。

下面是一個UDAF求平均值的例子。

#coding:utf-8
from odps.udf import annotate
from odps.udf import BaseUDAF

@annotate('double->double')
class Average(BaseUDAF):

    def new_buffer(self):
        return [0, 0]

    def iterate(self, buffer, number):
        if number is not None:
            buffer[0] += number
            buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def terminate(self, buffer):
        if buffer[1] == 0:
            return 0
        return buffer[0] / buffer[1]
UDTF
class odps.udf. BaseUDTF

Python UDTF的基類,用戶繼承此類,並實現 process, close 等方法。

BaseUDTF. __init__ ( )

初始化方法,繼承類如果實現這個方法,則必須在一開始調用基類的初始化方法 super(BaseUDTF,self).__init__()

__init__ 方法在整個UDTF生命周期中只會被調用一次,即在處理第一條記錄之前。當UDTF需要保存內部狀態時,可以在這個方法中初始化所有狀態。

BaseUDTF. process ([ args, ...] )

這個方法由ODPS SQL框架調用,SQL中每一條記錄都會對應調用一次 processprocess 的參數為SQL語句中指定的UDTF輸入參數。

BaseUDTF. forward ([ args, ...] )

UDTF的輸出方法,此方法由用戶代碼調用。每調用一次 forward ,就會輸出一條記錄。 forward 的參數為SQL語句中指定的UDTF的輸出參數。

BaseUDTF. close ( )

UDTF的結束方法,此方法由ODPS SQL框架調用,並且只會被調用一次,即在處理完最后一條記錄之后。

下面是一個UDTF的例子。

#coding:utf-8
# explode.py
from odps.udf import annotate
from odps.udf import BaseUDTF

@annotate('string -> string')
class Explode(BaseUDTF):
   """將string按逗號分隔輸出成多條記錄"""
   def process(self, arg):
       props = arg.split(',')
       for p in props:
           self.forward(p)

注解

Python UDTF也可以不加annotate指定參數類型和返回值類型。這樣,函數在SQL中使用時可以匹配任意輸入參數,但返回值類型無法推導,所有輸出參數都將認為是string類型。因此在調用 forward 時,就必須將所有輸出值轉成 str 類型。

引用資源

Python UDF可以通過 odps.distcache 模塊引用資源文件,目前支持引用文件資源和表資源。

odps.distcache. get_cache_file ( resource_name )

release-2012.09.03 新版功能.

返回指定名字的資源內容。 resource_namestr 類型,對應當前Project中已存在的資源名。如果資源名非法或者沒有相應的資源,會拋出異常。

返回值為 file-like object ,在使用完這個object后,調用者有義務調用 close 方法釋放打開的資源文件。

下面是使用 get_cache_file 的例子:

from odps.udf import annotate
from odps.distcache import get_cache_file

@annotate('bigint->string')
class DistCacheExample(object):

def __init__(self):
    cache_file = get_cache_file('test_distcache.txt')
    kv = {}
    for line in cache_file:
        line = line.strip()
        if not line:
            continue
        k, v = line.split()
        kv[int(k)] = v
    cache_file.close()
    self.kv = kv

def evaluate(self, arg):
    return self.kv.get(arg)

odps.distcache.get_cache_table(resource_name)

release-2012.11.14 新版功能.

返回指定資源表的內容。 resource_namestr 類型,對應當前Project中已存在的資源表名。如果資源名非法或者沒有相應的資源,會拋出異常。

返回值為 generator 類型,調用者通過遍歷獲取表的內容,每次遍歷得到的是以 tuple 形式存在的表中的一條記錄。

下面是使用 get_cache_table 的例子:

from odps.udf import annotate
from odps.distcache import get_cache_table

@annotate('->string')
class DistCacheTableExample(object):
    def __init__(self):
        self.records = list(get_cache_table('udf_test'))
        self.counter = 0
        self.ln = len(self.records)

    def evaluate(self):
        if self.counter > self.ln - 1:
            return None
        ret = self.records[self.counter]
        self.counter += 1
        return str(ret)

注意事項

表達式優化

當一個Query中有多個相同UDF,並且他們的參數也都一致時,這些UDF在執行時會被優化成只執行一次。例如:

random.seed(12345)
@annotate('bigint->bigint')
class MyRand(object):
    def evaluate(self, a):
        return random.randint(0, 10)

實現一個Rand函數,希望每次調用Rand時返回一個隨機值。

> select MyRand(c_int_a), MyRand(c_int_a) from udf_test;
+------------+------------+
| _c0        | _c1        |
+------------+------------+
| 4          | 4          |
| 0          | 0          |
| 9          | 9          |
| 3          | 3          |
+------------+------------+

可以看到默認情況下,同一行的兩次Rand調用返回值結果一樣,這是因為被優化后只執行一次導致的。如果不想要這個優化,可以通過設置配置項odps.sql.udf.optimize.reuse 取消這個優化:

> set odps.sql.udf.optimize.reuse=false;
> select MyRand(c_int_a), MyRand(c_int_a) from udf_test;
+------------+------------+
| _c0        | _c1        |
+------------+------------+
| 4          | 0          |
| 9          | 3          |
| 4          | 2          |
| 6          | 1          |
+------------+------------+

總結

ODPS為Python提供的類有

1. 參數與返回值類型

@odps.udf.annotate(signature),ODPS SQL數據類型對應Python類型如下:

image

odps.udf.int(value[, silent=True])

2. UDF

# 定義一個new-style class,並實現 evaluate 方法

from odps.udf import annotate
@annotate("bigint,bigint->bigint")
class MyPlus(object):
   def evaluate(self, arg0, arg1):
       if None in (arg0, arg1):
           return None
       return arg0 + arg1

3. UDAF

class odps.udf.BaseUDAF—繼承此類實現Python UDAF。

BaseUDAF類擁有的四個方法如下:

BaseUDAF. new_buffer ( )
BaseUDAF. iterate ( buffer[, args, ...] )
BaseUDAF. merge ( buffer, pbuffer )
BaseUDAF. terminate ( buffer )

下面是一個UDAF求平均值的例子。

#coding:utf-8
from odps.udf import annotate
from odps.udf import BaseUDAF

@annotate('double->double')
class Average(BaseUDAF):

    def new_buffer(self):
        return [0, 0]

    def iterate(self, buffer, number):
        if number is not None:
            buffer[0] += number
            buffer[1] += 1

    def merge(self, buffer, pbuffer):
        buffer[0] += pbuffer[0]
        buffer[1] += pbuffer[1]

    def terminate(self, buffer):
        if buffer[1] == 0:
            return 0
        return buffer[0] / buffer[1]

4.UDTF

class odps.udf.BaseUDTF—Python UDTF的基類,用戶繼承此類,並實現 process , close 等方法。

BaseUDTF類擁有的四個方法

BaseUDTF. __init__ ( )
BaseUDTF. process ([ args, ...] )
BaseUDTF. forward ([ args, ...] )
BaseUDTF. close ( )
下面是一個UDTF的例子。
#coding:utf-8
# explode.py
from odps.udf import annotate
from odps.udf import BaseUDTF

@annotate('string -> string')
class Explode(BaseUDTF):
   """將string按逗號分隔輸出成多條記錄
   """

   def process(self, arg):
       props = arg.split(',')
       for p in props:
           self.forward(p)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM