Apache Phoenix Flume集成 -- JsonEventSerializer改進


背景

數據從kafka ingest到Phoenix。數據格式采取Json。數據鏈路:

api -> kafka -> Flume -> Phoenix

官方JsonEventSerializer的問題

  • 每個table column必須有json字段,如果某個字段json中沒有,那么這條記錄被丟棄;
    例如table中有cola和colb兩個列,但是json數據中只有{"cola":1},原生版本會將這個消息丟棄。

  • 數組的問題,json中的數組元素是沒有類型的,例如一個小數字會被解析為java的int,如果我們的表中數組元素定義為BITINT,此時會有integer轉long類型的exception。

  • timestamp不支持unix timestamp格式(long)

  • 如果數據格式出錯會有SqlException異常拋出,此時會導致Flume sink循環拉取錯誤的消息並不停地嘗試插入數據到Phoenix,但是一直失敗,新的數據也無法插入。

內部原理

  • 讀取flume的event body (String),利用JsonObject解析,這里的處理會將json的每個字段強轉為String類型;
  • 如果該json字段對應的table列類型是isArrayType則將json值(數組)創建成Phoenix Array;
  • 如果不是數組,則直接將json的值轉為對應的Object upsertValue = pDataType.toObject(jsonValue)
  • 最后調用Phoenix jdbc API (PrepareedStatement.excute())將數據插入到Phoenix,這里會有類型判斷。

解決:

  • 為null字段調用setNull
  • 把json數組元素逐個轉為table中元素的類型
  • 對timestamp類型特殊處理,判斷數據格式是不是\d+,如果是,則強轉
else if (pDataType == PTimestamp.INSTANCE) {
          if (value.matches("\\d+")) { // if it's  a Long value as time stamp
                  upsertValue = pDataType.toObject(Long.parseLong(value), PLong.INSTANCE);
          } else {
                  upsertValue = pDataType.toObject(value);
          }
  }
  • 避免拋出SqlException,只打印錯誤消息。

代碼修改參見:https://github.com/wlu-mstr/phoenix/tree/4.13-cdh5.11.2


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM