Flink 使用(一)——從kafka中讀取數據寫入到HBASE中


1、前言

  本文是在《如何計算實時熱門商品》[1]一文上做的擴展,僅在功能上驗證了利用Flink消費Kafka數據,把處理后的數據寫入到HBase的流程,其具體性能未做調優。此外,文中並未就Flink處理邏輯做過多的分析,只因引文(若不特殊說明,文中引文皆指《如何計算實時熱門商品》一文)中寫的很詳細了,故僅給出博主調試犯下的錯。文中若有錯誤,歡迎大伙留言指出,謝謝

  源碼在GitHub上,地址:https://github.com/L-Wg/flinkExample

  環境:Flink 1.6+Kafka 1.1+HBase 1.2

       OpenJDK 1.8+Maven 3.5.2

2、獲取數據

  本文是將Kafka作為數據源(目前業界比較流行的做法),數據的格式和引文的格式一致,數據類型為POJO。為添加源,一般是實現接口SourceFunction<T>,但是Flink與Kafka的鏈接器(connector),Flink社區已經做好了,我們只需在pom文件中加入相應的依賴即可。這里有值得注意的一點是:flink-connector-kafka-*.jar是有版本要求的,其具體的要求可以參加Flink官網connector一節[2]。代碼如下:

DataStream<UserBehaviorSchema> dataStream=env.addSource(new FlinkKafkaConsumer010<UserBehaviorSchema>(
                topic,
                new UserBehaviorSerial(),
                properties
        ).setStartFromEarliest());

 其中,在代碼中需指定的有:要消費的topic、數據序列化的對象以及配置,其中,配置可指定bootstrap.servers即可,其他配置按需設置。調用setStarFromEarliest()是為讓Flink從頭消費指定topic中數據,這樣寫的好處是:只要你Kafka topic中存在數據,測試時就不用重新往kafka里寫數據了。當然調用該方法不僅僅是這個作用,其在業務上的使用需根據需求。此外,Flink中還有諸多指定消費kafka的方法,詳情請見官網[2]

這里值得說的一點是獲取數據后,dataStream的值是不變的,不會因為做過flatmap等操作后就會改變。

3、數據轉換

  對Flink 代碼的分析過程見引文,此處僅有以下幾點需說明的:

  1.  若是kafka中的數據是自己按照因為數據格式隨機生成的,請不要按照博主代碼中customWaterExtractor()類的寫法去定義watermark和timestamp,因為代碼中的currentTimeStamp的值可能也是隨機的,所以就會造成程序不報錯但是卡死等待的情況。

  2.  timestamp的值要和數據源中數據保持相同的數據級。

public static class customWaterExtractor implements AssignerWithPeriodicWatermarks<UserBehaviorSchema>{

        private static final long serialVersionUID = 298015256202705122L;

        private final long maxOutOrderness=3500;
        private long currentTimeStamp=Long.MIN_VALUE;

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(currentTimeStamp-maxOutOrderness);
        }

        @Override
        public long extractTimestamp(UserBehaviorSchema element, long previousElementTimestamp) {
//          此處需要注意的點:timestamp得到的值是否乘以1000轉換為毫秒,需要根據消息中被指定為timestamp字段的單位。
            long timeStamp=element.timestamp*1000;
            currentTimeStamp=Math.max(timeStamp,currentTimeStamp);
            return timeStamp;
        }
    }

  3.  在返回的結果類ResultEvent中,使用sinking字段去保存HotTopN的名次,其默認值為0。  

 4、數據存儲

  本文中是通過extends RichSinkFunction來實現將數據寫入HBase中,其中,@Override的invoke()方法是針對每條數據都會調用的,其余的open()、close()方法,從日志上看是不是針對每條數據都會調用。對open()方法用於打開鏈接,最好實現連接池避免鏈接過多,此處HBase的connection已自身實現不用單獨實現。

  數據寫入HBase時,有兩點建議:

  1.  將數據寫入HBase的表中時,最好先做好表的預分區工作,避免后期因為表的split造成性能下降以及維護上的困難;

  2.  為加快HBase的查詢速度,可以將制定字段作為HBase表的rowkey,文中是指定時間戳和排名作為表的rowkey,至於二級索引等暫不在此處討論。

5、參考文獻鏈接:

  [1]http://wuchong.me/blog/2018/11/07/use-flink-calculate-hot-items/

  [2]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM