Flink流處理-Sink之HBase


TripDriveToHBaseSink

package pers.aishuang.flink.streaming.sink.hbase;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.entity.TripModel;
import pers.aishuang.flink.streaming.utils.DateUtil;

import java.io.IOException;

public class TripDriveToHBaseSink extends RichSinkFunction<TripModel> {
    private final static Logger logger = LoggerFactory.getLogger(TripDriveToHBaseSink.class);


    private String tableName;
    private Connection conn = null;
    private BufferedMutator mutator = null;

    public TripDriveToHBaseSink(String _tableName) {
        this.tableName = _tableName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //從上下文獲取到全局參數
        ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
                .getExecutionConfig().getGlobalJobParameters();
        //獲取HBase Java API相關參數
        String zkQuorum = globalJobParameters.getRequired("zookeeper.quorum");
        String port = globalJobParameters.getRequired("zookeeper.clientPort");
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        conf.set(HConstants.ZOOKEEPER_QUORUM,zkQuorum);
        conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,port);
        conf.set(TableInputFormat.INPUT_TABLE,tableName);

        org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create(conf);
        //通過連接工廠創建連接
        conn = ConnectionFactory.createConnection(hbaseConf);
        //設置緩存對象的多大、多長時間刷寫到HBase中
        //緩存寫入HBaes,與Kafka的緩存寫入Kafka有異曲同工之秒
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));
        //設置緩存達到一定的大小:10M
        params.writeBufferSize(10*1024*1024L);
        //設置緩存達到一定的時間:5s
        params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L);
        //通過連接獲取表對象
        try {
            mutator = conn.getBufferedMutator(params);
        } catch (IOException e) {
            logger.error("當前獲取bufferedMutator 失敗:" + e.getMessage());
        }
    }

    //5. 重寫 invoke 方法,將讀取的數據寫入到 hbase
    @Override
    public void invoke(TripModel value, Context context) throws Exception {
        //5.1 setDataSourcePut輸入參數value,返回put對象
        try {
            Put put = setDataSourcePut(value);
            mutator.mutate(put);
            //5.2 指定時間內的數據強制刷寫到hbase
            mutator.flush();
        } catch (Exception ex) {
            logger.error("寫入到hbase失敗:" + ex.getMessage());
        }
    }

    //4.重寫close方法
    @Override
    public void close() throws Exception {
        //4.1 關閉hbase 表和連接資源
        if (mutator != null) mutator.close();
        if (!conn.isClosed()) conn.close();
    }

    //6. 實現 setDataSourcePut 方法

    /**
     * 每條對象生成一個 put
     * 1.表名 2.rowkey 3.列簇  4.列名和列值
     *
     * @param tripModel
     * @return
     */
    private Put setDataSourcePut(TripModel tripModel) {
        String rowKey = tripModel.getVin() + "_" + DateUtil.convertStringToDate(tripModel.getTripStartTime()).getTime();
        String cf = "cf";
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("vin"), Bytes.toBytes(tripModel.getVin()));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("lastSoc"), Bytes.toBytes(String.valueOf(tripModel.getLastSoc())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("lastMileage"), Bytes.toBytes(String.valueOf(tripModel.getLastMileage())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripStartTime"), Bytes.toBytes(tripModel.getTripStartTime()));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getStart_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_longitude"), Bytes.toBytes(String.valueOf(tripModel.getStart_longitude())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_latitude"), Bytes.toBytes(String.valueOf(tripModel.getStart_latitude())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("start_mileage"), Bytes.toBytes(String.valueOf(tripModel.getStart_mileage())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getEnd_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_longitude"), Bytes.toBytes(String.valueOf(tripModel.getEnd_longitude())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_latitude"), Bytes.toBytes(String.valueOf(tripModel.getEnd_latitude())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("end_mileage"), Bytes.toBytes(String.valueOf(tripModel.getEnd_mileage())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripEndTime"), Bytes.toBytes(tripModel.getTripEndTime()));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("mileage"), Bytes.toBytes(String.valueOf(tripModel.getMileage())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("max_speed"), Bytes.toBytes(String.valueOf(tripModel.getMax_speed())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("soc_comsuption"), Bytes.toBytes(String.valueOf(tripModel.getSoc_comsuption())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("time_comsuption"), Bytes.toBytes(String.valueOf(tripModel.getTime_comsuption())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_low_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_low_speed_nums())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_medium_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_medium_speed_nums())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("total_high_speed_nums"), Bytes.toBytes(String.valueOf(tripModel.getTotal_high_speed_nums())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Low_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getLow_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Medium_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getMedium_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("High_BMS_SOC"), Bytes.toBytes(String.valueOf(tripModel.getHigh_BMS_SOC())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Low_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getLow_BMS_Mileage())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("Medium_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getMedium_BMS_Mileage())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("High_BMS_Mileage"), Bytes.toBytes(String.valueOf(tripModel.getHigh_BMS_Mileage())));

        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("tripStatus"), Bytes.toBytes(String.valueOf(tripModel.getTripStatus())));
        put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("processTime"), Bytes.toBytes(DateUtil.getCurrentDateTime()));
        return put;
    }
}

TripSampleToHBaseSink

package pers.aishuang.flink.streaming.sink.hbase;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pers.aishuang.flink.streaming.utils.DateUtil;
import pers.aishuang.flink.streaming.utils.StringUtil;

import java.io.IOException;

public class TripSampleToHBaseSink extends RichSinkFunction<String[]> {
    //創建日志打印器
    private final static Logger logger = LoggerFactory.getLogger(TripSampleToHBaseSink.class);

    //定義當前類的私有變量
    private String tableName;
    //定義連接
    org.apache.hadoop.hbase.client.Connection conn = null;
    //定義表操作的對象
    BufferedMutator mutator = null;

    //創建一個有參數-表名的構造方法
    public TripSampleToHBaseSink(String _tableName) {
        this.tableName = _tableName;
    }

    //重寫open方法
    @Override
    public void open(Configuration parameters) throws Exception {
        //1、從上下文獲取到全局的參數
        ParameterTool globalJobParameters = (ParameterTool) getRuntimeContext()
                .getExecutionConfig()
                .getGlobalJobParameters();
        //2、獲取HBase Java API相關參數
        //-- 指定ZK集群服務端地址(quorum:法定人數)
        String zkQuorum = globalJobParameters.getRequired("zookeeper.quorum");
        //-- 指定ZK客戶端端口號
        String port = globalJobParameters.getRequired("zookeeper.clientPort");
        //-- 創建配置
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        //-- 設置配置,加載參數
        conf.set(HConstants.CLIENT_ZOOKEEPER_QUORUM,zkQuorum);
        conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,port);
        conf.set(TableInputFormat.INPUT_TABLE,tableName);

        org.apache.hadoop.conf.Configuration hbaseConf = HBaseConfiguration.create(conf);
        //3、通過連接工廠創建連接
        conn = ConnectionFactory.createConnection(hbaseConf);
        //-- 設置緩存對象的多大、多長時間刷新到Hbase中
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));
        //-- 寫緩存大小為10M
        params.writeBufferSize(10*1024*1024L);//10M
        //-- 寫緩存刷寫時間為5s
        params.setWriteBufferPeriodicFlushTimeoutMs(5*1000L);//5s
        //4、通過連接獲取表對象
        try {
            mutator = conn.getBufferedMutator(params);
        } catch (IOException e) {
            logger.error("當前獲取bufferedMutator 失敗:" + e.getMessage());
        }
    }

    //5、重寫invoke方法,將讀取的數據寫入到HBase
    @Override
    public void invoke(String[] value, Context context) throws Exception {
        //-- setDataSourcePut輸入參數value,返回put對象
        try {
            Put put = setDataSourcePut(value);
            mutator.mutate(put);
            //-- 指定時間內的數據強制刷寫到HBase
            mutator.flush();
        } catch (IOException e) {
            logger.error("寫入到HBase失敗:" + e.getMessage());
        }
    }

    //重寫close方法
    @Override
    public void close() throws Exception {
        //關閉hbase表和連接資源
        if(mutator != null) mutator.close();
        if( conn != null ) conn.close();
    }


    /**
     * 實現setDataSourcePut方法
     * 每個對象生成一個 put
     * 1、表名 2、rowkey 3、列簇 4、列別和列值
     * @param tripDriveArr
     * @return
     */
    private Put setDataSourcePut(String[] tripDriveArr) {
        //1. 如何設計rowkey VIN+時間戳反轉
        String rowkey = tripDriveArr[0] + StringUtil.reverse(tripDriveArr[1]);
        //2. 通過rowkey實例化put
        Put put = new Put(Bytes.toBytes(rowkey));
        //3. 定義列簇的名稱
        String cf = "cf";
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("vin"),Bytes.toBytes(tripDriveArr[0]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("terminalTimeStamp"),Bytes.toBytes(tripDriveArr[1]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("soc"),Bytes.toBytes(tripDriveArr[2]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("mileage"),Bytes.toBytes(tripDriveArr[3]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("speed"),Bytes.toBytes(tripDriveArr[4]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("gps"),Bytes.toBytes(tripDriveArr[5]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("terminalTime"),Bytes.toBytes(tripDriveArr[6]));
        put.addColumn(Bytes.toBytes(cf),Bytes.toBytes("processTime"),Bytes.toBytes(DateUtil.getCurrentDateTime()));

        return put;
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM