Flink SQL 自定義 UDF 解析復雜 JSON


2021-06-07 修改

白干了,flink 1.13 json format 可以直接解析復雜的sql,以如下格式

CREATE TABLE user_log (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,sub_json ROW(sub_name STRING, password STRING, sub_json ROW(sub_name STRING, sub_pass STRING))
) WITH (
   'connector' = 'kafka'
  ,'topic' = 'user_b'
  ,'properties.bootstrap.servers' = '10.201.1.132:9092'
  ,'properties.group.id' = 'user_log_1'
  ,'scan.startup.mode' = 'latest-offset'
  ,'format' = 'json'
  ,'json.ignore-parse-errors' = 'false'
);

insert into mysql_table_venn_user_log_sink
SELECT user_id, item_id, category_id, sub_json.sub_name, sub_json.password, sub_json.sub_json.sub_name, sub_json.sub_json.sub_pass
FROM user_log;

-------------分割線-------------------

最近用 Flink  處理一些 json 格式數據的時候,突然發現 1.13 的 json format 沒有解析復雜 SQL 的屬性了

在 Flink 1.10 的時候,還寫了一篇博客來介紹 自定義 json 格式的寫法:

https://www.cnblogs.com/Springmoon-venn/p/12664547.html

這就尷尬了

沒發,只能寫個自定義的 udf 先湊合着用了,這兩天突然有點想法,寫個通用的 udf,來解析復雜 json

處理json 的udf 的需求是輸入多個字段,返回多個字段,但是只有一行,只能使用 UDTF(flink 也就是 table functions)


官網地址: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/#table-functions

類型推導 
Table(類似於 SQL 標准)是一種強類型的 API。因此,函數的參數和返回類型都必須映射到[數據類型]({%link dev/table/types.zh.md %})。

從邏輯角度看,Planner 需要知道數據類型、精度和小數位數;從 JVM 角度來看,Planner 在調用自定義函數時需要知道如何將內部數據結構表示為 JVM 對象。

術語 類型推導 概括了意在驗證輸入值、派生出參數/返回值數據類型的邏輯。

Flink 自定義函數實現了自動的類型推導提取,通過反射從函數的類及其求值方法中派生數據類型。如果這種隱式的反射提取方法不成功,則可以通過使用 @DataTypeHint 和 @FunctionHint 注解相關參數、類或方法來支持提取過程,下面展示了有關如何注解函數的例子。

如果需要更高級的類型推導邏輯,實現者可以在每個自定義函數中顯式重寫 getTypeInference() 方法。但是,建議使用注解方式,因為它可使自定義類型推導邏輯保持在受影響位置附近,而在其他位置則保持默認狀態。

自動類型推導 
自動類型推導會檢查函數的類和求值方法,派生出函數參數和結果的數據類型, @DataTypeHint 和 @FunctionHint 注解支持自動類型推導。

