背景
數據從kafka ingest到Phoenix。數據格式采取Json。數據鏈路:
api -> kafka -> Flume -> Phoenix
官方JsonEventSerializer的問題
-
每個table column必須有json字段,如果某個字段json中沒有,那么這條記錄被丟棄;
例如table中有cola和colb兩個列,但是json數據中只有{"cola":1},原生版本會將這個消息丟棄。 -
數組的問題,json中的數組元素是沒有類型的,例如一個小數字會被解析為java的int,如果我們的表中數組元素定義為BITINT,此時會有integer轉long類型的exception。
-
timestamp不支持unix timestamp格式(long)
-
如果數據格式出錯會有
SqlException
異常拋出,此時會導致Flume sink循環拉取錯誤的消息並不停地嘗試插入數據到Phoenix,但是一直失敗,新的數據也無法插入。
內部原理
- 讀取flume的event body (String),利用
JsonObject
解析,這里的處理會將json的每個字段強轉為String類型; - 如果該json字段對應的table列類型是
isArrayType
則將json值(數組)創建成Phoenix Array; - 如果不是數組,則直接將json的值轉為對應的
Object upsertValue = pDataType.toObject(jsonValue)
; - 最后調用Phoenix jdbc API (
PrepareedStatement.excute()
)將數據插入到Phoenix,這里會有類型判斷。
解決:
- 為null字段調用setNull
- 把json數組元素逐個轉為table中元素的類型
- 對timestamp類型特殊處理,判斷數據格式是不是\d+,如果是,則強轉
else if (pDataType == PTimestamp.INSTANCE) {
if (value.matches("\\d+")) { // if it's a Long value as time stamp
upsertValue = pDataType.toObject(Long.parseLong(value), PLong.INSTANCE);
} else {
upsertValue = pDataType.toObject(value);
}
}
- 避免拋出
SqlException
,只打印錯誤消息。
代碼修改參見:https://github.com/wlu-mstr/phoenix/tree/4.13-cdh5.11.2