package com.jason.lala.pipe.dbinfo import com.jason.lala.common.query.option.HbaseOptions import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, TableName} import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, HTable, Scan} import org.apache.hadoop.hbase.filter.PageFilter import org.apache.hadoop.hbase.util.Bytes class HbaseShow(private val map: Map[String, String]) { private val con = HbaseShow.getCon private val hBaseAdmin = con.getAdmin private val nameSpace = map.getOrElse(HbaseOptions.NAMESPACE, "") private val tbl = map.getOrElse(HbaseOptions.TBL, "") private val cf = map.getOrElse(HbaseOptions.CF, "") private val hbaseTable = s"${nameSpace}:${tbl}" /** * 获取hbase 中的namespaces * * @return */ def getNamespaces: String = { val tableNames = hBaseAdmin.listTableNames val arrString = for { tableName <- tableNames arr = tableName.getNameAsString.split(":", 2) } yield { arr(0) } arrString.toSet.mkString(",") } /** * 获取对应namespace 下的tablenames * * @return */ def getTableNames(): String = { val tableNames = hBaseAdmin.listTableNames val arrString = for { tableName <- tableNames arr = tableName.getNameAsString.split(":", 2) if arr(0) == nameSpace } yield { arr(1) } arrString.mkString(",") } def getColumnsWithCF(): String = getColumnsWithCF(50).mkString(",") def getColumns(): String = getColumns(50).mkString(",") /** * namespace:colname,namespace2:colname2 * * @param limitScan * @return */ private def getColumnsWithCF(limitScan: Int): Array[String] = { val columnList = scala.collection.mutable.TreeSet[String]() //val hTable = new HTable(hBaseConfiguration, hbaseTable) val hTable = con.getTable(TableName.valueOf(hbaseTable)) val scan = new Scan scan.setFilter(new PageFilter(limitScan)) val results = hTable.getScanner(scan) import scala.collection.JavaConversions._ //======================== for (result <- results) { for (cell <- result.listCells()) { val family = Bytes.toString(CellUtil.cloneFamily(cell)) val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)) columnList.add(s"$family:$qualifier") } } //======================== Array("rowkey:key1") ++ (columnList.toArray[String]) } /** * colname,colname2... * * @param limitScan * @return */ private def getColumns(limitScan: Int): Array[String] = { val columnList = getColumnsWithCF(limitScan) val list = columnList.filter { s => val arr = s.split(":", 2) arr(0) == cf }.map(_.split(":", 2)(1)) list } def getCfs(): String = { val htd = hBaseAdmin.getTableDescriptor(TableName.valueOf(hbaseTable)) val cfs = for (cf <- htd.getColumnFamilies) yield { cf.getNameAsString } cfs.mkString(",") } /** * * @param limitScan * @return */ def getSampleData(limitScan: Long = DBShow.Num): String = { val hTable = con.getTable(TableName.valueOf(hbaseTable)) val scan = new Scan scan.setFilter(new PageFilter(limitScan)) val results = hTable.getScanner(scan) import scala.collection.JavaConversions._ val sample = for { result <- results cell <- result.listCells() } yield { //取行健 val rowKey = Bytes.toString(CellUtil.cloneRow(cell)); //取到时间戳 val timestamp = cell.getTimestamp(); //取到族列 val family = Bytes.toString(CellUtil.cloneFamily(cell)); //取到修饰名 val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)); //取到值 val value = Bytes.toString(CellUtil.cloneValue(cell)); //stu1 column=course:english, timestamp=1544063446429, value=90 s"$rowKey, column=$family:$qualifier, timestamp=$timestamp, value=${value}" } sample.mkString("\n") } } object HbaseShow { private var conn: Connection = null def getCon: Connection = { if (conn == null) { val hBaseConfiguration = HBaseConfiguration.create() conn = ConnectionFactory.createConnection(hBaseConfiguration) } conn } }