在 Flink 1.10 的 Table API 和 SQL 中,表支持的格式有四種:
CSV Format
JSON Format
Apache Avro Format
Old CSV Format
官網地址如下:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#table-formats
我用 JSON Format 比較多,也有嵌套的JSON 數據需要解析,大概描述一下。
以下內容來下官網介紹:
JSON格式允許讀取和寫入與給定格式 schema 相對應的JSON數據。 格式 schema 可以定義為Flink類型,JSON schema 或從所需的表 schema 派生。 Flink類型啟用了更類似於SQL的定義並映射到相應的SQL數據類型。 JSON模式允許更復雜和嵌套的結構。
如果格式 schema 等於表 schema,則也可以自動派生該 schema。 這只允許定義一次 schema 信息。 格式的名稱,類型和字段的順序由表的 schema 確定。 如果時間屬性的來源不是字段,則將忽略它們。 表 schema 中的from定義被解釋為以該格式重命名的字段。
大概意思就是,flink 在解析json的時候,可以自己通過 schema(支持復雜的嵌套json),如果不提供 schema,默認使用 table schema 自動派生 json 的 schema(不支持復雜json)。
官網對應 json format 的表的樣例:
CREATE TABLE MyUserTable ( ... ) WITH ( 'format.type' = 'json', -- required: specify the format type 'format.fail-on-missing-field' = 'true' -- optional: flag whether to fail if a field is missing or not, false by default 'format.fields.0.name' = 'lon', -- optional: define the schema explicitly using type information. 'format.fields.0.data-type' = 'FLOAT', -- This overrides default behavior that uses table's schema as format schema. 'format.fields.1.name' = 'rideTime', 'format.fields.1.data-type' = 'TIMESTAMP(3)', 'format.json-schema' = -- or by using a JSON schema which parses to DECIMAL and TIMESTAMP. '{ -- This also overrides the default behavior. "type": "object", "properties": { "lon": { "type": "number" }, "rideTime": { "type": "string", "format": "date-time" } } }' )
注:flink 1.10 字段的名稱和類型可以從 table schema 中推斷,不用寫 format.fields.0.name 和 format.fields.0.data-type 了。
CREATE TABLE user_log( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3) ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.properties.zookeeper.connect' = 'venn:2181', 'connector.properties.bootstrap.servers' = 'venn:9092', 'connector.startup-mode' = 'earliest-offset', 'format.type' = 'json' );
對應 json 數據如下:
{"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
對應的字段,會映射到對應的類型上,可以直接使用,比1.9 方便了不少。
當然,這個並不是這里的主要內容。
先來個嵌套的json看下:
{"user_info":{"user_id":"0111","name":"xxx"},"timestam":1586670908699,"id":"10001"}
這樣的復雜sql該怎么解析呢?
回來看下官網那段實例:
'format.json-schema' = -- or by using a JSON schema which parses to DECIMAL and TIMESTAMP. '{ -- This also overrides the default behavior. "type": "object", "properties": { "lon": { "type": "number" }, "rideTime": { "type": "string", "format": "date-time" } } }'
SQL 的properties 中可以通過 屬性 "format.json-schema" 設置輸入的 json schema。
Flink 的 json-schema 中支持如下的數據類型:

再來看下剛剛的嵌套json:
{"user_info":{"user_id":"0111","name":"xxx"},"timestam":1586670908699,"id":"10001"}
第一層的 timestam、id 直接就映射到字段上,而 user_info 也是個json。
從上面的實例上,可以看到 object 類型數據有 properties,而properties 的內容,怎么看都想是json的內層數據。
所以上面的sql 對應的 json-schema 是這樣的:
'format.json-schema' = '{
"type": "object",
"properties": {
"id": {type: "string"},
"timestam": {type: "string"},
"user_info":{type: "object",
"properties" : {
"user_id" : {type:"string"},
"name":{type:"string"} } }
}
}'
從上面的 json schame 和 Flink SQL 的映射關系可以看出,user_info 對應的table 字段的類型是ROW,所以 table 的schema 是這樣的:
CREATE TABLE user_log(
id VARCHAR,
timestam VARCHAR,
user_info ROW(user_id string, name string )
)
ROW 類型的 user_info,有兩個字段:user_id 和 name
注:使用的時候,直接用 "." 就可以了:如 user_info.user_id
到此,嵌套json的 schame 就搞定了。
下面我們再來看下 嵌套 json 數組:
{"user_info":{"user_id":"0111","name":"xxx"},"timestam":1586670908699,"id":"10001","jsonArray":[{"name222":"xxx","user_id222":"0111"}]}
這個又該怎么寫 json schema 呢?
官網有個實例說 json format 直接解析這樣的復雜 json:
"optional_address": { "oneOf": [ { "type": "null" }, { "$ref": "#/definitions/address" } ] }
太長了,截取一段,官網明確說了支持這樣的實例,也就是支持 json 數組
json schema 和 Flink SQL 的映射關系中, json 的 array 對應 Flink SQL的 ARRAY[_]
按照 object 類型的寫法,寫了個這樣的:
"jsonArray":{"type": "array",
"properties": {
"type": "object",
"properties" : {
"user_id222" : {type:"string"},
"name222" : {type:"string"}
}
}
}
收獲了一個 exception:
Caused by: java.lang.IllegalArgumentException: Arrays must specify an 'items' property in node: <root>/jsonArray at org.apache.flink.formats.json.JsonRowSchemaConverter.convertArray(JsonRowSchemaConverter.java:264) at org.apache.flink.formats.json.JsonRowSchemaConverter.convertType(JsonRowSchemaConverter.java:176) at org.apache.flink.formats.json.JsonRowSchemaConverter.convertObject(JsonRowSchemaConverter.java:246)
然后,當然是 debug 代碼了: org.apache.flink.formats.json.JsonRowSchemaConverter 就是解析 json-schema 的代碼了
JsonRowSchemaConverter 類有3個主要的方法分別對應解析不同類型的數據:
// 解析 type private static TypeInformation<?> convertType(String location, JsonNode node, JsonNode root) // 解析 object private static TypeInformation<Row> convertObject(String location, JsonNode node, JsonNode root) // 解析 array private static TypeInformation<?> convertArray(String location, JsonNode node, JsonNode root)
convertType 方法在這里解析具體字段和類型:
for (String type : types) { // set field type switch (type) { case TYPE_NULL: typeSet.add(Types.VOID); break; case TYPE_BOOLEAN: typeSet.add(Types.BOOLEAN); break; case TYPE_STRING: if (node.has(FORMAT)) { typeSet.add(convertStringFormat(location, node.get(FORMAT))); } else if (node.has(CONTENT_ENCODING)) { typeSet.add(convertStringEncoding(location, node.get(CONTENT_ENCODING))); } else { typeSet.add(Types.STRING); } break; case TYPE_NUMBER: typeSet.add(Types.BIG_DEC); break; case TYPE_INTEGER: // use BigDecimal for easier interoperability // without affecting the correctness of the result typeSet.add(Types.BIG_DEC); break; case TYPE_OBJECT: typeSet.add(convertObject(location, node, root)); break; case TYPE_ARRAY: typeSet.add(convertArray(location, node, root)); break; default: throw new IllegalArgumentException( "Unsupported type '" + node.get(TYPE).asText() + "' in node: " + location); } }
簡單類型,就直接添加對應的 Flink SQL 類型, 復雜類型的 object、array 由單獨的方法解析,這里我們看下 covertArray:
private static TypeInformation<?> convertArray(String location, JsonNode node, JsonNode root) { // validate items if (!node.has(ITEMS)) { throw new IllegalArgumentException( "Arrays must specify an '" + ITEMS + "' property in node: " + location); } final JsonNode items = node.get(ITEMS); // list (translated to object array) if (items.isObject()) { final TypeInformation<?> elementType = convertType( location + '/' + ITEMS, items, root); // result type might either be ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings return Types.OBJECT_ARRAY(elementType); } // tuple (translated to row) else if (items.isArray()) { final TypeInformation<?>[] types = convertTypes(location + '/' + ITEMS, items, root); // validate that array does not contain additional items if (node.has(ADDITIONAL_ITEMS) && node.get(ADDITIONAL_ITEMS).isBoolean() && node.get(ADDITIONAL_ITEMS).asBoolean()) { throw new IllegalArgumentException( "An array tuple must not allow additional items in node: " + location); } return Types.ROW(types); } throw new IllegalArgumentException( "Invalid type for '" + ITEMS + "' property in node: " + location); }
注:更多信息請查看源碼(org.apache.flink.formats.json.JsonRowSchemaConverter)
從上面的代碼可以看出,從 convertTypes 中解析到是 array 類型的,就調用 convertArray 方法,而 convertArray 方法中第一步就是判斷是否有個 ITEMS 字段,沒有直接就報錯:
Arrays must specify an 'items' property in node: <root>/jsonArray
有就 final JsonNode items = node.get(ITEMS) get 出來繼續解析,判斷 items 是個 object 或 array (然后繼續遞歸),都不是就拋出異常
從源碼可以看出 json 數組類型的 json schema 就是這樣的:
CREATE TABLE user_log( id VARCHAR, timestam VARCHAR, user_info ROW(user_id string, name string ), jsonArray ARRAY<ROW(user_id222 STRING, name222 STRING)> ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'complex_string', 'connector.properties.zookeeper.connect' = 'venn:2181', 'connector.properties.bootstrap.servers' = 'venn:9092', 'connector.startup-mode' = 'earliest-offset', 'format.type' = 'json', 'format.json-schema' = '{ "type": "object", "properties": { "id": {type: "string"}, "timestam": {type: "string"}, "user_info":{type: "object", "properties" : { "user_id" : {type:"string"}, "name":{type:"string"} } }, "jsonArray":{"type": "array", "items": { "type": "object", "properties" : { "user_id222" : {type:"string"}, "name222" : {type:"string"} } } } } }' );
看過源碼之后,對於上面的json schema 就沒有難度了
這里還要說下 json array 中有多個元素的案例:
{"user_info":{"user_id":"0111","name":"xxx"},"timestam":1586676835655,"id":"10001","jsonArray":[{"name222":"xxx","user_id222":"0022"},{"name333":"name3333","user_id222":"user3333"},{"cc":"xxx333","user_id444":"user4444","name444":"name4444"}]}
對應的 schema 也是這樣的:
"jsonArray":{"type": "array",
"items": {
"type": "object",
"properties" : {
"user_id222" : {type:"string"},
"name222" : {type:"string"}
}
}
}
}
因為在解析 json array 的時候,只能獲取到一個 items 字段(多加也沒用),會拿這個schema 去解析 json array 里面的所有元素,有對應字段就賦值,沒用就為空
表的列也是這樣的:
jsonArray ARRAY<ROW(user_id222 STRING, name222 STRING)>
在查詢中直接使用 jsonArray 會將所有數據直接查出來:
INSERT INTO user_log_sink SELECT * FROM user_log;
輸出的數據如下:
{"id":"10001","timestam":"1586676835655","user_info":{"user_id":"0111","name":"xxx"},"jsonArray":[{"user_id222":"0022","name222":"xxx"},{"user_id222":"user3333","name222":null},{"user_id222":null,"name222":null}]}
json array 中的第一個元素 全部解出來了,第二個元素只有 user_id222 有值,第三個元素都沒解析出來
注:json array 是這樣使用的:jsonArray[1].user_id222 # 代表 jsonArray 中的第一個元素的 user_id222 字段,數組下標從 1 開始,0 或 大於實際 json array 中的 長度會報 : java.lang.ArrayIndexOutOfBoundsException: 1
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

