Springboot+Flink1.12.1通過Table API / Flink SQL讀取HBase2.4.0


 

背景

需要將Hbase的數據同步到es,但是又不想把flink程序放到hadoop集群,簡單講 就是開發側,把大數據側的工作干了。。。

1. 環境

廢話不多說,這里用到的環境如下(不確定是否都必要,但是至少我是這個環境)

  • zookeeper 3.6.2
  • Hbase 2.4.0
  • Flink 1.12.1

2. HBase表

建表就不說了,只說下數據結構

表 pmaunew ,建了個名稱空間mes_orgdata,所以pmaunew 是在mes_orgdata下

表 pmaunew   下 rowkey是id,有個列族datas 下面有個projectid字段

 

3. pom依賴

  • jdk1.8
  • Flink1.12.1 使用的pom依賴如下(有些是多余的)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ag</groupId>
    <artifactId>dtools</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>dtools</name>
    <description>dtools project for Spring Boot</description>



    <properties>
        <java.version>1.8</java.version>
        <flink.version>1.12.1</flink.version>
        <spring-cloud.version>Hoxton.SR6</spring-cloud.version>
        <spring-boot.version>2.3.9.RELEASE</spring-boot.version>
        <alibaba-cloud.version>2.2.1.RELEASE</alibaba-cloud.version>
        <nacos.version>0.2.2.RELEASE</nacos.version>
        <scala.binary.version>2.12</scala.binary.version>

    </properties>



    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>3.5.2</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.2.0</version>
        </dependency>


        <dependency>
            <groupId>xyz.downgoon</groupId>
            <artifactId>snowflake</artifactId>
            <version>1.0.0</version>
            <scope>compile</scope>
        </dependency>


        <!-- hbase 客戶端 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hbase-2.2_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!-- hbase協處理器 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-endpoint</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-extension</artifactId>
            <version>3.2.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.12</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.37</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.4.6</version>
        </dependency>


        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version> 1.2.70</version>
        </dependency>



        <dependency>
            <groupId>com.jayway.jsonpath</groupId>
            <artifactId>json-path</artifactId>
        </dependency>


        <dependency>
            <groupId>com.sun.jna</groupId>
            <artifactId>jna</artifactId>
            <version>3.0.9</version>
        </dependency>



        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.7.0</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.7.0</version>
        </dependency>



    <!--swg-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>

        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/io.github.swagger2markup/swagger2markup -->
        <dependency>
            <groupId>io.github.swagger2markup</groupId>
            <artifactId>swagger2markup</artifactId>
            <version>1.3.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/io.swagger/swagger-core -->
        <dependency>
            <groupId>io.swagger</groupId>
            <artifactId>swagger-core</artifactId>
            <version>1.5.22</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/io.swagger/swagger-models -->
        <dependency>
            <groupId>io.swagger</groupId>
            <artifactId>swagger-models</artifactId>
            <version>1.5.22</version>
        </dependency>




    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

 

 

 
           

實際測試代碼

package com.ag.dtools.services.bigdata;


import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;

import java.util.function.Consumer;


public class FlinkService {

    public void testFink(){

       // Configuration hcf =   HBaseConfiguration.create()

        // 批執行環境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 表環境
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        TableResult tableResult = tableEnv.executeSql(
                "CREATE TABLE pmaunew (" +
                        " id STRING," +
                        " datas ROW<projectid STRING>," +
                        " PRIMARY KEY (id) NOT ENFORCED" +
                        " ) WITH (" +
                        " 'connector' = 'hbase-2.2' ," +
                        " 'table-name' = 'mes_orgdata:pmaunew' ," +
                        " 'zookeeper.quorum' = 'ds1:2181,ds2:2181'" +
                        " )");

        // 查詢是否能獲取到HBase里的數據
//        Table table = tableEnv.sqlQuery("SELECT rowkey, u_m_r FROM u_m");

        // 相當於 scan
        Table table = tableEnv.sqlQuery("SELECT * FROM pmaunew where datas.projectid='IG20220113116Y'");

        // 查詢的結果
        TableResult executeResult = table.execute();

        // 獲取查詢結果
        CloseableIterator<Row> collect = executeResult.collect();


        // 輸出 (執行print或者下面的 Consumer之后,數據就被消費了。兩個只能留下一個)
        executeResult.print();

     

        collect.forEachRemaining(new Consumer<Row>() {
            @Override
            public void accept(Row row) {
                String field0 = String.valueOf(row.getField(0));
                String[] user_movie = field0.split(",");
                Double ratting = Double.valueOf(String.valueOf(row.getField(1)));
                int x = 1;

                
            }
        });
    }

}

 

