Hbase默認建表是只有一個分區的,開始的時候所有的數據都會查詢這個分區,當這個分區達到一定大小的時候,就會進行做split操作;
因此為了確保regionserver的穩定和高效,應該盡量避免region分裂和熱點的問題;
那么有的同學在做預分區的時候,可能是按照:
1):
通過Hbase提供的api: bin/hbase org.apache.hadoop.hbase.util.RegionSplitter demo1 HexStringSplit -c 10 -f info 默認建表是沒有開啟Bloomfilter和壓縮參數的,這里為了提供讀性能,建議開啟Bloomfilter,同時使用壓縮SNAPPY,進入hbase shell,首先需要disable 'poidb',然后使用使用 alter 'poidb',{NAME => 'info',BLOOMFILTER => 'ROWCOL',COMPRESSION => 'SNAPPY',VERSIONS => '1'} -C 多少個分區 -f 列族
2):
通過指定create命令
3):
沒做任何修飾的代碼操作

package com.dongfeng.code.tools.writeDb import com.dongfeng.code.tools.GlobalConfigUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.hbase.client.{Admin, Connection, ConnectionFactory} import org.apache.hadoop.hbase.util.Bytes /** * Created by angel */ object WriteToHbaseDB { private val config: Configuration = HBaseConfiguration.create() config.set("hbase.zookeeper.quorum" , GlobalConfigUtils.hbaseQuorem) config.set("hbase.master" , GlobalConfigUtils.hbaseMaster) config.set("hbase.zookeeper.property.clientPort" , GlobalConfigUtils.clientPort) config.set("hbase.rpc.timeout" , GlobalConfigUtils.rpcTimeout) config.set("hbase.client.operator.timeout" , GlobalConfigUtils.operatorTimeout) //def scannTimeout = conf.getString("c") config.set("hbase.client.scanner.timeout.period" , GlobalConfigUtils.scannTimeout) private val conn: Connection = ConnectionFactory.createConnection(config) private val admin: Admin = conn.getAdmin //創建表 def createTable(tableName:TableName, columnFamily:String) = { val hTableDescriptor = new HTableDescriptor(tableName) val hColumnDescriptor = new HColumnDescriptor(columnFamily) hTableDescriptor.addFamily(hColumnDescriptor) //如果表不存在則創建表 if(!admin.tableExists(tableName)){ var splitKeys: List[Array[Byte]] = List( Bytes.toBytes("40000") , Bytes.toBytes("80000") , Bytes.toBytes("120000") , Bytes.toBytes("160000") ) // for (x <- 1 to 5) { // if(x<10){ // splitKeys = splitKeys.+:(Bytes.toBytes(x.toString)) // }else{ // splitKeys = splitKeys.+:(Bytes.toBytes(x.toString)) // } // } try{ //創建表 admin.createTable(hTableDescriptor, splitKeys.toArray) }finally { admin.close() } } } def main(args: Array[String]): Unit = { createTable(TableName.valueOf("demo3") , "info") } }
其實上面的這些操作,會無形中限制我們的rowkey的最初設計,既要考慮高效的字典排列方式,還要考慮熱點問題。往往稍微有點偏差,就會出現大部分的數據都往一個region中跑,顯然不合理
因此,我覺得至少在我的業務中是需要進行rowkey的加鹽或者MD5操作的,達到rowkey的散列
我這里進行MD5加密處理

package com.df.tools import java.util.concurrent.atomic.AtomicInteger import com.df.Contant.GlobalConfigUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes, MD5Hash} import org.apache.hadoop.hbase.util.RegionSplitter.HexStringSplit /** * Created by angel */ object HbaseTools { private val config: Configuration = HBaseConfiguration.create() config.set("hbase.zookeeper.quorum" , GlobalConfigUtils.hbaseQuorem) config.set("hbase.master" , GlobalConfigUtils.hbaseMaster) config.set("hbase.zookeeper.property.clientPort" , GlobalConfigUtils.clientPort) config.set("hbase.rpc.timeout" , GlobalConfigUtils.rpcTimeout) config.set("hbase.client.operator.timeout" , GlobalConfigUtils.operatorTimeout) config.set("hbase.client.scanner.timeout.period" , GlobalConfigUtils.scannTimeout) private val conn: Connection = ConnectionFactory.createConnection(config) private val admin: Admin = conn.getAdmin val atomic = new AtomicInteger(0) var resultAtomic = 0 /** * @return 構建表的連接 * */ def Init(tableName: String , columnFamily:String):Table = { val hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)) val hColumnDescriptor = new HColumnDescriptor(columnFamily) hTableDescriptor.addFamily(hColumnDescriptor) if(!admin.tableExists(TableName.valueOf(tableName))){ // admin.createTable(hTableDescriptor) createHTable(conn , tableName , 10 , Array(columnFamily)) } conn.getTable(TableName.valueOf(tableName)) } // 對指定的列構造rowKey,采用Hash前綴拼接業務主鍵的方法 def rowKeyWithHashPrefix(column: String*): Array[Byte] = { val rkString = column.mkString("") val hash_prefix = getHashCode(rkString) val rowKey = Bytes.add(Bytes.toBytes(hash_prefix), Bytes.toBytes(rkString)) rowKey } // 對指定的列構造rowKey, 采用Md5 前綴拼接業務主鍵方法,主要目的是建表時采用MD5 前綴進行預分區 def rowKeyWithMD5Prefix(separator:String,length: Int,column: String*): Array[Byte] = { val columns = column.mkString(separator) var md5_prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(columns)) if (length < 8){ md5_prefix = md5_prefix.substring(0, 8) }else if (length >= 8 || length <= 32){ md5_prefix = md5_prefix.substring(0, length) } val row = Array(md5_prefix,columns) val rowKey = Bytes.toBytes(row.mkString(separator)) rowKey } // 對指定的列構造RowKey,采用MD5方法 def rowKeyByMD5(column: String*): Array[Byte] = { val rkString = column.mkString("") val md5 = MD5Hash.getMD5AsHex(Bytes.toBytes(rkString)) val rowKey = Bytes.toBytes(md5) rowKey } // 直接拼接業務主鍵構造rowKey def rowKey(column:String*):Array[Byte] = Bytes.toBytes(column.mkString("")) // Hash 前綴的方法:指定列拼接之后與最大的Short值做 & 運算 // 目的是預分區,盡量保證數據均勻分布 private def getHashCode(field: String): Short ={ (field.hashCode() & 0x7FFF).toShort } /** * @param tablename 表名 * @param regionNum 預分區數量 * @param columns 列簇數組 */ def createHTable(connection: Connection, tablename: String,regionNum: Int, columns: Array[String]): Unit = { val nameSpace = "df" val hexsplit: HexStringSplit = new HexStringSplit() // 預先構建分區,指定分區的start key val splitkeys: Array[Array[Byte]] = hexsplit.split(regionNum) val admin = connection.getAdmin val tableName = TableName.valueOf(tablename) if (!admin.tableExists(tableName)) { val tableDescriptor = new HTableDescriptor(tableName) if (columns != null) { columns.foreach(c => { val hcd = new HColumnDescriptor(c.getBytes()) //設置列簇 hcd.setMaxVersions(1) hcd.setCompressionType(Algorithm.SNAPPY) //設定數據存儲的壓縮類型.默認無壓縮(NONE) tableDescriptor.addFamily(hcd) }) } admin.createTable(tableDescriptor,splitkeys) } } /** * @param tableName * @param key * @param columnFamily * @param column * @param data 要落地的數據 * */ def putData(tableName: String , key:String , columnFamily:String , column:String , data:String):Int = { val table: Table = Init(tableName , columnFamily) try{ val rowkey = HbaseTools.rowKeyByMD5(key) val put: Put = new Put(rowkey) put.addColumn(Bytes.toBytes(columnFamily) ,Bytes.toBytes(column.toString) , Bytes.toBytes(data.toString)) table.put(put) resultAtomic = atomic.incrementAndGet() }catch{ case e:Exception => e.printStackTrace() resultAtomic = atomic.decrementAndGet() }finally { table.close() } resultAtomic } /** * @param mapData 要插入的數據[列明 , 值] * */ def putMapData(tableName: String , columnFamily:String, key:String , mapData:Map[String , String]):Int = { val table: Table = Init(tableName , columnFamily) try{ //TODO rowKeyWithMD5Prefix val rowkey = HbaseTools.rowKeyByMD5(key) val put: Put = new Put(rowkey) if(mapData.size > 0){ for((k , v) <- mapData){ put.addColumn(Bytes.toBytes(columnFamily) ,Bytes.toBytes(k.toString) , Bytes.toBytes(v.toString)) } } table.put(put) resultAtomic = atomic.incrementAndGet() }catch{ case e:Exception => e.printStackTrace() resultAtomic = atomic.decrementAndGet() }finally { table.close() } resultAtomic } def deleteData(tableName: String , rowKey:String , columnFamily:String):Int ={ val table: Table = Init(tableName , columnFamily) try{ val delete = new Delete(Bytes.toBytes(rowKey)) table.delete(delete) resultAtomic = atomic.decrementAndGet() }catch{ case e:Exception => e.printStackTrace() resultAtomic = atomic.decrementAndGet() }finally { table.close() } resultAtomic } def convertScanToString(scan: Scan):String={ val proto = ProtobufUtil.toScan(scan) return Base64.encodeBytes(proto.toByteArray) } }