有關可以隱式映射到數據類型的類的完整列表,請參閱[數據類型]({%link dev/table/types.zh.md %}#數據類型注解)。

官網示例:

@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {

  public void eval(String str) {
    for (String s : str.split(" ")) {
      // use collect(...) to emit a row
      collect(Row.of(s, s.length()));
    }
  }
}

發現scala 的支持有點尷尬,因為想做一個解析復雜 json 的通用UDF,json 里面的字段是不固定的,如果將返回值放到數組中,如: collect(Row.of(arr(0), arr(1))) 這樣返回會報錯: Scala collections are not supported. See the documentation for supported classes or treat them as RAW types.

改成 java 略好一點

改完的代碼,寫成這樣:

public class ParseJson extends TableFunction<Row> {

    public void eval(String... json) {
        if (json == null || json.length == 0) {
            return;
        }
        // get column from json
        String[] arr = getStrings(json);
        
        RowKind rowKind = RowKind.fromByteValue((byte) 0);
        Row row = new Row(rowKind, json.length - 1);
        for (int i = 0; i < arr.length; ++i) {
            row.setField(json[i + 1], arr[i]);
        }

        collect(row);
    }

    /**
     * parse user columns from json and provider column name
     */
    private String[] getStrings(String[] json) {
        JsonObject jsonObject = new JsonParser().parse(json[0]).getAsJsonObject();
        int len = json.length - 1;
        String[] arr = new String[len];
        for (int i = 0; i < len - 1; ++i) {
            JsonElement tm = jsonObject.get(json[i + 1]);
            if (tm != null) {
                arr[i] = tm.getAsString();
            } else {
                arr[i] = null;
            }
        }
        return arr;
    }
}

代碼比較簡單,輸入參數是數組的,第一個字段是需要處理的 json 字符串,后面的字段是需要解析的字段
解析出來的結果放在 String 字符數組中,再轉為 Row

可是不能推導出 Row 的類型和名稱: Cannot extract a data type from a pure 'org.apache.flink.types.Row' class. Please use annotations to define field names and field types.

吐血啊,如果這樣不能識別,需要在 annotations 上寫配置對應的列名和列類型,就不通用了

退而求其次,先配置個 annotations 看下情況再說

@FunctionHint(output = @DataTypeHint("ROW<col1 STRING, col2 STRING, col3 STRING>"))
public void eval(String json, String col1, String col2, String col3) {
    if (json == null || json.trim().length() == 0) {
        return;
    }
    String[] inputArr = new String[3];
    inputArr[0] = col1;
    inputArr[1] = col2;
    inputArr[2] = col3;
    String[] arr = getStrings(json, inputArr);
    collect(Row.of(arr[0], arr[1], arr[2]));
}

@FunctionHint(output = @DataTypeHint("ROW<col1 STRING, col2 STRING, col3 STRING, col4 STRING>"))
public void eval(String json, String col1, String col2, String col3, String col4) {
    if (json == null || json.trim().length() == 0) {
        return;
    }
    String[] inputArr = new String[4];
    inputArr[0] = col1;
    inputArr[1] = col2;
    inputArr[2] = col3;
    inputArr[3] = col4;
    String[] arr = getStrings(json, inputArr);
    collect(Row.of(arr[0], arr[1], arr[2], arr[3]));
}

測試 json 如下:

{"subJson":"{\"password\":\"pass_4\",\"doub\":\"3.1250\",\"username\":\"venn_4\"}","category_id":"category_id_4","user_id":"user_id_702","item_id":"item_id_4","behavior":"4","sort_col":"25","sales":"35","ts":"2021-05-31 14:29:53.680"}

測試 SQL 如下:

insert into t_json_sink
select col1, col2, item_id, username, password, cast(doub as double) doub
from t_json a
    LEFT JOIN LATERAL TABLE(udf_parse_json(json, 'category_id', 'user_id', 'item_id', 'subJson')) AS T(col1, col2, item_id,subJson) ON TRUE
    LEFT JOIN LATERAL TABLE(udf_parse_json(subJson, 'username', 'password', 'doub')) AS T1(username, password, doub) ON TRUE
    ;

這樣是可以解析復雜的嵌套 json 的,但是需要窮舉字段數量,因為是通用的 UDF,需要支持所有的字段

測試了一下三層的 json,是可支持的

測試 SQL 如下:

insert into t_json_sink
select category_id, user_id, item_id, cast(sort_col as int) sort_col, username, password, cast(doub as double) doub,sub_name,sub_pass
from t_json a
    LEFT JOIN LATERAL TABLE(udf_parse_json(json, 'category_id', 'user_id', 'item_id', 'sort_col', 'sub_json')) AS T(category_id, user_id, item_id, sort_col, sub_json) ON TRUE
    LEFT JOIN LATERAL TABLE(udf_parse_json(sub_json, 'username', 'password', 'doub', 'sub_json_1')) AS T1(username, password, doub, sub_json_1) ON TRUE
    LEFT JOIN LATERAL TABLE(udf_parse_json(sub_json_1, 'sub_name', 'sub_pass')) AS T2(sub_name, sub_pass) ON TRUE
    ;

看起來,多嵌套幾層也是可以執行的,唯一的問題就是需要窮舉字段數量

直接代碼生成100個以內的所有字段的 eval 重載方法,發現程序不能啟動了,一直卡在 sql 檢查上

減少 eval 的重載方法數量,發現隨着重載方法增多,啟動越慢,10 個以上重載方法的啟動時間(15秒)就不能接受了

挑戰失敗,繼續翻官網,在數據類型章節最前面看到:

ROW<myField ARRAY<BOOLEAN>, myOtherField TIMESTAMP(3)>

ROW 類型,里面的字段有數組,如果把 FunctionHint 寫成這樣:

@FunctionHint(output = @DataTypeHint("ROW<arr ARRAY<STRING>>"))

直接返回一個數組,剛好處理過程都是用的數組作為流轉的類型,不需要構造結果的格式了

最后的代碼寫成這樣

@FunctionHint(output = @DataTypeHint("ROW<arr ARRAY<STRING>>"))
public void eval(String... json) {
    if (json == null || json.length == 0) {
        return;
    }
    String[] arr = getStrings(json);
    RowKind rowKind = RowKind.fromByteValue((byte) 0);
    Row row = new Row(rowKind, json.length - 1);
    row.setField(0, arr);
    collect(row);
}

對應的SQL 如下:

insert into t_json_sink
select arr, arr[1],arr[2]
from t_json a
    LEFT JOIN LATERAL TABLE(udf_parse_json(json, 'category_id', 'user_id')) AS T(arr) ON TRUE

這樣也能很好的實現通用的復雜 json 解析

測試了下嵌套6層的 json:

{
    "sub_json": "{
        "sub_json":"    {
            "sub_json":"{
                "sub_json":"{
                    "sub_json":"{"sub_name":"sub_5_venn_3","sub_pass":"sub_5_pass_3"}",
                    "sub_name":"sub_4_venn_3","sub_pass":"sub_4_pass_3"}",
                "sub_name":"sub_3_venn_3","sub_pass":"sub_3_pass_3"}",
            "sub_name":"sub_2_sub_venn_3","sub_pass":"sub_2_sub_pass_3"}",
        "password":"sub_1_pass_3","sub_name":"sub_1_venn_3","doub":"3.1250"}",
    "category_id": "category_id_3",
    "user_id": "user_id_73",
    "item_id": "item_id_3",
    "behavior": "3",
    "sort_col": "45",
    "sales": "45",
    "ts": "2021-06-01 13:52:44.708"
}

