Flink 1.10 SQL 寫HBase


Hbase 也是我們很常用的數據存儲組件,所以提前嘗試下用SQL 寫Hbase,中間也遇到一些坑,跟大家分享一下。

官網地址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#hbase-connector

--------------------------20200508--------------

新增: Flink 1.10 SQL 寫 Hbase 數據無法寫入hbase問題

---------------------------------------------------

HBase Connector 支持這些操作: Source: Batch Sink: Batch Sink: Streaming Append Mode Sink: Streaming Upsert Mode Temporal Join: Sync Mode

(選擇性忽略Batch 的操作了,上次跟一個朋友說,HBase connector 只支持 sink 操作。)

HBase connector  可以在upsert模式下運行,以使用查詢定義的密鑰與外部系統交換UPSERT / DELETE消息。

對於 append-only 查詢,connector 還可以在 append  模式下操作,僅與外部系統交換INSERT消息。

官網示例如下:

CREATE TABLE MyUserTable (
  hbase_rowkey_name rowkey_type,
  hbase_column_family_name1 ROW<...>,
  hbase_column_family_name2 ROW<...>
) WITH (
  'connector.type' = 'hbase', -- required: specify this table type is hbase
  
  'connector.version' = '1.4.3',          -- required: valid connector versions are "1.4.3"
  
  'connector.table-name' = 'hbase_table_name',  -- required: hbase table name
  
  'connector.zookeeper.quorum' = 'localhost:2181', -- required: HBase Zookeeper quorum configuration
  'connector.zookeeper.znode.parent' = '/test',    -- optional: the root dir in Zookeeper for HBase cluster.
                                                   -- The default value is "/hbase".

  'connector.write.buffer-flush.max-size' = '10mb', -- optional: writing option, determines how many size in memory of buffered
                                                    -- rows to insert per round trip. This can help performance on writing to JDBC
                                                    -- database. The default value is "2mb".

  'connector.write.buffer-flush.max-rows' = '1000', -- optional: writing option, determines how many rows to insert per round trip.
                                                    -- This can help performance on writing to JDBC database. No default value,
                                                    -- i.e. the default flushing is not depends on the number of buffered rows.

  'connector.write.buffer-flush.interval' = '2s',   -- optional: writing option, sets a flush interval flushing buffered requesting
                                                    -- if the interval passes, in milliseconds. Default value is "0s", which means
                                                    -- no asynchronous flush thread will be scheduled.
)

Columns: HBase表中的 column families 必須聲明為ROW類型字段名稱映射到column families 名稱而嵌套的字段名稱映射到 column qualifier 名稱。 無需在架構中聲明所有 families 和 qualifiers ,用戶可以聲明必要的內容。 除ROW type字段外,原子類型的唯一 一個字段(例如STRING,BIGINT)將被識別為表的 rowkey。 row key 字段的名稱沒有任何限制。

Temporary join: 針對HBase的 Lookup join 不使用任何緩存; 始終總是通過HBase客戶端直接查詢數據。

之前一直看英文,上面的描述看得似是而非的,沒能理解到,Flink 中 建HBase 表的 DDL 的規則,簡單列下:

  1、Flink HBase 表只能有一個原子類型的字段,就是 rowkey(習慣是放在第一個字段,名字隨意)

  2、Flink HBase 表的其他字段都是ROW 類型的,並且字段名與 HBase 表中 的 column family 名一樣(如果只有一個列族,除了rowkey 就只有一個字段)

  3、ROW 類型的字段嵌套的字段名稱就是該列族下的列名

下面看個實例:

首先需要添加flink-hbase connector 對應的依賴:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hbase_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

選擇對應版本,Flink 版本是 1.10

必須要說下,現在 HBase 版本只支持 1.4.3,我的HBase 是2.1.4 的,懶得換了,直接修改Flink 代碼,繞過版本驗證(可以正常寫數,沒有經過版本匹配和嚴格的測試,可能會有未知的問題)

sql 文件如下:

-- 讀 kafka write to json
--sourceTable
CREATE TABLE user_log(
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3)
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior',
    'connector.properties.zookeeper.connect' = 'venn:2181',
    'connector.properties.bootstrap.servers' = 'venn:9092',
    'connector.startup-mode' = 'earliest-offset',
    'format.type' = 'json'
);

--sinkTable
CREATE TABLE user_log_sink ( rowkey string, cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3)) ) WITH (
    'connector.type' = 'hbase',
    -- 目前只支持 1.4.3 ,   HBaseValidator.CONNECTOR_VERSION_VALUE_143 寫死了 1.4.3, 改成 2.1.4 可以正常寫數到 hbase
    -- 生產慎用
    'connector.version' = '2.1.4',                    -- hbase vesion
    'connector.table-name' = 'venn',                  -- hbase table name
    'connector.zookeeper.quorum' = 'venn:2181',       -- zookeeper quorum
    'connector.zookeeper.znode.parent' = '/hbase',    -- hbase znode in zookeeper
    'connector.write.buffer-flush.max-size' = '10mb', -- max flush size
    'connector.write.buffer-flush.max-rows' = '1000', -- max flush rows
    'connector.write.buffer-flush.interval' = '2s'    -- max flush interval
);
--insert
INSERT INTO user_log_sink
SELECT user_id,
  ROW(item_id, category_id, behavior, ts ) as cf
FROM user_log;

簡單描述下 sink 表:

有個string類型的rowkey, 還有一個 列 cf (HBase 表的列族),cf 下面有 item_id/category_id/behavior/ts 4個列

執行的sql 就很簡單了,從Kafka 的 source 表讀數據,寫到 sink 表。 

 查看寫入到 HBase 中的數據:

hbase(main):001:0> count 'venn'
Current count: 1000, row: 561558                                                                                                                           
1893 row(s)
Took 1.8662 seconds                                                                                                                                        
=> 1893
hbase(main):002:0> scan 'venn',{LIMIT=>1}
ROW                                     COLUMN+CELL                                                                                                        
 1000034                                column=cf:behavior, timestamp=1584615437261, value=pv                                                              
 1000034                                column=cf:category_id, timestamp=1584615437261, value=982926                                                       
 1000034                                column=cf:item_id, timestamp=1584615437261, value=800784                                                           
 1000034                                column=cf:ts, timestamp=1584615437261, value=\x00\x00\x01_\xF4\x1F`\xD0                                            
1 row(s)
Took 0.0834 seconds   

 

------------20200427 改-----------

根本不用改源碼,直接將 sql  properties 寫成 '1.4.3' 就可以了,執行的時候,不會去校驗hbase 的版本

----------------------------------------

搞定 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM