背景
需要將Hbase的數據同步到es,但是又不想把flink程序放到hadoop集群,簡單講 就是開發側,把大數據側的工作干了。。。
1. 環境
廢話不多說,這里用到的環境如下(不確定是否都必要,但是至少我是這個環境)
- zookeeper 3.6.2
- Hbase 2.4.0
- Flink 1.12.1
2. HBase表
建表就不說了,只說下數據結構
表 pmaunew ,建了個名稱空間mes_orgdata,所以pmaunew 是在mes_orgdata下
表 pmaunew 下 rowkey是id,有個列族datas 下面有個projectid字段
3. pom依賴
- jdk1.8
- Flink1.12.1 使用的pom依賴如下(有些是多余的)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.ag</groupId> <artifactId>dtools</artifactId> <version>0.0.1-SNAPSHOT</version> <name>dtools</name> <description>dtools project for Spring Boot</description> <properties> <java.version>1.8</java.version> <flink.version>1.12.1</flink.version> <spring-cloud.version>Hoxton.SR6</spring-cloud.version> <spring-boot.version>2.3.9.RELEASE</spring-boot.version> <alibaba-cloud.version>2.2.1.RELEASE</alibaba-cloud.version> <nacos.version>0.2.2.RELEASE</nacos.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>2.2.1.RELEASE</version> </dependency> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>3.5.2</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.2.0</version> </dependency> <dependency> <groupId>xyz.downgoon</groupId> <artifactId>snowflake</artifactId> <version>1.0.0</version> <scope>compile</scope> </dependency> <!-- hbase 客戶端 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hbase-2.2_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- hbase協處理器 --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-endpoint</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-extension</artifactId> <version>3.2.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.12</version> <scope>compile</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.37</version> <scope>compile</scope> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.4.6</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version> 1.2.70</version> </dependency> <dependency> <groupId>com.jayway.jsonpath</groupId> <artifactId>json-path</artifactId> </dependency> <dependency> <groupId>com.sun.jna</groupId> <artifactId>jna</artifactId> <version>3.0.9</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.7.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.7.0</version> </dependency> <!--swg--> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <!-- https://mvnrepository.com/artifact/io.github.swagger2markup/swagger2markup --> <dependency> <groupId>io.github.swagger2markup</groupId> <artifactId>swagger2markup</artifactId> <version>1.3.1</version> </dependency> <!-- https://mvnrepository.com/artifact/io.swagger/swagger-core --> <dependency> <groupId>io.swagger</groupId> <artifactId>swagger-core</artifactId> <version>1.5.22</version> </dependency> <!-- https://mvnrepository.com/artifact/io.swagger/swagger-models --> <dependency> <groupId>io.swagger</groupId> <artifactId>swagger-models</artifactId> <version>1.5.22</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
4. Flink-Java代碼
實際測試代碼
package com.ag.dtools.services.bigdata; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.types.Row; import org.apache.flink.util.CloseableIterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import java.util.function.Consumer; public class FlinkService { public void testFink(){ // Configuration hcf = HBaseConfiguration.create() // 批執行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 表環境 EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); TableResult tableResult = tableEnv.executeSql( "CREATE TABLE pmaunew (" + " id STRING," + " datas ROW<projectid STRING>," + " PRIMARY KEY (id) NOT ENFORCED" + " ) WITH (" + " 'connector' = 'hbase-2.2' ," + " 'table-name' = 'mes_orgdata:pmaunew' ," + " 'zookeeper.quorum' = 'ds1:2181,ds2:2181'" + " )"); // 查詢是否能獲取到HBase里的數據 // Table table = tableEnv.sqlQuery("SELECT rowkey, u_m_r FROM u_m"); // 相當於 scan Table table = tableEnv.sqlQuery("SELECT * FROM pmaunew where datas.projectid='IG20220113116Y'"); // 查詢的結果 TableResult executeResult = table.execute(); // 獲取查詢結果 CloseableIterator<Row> collect = executeResult.collect(); // 輸出 (執行print或者下面的 Consumer之后,數據就被消費了。兩個只能留下一個) executeResult.print(); collect.forEachRemaining(new Consumer<Row>() { @Override public void accept(Row row) { String field0 = String.valueOf(row.getField(0)); String[] user_movie = field0.split(","); Double ratting = Double.valueOf(String.valueOf(row.getField(1))); int x = 1; } }); } }
5. 輸出
| 1497087911286276096 | IG20220113116Y |
| 1497087911911227392 | IG20220113116Y |
| 1497087912536178688 | IG20220113116Y |
| 1497087913152741377 | IG20220113116Y |
| 1497087913735749632 | IG20220113116Y |
| 1497087914243260416 | IG20220113116Y |
| 1497087914973069312 | IG20220113116Y |
| 1497087915853873152 | IG20220113116Y |
| 1497087916478824448 | IG20220113116Y |
| 1497087917200244736 | IG20220113116Y |
| 1497087918143963136 | IG20220113116Y |
注意
這里我們在Flink在SQL里面定義HBase的Table時,指定的字段都是用的STRING類型,雖然本來應該是INT,但是用INT的時候,報錯了,改成INT就ok了。
關於flink創建表還可以使用這種方式
public class TableDefine { public static void defineUserHbaseTable(StreamTableEnvironment tEnv,Configuration conf){ HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, "t_user"); //設置hbase表的rowKey及其類型 hBaseTableSource.setRowKey("rowKey", Integer.class); //設置hbase表的字段及其類型 第一個參數為列簇,第二個參數為字段名(最后簡寫以減少存儲空間和執行效率),第三個參數為類型 hBaseTableSource.addColumn("f", "uid", Integer.class); hBaseTableSource.addColumn("f", "uname", String.class); hBaseTableSource.addColumn("f", "sex", String.class); hBaseTableSource.addColumn("f", "address", String.class); //向flinktable注冊處理函數 // 第一個參數為注冊函數名字,即flink-sql中的表名,而new HBaseTableSource(conf, "t_user")中的t_user為hbase的表名 // 第二個參數是一個TableFunction,返回結果為要查詢出的數據列,即hbase表addColumn的哪些列,參數為rowkey,表示根據rowkey即userID進行查詢 tEnv.registerFunction("t_user", hBaseTableSource.getLookupFunction(new String[]{"rowKey"})); } public static void defineProductHbaseTable(StreamTableEnvironment tEnv,Configuration conf){ HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, "t_product"); //設置hbase表的rowKey及其類型 hBaseTableSource.setRowKey("rowKey", Integer.class); //設置hbase表的字段及其類型 第一個參數為列簇,第二個參數為字段名(最后簡寫以減少存儲空間和執行效率),第三個參數為類型 hBaseTableSource.addColumn("f", "pid", Integer.class); hBaseTableSource.addColumn("f", "pname", String.class); hBaseTableSource.addColumn("f", "pt", String.class); //向flinktable注冊處理函數 // 第一個參數為注冊函數名字,即flink-sql中的表名,而new HBaseTableSource(conf, "t_product")中的t_product為hbase的表名 // 第二個參數是一個TableFunction,返回結果為要查詢出的數據列,即hbase表addColumn的哪些列,參數為rowkey,表示根據rowkey即userID進行查詢 tEnv.registerFunction("t_product", hBaseTableSource.getLookupFunction(new String[]{"rowKey"})); } }
https://blog.csdn.net/Baron_ND/article/details/107839599
其他參考
https://www.cnblogs.com/Springmoon-venn/p/12544251.html
https://www.cnblogs.com/21airuirui1123/p/14644933.html