1、前言
本文是在《如何計算實時熱門商品》[1]一文上做的擴展,僅在功能上驗證了利用Flink消費Kafka數據,把處理后的數據寫入到HBase的流程,其具體性能未做調優。此外,文中並未就Flink處理邏輯做過多的分析,只因引文(若不特殊說明,文中引文皆指《如何計算實時熱門商品》一文)中寫的很詳細了,故僅給出博主調試犯下的錯。文中若有錯誤,歡迎大伙留言指出,謝謝!
源碼在GitHub上,地址:https://github.com/L-Wg/flinkExample;
環境:Flink 1.6+Kafka 1.1+HBase 1.2
OpenJDK 1.8+Maven 3.5.2
2、獲取數據
本文是將Kafka作為數據源(目前業界比較流行的做法),數據的格式和引文的格式一致,數據類型為POJO。為添加源,一般是實現接口SourceFunction<T>,但是Flink與Kafka的鏈接器(connector),Flink社區已經做好了,我們只需在pom文件中加入相應的依賴即可。這里有值得注意的一點是:flink-connector-kafka-*.jar是有版本要求的,其具體的要求可以參加Flink官網connector一節[2]。代碼如下:
DataStream<UserBehaviorSchema> dataStream=env.addSource(new FlinkKafkaConsumer010<UserBehaviorSchema>( topic, new UserBehaviorSerial(), properties ).setStartFromEarliest());
其中,在代碼中需指定的有:要消費的topic、數據序列化的對象以及配置,其中,配置可指定bootstrap.servers即可,其他配置按需設置。調用setStarFromEarliest()是為讓Flink從頭消費指定topic中數據,這樣寫的好處是:只要你Kafka topic中存在數據,測試時就不用重新往kafka里寫數據了。當然調用該方法不僅僅是這個作用,其在業務上的使用需根據需求。此外,Flink中還有諸多指定消費kafka的方法,詳情請見官網[2]。
這里值得說的一點是獲取數據后,dataStream的值是不變的,不會因為做過flatmap等操作后就會改變。
3、數據轉換
對Flink 代碼的分析過程見引文,此處僅有以下幾點需說明的:
1. 若是kafka中的數據是自己按照因為數據格式隨機生成的,請不要按照博主代碼中customWaterExtractor()類的寫法去定義watermark和timestamp,因為代碼中的currentTimeStamp的值可能也是隨機的,所以就會造成程序不報錯但是卡死等待的情況。
2. timestamp的值要和數據源中數據保持相同的數據級。
public static class customWaterExtractor implements AssignerWithPeriodicWatermarks<UserBehaviorSchema>{ private static final long serialVersionUID = 298015256202705122L; private final long maxOutOrderness=3500; private long currentTimeStamp=Long.MIN_VALUE; @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(currentTimeStamp-maxOutOrderness); } @Override public long extractTimestamp(UserBehaviorSchema element, long previousElementTimestamp) { // 此處需要注意的點:timestamp得到的值是否乘以1000轉換為毫秒,需要根據消息中被指定為timestamp字段的單位。 long timeStamp=element.timestamp*1000; currentTimeStamp=Math.max(timeStamp,currentTimeStamp); return timeStamp; } }
3. 在返回的結果類ResultEvent中,使用sinking字段去保存HotTopN的名次,其默認值為0。
4、數據存儲
本文中是通過extends RichSinkFunction來實現將數據寫入HBase中,其中,@Override的invoke()方法是針對每條數據都會調用的,其余的open()、close()方法,從日志上看是不是針對每條數據都會調用。對open()方法用於打開鏈接,最好實現連接池避免鏈接過多,此處HBase的connection已自身實現不用單獨實現。
數據寫入HBase時,有兩點建議:
1. 將數據寫入HBase的表中時,最好先做好表的預分區工作,避免后期因為表的split造成性能下降以及維護上的困難;
2. 為加快HBase的查詢速度,可以將制定字段作為HBase表的rowkey,文中是指定時間戳和排名作為表的rowkey,至於二級索引等暫不在此處討論。
5、參考文獻鏈接:
[1]http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/
[2]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html