引入包:
<!-- Spring HBase 依賴 --> <!--==================hadoop ===================--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <!-- mapreduce 核心jar包 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <!--==================HBase ===================--> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-mapreduce</artifactId> <version>${hbase.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-annotations --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-annotations</artifactId> <version>${hbase.version}</version> </dependency> <!--spark--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> </exclusion> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> </exclusion> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>${spark.version}</version> </dependency>
新建一個 HBaseBean 類
public class HBaseBean { public HBaseBean() { } /** * hbase中的rowKey */ private String rowKey; /** * hbase中的列族 */ private String columnFamily; /** * hbase 列字段名 */ private String columnQualifier; /** * 時間戳 */ private Long timeStamp; /** * 類型 */ private String type; /** * 值 */ private String value; public String getRowKey() { return rowKey; } public void setRowKey(String rowKey) { this.rowKey = rowKey; } public String getColumnFamily() { return columnFamily; } public void setColumnFamily(String columnFamily) { this.columnFamily = columnFamily; } public String getColumnQualifier() { return columnQualifier; } public void setColumnQualifier(String columnQualifier) { this.columnQualifier = columnQualifier; } public Long getTimeStamp() { return timeStamp; } public void setTimeStamp(Long timeStamp) { this.timeStamp = timeStamp; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } }
新建一個 HBaseConfig 類
/** * HBase配置類 * HBaseConfiguration.create() 會去CLASSPATH,下找hbase-site.xml * @author child * @date 2020-7-14 12:11:18 * https://hbase.apache.org/book.html#faq 官網的 * http://c.biancheng.net/view/6523.html hbase用法參考 * */ @Configuration public class HBaseConfig { @Bean public HbaseTemplate hbaseTemplate() { Connection connection = null; try { org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); connection = ConnectionFactory.createConnection(conf); } catch (IOException e) { e.printStackTrace(); } return new HbaseTemplate(connection); } @Bean public Admin admin() { Admin admin = null; try { org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); admin = connection.getAdmin(); } catch (IOException e) { e.printStackTrace(); } return admin; } }
新建一個 HbaseTemplate 封裝一些常用的方法
import com.culturalCenter.dataCenter.Utils.HBaseUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import scala.Serializable; import javax.annotation.PostConstruct; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * 對hbase 的 DDL、DML操作或者使用HBaseUtils * * @Author wulincheng * @Date 2020-7-13 15:14:08 * @Version 1.0 */ public class HbaseTemplate{ private Logger log = LoggerFactory.getLogger(this.getClass()); /** * hbase連接對象 */ private Connection connection; public HbaseTemplate() { } public HbaseTemplate(Connection connection) { setConnection(connection); } public Connection getConnection() { return connection; } private Admin getAdmin() throws IOException { return connection.getAdmin(); } public void setConnection(Connection connection) { this.connection = connection; } /** * 獲取 {@link Table} * * @param tableName 表名稱 * @return * @throws IOException */ public Table getTable(String tableName) throws IOException { return connection.getTable(TableName.valueOf(tableName)); } /** * 創建命名空間 * * @param nameSpace 命名空間名稱 */ public void createNameSpace(String nameSpace) { Assert.hasLength(nameSpace, "命名空間不能為空"); NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(nameSpace).build(); try (Admin admin = getAdmin()) { admin.createNamespace(namespaceDescriptor); } catch (IOException e) { log.error("創建命名空間 [{}] 失敗", nameSpace, e); } } /** * 刪除命名空間 * * @param nameSpace 命名空間名稱 */ public void deleteNameSpace(String nameSpace) { Assert.hasLength(nameSpace, "命名空間不能為空"); try (Admin admin = getAdmin()) { admin.deleteNamespace(nameSpace); } catch (IOException e) { log.error("刪除命名空間 [{}] 失敗", nameSpace, e); } } /** * 創建表 * * @param tableName 表名稱 * @param CF 表中的列族 * @return */ public void createTable(String tableName, String... CF) { Assert.hasLength(tableName, "表名不能為空"); Assert.notEmpty(CF, "列族不能為空"); TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(); for (String columnFamily : CF) { ColumnFamilyDescriptor build = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily)).build(); columnFamilyDescriptors.add(build); } TableDescriptor tableDescriptor = tableDescriptorBuilder.setColumnFamilies(columnFamilyDescriptors).build(); try (Admin admin = getAdmin()) { admin.createTable(tableDescriptor); } catch (IOException e) { log.error("創建 table => {} 失敗", tableName, e); } } /** * 禁用表 * * @param tableName 表名稱 * @return */ public void disableTable(String tableName) { Assert.hasLength(tableName, "表名不能為空"); try (Admin admin = getAdmin()) { admin.disableTable(TableName.valueOf(tableName)); } catch (IOException e) { log.error("禁用 table => {} 失敗", tableName, e); } } /** * 刪除表 * * @param tableName 表名稱 * @return */ public void deleteTable(String tableName) { Assert.hasLength(tableName, "表名不能為空"); try (Admin admin = getAdmin()) { //禁用表之后才能刪除表 disableTable(tableName); //刪除表 admin.deleteTable(TableName.valueOf(tableName)); } catch (IOException e) { log.error("刪除 table => {} 失敗", tableName, e); } } /** * 列出指定命名空間下的所有表 * * @param nameSpace 命名空間名稱 * @return {@link TableName} */ public List<TableName> listTable(String nameSpace) { Assert.hasLength(nameSpace, "命名空間不能為空"); try (Admin admin = getAdmin()) { TableName[] tableNames = admin.listTableNamesByNamespace(nameSpace); List<TableName> tableNameList = (List<TableName>) CollectionUtils.arrayToList(tableNames); return tableNameList; } catch (IOException e) { log.error("獲取命名空間 [{}] 下的所有表失敗", nameSpace, e); } return null; } /** * 列出 default 命名空間下的所有表 * * @return {@link TableName} */ public List<TableName> listTableByDefault() { return listTable("default"); } /** * 掃描表 * * @param tableName 表名 * @return */ public List<Map<String, String>> scanTable(String tableName) { Assert.hasLength(tableName, "表名不能為空"); Scan scan = new Scan(); return getResult(tableName, scan); } /** * 插入數據 * @param tableName 表名 * @param rowKey rowKey * @param columnFamily 列族 * @param columns 列 * @param values 值 * @return true/false */ public boolean putData(String tableName, String rowKey, String columnFamily, List<String> columns, List<String> values) { try { Table table = getAdmin().getConnection().getTable(TableName.valueOf(tableName)); Put put = new Put(Bytes.toBytes(rowKey)); for (int i=0; i<columns.size(); i++) { put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns.get(i)), Bytes.toBytes(values.get(i))); } table.put(put); return true; } catch (IOException e) { e.printStackTrace(); return false; } } /** * 向表中添加數據 * * @param tableName * @param puts */ public void putRowData(String tableName, Put... puts) { Assert.hasLength(tableName, "表名不能為空"); Assert.notEmpty(puts, "添加數據不能為空"); try (Table table = getTable(tableName)) { List<Put> putList = (List<Put>) CollectionUtils.arrayToList(puts); table.put(putList); } catch (IOException e) { log.error("添加數據失敗"); } } /** * 向表中添加數據 * * @param tableName * @param puts */ public void putRowData(String tableName, List<Put> puts) { Assert.hasLength(tableName, "表名不能為空"); Assert.notEmpty(puts, "添加數據不能為空"); try (Table table = getTable(tableName)) { table.put(puts); } catch (IOException e) { log.error("添加數據失敗"); } } /** * 刪除數據 * * @param tableName * @param rowKeys */ public void deleteRowData(String tableName, String... rowKeys) { Assert.hasLength(tableName, "表名不能為空"); Assert.notEmpty(rowKeys, "rowKey 列表不能為空"); try (Table table = getTable(tableName)) { List<Delete> deleteList = new ArrayList<>(rowKeys.length); for (String rowKey : rowKeys) { Delete delete = new Delete(Bytes.toBytes(rowKey)); deleteList.add(delete); } table.delete(deleteList); } catch (IOException e) { log.error("刪除數據失敗, rowKey => [{}]", rowKeys, e); } } /** * 獲取單行數據 * * @param tableName 表名 * @param rowKey rowKey * @return */ public Map<String, String> getRowData(String tableName, String rowKey) { Assert.hasLength(tableName, "表名不能為空"); Assert.hasLength(rowKey, "rowKey 不能為空"); Get get = new Get(Bytes.toBytes(rowKey)); Map<String, String> rowData = null; try (Table table = getTable(tableName);) { Result result = table.get(get); rowData = HBaseUtils.getRowData(result); } catch (IOException e) { log.error("查詢 rowKey 為 [{}] 的數據時出錯", rowKey, e); } return rowData; } /** * 獲取單行數據 * * @param tableName 表名 * @param rowKey rowKey * @return */ public HBaseBean getRowData_1(String tableName, String rowKey) { Assert.hasLength(tableName, "表名不能為空"); Assert.hasLength(rowKey, "rowKey 不能為空"); HBaseBean hBaseBean = null; Get get = new Get(Bytes.toBytes(rowKey)); Map<String, String> rowData = null; try (Table table = getTable(tableName);) { Result result = table.get(get); hBaseBean = HBaseUtils.toHBaseBean(result); } catch (IOException e) { log.error("查詢 rowKey 為 [{}] 的數據時出錯", rowKey, e); } return hBaseBean; } /** * 獲取某個Rowkey 的一個列族所有的數據 * * @param tableName 表名稱 * @param rowKey rowkey * @param columnFamily 列族 * @return */ public HBaseBean getRowAndFamily(String tableName, String rowKey, String columnFamily) { Assert.hasLength(tableName, "表名不能為空"); Assert.hasLength(rowKey, "rowKey 不能為空"); HBaseBean hBaseBean = null; Get get = new Get(Bytes.toBytes(rowKey)); get.addFamily(Bytes.toBytes(columnFamily)); Map<String, String> rowData = null; try (Table table = getTable(tableName);) { Result result = table.get(get); hBaseBean = HBaseUtils.toHBaseBean(result); } catch (IOException e) { log.error("查詢 rowKey 為 [{}] 的數據時出錯", rowKey, e); } return hBaseBean; } /** * rowKey 范圍查找 * * @param tableName * @param startRowKey * @param stopRowKey * @return */ public List<HBaseBean> scanStartAndStopRow(String tableName, String startRowKey, String stopRowKey) { Assert.hasLength(tableName, "表名不能為空"); Assert.hasLength(startRowKey, "startRowKey 不能為空"); Assert.hasLength(stopRowKey, "startRowKey 不能為空"); List<HBaseBean> hBaseBeans = null; try (Table table = getTable(tableName)) { Scan scan = new Scan(); scan.withStartRow(Bytes.toBytes(startRowKey)); scan.withStopRow(Bytes.toBytes(stopRowKey)); ResultScanner scanner = table.getScanner(scan); hBaseBeans = HBaseUtils.toHBaseBeans(scanner.iterator()); } catch (IOException e) { log.error("查詢表數據出錯", e); } return hBaseBeans; } /** * 獲取所有數據 */ private List<Map<String, String>> getResult(String tableName, Scan scan) { List<Map<String, String>> result = null; try (Table table = getTable(tableName)) { ResultScanner scanner = table.getScanner(scan); for (Result rs : scanner) { Map<String, String> column = HBaseUtils.getRowData(rs); result.add(column); } } catch (IOException e) { log.error("查詢表數據出錯", e); } return result; } }
新建一個HBase(HBaseUtils 也可以直接整合在HBase)幫助類
import com.culturalCenter.dataCenter.globalconfig.hbasee.HBaseBean; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import java.io.IOException; import java.util.*; @Service public class HBaseUtils { // @Autowired // private HbaseTemplate hbaseTemplate; @Autowired private Admin hbaseAdmin; /** * 判斷表是否存在 * * @param tableName 表名 * @return true/false */ public boolean isExists(String tableName) { boolean tableExists = false; try { TableName table = TableName.valueOf(tableName); tableExists = hbaseAdmin.tableExists(table); } catch (IOException e) { e.printStackTrace(); } return tableExists; } /** * 分區【10, 20, 30】 -> ( ,10] (10,20] (20,30] (30, ) * @param keys 分區集合[10, 20, 30] * @return byte二維數組 */ private byte[][] getSplitKeys(List<String> keys) { byte[][] splitKeys = new byte[keys.size()][]; TreeSet<byte[]> rows = new TreeSet<>(Bytes.BYTES_COMPARATOR); for(String key : keys) { rows.add(Bytes.toBytes(key)); } int i = 0; for (byte[] row : rows) { splitKeys[i] = row; i ++; } return splitKeys; } public static HBaseBean toHBaseBean(Result rs){ List<Cell> cells = rs.listCells(); HBaseBean hBaseBean=new HBaseBean(); cells.forEach(cell -> { hBaseBean.setRowKey(Bytes.toString(CellUtil.cloneRow(cell))); hBaseBean.setColumnFamily(Bytes.toString(CellUtil.cloneFamily(cell))); hBaseBean.setColumnQualifier(Bytes.toString(CellUtil.cloneQualifier(cell))); hBaseBean.setTimeStamp(cell.getTimestamp()); hBaseBean.setType(cell.getType().toString()); hBaseBean.setValue(Bytes.toString(CellUtil.cloneValue(cell))); }); return hBaseBean; } public static List<HBaseBean> toHBaseBeans(Result rs){ List<HBaseBean> hBaseBeans=new ArrayList<>(); List<Cell> cells = rs.listCells(); cells.forEach(cell -> { HBaseBean hBaseBean=new HBaseBean(); hBaseBean.setRowKey(Bytes.toString(CellUtil.cloneRow(cell))); hBaseBean.setColumnFamily(Bytes.toString(CellUtil.cloneFamily(cell))); hBaseBean.setColumnQualifier(Bytes.toString(CellUtil.cloneQualifier(cell))); hBaseBean.setTimeStamp(cell.getTimestamp()); hBaseBean.setType(cell.getType().toString()); hBaseBean.setValue(Bytes.toString(CellUtil.cloneValue(cell))); hBaseBeans.add(hBaseBean); }); return hBaseBeans; } public static List<HBaseBean> toHBaseBeans(Iterator<Result> resultIterator){ List<HBaseBean> hBaseBeans=new ArrayList<>(); while (resultIterator.hasNext()){ Result rs = resultIterator.next(); List<Cell> cells = rs.listCells(); cells.forEach(cell -> { HBaseBean hBaseBean=new HBaseBean(); hBaseBean.setRowKey(Bytes.toString(CellUtil.cloneRow(cell))); hBaseBean.setColumnFamily(Bytes.toString(CellUtil.cloneFamily(cell))); hBaseBean.setColumnQualifier(Bytes.toString(CellUtil.cloneQualifier(cell))); hBaseBean.setTimeStamp(cell.getTimestamp()); hBaseBean.setType(cell.getType().toString()); hBaseBean.setValue(Bytes.toString(CellUtil.cloneValue(cell))); hBaseBeans.add(hBaseBean); }); } return hBaseBeans; } /** * 獲取單行結果 * @return */ public static Map<String, String> getRowData(Result rs) { Map<String, String> column = new HashMap<>(); column.put("rowKey", Bytes.toString(rs.getRow())); List<Cell> cells = rs.listCells(); for (Cell cell : cells) { String columnName = Bytes.toString(CellUtil.cloneQualifier(cell)); String columnValue = Bytes.toString(CellUtil.cloneValue(cell)); column.put(columnName, columnValue); } return column; } public static void setStartAndStop(String startRow, String stopRow, Scan scan) { if (!StringUtils.isEmpty(startRow)) { scan.withStartRow(Bytes.toBytes(startRow)); } if (!StringUtils.isEmpty(stopRow)) { scan.withStopRow(Bytes.toBytes(stopRow)); } } }
新建一個類配置spark
SparkContextBean
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import scala.Serializable; @Configuration @ConfigurationProperties(prefix="spark") public class SparkContextBean implements Serializable { //spark的安裝地址 private String sparkHome = ""; //應用的名稱 private String appName = ""; //master的地址 private String master = ""; @Bean @ConditionalOnMissingBean(SparkConf.class) public SparkConf sparkConf() throws Exception { SparkConf conf = new SparkConf() .setSparkHome(sparkHome) .setAppName(appName) .setMaster(master); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); return conf; } @Bean @ConditionalOnMissingBean(JavaSparkContext.class) public JavaSparkContext javaSparkContext() throws Exception { return new JavaSparkContext(sparkConf()); } public String getSparkHome() { return sparkHome; } public void setSparkHome(String sparkHome) { this.sparkHome = sparkHome; } public String getAppName() { return appName; } public void setAppName(String appName) { this.appName = appName; } public String getMaster() { return master; } public void setMaster(String master) { this.master = master; } }
然后把服務上配置好的hbase-site.xml文件復制到 resources 根目錄下就可以了