1 Presto介紹
Presto 是 Facebook 開源的分布式查詢引擎,在交互式查詢任務中擔當着重要的職責。隨着越來越多的人開始使用 SQL 在 Presto 上分析數據,我們發現需要將一些業務邏輯開發成類似 Hive 中的 UDF,提高 SQL 使用人員的效率,同時也保證 Hive 和 Presto 環境中的 UDF 統一。
1.1 Presto函數介紹
在 Presto 中,函數大體分為三種:scalar,aggregation 和 window 類型。分別如下:
1)scalar標量函數,簡單來說就是 Java 中的一個靜態方法,本身沒有任何狀態。
2)aggregation累積狀態的函數,或聚集函數,如count,avg。如果只是單節點,單機狀態可以直接用一個變量存儲即可,但是presto是分布式計算引擎,狀態數據會在多個節點之間傳輸,因此狀態數據需要被序列化成 Presto 的內部格式才可以被傳輸。
3)window 窗口函數,如同sparkSQL中的窗口函數類似
2 自定義函數實現
官網地址:https://prestodb.github.io/docs/current/develop/functions.html
2.1自定義Scalar函數的實現
2.1.1定義一個java類
1)用 @ScalarFunction 的 Annotation 標記實現業務邏輯的靜態方法。
2)用 @Description 描述函數的作用,這里的內容會在 SHOW FUNCTIONS 中顯示。
3)用@SqlType 標記函數的返回值類型,如返回字符串,因此是 StandardTypes.VARCHAR。
4)Java 方法的返回值必須使用 Presto 內部的序列化方式,因此字符串類型必須返回 Slice, 使用 Slices.utf8Slice 方法可以方便的將 String 類型轉換成 Slice 類型
示例代碼:
1 public class LiulishuoFunctions { 2 3 public static final String DATE_FORMAT = "yyyy-MM-dd"; 4 5 @ScalarFunction 6 @Description("hive to_date function") 7 @SqlType(StandardTypes.VARCHAR) 8 public static Slice to_date(@SqlType(StandardTypes.TIMESTAMP) long input) { 9 final DateFormat format = new SimpleDateFormat(DATE_FORMAT); 10 return Slices.utf8Slice(format.format(new Date(input))); 11 } 12 }
2.1.2 Presto插件機制
presto不能像hive那樣配置自定義的udf,要采用這種插件機制實現。Presto 的插件(Plugin)機制,是 Presto 能夠整合多種數據源的核心。通過實現不同的 Plugin,Presto 允許用戶在不同類型的數據源之間進行 JOIN 等計算。Presto 內部的所有數據源都是通過插件機制實現, 例如 MySQL、Hive、HBase等。Presto 插件機制不僅通過加載 Connector 來實現不同數據源的訪問,還通過加載 FunctionFactory 來實現 UDF 的加載。 Presto 的 Plugin 遵循 Java 中的 ServiceLoader 規范, 實現非常簡單。
實現一個plugin接口如:
1 import com.facebook.presto.spi.Plugin; 2 3 import com.google.common.collect.ImmutableSet; 4 6 7 import java.util.Set; 8 9 public class PrestoFunctionsPlugin implements Plugin { 10 @Override 11 public Set<Class<?>> getFunctions() { 12 return ImmutableSet.<Class<?>>builder() 13 .add(PvFlowStatsAggregation.class) 14 .add(AvgAggregationDemo.class) 15 .build(); 16 } 17 18 }
最后打包上傳到指定的presto的plugin目錄下,需要重啟presto才能將jar中的自定義函數加載進去
2.2 自定義Aggregation函數
2.2.1實現原理步驟
Presto 把 Aggregation 函數分解成三個步驟執行:
1、input(state, data): 針對每條數據,執行 input 函數。這個過程是並行執行的,因此在每個有數據的節點都會執行,最終得到多個累積的狀態數據。
2、combine(state1, state2):將所有節點的狀態數據聚合起來,多次執行,直至所有狀態數據被聚合成一個最終狀態,也就是 Aggregation 函數的輸出結果。
3、output(final_state, out):最終輸出結果到一個 BlockBuilder。
2.2.2 具體代碼實現過程
1、定義一個 Java 類,使用 @AggregationFunction 標記為 Aggregation 函數
2、使用 @InputFunction、 @CombineFunction、@OutputFunction 分別標記計算函數、合並結果函數和最終輸出函數在 Plugin 處注冊 Aggregation 函數
3、一個繼承AccumulatorState的State接口,get和set方法
4、並使用 @AccumulatorStateMetadata 提供序列化(stateSerializerClass指定)和 Factory 類信息(stateFactoryClass指定)。自己寫一個序列化類和一個工廠類。
核心代碼示例:
1 @AggregationFunction("pv_stats") 2 public class PvFlowStatsAggregation { 3 private PvFlowStatsAggregation() {} 4 @AccumulatorStateMetadata(stateSerializerClass = PvFlowStatsStateSerializer.class, stateFactoryClass = PvFlowStatsFactory.class) 5 6 public interface State extends AccumulatorState { 7 PvFlowStats get(); 8 void set(PvFlowStats value); 9 } 10 11 @InputFunction 12 public static void input(@AggregationState State state, @SqlType(StandardTypes.BIGINT) long id, 13 14 @SqlType(StandardTypes.VARBINARY) Slice serialisedTree, @SqlType(StandardTypes.VARCHAR) Slice pvOrder) { 15 handleDataInput(state, id, serialisedTree, pvOrder, null); 16 } 17 18 @InputFunction 19 20 public static void input(@AggregationState State state, @SqlType(StandardTypes.BIGINT) long id, 21 22 @SqlType(StandardTypes.VARBINARY) Slice serialisedTree, @SqlType(StandardTypes.VARCHAR) Slice pvOrder, 23 @SqlType(StandardTypes.VARCHAR) Slice endUrl) { 24 handleDataInput(state, id, serialisedTree, pvOrder, endUrl); 25 } 26 27 private static void handleDataInput(State state, long id, Slice serialisedTree, Slice pvOrder, Slice endUrl) { 28 29 PvFlowStats stats = state.get(); 30 if (stats == null) { 31 stats = new PvFlowStats(); 32 state.set(stats); 33 } 34 ...... 35 } 36 @CombineFunction 37 public static void combine(@AggregationState State state, State other) { 38 39 PvFlowStats input = other.get(); 40 PvFlowStats previous = state.get(); 41 if (previous == null) { 42 state.set(input); 43 } else { 44 previous.mergeWith(input); 45 } 46 } 47 @OutputFunction(StandardTypes.VARCHAR) 48 public static void output(@AggregationState State state, BlockBuilder out) { 49 PvFlowStats stats = state.get(); 50 if (stats == null) { 51 out.appendNull(); 52 return; 53 } 54 // 統計 55 Slice result = stats.statisticNextPage(); 56 if (result == null) { 57 out.appendNull(); 58 } else { 59 VarcharType.VARCHAR.writeSlice(out, result); 60 } 61 } 62 }
2.2.3 復雜數據類型(list,map或自定義的類)
對於復雜的類型,需要自定義序列化類和工廠類,需要自己實現類的序列化和反序列化。
下面是示例:
主類:
1 /*** ** id | value* ** ----+-------* ** 2 | ddd* ** 2 | ddd* ** 1 | bbb* ** 1 | bbb* ** 1 | ccc* ** 1 | aaa* ** 1 | bbb* ** 2 | aaa* ** 2 | ccc* ** 1 | ccc* *** *返回* ** [{id:1,{aaa:1,ccc:2,bbb:3},{id:2,{aaa:1,ccc:1,ddd:2}]* **/*@AggregationFunction("presto_collect") 2 public class CollectListAggregation { 3 4 @AccumulatorStateMetadata(stateSerializerClass = CollectListStatsSerializer.class, stateFactoryClass = CollectListStatsFactory.class) 5 public interface CollectState extends AccumulatorState { 6 CollectListStats get(); 7 void set(CollectListStats value); 8 } 9 @InputFunction 10 public static void input(@AggregationState CollectState state, @SqlType(StandardTypes.*VARCHAR*) Slice id,@SqlType(StandardTypes.*VARCHAR*) Slice key) { 11 try { 12 CollectListStats stats = state.get(); 13 if (stats == null) { 14 stats = new CollectListStats(); 15 state.set(stats); 16 } 17 int inputId = Integer.*parseInt*(id.toStringUtf8()); 18 String inputKey = key.toStringUtf8(); 19 stats.addCollectList(inputId,inputKey, 1); 20 } catch (Exception e) { 21 throw new RuntimeException(e+" --------- input err"); 22 } 23 } 24 25 @CombineFunction 26 public static void combine(@AggregationState CollectState state, CollectState otherState) { 27 try { 28 CollectListStats collectListStats = state.get(); 29 CollectListStats oCollectListStats = otherState.get(); 30 if(collectListStats == null) { 31 state.set(oCollectListStats); 32 } else { 33 collectListStats.mergeWith(oCollectListStats); 34 } 35 }catch (Exception e) { 36 throw new RuntimeException(e+" --------- combine err"); 37 } 38 } 39 40 @OutputFunction(StandardTypes.*VARCHAR*) 41 public static void output(@AggregationState CollectState state, BlockBuilder out) { 42 try { 43 CollectListStats stats = state.get(); 44 if (stats == null) { 45 out.appendNull(); 46 return; 47 } 48 // 統計 49 Slice result = stats.getCollectResult(); 50 if (result == null) { 51 out.appendNull(); 52 } else { 53 VarcharType.*VARCHAR*.writeSlice(out, result); 54 } 55 } catch (Exception e) { 56 throw new RuntimeException(e+" -------- output err"); 57 } 58 } 59 }
主類實現的比較簡單,input,combine,output即可
存放數據的類:此類需要實現數據的序列化和反序列化,這是最關鍵和比較麻煩的地方,貼一個例子,關鍵在於需要自己控制存儲空間以及數據的順序,和讀取的時候按照一定順序讀取。對於字符要先存儲長度,然后是字節,讀取則先讀取字符長度,然后讀取這么長的數據,最后轉化為字符
1 public class CollectListStats { 2 private static final int *INSTANCE_SIZE* = ClassLayout.*parseClass*(CollectListStats.class).instanceSize(); 3 //<id,<key,value>> 4 private Map<Integer,Map<String,Integer>> collectContainer = new HashMap<>(); 5 private long contentEstimatedSize = 0; 6 private int keyByteLen = 0; 7 private int keyListLen = 0; 8 CollectListStats() { 9 } 10 CollectListStats(Slice serialized) { 11 deserialize(serialized); 12 } 13 void addCollectList(Integer id, String key, int value) { 14 if (collectContainer.containsKey(id)) { 15 Map<String, Integer> tmpMap = collectContainer.get(id); 16 if (tmpMap.containsKey(key)) { 17 tmpMap.put(key, tmpMap.get(key)+value); 18 }else{ 19 tmpMap.put(key,value); 20 contentEstimatedSize += ( key.getBytes().length + SizeOf.*SIZE_OF_INT*); 21 keyByteLen += key.getBytes().length; 22 keyListLen++; 23 } 24 } else { 25 Map<String,Integer> tmpMap = new HashMap<String,Integer>(); 26 tmpMap.put(key, value); 27 keyByteLen += key.getBytes().length; 28 keyListLen++; 29 collectContainer.put(id, tmpMap); 30 contentEstimatedSize += SizeOf.*SIZE_OF_INT*; 31 } 32 } 33 //[{id:1,{"aaa":3,"fadf":6},{}] 34 Slice getCollectResult() { 35 Slice jsonSlice = null; 36 try { 37 StringBuilder jsonStr = new StringBuilder(); 38 jsonStr.append("["); 39 int collectLength = collectContainer.entrySet().size(); 40 for (Map.Entry<Integer, Map<String, Integer>> mapEntry : collectContainer.entrySet()) { 41 Integer id = mapEntry.getKey(); 42 Map<String, Integer> vMap = mapEntry.getValue(); 43 jsonStr.append("{id:").append(id).append(",{"); 44 int vLength = vMap.entrySet().size(); 45 for (Map.Entry<String, Integer> vEntry : vMap.entrySet()) { 46 String key = vEntry.getKey(); 47 Integer value = vEntry.getValue(); 48 jsonStr.append(key).append(":").append(value); 49 vLength--; 50 if (vLength != 0) { 51 jsonStr.append(","); 52 } 53 } 54 jsonStr.append("}"); 55 collectLength--; 56 if (collectLength != 0) { 57 jsonStr.append(","); 58 } 59 } 60 jsonStr.append("]"); 61 jsonSlice = Slices.*utf8Slice*(jsonStr.toString()); 62 } catch (Exception e) { 63 throw new RuntimeException(e+" ---------- get CollectResult err"); 64 } 65 return jsonSlice; 66 } 67 public void deserialize(Slice serialized) { 68 try { 69 SliceInput input = serialized.getInput(); 70 //外層map的長度 71 int collectStatsEntrySize = input.readInt(); 72 for (int collectCnt = 0; collectCnt < collectStatsEntrySize; collectCnt++) { 73 74 int id = input.readInt(); 75 int keyEntrySize = input.readInt(); 76 for (int idCnt = 0; idCnt < keyEntrySize; idCnt++) { 77 int keyBytesLen = input.readInt(); 78 byte[] keyBytes = new byte[keyBytesLen]; 79 for (int byteIdx = 0; byteIdx < keyBytesLen; byteIdx++) { 80 keyBytes[byteIdx] = input.readByte(); 81 } 82 String key = new String(keyBytes); 83 int value = input.readInt(); 84 addCollectList(id, key, value); 85 } 86 } 87 } catch (Exception e) { 88 throw new RuntimeException(e+" ----- deserialize err"); 89 } 90 } 91 92 public Slice serialize() { 93 SliceOutput builder = null; 94 int requiredBytes = //對應 SliceOutput builder append的內容所占用的空間 95 SizeOf.*SIZE_OF_INT* * 3 //id entry數目,id數值,key Entry數目 96 \+ keyListLen * SizeOf.*SIZE_OF_INT* //key bytes長度 97 \+ keyByteLen //key byte總長度 98 \+ keyListLen * SizeOf.*SIZE_OF_INT*; //value 99 try { 100 // 序列化 101 builder = Slices.*allocate*(requiredBytes).getOutput(); 102 for (Map.Entry<Integer,Map<String, Integer>> entry : collectContainer.entrySet()) { 103 //id個數 104 builder.appendInt(collectContainer.entrySet().size()); 105 //id 數值 106 builder.appendInt(entry.getKey()); 107 Map<String, Integer> kMap = entry.getValue(); 108 builder.appendInt(kMap.entrySet().size()); 109 for (Map.Entry<String, Integer> vEntry : kMap.entrySet()) { 110 byte[] keyBytes = vEntry.getKey().getBytes(); 111 builder.appendInt(keyBytes.length); 112 builder.appendBytes(keyBytes); 113 builder.appendInt(vEntry.getValue()); 114 } 115 } 116 return builder.getUnderlyingSlice(); 117 } catch (Exception e) { 118 throw new RuntimeException(e+" ---- serialize err requiredBytes = " + requiredBytes + " keyByteLen= " + keyByteLen + " keyListLen = " + keyListLen); 119 } 120 } 121 long estimatedInMemorySize() { 122 return *INSTANCE_SIZE* + contentEstimatedSize; 123 } 124 void mergeWith(CollectListStats other) { 125 if (other == null) { 126 return; 127 } 128 for (Map.Entry<Integer,Map<String, Integer>> cEntry : other.collectContainer.entrySet()) { 129 Integer id = cEntry.getKey(); 130 Map<String, Integer> kMap = cEntry.getValue(); 131 for (Map.Entry<String, Integer> kEntry : kMap.entrySet()) { 132 addCollectList(id, kEntry.getKey(), kEntry.getValue()); 133 } 134 } 135 } 136 }
序列化類:
1 public class CollectListStatsSerializer implements AccumulatorStateSerializer<CollectListAggregation.CollectState> { 2 @Override 3 public Type getSerializedType() { 4 return *VARBINARY*; 5 } 6 @Override 7 public void serialize(CollectListAggregation.CollectState state, BlockBuilder out) { 8 if (state.get() == null) { 9 out.appendNull(); 10 } else { 11 *VARBINARY*.writeSlice(out, state.get().serialize()); 12 } 13 } 14 @Override 15 public void deserialize(Block block, int index, CollectListAggregation.CollectState state) { 16 state.set(new CollectListStats(*VARBINARY*.getSlice(block, index))); 17 } 18 }
工廠類:
1 /*** **/*public class CollectListStatsFactory implements AccumulatorStateFactory<CollectListAggregation.CollectState> { 2 @Override 3 public CollectListAggregation.CollectState createSingleState() { 4 return new SingleState(); 5 } 6 @Override 7 public Class<? extends CollectListAggregation.CollectState> getSingleStateClass() { 8 return SingleState.class; 9 } 10 @Override 11 public CollectListAggregation.CollectState createGroupedState() { 12 return new GroupState(); 13 } 14 @Override 15 public Class<? extends CollectListAggregation.CollectState> getGroupedStateClass() { 16 return GroupState.class; 17 } 18 public static class GroupState implements GroupedAccumulatorState, CollectListAggregation.CollectState { 19 private final ObjectBigArray<CollectListStats> collectStatsList = new ObjectBigArray<>(); 20 private long size; 21 private long groupId; 22 @Override 23 public void setGroupId(long groupId) { 24 this.groupId = groupId; 25 } 26 @Override 27 public void ensureCapacity(long size) { 28 collectStatsList.ensureCapacity(size); 29 } 30 @Override 31 public CollectListStats get() { 32 return collectStatsList.get(groupId); 33 } 34 @Override 35 public void set(CollectListStats value) { 36 CollectListStats previous = get(); 37 if (previous != null) { 38 size -= previous.estimatedInMemorySize(); 39 } 40 collectStatsList.set(groupId, value); 41 size += value.estimatedInMemorySize(); 42 } 43 @Override 44 public long getEstimatedSize() { 45 return size + collectStatsList.sizeOf(); 46 } 47 } 48 public static class SingleState implements CollectListAggregation.CollectState{ 49 private CollectListStats stats; 50 @Override 51 public CollectListStats get() { 52 return stats; 53 } 54 @Override 55 public void set(CollectListStats value) { 56 stats = value; 57 } 58 @Override 59 public long getEstimatedSize() { 60 if (stats == null) { 61 return 0; 62 } 63 return stats.estimatedInMemorySize(); 64 } 65 } 66 }
2.2.4 采用Slice可以有效提高性能
使用Slice進行內存操作,Slice使用Unsafe#copyMemory實現了高效的內存拷貝
不過使用Slice就需要手動控制存儲的數據,記錄數據的容量,長度,擴容等等。
1 public class RouteUserAggregationBase { 2 3 //...... 其他定義的靜態變量 4 /** 5 * Slice State 6 * 中間數據 Buffer 7 */ 8 public interface SliceState extends AccumulatorState { 9 Slice getSlice(); 10 11 void setSlice(Slice slice); 12 } 13 } 14 15 @AggregationFunction("函數名") 16 public class RouteUserGroupAggregation extends RouteUserAggregationBase { 17 18 /** 緩存 Buffer Body 的初始字節容量 **/ 19 private static final int STORED_DATA_BODY_INIT_BYTE_SIZE = 64; 20 21 /** 緩存 Buffer 頭部元信息定義 **/ 22 private static int VALUES_OFFSET_HEADER_BYTE_LEN = 0; 23 private static int VALUES_OFFSET_BODY_BYTE_SIZE = 4; 24 private static int VALUES_OFFSET_BODY_BYTE_USED = 8; 25 26 private static int VALUES_OFFSET_CONTAIN_TARGET_EVENT = 12; 27 28 private static int VALUES_OFFSET_TARGET_EVENT_TYPE = 13; 29 private static int VALUES_OFFSET_ROUTE_INTERVAL = 17; 30 private static int VALUES_OFFSET_TARGET_EVENT_LEN = 21; 31 private static int VALUES_OFFSET_TARGET_EVENT_BYTES = 25; 32 33 @InputFunction 34 public static void input(SliceState state, 35 //目標事件 36 @SqlType(StandardTypes.VARCHAR) Slice targetEvent, 37 //目標事件類型 38 @SqlType(StandardTypes.BIGINT) long targetType, 39 //事件間隔 40 @SqlType(StandardTypes.BIGINT) long eventInterval, 41 //當前事件名 42 @SqlType(StandardTypes.VARCHAR) Slice currEvent, 43 //當前事件時間 44 @SqlType(StandardTypes.BIGINT) long eventTime) { 45 46 handleInput(state, targetEvent, (int) targetType, (int) eventInterval, currEvent, (int) eventTime, null, null); 47 } 48 49 private static void handleInput(SliceState state, Slice targetEvent, int targetType, int eventInterval, Slice currEvent, int eventTime, Slice groupByEvent, Slice groupByProp) { 50 // 獲取緩存的數據 51 Slice storedData = state.getSlice(); 52 53 // 初始化緩存的元信息 不會變化的值,如:目標事件,目標類型,時間間隔 54 if (storedData == null) { 55 /* 56 Header byte大小 57 Body 總字節大小 58 Body 已使用字節大小 59 是否包含目標事件 60 目標事件類型 61 事件時間間隔 62 */ 63 int headerByteLen = SizeOf.SIZE_OF_INT 64 + SizeOf.SIZE_OF_INT 65 + SizeOf.SIZE_OF_INT 66 + SizeOf.SIZE_OF_BYTE 67 + SizeOf.SIZE_OF_INT 68 + SizeOf.SIZE_OF_INT 69 ; 70 int targetLength = SizeOf.SIZE_OF_INT + targetEvent.length(); 71 headerByteLen += targetLength; 72 73 storedData = Slices.allocate(headerByteLen + STORED_DATA_BODY_INIT_BYTE_SIZE); 74 storedData.setInt(VALUES_OFFSET_HEADER_BYTE_LEN, headerByteLen); 75 storedData.setInt(VALUES_OFFSET_BODY_BYTE_SIZE, STORED_DATA_BODY_INIT_BYTE_SIZE); 76 storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, 0); 77 //是否包含目標事件 78 storedData.setByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT, 0); 79 //緩存 不變的參數 80 storedData.setInt(VALUES_OFFSET_TARGET_EVENT_TYPE, targetType); 81 storedData.setInt(VALUES_OFFSET_ROUTE_INTERVAL, eventInterval); 82 83 storedData.setInt(VALUES_OFFSET_TARGET_EVENT_LEN, targetEvent.length()); 84 storedData.setBytes(VALUES_OFFSET_TARGET_EVENT_BYTES, targetEvent); 85 } 86 87 int headerByteLen = storedData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN); 88 int bodyByteSize = storedData.getInt(VALUES_OFFSET_BODY_BYTE_SIZE); 89 int bodyByteUsed = storedData.getInt(VALUES_OFFSET_BODY_BYTE_USED); 90 91 // 標記包含目標事件 92 if (currEvent.toStringUtf8().equals(targetEvent.toStringUtf8())) { 93 storedData.setByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT, 1); 94 } 95 //直接判斷,如果存在分組,判斷當前事件就是分組事件,那么直接將分組值和事件拼接在一起 96 if (groupByEvent != null && groupByEvent.toStringUtf8().equals(currEvent.toStringUtf8())) { 97 String newEventKey = currEvent.toStringUtf8() + EVENT_CONCAT_GROUP_VALUE + groupByProp.toStringUtf8(); 98 currEvent = Slices.utf8Slice(newEventKey); 99 } 100 101 //擴展的長度,eventTime int , current length的int bytes內容 102 int entryByteLen = SizeOf.SIZE_OF_INT * 2 + currEvent.length(); 103 if (bodyByteUsed + entryByteLen > bodyByteSize) { 104 // 擴容 byteSize * 2 105 int newBodyByteSize = bodyByteSize * 2; 106 Slice newStoredData = Slices.allocate(headerByteLen + newBodyByteSize); 107 //將storeData的數據copy到new的Slice中,然后重新設置容量 108 newStoredData.setBytes(0, storedData.getBytes()); 109 newStoredData.setInt(VALUES_OFFSET_BODY_BYTE_SIZE, newBodyByteSize); 110 storedData = newStoredData; 111 } 112 //寫入位置的定位 113 int writePos = headerByteLen + bodyByteUsed; 114 storedData.setInt(writePos, entryByteLen); 115 writePos += SizeOf.SIZE_OF_INT; 116 storedData.setInt(writePos, eventTime); 117 writePos += SizeOf.SIZE_OF_INT; 118 storedData.setBytes(writePos, currEvent); 119 storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, bodyByteUsed + entryByteLen); 120 // 更新緩存的數據 121 state.setSlice(storedData); 122 } 123 124 125 @CombineFunction 126 public static void combine(SliceState state, SliceState other) { 127 // 獲取緩存的數據 128 Slice storedData = state.getSlice(); 129 Slice otherStoredData = other.getSlice(); 130 131 // 合並緩存 132 if (storedData == null) { 133 state.setSlice(otherStoredData); 134 } else { 135 int headerByteLen = storedData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN); 136 int bodyByteSize = storedData.getInt(VALUES_OFFSET_BODY_BYTE_SIZE); 137 int bodyByteUsed = storedData.getInt(VALUES_OFFSET_BODY_BYTE_USED); 138 int otherHeaderByteLen = otherStoredData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN); 139 int otherBodyByteSize = otherStoredData.getInt(VALUES_OFFSET_BODY_BYTE_SIZE); 140 int otherBodyByteUsed = otherStoredData.getInt(VALUES_OFFSET_BODY_BYTE_USED); 141 byte containTargetEvent = 0; 142 if (storedData.getByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT) == 1 || otherStoredData.getByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT) == 1) { 143 containTargetEvent = 1; 144 } 145 Slice finalStoredData; 146 int finalBodyByteUsed = bodyByteUsed + otherBodyByteUsed; 147 if (bodyByteSize >= finalBodyByteUsed) { 148 // 左容量足夠 這里只copy header之外的數據,就是當前事件和time 149 storedData.setBytes(headerByteLen + bodyByteUsed, otherStoredData, otherHeaderByteLen, otherBodyByteUsed); 150 storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, finalBodyByteUsed); 151 finalStoredData = storedData; 152 } else if (otherBodyByteSize >= finalBodyByteUsed) { 153 // 右容量足夠 154 otherStoredData.setBytes(otherHeaderByteLen + otherBodyByteUsed, storedData, headerByteLen, bodyByteUsed); 155 otherStoredData.setInt(VALUES_OFFSET_BODY_BYTE_USED, finalBodyByteUsed); 156 finalStoredData = otherStoredData; 157 } else { 158 // 擴容 159 int newBodyByteSize = bodyByteSize; 160 while (newBodyByteSize < finalBodyByteUsed) { 161 newBodyByteSize *= 2; 162 } 163 Slice newStoredData = Slices.allocate(headerByteLen + newBodyByteSize); 164 newStoredData.setBytes(VALUES_OFFSET_HEADER_BYTE_LEN, storedData.getBytes()); 165 newStoredData.setInt(VALUES_OFFSET_BODY_BYTE_SIZE, newBodyByteSize); 166 storedData = newStoredData; 167 168 storedData.setBytes(headerByteLen + bodyByteUsed, otherStoredData, otherHeaderByteLen, otherBodyByteUsed); 169 storedData.setInt(VALUES_OFFSET_BODY_BYTE_USED, finalBodyByteUsed); 170 finalStoredData = storedData; 171 } 172 // 是否包含目標事件 173 finalStoredData.setByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT, containTargetEvent); 174 state.setSlice(finalStoredData); 175 } 176 } 177 178 @OutputFunction(StandardTypes.VARCHAR) 179 public static void output(@AggregationState SliceState state, BlockBuilder out) { 180 // 獲取緩存數據 181 Slice storedData = state.getSlice(); 182 183 // 數據為空,或者沒有起始事件 184 if ((storedData == null) || (storedData.getByte(VALUES_OFFSET_CONTAIN_TARGET_EVENT) == 0)) { 185 out.appendNull(); 186 return; 187 } 188 //匹配 189 Slice makeRoute = makeRoute(storedData); 190 if (makeRoute == null) { 191 out.appendNull(); 192 } else { 193 VarcharType.VARCHAR.writeSlice(out, makeRoute); 194 } 195 } 196 197 private static Slice makeRoute(Slice storedData) { 198 // 獲取 Header 信息 199 int interval = storedData.getInt(VALUES_OFFSET_ROUTE_INTERVAL); 200 int targetType = storedData.getInt(VALUES_OFFSET_TARGET_EVENT_TYPE); 201 int targetLength = storedData.getInt(VALUES_OFFSET_TARGET_EVENT_LEN); 202 String targetEvent = new String(storedData.getBytes(VALUES_OFFSET_TARGET_EVENT_BYTES, targetLength)); 203 List<Slice> timeEventSeries = new ArrayList<>(); 204 int headerByteLen = storedData.getInt(VALUES_OFFSET_HEADER_BYTE_LEN); 205 int bodyByteUsed = storedData.getInt(VALUES_OFFSET_BODY_BYTE_USED); 206 int bound = headerByteLen + bodyByteUsed; 207 int idx = headerByteLen; 208 while (idx < bound) { 209 //獲取每個事件數據 time,事件名 210 int entryByteLen = storedData.getInt(idx); 211 Slice entry = storedData.slice(idx + SizeOf.SIZE_OF_INT, entryByteLen - SizeOf.SIZE_OF_INT); 212 idx += entryByteLen; 213 timeEventSeries.add(entry); 214 } 215 //處理邏輯 216 ...... 217 // 構造返回結果 218 Slice result = null; 219 if (routes.size() > 0) { 220 for (String route : routes) { 221 Slice routeSlice = Slices.utf8Slice(route); 222 Slice routeInfo = Slices.allocate(SizeOf.SIZE_OF_INT + routeSlice.length()); 223 routeInfo.setInt(0, routeSlice.length()); 224 routeInfo.setBytes(4, routeSlice); 225 if (result == null) { 226 result = routeInfo; 227 } else { 228 Slice newSlice = Slices.allocate(result.length() + routeInfo.length()); 229 newSlice.setBytes(0, result); 230 newSlice.setBytes(result.length(), routeInfo, 0, routeInfo.length()); 231 result = newSlice; 232 } 233 } 234 } 235 return result; 236 } 237 }