業務介紹:
最近在做將線上的日志數據 通過flink的形式消費過來 ,然后灌入hbase 表當中。然后最近要解析這個日志想通過Python的方式讀取hbase的日志進行解析 然后入庫的方式做。
我們想一個小時做一次 但是那個rowkey 設計的真是卧槽。一點業務理念都沒有 就是簡單的字符串拼接起來的。只能通過fliter的形式過濾rowkey 然后把數據拿到。真是蛋疼。
所以只能重新設計rowkey,然后將實時的數據接入到我們的系統。
圖中的rowkey是之前別人設計的,我想一個小時拿一次 這可咋整啊。真是無語了。
所以就想着能不能改一下rowkey的形式。然后為了保證不出現數據熱點和數據的唯一性。重新設計rowkey
我重新設計通過每天的小時數,然后還有年月日時分秒的形式,再加上這條數據的hash值。然后作為rowkey
具體的python代碼如下:
#! /root/anaconda3/bin/python3 # coding:utf-8 import happybase import datetime,time connection = happybase.Connection('自己集群的ip') connection = happybase.Connection('自己集群的ip', autoconnect=False) connection.open() print(connection.tables()) table = connection.table('app') print(table) #因為這里是美國時間,所以要獲取的時間減8個小時,同時因為時獲取的是前一個小時的數據 ,所以這里需要減去9個小時 today_str = (datetime.datetime.now() + datetime.timedelta(hours=-9)).strftime('%Y-%m-%d %H:%M:%S') #這里獲取的是小時的標識符2019080506 這里就截取06作為要處理的小時數據 today_time=today_str[11:13] today = today_str[0:4] + today_str[5:7] + today_str[8:10] print(today_time) print(today) today_time_pingjie=today_time+"|"+today #這里需要對rowkey的前綴進行編碼轉化 today_time_byte = bytes(today_time_pingjie, encoding = "utf8") for key, data in table.scan(row_prefix=today_time_byte): print(key, data)
注意:
這里通過 row_prefix 的形式獲取到rowkey中我們想要的行數據。這里一定要注意將其轉化為字節。字符串會出現下面的報錯
Traceback (most recent call last):
File "F:/softinstall/Python/pythonworkplace/readhbasedata/hbase.py", line 27, in <module>
for key, data in table.scan(row_prefix='05'):
File "F:\softinstall\Python\pythonworkplace\venv\lib\site-packages\happybase\table.py", line 341, in scan
row_stop = bytes_increment(row_prefix)
File "F:\softinstall\Python\pythonworkplace\venv\lib\site-packages\happybase\util.py", line 82, in bytes_increment
assert isinstance(b, six.binary_type)
AssertionError
官網的代碼如下:
https://happybase.readthedocs.io/en/latest/
這個代碼當中有一個b 百度上漫天飛的都沒加這個 死活查詢不到數據。