5. 輸出

 

| 1497087911286276096 | IG20220113116Y |
| 1497087911911227392 | IG20220113116Y |
| 1497087912536178688 | IG20220113116Y |
| 1497087913152741377 | IG20220113116Y |
| 1497087913735749632 | IG20220113116Y |
| 1497087914243260416 | IG20220113116Y |
| 1497087914973069312 | IG20220113116Y |
| 1497087915853873152 | IG20220113116Y |
| 1497087916478824448 | IG20220113116Y |
| 1497087917200244736 | IG20220113116Y |
| 1497087918143963136 | IG20220113116Y |

 
           

注意

這里我們在Flink在SQL里面定義HBase的Table時,指定的字段都是用的STRING類型,雖然本來應該是INT,但是用INT的時候,報錯了,改成INT就ok了。

 

關於flink創建表還可以使用這種方式

public class TableDefine {
 
    public static void defineUserHbaseTable(StreamTableEnvironment tEnv,Configuration conf){
        HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, "t_user");
        //設置hbase表的rowKey及其類型
        hBaseTableSource.setRowKey("rowKey", Integer.class);
        //設置hbase表的字段及其類型 第一個參數為列簇,第二個參數為字段名(最后簡寫以減少存儲空間和執行效率),第三個參數為類型
        hBaseTableSource.addColumn("f", "uid", Integer.class);
        hBaseTableSource.addColumn("f", "uname", String.class);
        hBaseTableSource.addColumn("f", "sex", String.class);
        hBaseTableSource.addColumn("f", "address", String.class);
        //向flinktable注冊處理函數 
        // 第一個參數為注冊函數名字,即flink-sql中的表名,而new HBaseTableSource(conf, "t_user")中的t_user為hbase的表名
        // 第二個參數是一個TableFunction,返回結果為要查詢出的數據列,即hbase表addColumn的哪些列,參數為rowkey,表示根據rowkey即userID進行查詢
        tEnv.registerFunction("t_user", hBaseTableSource.getLookupFunction(new String[]{"rowKey"}));
    }
 
    public static void defineProductHbaseTable(StreamTableEnvironment tEnv,Configuration conf){
        HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, "t_product");
        //設置hbase表的rowKey及其類型
        hBaseTableSource.setRowKey("rowKey", Integer.class);
        //設置hbase表的字段及其類型 第一個參數為列簇,第二個參數為字段名(最后簡寫以減少存儲空間和執行效率),第三個參數為類型
        hBaseTableSource.addColumn("f", "pid", Integer.class);
        hBaseTableSource.addColumn("f", "pname", String.class);
        hBaseTableSource.addColumn("f", "pt", String.class);
        //向flinktable注冊處理函數 
        // 第一個參數為注冊函數名字,即flink-sql中的表名,而new HBaseTableSource(conf, "t_product")中的t_product為hbase的表名
        // 第二個參數是一個TableFunction,返回結果為要查詢出的數據列,即hbase表addColumn的哪些列,參數為rowkey,表示根據rowkey即userID進行查詢
        tEnv.registerFunction("t_product", hBaseTableSource.getLookupFunction(new String[]{"rowKey"}));
    }
}

 

https://blog.csdn.net/Baron_ND/article/details/107839599

 

 

其他參考

https://www.cnblogs.com/Springmoon-venn/p/12544251.html

https://www.cnblogs.com/21airuirui1123/p/14644933.html

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM