0. 寫在前面
Presto Functions 並不能像 Hive UDF 一樣動態加載,需要根據 Function 的類型,實現 Presto 內部定義的不同接口,在 Presto 服務啟動時進行注冊,然后才能在 SQL 執行時進行調用。
1. 函數定義
Presto 內部將 Functions 分為以下三大類:
- Scalar Function,即標量函數。將傳遞給它的一個或者多個參數值,進行計算后,返回一個確定類型的標量值。
- Aggregation Function,即聚合函數。計算從列中取得的值,返回一個單一的值。
- Window Function,即開窗函數。計算從分組列中取得的值,並返回多個值。
對於不同類型的函數,需要遵循不同的規則進行實現。
1.1 標量函數
Presto 使用注解框架來實現標量函數,標量函數分別需要定義函數名稱、輸入參數類型和返回結果類型。下面介紹幾種開發標量函數常用的注解:
@ScalarFunction
:用於聲明標量函數的名稱和別名@Description
:用於生成函數的功能描述@SqlType
:用於聲明函數的返回類型和參數類型@TypeParameter
:用於聲明類型變量,它所聲明的類型變量可以用於函數的返回類型和參數類型,框架在運行時會自動將變量與具體的類型進行綁定@SqlNullable
:用於表示函數參數或返回結果可能為NULL
。如果方法的參數不使用此注解,當函數參數包含NULL
時,則該函數不會被調用,框架自動返回結果NULL
。當 Java 代碼中用於實現函數的方法的返回值為包裝類型時,必須要在實現方法上加上該注解,且該注解無法用於 Java 基礎類型
下面用一個簡單的is_null
函數來具體說明如何使用以上注解進行標量函數開發。
public class ExampleIsNullFunction
{
@ScalarFunction(value = "is_null", alias = "isnull")
@Description("Returns TRUE if the argument is NULL")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isNull(@SqlNullable @SqlType(StandardTypes.VARCHAR) Slice string)
{
return (string == null);
}
}
以上代碼實現的is_null
函數功能為:判斷傳入的VARCHAR
類型參數是否為NULL
,如果為NULL
則返回true
,否則返回false
。其中:
@ScalarFunction(value = "is_null", alias = "isnull")
聲明了函數名為is_null
,函數別名為isnull
,即在 SQL 中使用is_null
和isnull
都可以調用該函數@Description("Returns TRUE if the argument is NULL")
聲明了函數描述,使用show functions
命令可以看到函數的描述@SqlType(StandardTypes.BOOLEAN)
聲明了函數的返回類型為BOOLEAN
- 因為當函數參數為
NULL
時,我們不能直接返回NULL
,而是要進行判斷,所以要加上@SqlNullable
避免框架自動返回NULL
@SqlType(StandardTypes.VARCHAR)
聲明了函數的參數類型為VARCHAR
注意到,這里使用了 Java 類型Slice
來接收 SQL 中VARCHAR
類型的值。框架會自動將 SQL 中的數據類型與“原生容器類型”(Native container type)進行綁定,目前“原生容器類型”只包括:boolean
、long
、double
、Slice
和Block
。VARCHAR
對應的原生容器類型是Slice
而不是String
,Slice
的本質是對byte[]
進行了封裝,為的是更加高效、自由地對內存進行操作。Block
可以簡單的理解為對應 SQL 中的數組類型。具體的對應關系和綁定過程涉及 Presto 的類型系統和函數調用過程,不是本文講解的重點,故在此不作展開。
進一步地,我們想對 is_null
函數進行升級,使它能夠處理任意類型的參數,這時@TypeParameter
注解就派上用場了,函數的實現可以改寫為:
@ScalarFunction(value = "is_null", alias = "isnull")
@Description("Returns TRUE if the argument is NULL")
public class ExampleIsNullFunction
{
private IsNullFunctions()
{
}
@TypeParameter("T")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isNullSlice(@SqlNullable @SqlType("T") Slice value)
{
return (value == null);
}
@TypeParameter("T")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isNullLong(@SqlNullable @SqlType("T") Long value)
{
return (value == null);
}
@TypeParameter("T")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isNullDouble(@SqlNullable @SqlType("T") Double value)
{
return (value == null);
}
@TypeParameter("T")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isNullBoolean(@SqlNullable @SqlType("T") Boolean value)
{
return (value == null);
}
@TypeParameter("T")
@SqlType(StandardTypes.BOOLEAN)
public static boolean isNullBlock(@SqlNullable @SqlType("T") Block value)
{
return (value == null);
}
}
可以看到,@TypeParameter
的使用有點類似 Java 中泛型的用法,類型變量T
在聲明完之后就可以在@SqlType
注解中使用。在實際的調用過程中,框架會將T
與實際 SQL 類型進行綁定,然后再去調用以對應的原生容器類型為參數的實際方法。
1.2 聚合函數
聚合的過程一般涉及多行,有一個累積計算的過程,又由於 Presto 是一個分布式的計算引擎,數據分布在多個節點,所以需要用狀態對象來維護和記錄中間計算結果。
引入狀態之后,Presto 將聚合的過程抽象為三個步驟:
input(state, value)
combine(state1, state2)
output(state, out)
首先,input
階段分別在不同的 worker 中進行,將行值進行累積計算到state
中;combine
階段將上一步得到的state
進行兩兩結合;經過前兩步,最終會得到一個state
,在output
階段對最終的state
進行處理輸出。
在實現方面,聚合函數的開發使用了和標量函數類似的注解框架,但是由於狀態概念的引入,需要定義一個繼承於AccumulatorState
接口的狀態接口,對於簡單的聚合,該接口只需要新增聚合所需的getter
和setter
,框架會自動生成相關的實現和序列化代碼;如果聚合過程中需要記錄復雜類型(LIST
、MAP
或自定義的類)的狀態,則需要額外實現AccumulatorStateFactory
接口和AccumulatorStateSerializer
接口,並在狀態接口上使用@AccumulatorStateMetadata
注解,在注解中指定stateFactoryClass
和stateSerializerClass
。
下面以實現求DOUBLE
類型的列均值的聚合函數avg_double
為例來說明如何進行簡單聚合函數的開發。
avg_double
的聚合狀態只需要記錄累積和與加數個數,所以狀態接口的定義如下:
public interface LongAndDoubleState
extends AccumulatorState
{
long getLong();
void setLong(long value);
double getDouble();
void setDouble(double value);
}
使用定義好的狀態接口進行聚合函數實現:
@AggregationFunction("avg_double")
public class AverageAggregation
{
@InputFunction
public static void input(LongAndDoubleState state, @SqlType(StandardTypes.DOUBLE) double value)
{
state.setLong(state.getLong() + 1);
state.setDouble(state.getDouble() + value);
}
@CombineFunction
public static void combine(LongAndDoubleState state, LongAndDoubleState otherState)
{
state.setLong(state.getLong() + otherState.getLong());
state.setDouble(state.getDouble() + otherState.getDouble());
}
@OutputFunction(StandardTypes.DOUBLE)
public static void output(LongAndDoubleState state, BlockBuilder out)
{
long count = state.getLong();
if (count == 0) {
out.appendNull();
}
else {
double value = state.getDouble();
DOUBLE.writeDouble(out, value / count);
}
}
}
可以看到聚合函數的實現使用了以下注解:
@AggregationFunction
聲明了聚合函數的名稱,也可以指定函數的別名@InputFunction
、@CombineFunction
和@OutputFunction
分別用來標記聚合的三個步驟,其中@OutputFunction
注解需要聲明聚合函數返回結果的數據類型BlockBuilder
類為結果輸出類,聚合計算出的最終結果值將通過BlockBuilder
進行輸出
1.3 窗口函數
窗口函數在查詢結果的行上進行計算,執行順序在HAVING
子句之后,ORDER BY
子句之前。在 Presto SQL 中,窗口函數的語法形式如下:
windowFunction(arg1,....argn) OVER([PARTITION BY<...>] [ORDER BY<...>] [RANGE|ROWS BETWEEN AND])
由此可見,窗口函數語法由關鍵字OVER
觸發,且包含三個子句:
PARTITION BY
: 指定輸入行分區的規則,類似於聚合函數的GROUP BY
子句,不同分區里的計算互不干擾(窗口函數的計算是並發進行的,並發數和partition
數量一致),缺省時將所有數據行視為一個分區ORDER BY
: 決定了窗口函數處理輸入行的順序RANGE|ROWS BETWEEN AND
: 指定窗口邊界,不常用,缺省時的窗口為當前行所在的分區第一行到當前行
窗口函數的開發需要實現WindowFunction
接口,WindowFunction
接口中聲明了兩個方法:
void reset(WindowIndex windowIndex)
: 處理新分區時,都會調用該方法進行初始化,WindowIndex
包含了已排序的分區的所有行void processRow(BlockBuilder output, int peerGroupStart, int peerGroupEnd, int frameStart, int frameEnd)
: 窗口函數的實現方法,BlockBuilder
為結果輸出類,計算出來的值將通過BlockBuilder
進行輸出;peerGroupStart
和peerGroupEnd
為當前處理的行所在的分區的開始和結束的位置;frameStart
和frameEnd
為當前處理行所在的窗口的開始和結束位置。
實現一個返回窗口中第一個值的窗口函數first_value(x)
的代碼如下:
@WindowFunctionSignature(name = "first_value", typeVariable = "T", returnType = "T", argumentTypes = "T")
public class FirstValueFunction
extends WindowFunction
{
private final int argumentChannel;
private WindowIndex windowIndex;
public FirstValueFunction(List<Integer> argumentChannels)
{
this.argumentChannel = getOnlyElement(argumentChannels);
}
@Override
public void reset(WindowIndex windowIndex)
{
this.windowIndex = windowIndex;
}
@Override
public void processRow(BlockBuilder output, int peerGroupStart, int peerGroupEnd, int frameStart, int frameEnd)
{
if (frameStart < 0) {
output.appendNull();
return;
}
//Outputs a value from the index
windowIndex.appendTo(argumentChannel, frameStart, output);
}
}
其中:
@WindowFunctionSignature
注解聲明了窗口函數的名稱,為了處理任意數據類型的字段,這里還聲明了類型變量T
,並將返回類型和參數類型都指定為T
- 構造函數中的
argumentChannels
為參數字段所在列的索引值 processRow
方法中,每次只需要通過列索引argumentChannel
和當前行所在的窗口起始索引frameStart
,就能確定窗口中的第一個值
2. 函數注冊
Presto 函數由MetadataManager
中的FunctionRegistry
進行管理,開發的函數要生效必須要先注冊到FunctionRegistry
中。函數注冊是在 Presto 服務啟動過程中進行的,有以下兩種方式進行函數注冊。
2.1 內置函數注冊
內置函數指的是 Presto 自帶的函數庫中的函數,函數的實現位於presto-main
模塊中,在FunctionRegistry
初始化時進行注冊。具體的注冊過程使用了建造者模式,不同類型的函數注冊只需要調用FunctionListBuilder
對象對應的方法進行注冊,關鍵代碼如下:
FunctionListBuilder builder = new FunctionListBuilder()
.window(RowNumberFunction.class)
.aggregate(ApproximateCountDistinctAggregation.class)
.scalar(RepeatFunction.class)
.function(MAP_HASH_CODE)
......
2.2 插件函數注冊
內置函數滿足不了使用需求時,就需要自行開發函數來拓展函數庫。開發者自行編寫的拓展函數一般通過插件的方式進行注冊。PluginManager
在安裝插件時會調用插件的getFunctions()
方法,將獲取到的函數集合通過MetadataManager
的addFunctions
方法進行注冊:
public void installPlugin(Plugin plugin)
{
......
for (Class<?> functionClass : plugin.getFunctions()) {
log.info("Registering functions from %s", functionClass.getName());
metadata.addFunctions(extractFunctions(functionClass));
}
......
}
所以用做拓展函數庫的插件,需要實現getFunctions()
方法,來返回拓展的函數集合,例:
public class ExampleFunctionsPlugin
implements Plugin
{
@Override
public Set<Class<?>> getFunctions()
{
return ImmutableSet.<Class<?>>builder()
.add(ExampleNullFunction.class)
.add(IsNullFunction.class)
.add(IsEqualOrNullFunction.class)
.add(ExampleStringFunction.class)
.add(ExampleAverageFunction.class)
.build();
}
}
3. 多說幾句
以上介紹的 Presto 函數開發方式可以滿足日常大部分函數開發需求, Presto 函數的注冊機制,新增和修改函數后,必須要重啟服務才能生效,所以目前還不支持真正的用戶自定義函數。
其他較為復雜的函數實現,比如變長參數函數的實現涉及調用過程中的函數簽名匹配和類型參數綁定,需要用到codeGen
進行實現,具體原理由於篇幅有限,在文中沒有進行展開講解,感興趣的讀者可以在評論區留言。