确保自己的集群注册过 C:\Windows\System32\drivers\etc\hosts
方案Ⅰ 远程连接需要有配置文件
代码
package tmhbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import java.io.IOException; import java.text.DecimalFormat; import java.util.Iterator; import java.util.Map; import java.util.NavigableMap; public class TestCRUD { //创建表并插入数据 @Test public void put() throws Exception { //创建conf对象 Configuration conf = HBaseConfiguration.create(); //通过连接工厂创建连接对象 Connection conn = ConnectionFactory.createConnection(conf); //通过连接查询tableName对象 TableName tname = TableName.valueOf("ns1:t1"); //获得table Table table = conn.getTable(tname); //通过bytes工具类创建字节数组(将字符串) byte[] rowid = Bytes.toBytes("row3"); //创建put对象 Put put = new Put(rowid); byte[] f1 = Bytes.toBytes("f1"); byte[] id = Bytes.toBytes("id"); byte[] value = Bytes.toBytes(102); put.addColumn(f1, id, value); //执行插入 table.put(put); } //大数据插入 @Test public void bigInsert() throws Exception { DecimalFormat format = new DecimalFormat(); format.applyPattern("0000"); long start = System.currentTimeMillis(); Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tname = TableName.valueOf("ns1:t1"); HTable table = (HTable) conn.getTable(tname); //不要自动清理缓冲区 table.setAutoFlush(false); for (int i = 1; i < 10000; i++) { Put put = new Put(Bytes.toBytes("row" + format.format(i))); //关闭写前日志 put.setWriteToWAL(false); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"), Bytes.toBytes(i)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("tom" + i)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("age"), Bytes.toBytes(i % 100)); table.put(put); if (i % 2000 == 0) { table.flushCommits(); } } // table.flushCommits(); System.out.println(System.currentTimeMillis() - start); } //根据rowkey或取值 @Test public void get() throws Exception { //创建conf对象 Configuration conf = HBaseConfiguration.create(); //通过连接工厂创建连接对象 Connection conn = ConnectionFactory.createConnection(conf); //通过连接查询tableName对象 TableName tname = TableName.valueOf("TM_BUS_SITE_LOG_A"); //获得table Table table = conn.getTable(tname); //通过bytes工具类创建字节数组(将字符串) // byte[] rowid = Bytes.toBytes("001K00091220180522055001"); Get get = new Get(Bytes.toBytes("001K00091220180522055001"));//rowkey Result r = table.get(get); byte[] idvalue = r.getValue(Bytes.toBytes("f"), Bytes.toBytes("LINE_NO"));//获取指定的列值 System.out.println(Bytes.toInt(idvalue)); } //测试在数据前补零 ---打印结果: 0000008 @Test public void formatNum() { DecimalFormat format = new DecimalFormat(); format.applyPattern("0000000"); System.out.println(format.format(8)); } //创建表空间 @Test public void createNameSpace() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); //创建名字空间描述符 NamespaceDescriptor nsd = NamespaceDescriptor.create("ns2").build(); admin.createNamespace(nsd); NamespaceDescriptor[] ns = admin.listNamespaceDescriptors(); for (NamespaceDescriptor n : ns) { System.out.println(n.getName()); } } // 列出表空间 @Test public void listNameSpaces() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); NamespaceDescriptor[] ns = admin.listNamespaceDescriptors(); for (NamespaceDescriptor n : ns) { System.out.println(n.getName()); } } //创建表 @Test public void createTable() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); //创建表名对象 TableName tableName = TableName.valueOf("ns2:t2"); //创建表描述符对象 HTableDescriptor tbl = new HTableDescriptor(tableName); //创建列族描述符 HColumnDescriptor col = new HColumnDescriptor("f1"); tbl.addFamily(col); admin.createTable(tbl); System.out.println("over"); } //禁用表 @Test public void disableTable() throws Exception { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); //禁用表 enable(...) disableTable(...) admin.deleteTable(TableName.valueOf("ns2:t2")); } //删除指定表的某个列值 @Test public void deleteData() throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tname = TableName.valueOf("TM_BUS_SITE_LOG_A"); Table table = conn.getTable(tname); Delete del = new Delete(Bytes.toBytes("001K00091220180522054722"));//rowkey del.addColumn(Bytes.toBytes("f"), Bytes.toBytes("REG_TIME")); // del.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name")); table.delete(del); System.out.println("over"); } ///删除在rowkey范围内的数据 @Test public void scan() throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tname = TableName.valueOf("ns1:t1"); Table table = conn.getTable(tname); Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes("row5000")); scan.setStopRow(Bytes.toBytes("row8000")); ResultScanner rs = table.getScanner(scan); Iterator<Result> it = rs.iterator(); while (it.hasNext()) { Result r = it.next(); byte[] name = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name")); System.out.println(Bytes.toString(name)); } } /** * 动态遍历 */ @Test public void scan2() throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tname = TableName.valueOf("TM_BUS_SITE_LOG_A"); Table table = conn.getTable(tname); Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes("001K00091220180522055001")); scan.setStopRow(Bytes.toBytes("001K00091220180522060218")); ResultScanner rs = table.getScanner(scan); Iterator<Result> it = rs.iterator(); while (it.hasNext()) { Result r = it.next(); Map<byte[], byte[]> map = r.getFamilyMap(Bytes.toBytes("f")); for (Map.Entry<byte[], byte[]> entrySet : map.entrySet()) { String col = Bytes.toString(entrySet.getKey()); String val = Bytes.toString(entrySet.getValue()); System.out.print(col + ":" + val + ","); } System.out.println(); } } /** * 动态遍历 */ @Test public void scan3() throws IOException { Configuration conf = HBaseConfiguration.create(); Connection conn = ConnectionFactory.createConnection(conf); TableName tname = TableName.valueOf("TM_BUS_SITE_LOG_A"); Table table = conn.getTable(tname); Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes("001K00091220180522055001")); scan.setStopRow(Bytes.toBytes("001K00091220180522060218")); ResultScanner rs = table.getScanner(scan); Iterator<Result> it = rs.iterator(); while (it.hasNext()) { Result r = it.next(); //得到一行的所有map,key=f1,value=Map<Col,Map<Timestamp,value>> NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = r.getMap(); // for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entry : map.entrySet()) { //得到列族 String f = Bytes.toString(entry.getKey()); Map<byte[], NavigableMap<Long, byte[]>> colDataMap = entry.getValue(); for (Map.Entry<byte[], NavigableMap<Long, byte[]>> ets : colDataMap.entrySet()) { String c = Bytes.toString(ets.getKey()); Map<Long, byte[]> tsValueMap = ets.getValue(); for (Map.Entry<Long, byte[]> e : tsValueMap.entrySet()) { Long ts = e.getKey(); String value = Bytes.toString(e.getValue()); System.out.print(f + ":" + c + ":" + ts + "=" + value + ","); } } } System.out.println(); } } }
上面这个方案需要添加配置文件
方案Ⅱ
下面是示例代码。这个代码不需要依赖上面两个配置文件直接运行,如果放上面的配置文件会报错
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.*; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class HbaseTest { private HBaseAdmin admin = null; // 定义配置对象HBaseConfiguration private HBaseConfiguration cfg = null; public HbaseTest() throws Exception { Configuration HBASE_CONFIG = new Configuration(); HBASE_CONFIG.set("hbase.zookeeper.quorum", "192.168.1.116"); HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181"); cfg = new HBaseConfiguration(HBASE_CONFIG); admin = new HBaseAdmin(cfg); } // 创建一张表,指定表名,列族 public void createTable(String tableName, String columnFarily) throws Exception { if (admin.tableExists(tableName)) { System.out.println(tableName + "存在!"); System.exit(0); } else { HTableDescriptor tableDesc = new HTableDescriptor(tableName); tableDesc.addFamily(new HColumnDescriptor(columnFarily)); admin.createTable(tableDesc); System.out.println("创建表成功!"); } } // Hbase获取所有的表信息 public List getAllTables() { List<String> tables = null; if (admin != null) { try { HTableDescriptor[] allTable = admin.listTables(); if (allTable.length > 0) tables = new ArrayList<String>(); for (HTableDescriptor hTableDescriptor : allTable) { tables.add(hTableDescriptor.getNameAsString()); System.out.println(hTableDescriptor.getNameAsString()); } } catch (IOException e) { e.printStackTrace(); } } return tables; } // Hbase中往某个表中添加一条记录 public boolean addOneRecord(String table, String key, String family, String col, byte[] dataIn) { HTablePool tp = new HTablePool(cfg, 1000); HTable tb = (HTable) tp.getTable(table); Put put = new Put(key.getBytes()); put.add(family.getBytes(), col.getBytes(), dataIn); try { tb.put(put); System.out.println("插入数据条" + key + "成功!!!"); return true; } catch (IOException e) { System.out.println("插入数据条" + key + "失败!!!"); return false; } } // Hbase表中记录信息的查询 public void getValueFromKey(String table, String key) { HTablePool tp = new HTablePool(cfg, 1000); HTable tb = (HTable) tp.getTable(table); Get get = new Get(key.getBytes()); try { Result rs = tb.get(get); if (rs.raw().length == 0) { System.out.println("不存在关键字为" + key + "的行!!"); } else { for (KeyValue kv : rs.raw()) { System.out.println(new String(kv.getKey()) + " " + new String(kv.getValue())); } } } catch (IOException e) { e.printStackTrace(); } } // 显示所有数据,通过HTable Scan类获取已有表的信息 public void getAllData(String tableName) throws Exception { HTable table = new HTable(cfg, tableName); Scan scan = new Scan(); ResultScanner rs = table.getScanner(scan); for (Result r : rs) { for (KeyValue kv : r.raw()) { System.out.println(new String(kv.getKey()) + new String(kv.getValue())); } } } // Hbase表中记录信息的删除 public boolean deleteRecord(String table, String key) { HTablePool tp = new HTablePool(cfg, 1000); HTable tb = (HTable) tp.getTable(table); Delete de = new Delete(key.getBytes()); try { tb.delete(de); return true; } catch (IOException e) { System.out.println("删除记录" + key + "异常!!!"); return false; } } // Hbase中表的删除 public boolean deleteTable(String table) { try { if (admin.tableExists(table)) { admin.disableTable(table); admin.deleteTable(table); System.out.println("删除表" + table + "!!!"); } return true; } catch (IOException e) { System.out.println("删除表" + table + "异常!!!"); return false; } } // 测试函数 public static void main(String[] args) { try { HbaseTest hbase = new HbaseTest(); hbase.createTable("student", "fam1"); // hbase.getAllTables(); // hbase.addOneRecord("student","id1","fam1","name","Jack".getBytes()); // hbase.addOneRecord("student","id1","fam1","address","HZ".getBytes()); // hbase.getValueFromKey("student","id1"); // hbase.getAllData("student"); //hbase.deleteRecord("student", "id1"); //hbase.deleteTable("student"); } catch (Exception e) { e.printStackTrace(); } } }
3.百万数据插入并优化
@Test //百万数据插入优化测试 public void bigInsert() throws Exception { //数字格式化 DecimalFormat format = new DecimalFormat(); format.applyPattern("0000"); long start = System.currentTimeMillis(); Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.1.116"); Connection conn = ConnectionFactory.createConnection(conf); TableName tname = TableName.valueOf("ns1:t1"); HTable table = (HTable) conn.getTable(tname); //不要自动清理缓冲区 ---- table.setAutoFlush(false); for (int i = 1; i < 10000; i++) { Put put = new Put(Bytes.toBytes("row" + format.format(i))); //关闭写前日志 ---- put.setWriteToWAL(false); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"), Bytes.toBytes(i)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("tom" + i)); put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("age"), Bytes.toBytes(i % 100)); table.put(put); if (i % 2000 == 0) { table.flushCommits(); } } // table.flushCommits(); System.out.println(System.currentTimeMillis() - start); }
4.数字格式化
@Test public void formatNum(){ DecimalFormat format = new DecimalFormat(); format.applyPattern("0000000"); //format.applyPattern("000,000,00"); //format.applyPattern("###,###,00"); System.out.println(format.format(8)); }
--------------------------------------------------------------------------------------------------------------
hbase 的表不能直接删除。删除方法:disable '表名' 禁用表。然后删除表:drop '表名'
========================================================================================
如果报类似主机找不到,直接删去那个资源文件的hbase-site.xml即可。java读hbase报错 java.net.UnknownHostException: ns1