對應 SQL 如下:

insert into t_json_sink
select T.arr[1], T1.arr[1], T2.arr[1], T3.arr[1], T4.arr[1], T5.arr[1]
from t_json a
    LEFT JOIN LATERAL TABLE(udf_parse_json(json, 'user_id', 'sub_json')) AS T(arr) ON TRUE
    LEFT JOIN LATERAL TABLE(udf_parse_json(T.arr[2], 'sub_name', 'sub_json')) AS T1(arr) ON TRUE
    LEFT JOIN LATERAL TABLE(udf_parse_json(T1.arr[2], 'sub_name', 'sub_json')) AS T2(arr) ON TRUE
    LEFT JOIN LATERAL TABLE(udf_parse_json(T2.arr[2], 'sub_name', 'sub_json')) AS T3(arr) ON TRUE
    LEFT JOIN LATERAL TABLE(udf_parse_json(T3.arr[2], 'sub_name', 'sub_json')) AS T4(arr) ON TRUE
    LEFT JOIN LATERAL TABLE(udf_parse_json(T4.arr[2], 'sub_name', 'sub_json')) AS T5(arr) ON TRUE
    ;

解析的結果如下:

+I[user_id_261, sub_1_venn_6, sub_2_sub_venn_6, sub_3_venn_6, sub_4_venn_6, sub_5_venn_6]
+I[user_id_262, sub_1_venn_3, sub_2_sub_venn_3, sub_3_venn_3, sub_4_venn_3, sub_5_venn_3]
+I[user_id_263, sub_1_venn_7, sub_2_sub_venn_7, sub_3_venn_7, sub_4_venn_7, sub_5_venn_7]
+I[user_id_264, sub_1_venn_5, sub_2_sub_venn_5, sub_3_venn_5, sub_4_venn_5, sub_5_venn_5]
+I[user_id_265, sub_1_venn_8, sub_2_sub_venn_8, sub_3_venn_8, sub_4_venn_8, sub_5_venn_8]
+I[user_id_266, sub_1_venn_0, sub_2_sub_venn_0, sub_3_venn_0, sub_4_venn_0, sub_5_venn_0]

執行沒有任何問題

完整代碼參見: https://github.com/springMoon/sqlSubmit ,ParseJson.java 和 parse_complex_json.sql

注:突然好久沒寫博客,水一篇湊個數

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM