hbase版本:1.3.1
目的:HBase新API的使用方法。
嘗試並驗證了如下幾種java api的使用方法。
1.創建表
2.創建表(預分區)
3.單條插入
4.批量插入
5.批量插入(客戶端緩存)
6.單條get
7.批量get
8.簡單scan
9.混合使用
■實際代碼
https://github.com/quchunhui/hbase_sample
■pom.xml文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <artifactId>hbase_sample</artifactId> <groupId>hbase_sample</groupId> <version>1.0</version> <modelVersion>4.0.0</modelVersion> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.3.1</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <outputDirectory>target/classes</outputDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.4</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
===1.創建表===
package api; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.regionserver.BloomType; public class create_table_sample1 { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.1.80,192.168.1.81,192.168.1.82"); Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("TEST1")); //MemStore大小。默認128M,不能小於1M desc.setMemStoreFlushSize(2097152L); //HFile最大size。默認10G。不能小於2M desc.setMaxFileSize(10485760L); //日志flush的時候是同步寫,還是異步寫 desc.setDurability(Durability.SYNC_WAL); HColumnDescriptor family1 = new HColumnDescriptor(constants.COLUMN_FAMILY_DF.getBytes()); family1.setTimeToLive(2 * 60 * 60 * 24); //過期時間 family1.setMaxVersions(2); //版本數 family1.setBlockCacheEnabled(true); desc.addFamily(family1); HColumnDescriptor family2 = new HColumnDescriptor(constants.COLUMN_FAMILY_EX.getBytes()); //數據生存時間 family2.setTimeToLive(3 * 60 * 60 * 24); //最小版本數,默認0。 family2.setMinVersions(2); //最大版本數,默認-1 family2.setMaxVersions(3); //bloom過濾器,有ROW和ROWCOL,ROWCOL除了過濾ROW還要過濾列族。默認ROW。 family2.setBloomFilterType(BloomType.ROW); //數據塊的大小,單位bytes,默認值是65536。 family2.setBlocksize(65536); //數據塊緩存,保存着每個HFile數據塊的startKey。默認true。 family2.setBlockCacheEnabled(true); // //寫的時候緩存bloom。默認false。 // family2.setCacheBloomsOnWrite(false); // //寫的時候緩存索引。默認false。 // family2.setCacheIndexesOnWrite(false); // //存儲的時候使用壓縮算法。默認NONE。 // family2.setCompressionType(Compression.Algorithm.NONE); // //進行compaction的時候使用壓縮算法。默認NONE。 // family2.setCompactionCompressionType(Compression.Algorithm.NONE); // //壓縮內存和存儲的數據,區別於Snappy。默認NONE。 // family2.setDataBlockEncoding(DataBlockEncoding.NONE); // //關閉的時候,是否剔除緩存的塊。默認false。 // family2.setEvictBlocksOnClose(false); // //讓數據塊緩存在LRU緩存里面有更高的優先級。默認false。 // family2.setInMemory(false); // //集群間復制的時候,如果被設置成REPLICATION_SCOPE_LOCAL就不能被復制了。默認0 // family2.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); desc.addFamily(family2); admin.createTable(desc); admin.close(); connection.close(); } }
===2.創建表(預分區)===
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.util.Bytes; public class create_table_sample2 { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.1.80,192.168.1.81,192.168.1.82"); Connection connection = ConnectionFactory.createConnection(conf); Admin admin = connection.getAdmin(); TableName table_name = TableName.valueOf("TEST1"); if (admin.tableExists(table_name)) { admin.disableTable(table_name); admin.deleteTable(table_name); } HTableDescriptor desc = new HTableDescriptor(table_name); HColumnDescriptor family1 = new HColumnDescriptor(constants.COLUMN_FAMILY_DF.getBytes()); family1.setTimeToLive(3 * 60 * 60 * 24); //過期時間 family1.setBloomFilterType(BloomType.ROW); //按行過濾 family1.setMaxVersions(3); //版本數 desc.addFamily(family1); HColumnDescriptor family2 = new HColumnDescriptor(constants.COLUMN_FAMILY_EX.getBytes()); family2.setTimeToLive(2 * 60 * 60 * 24); //過期時間 family2.setBloomFilterType(BloomType.ROW); //按行過濾 family2.setMaxVersions(2); //版本數 desc.addFamily(family2); byte[][] splitKeys = { Bytes.toBytes("row01"), Bytes.toBytes("row02"), Bytes.toBytes("row04"), Bytes.toBytes("row06"), Bytes.toBytes("row08"), }; admin.createTable(desc, splitKeys); admin.close(); connection.close(); } }
===3.單條插入===
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.util.Random; public class table_put_sample1 { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.1.80,192.168.1.81,192.168.1.82"); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(constants.TABLE_NAME)); Random random = new Random(); String[] rows = new String[] {"01", "02", "03"}; String[] names = new String[] {"zhang san", "li si", "wang wu", "wei liu"}; String[] sexs = new String[] {"men", "women"}; String[] heights = new String[] {"165cm", "170cm", "175cm", "180cm"}; String[] weights = new String[] {"50kg", "55kg", "60kg", "65kg", "70kg", "75kg", "80kg"}; Put put = new Put(Bytes.toBytes("row" + rows[random.nextInt(rows.length)])); String name = names[random.nextInt(names.length)]; put.addColumn(constants.COLUMN_FAMILY_DF.getBytes(), "name".getBytes(), name.getBytes()); String sex = sexs[random.nextInt(sexs.length)]; put.addColumn(constants.COLUMN_FAMILY_DF.getBytes(), "sex".getBytes(), sex.getBytes()); String height = heights[random.nextInt(heights.length)]; put.addColumn(constants.COLUMN_FAMILY_EX.getBytes(), "height".getBytes(), height.getBytes()); String weight = weights[random.nextInt(weights.length)]; put.addColumn(constants.COLUMN_FAMILY_EX.getBytes(), "weight".getBytes(), weight.getBytes()); table.put(put); table.close(); connection.close(); } }
===4.批量插入===
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.util.ArrayList; import java.util.List; import java.util.Random; public class table_put_sample2 { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.1.80,192.168.1.81,192.168.1.82"); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(constants.TABLE_NAME)); Random random = new Random(); String[] rows = new String[] {"01", "02", "03"}; String[] names = new String[] {"zhang san", "li si", "wang wu", "wei liu"}; String[] sexs = new String[] {"men", "women"}; String[] heights = new String[] {"165cm", "170cm", "175cm", "180cm"}; String[] weights = new String[] {"50kg", "55kg", "60kg", "65kg", "70kg", "75kg", "80kg"}; List<Put> puts = new ArrayList<>(); for(String row : rows) { Put put = new Put(Bytes.toBytes("row" + row)); String name = names[random.nextInt(names.length)]; put.addColumn(constants.COLUMN_FAMILY_DF.getBytes(), "name".getBytes(), name.getBytes()); String sex = sexs[random.nextInt(sexs.length)]; put.addColumn(constants.COLUMN_FAMILY_DF.getBytes(), "sex".getBytes(), sex.getBytes()); String height = heights[random.nextInt(heights.length)]; put.addColumn(constants.COLUMN_FAMILY_EX.getBytes(), "height".getBytes(), height.getBytes()); String weight = weights[random.nextInt(weights.length)]; put.addColumn(constants.COLUMN_FAMILY_EX.getBytes(), "weight".getBytes(), weight.getBytes()); puts.add(put); } table.put(puts); table.close(); connection.close(); } }
===5.批量插入(客戶端緩存)===
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.util.ArrayList; import java.util.List; import java.util.Random; public class table_put_sample4 { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.1.80,192.168.1.81,192.168.1.82"); conf.set("hbase.client.write.buffer", "1048576");//1M Connection connection = ConnectionFactory.createConnection(conf); BufferedMutator table = connection.getBufferedMutator(TableName.valueOf(constants.TABLE_NAME)); System.out.print("[--------]write buffer size = " + table.getWriteBufferSize()); Random random = new Random(); String[] rows = new String[] {"01", "02", "03", "04", "05"}; String[] names = new String[] {"zhang san", "li si", "wang wu", "wei liu"}; String[] sexs = new String[] {"men", "women"}; String[] heights = new String[] {"165cm", "170cm", "175cm", "180cm"}; String[] weights = new String[] {"50kg", "55kg", "60kg", "65kg", "70kg", "75kg", "80kg"}; List<Mutation> batch = new ArrayList<>(); for(String row : rows) { Put put = new Put(Bytes.toBytes("row" + row)); String name = names[random.nextInt(names.length)]; put.addColumn(constants.COLUMN_FAMILY_DF.getBytes(), "name".getBytes(), name.getBytes()); String sex = sexs[random.nextInt(sexs.length)]; put.addColumn(constants.COLUMN_FAMILY_DF.getBytes(), "sex".getBytes(), sex.getBytes()); String height = heights[random.nextInt(heights.length)]; put.addColumn(constants.COLUMN_FAMILY_EX.getBytes(), "height".getBytes(), height.getBytes()); String weight = weights[random.nextInt(weights.length)]; put.addColumn(constants.COLUMN_FAMILY_EX.getBytes(), "weight".getBytes(), weight.getBytes()); batch.add(put); } table.mutate(batch); table.flush(); table.close(); connection.close(); } }
===6.單條get===
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; public class table_get_sample3 { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.1.80,192.168.1.80,192.168.1.82"); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(constants.TABLE_NAME)); Get get = new Get(("row01").getBytes()); get.addColumn(constants.COLUMN_FAMILY_DF.getBytes(), "name".getBytes()); get.addColumn(constants.COLUMN_FAMILY_DF.getBytes(), "sex".getBytes()); get.addColumn(constants.COLUMN_FAMILY_EX.getBytes(), "height".getBytes()); get.addColumn(constants.COLUMN_FAMILY_EX.getBytes(), "weight".getBytes()); Result result = table.get(get); byte[] name = result.getValue(constants.COLUMN_FAMILY_DF.getBytes(), "name".getBytes()); byte[] sex = result.getValue(constants.COLUMN_FAMILY_DF.getBytes(), "sex".getBytes()); byte[] height = result.getValue(constants.COLUMN_FAMILY_EX.getBytes(), "height".getBytes()); byte[] weight = result.getValue(constants.COLUMN_FAMILY_EX.getBytes(), "weight".getBytes()); System.out.print("[------]name=" + new String(name) + "\n"); System.out.print("[------]sex=" + new String(sex) + "\n"); System.out.print("[------]height=" + new String(height) + "\n"); System.out.print("[------]weight=" + new String(weight) + "\n"); table.close(); connection.close(); } }
===7.批量get===
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import java.util.ArrayList; import java.util.List; public class table_get_sample4 { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.1.80,192.168.1.81,192.168.1.82"); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(constants.TABLE_NAME)); List<Get> gets = new ArrayList<>(); Get get1 = new Get(("row01").getBytes()); get1.addFamily(constants.COLUMN_FAMILY_DF.getBytes()); get1.addFamily(constants.COLUMN_FAMILY_EX.getBytes()); gets.add(get1); Get get2 = new Get(("row02").getBytes()); get2.addColumn(constants.COLUMN_FAMILY_DF.getBytes(), "name".getBytes()); get2.addColumn(constants.COLUMN_FAMILY_DF.getBytes(), "sex".getBytes()); get2.addColumn(constants.COLUMN_FAMILY_EX.getBytes(), "height".getBytes()); get2.addColumn(constants.COLUMN_FAMILY_EX.getBytes(), "weight".getBytes()); gets.add(get2); Result[] results = table.get(gets); for ( Result result : results) { byte[] name = result.getValue(constants.COLUMN_FAMILY_DF.getBytes(), "name".getBytes()); byte[] sex = result.getValue(constants.COLUMN_FAMILY_DF.getBytes(), "sex".getBytes()); byte[] height = result.getValue(constants.COLUMN_FAMILY_EX.getBytes(), "height".getBytes()); byte[] weight = result.getValue(constants.COLUMN_FAMILY_EX.getBytes(), "weight".getBytes()); System.out.print("[------]name=" + new String(name) + "\n"); System.out.print("[------]sex=" + new String(sex) + "\n"); System.out.print("[------]height=" + new String(height) + "\n"); System.out.print("[------]weight=" + new String(weight) + "\n"); } table.close(); connection.close(); } }
===8.簡單scan===
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; public class table_scan_sample3 { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.1.80,192.168.1.81,192.168.1.82"); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(constants.TABLE_NAME)); Scan scan = new Scan(); scan.addColumn(constants.COLUMN_FAMILY_DF.getBytes(), "name".getBytes()); scan.addFamily(constants.COLUMN_FAMILY_EX.getBytes()); ResultScanner rs = table.getScanner(scan); for (Result r = rs.next(); r != null; r = rs.next()) { byte[] row_key = r.getRow(); System.out.print("[------]row_key=" + new String(row_key) + "\n"); byte[] name = r.getValue(constants.COLUMN_FAMILY_DF.getBytes(), "name".getBytes()); System.out.print("[------]name=" + new String(name) + "\n"); byte[] weight = r.getValue(constants.COLUMN_FAMILY_EX.getBytes(), "weight".getBytes()); System.out.print("[------]weight=" + new String(weight) + "\n"); } table.close(); connection.close(); } }
===9.混合使用===
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import java.util.ArrayList; import java.util.List; public class table_batch_sample2 { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.1.80,192.168.1.81,192.168.1.82"); conf.set("hbase.client.write.buffer", "1048576");//1M Connection connection = ConnectionFactory.createConnection(conf); BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(constants.TABLE_NAME)); List<Mutation> batch = new ArrayList<>(); byte[] row_key = random.getRowKey(); Put put = new Put(row_key); put.addColumn(constants.COLUMN_FAMILY_DF.getBytes(), "name".getBytes(), random.getName()); put.addColumn(constants.COLUMN_FAMILY_DF.getBytes(), "sex".getBytes(), random.getSex()); put.addColumn(constants.COLUMN_FAMILY_EX.getBytes(), "height".getBytes(), random.getHeight()); put.addColumn(constants.COLUMN_FAMILY_EX.getBytes(), "weight".getBytes(), random.getWeight()); batch.add(put); Delete delete = new Delete(row_key); delete.addFamily(constants.COLUMN_FAMILY_DF.getBytes()); delete.addColumn(constants.COLUMN_FAMILY_EX.getBytes(), "weight".getBytes()); batch.add(delete); mutator.mutate(batch); Table table = connection.getTable(TableName.valueOf(constants.TABLE_NAME)); Get get = new Get(row_key); Result result1 = table.get(get); System.out.print("[------]name=" + getValue(result1, constants.COLUMN_FAMILY_DF, "name") + "\n"); System.out.print("[------]sex=" + getValue(result1, constants.COLUMN_FAMILY_DF, "sex") + "\n"); System.out.print("[------]height=" + getValue(result1, constants.COLUMN_FAMILY_EX, "height") + "\n"); System.out.print("[------]weight=" + getValue(result1, constants.COLUMN_FAMILY_EX, "weight") + "\n"); mutator.flush(); Result result2 = table.get(get); System.out.print("[------]name=" + getValue(result2, constants.COLUMN_FAMILY_DF, "name") + "\n"); System.out.print("[------]sex=" + getValue(result2, constants.COLUMN_FAMILY_DF, "sex") + "\n"); System.out.print("[------]height=" + getValue(result2, constants.COLUMN_FAMILY_EX, "height") + "\n"); System.out.print("[------]weight=" + getValue(result2, constants.COLUMN_FAMILY_EX, "weight") + "\n"); table.close(); mutator.close(); connection.close(); } private static String getValue(Result rs, String family, String column) { byte[] value = rs.getValue(family.getBytes(), column.getBytes()); if (value == null) { return ""; } else { return new String(value); } } }
===補充===
1)HTableDescriptor特性
可以通過 HTableDescriptor對象設置Table的相關特性 ,比如
//日志flush的時候是同步寫,還是異步寫 tb.setDurability(Durability.SYNC_WAL); //region size大小,當一個region中的最大store文件達到這個size時,region就開始分裂 tb.setMaxFileSize(1024*1024*1024); //MemStore大小,當memstore達到這個值時,開始往磁盤中刷數據 tb.setMemStoreFlushSize(256*1024*1024);
如果設置了MemStore時,HBase的數據會是先寫入內存,數據累計達到內存閥值時才往磁盤中flush數據,
所以,如果在數據還沒有flush進硬盤時,RegionServer down掉了,內存中的數據將丟失。
想解決這個場景的問題可以通過設置WAL日志級別來解決。即:tb.setDurability(Durability.SYNC_WAL);
setDurability(Durability d)方法可以在相關的三個對象中使用,分別是:HTableDescriptor,Delete,Put。
其中Delete和Put的該方法都是繼承自父類org.apache.hadoop.hbase.client.Mutation。
分別針對表、插入操作、刪除操作設定WAL日志寫入級別。
需要注意的是,Delete和Put並不會繼承Table的Durability級別(已實測驗證)。
Durability是一個枚舉變量,如果不通過該方法指定WAL日志級別,則為默認USE_DEFAULT級別。
USE_DEFAULT //全局默認的WAL寫入級別,即 SYNC_WAL ASYNC_WAL //當數據變動時,異步寫WAL日志 SYNC_WAL //當數據變動時,同步寫WAL日志 FSYNC_WAL //當數據變動時,同步寫WAL日志,並且,強制將數據寫入磁盤 SKIP_WAL //不寫WAL日志
2)HColumnDescriptor特性
可以通過 HColumnDescriptor對象設置ColumnFamily的特性 ,比如:
//壓縮內存中和存儲文件中的數據,默認NONE(不壓縮) tb.setDataBlockEncoding(DataBlockEncoding.PREFIX); //bloom過濾器:NONE,ROW(默認值)和ROWCOL.ROWCOL除了過濾ROW還要過濾列族 tb.setBloomFilterType(BloomType.ROW); //集群間復制的時候,如果被設置成REPLICATION_SCOPE_LOCAL(默認值)就不能被復制了 tb.setScope(HConstants.REPLICATION_SCOPE_GLOBAL); //數據保存的最大版本數.默認是Long.MAX tb.setMaxVersions(3); //數據保存的最小版本數.默認是0.配合TTL使用 tb.setMinVersions(1); //數據保存的最長時間,即TTL,單位是ms tb.setTimeToLive(18000); //設定數據存儲的壓縮類型.默認無壓縮(NONE) tb.setCompressionType(Algorithm.SNAPPY); //是否保存那些已經刪除掉的cell tb.setKeepDeletedCells(false); //設置數據保存在內存中以提高響應速度 tb.setInMemory(true); //塊緩存,保存着每個HFile數據塊的startKey tb.setBlockCacheEnabled(true); //塊的大小,默認值是65536 tb.setBlocksize(64*1024);
--END--