java调hbaseAPI----简单的增删改查操作


确保自己的集群注册过 C:\Windows\System32\drivers\etc\hosts

 

 

方案Ⅰ 远程连接需要有配置文件

代码

 package tmhbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;


public class TestCRUD {
    //创建表并插入数据
    @Test
    public void put() throws Exception {
        //创建conf对象
        Configuration conf = HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn = ConnectionFactory.createConnection(conf);
        //通过连接查询tableName对象
        TableName tname = TableName.valueOf("ns1:t1");
        //获得table
        Table table = conn.getTable(tname);

        //通过bytes工具类创建字节数组(将字符串)
        byte[] rowid = Bytes.toBytes("row3");

        //创建put对象
        Put put = new Put(rowid);

        byte[] f1 = Bytes.toBytes("f1");
        byte[] id = Bytes.toBytes("id");
        byte[] value = Bytes.toBytes(102);
        put.addColumn(f1, id, value);

        //执行插入
        table.put(put);
    }

    //大数据插入
    @Test
    public void bigInsert() throws Exception {

        DecimalFormat format = new DecimalFormat();
        format.applyPattern("0000");

        long start = System.currentTimeMillis();
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("ns1:t1");
        HTable table = (HTable) conn.getTable(tname);
        //不要自动清理缓冲区
        table.setAutoFlush(false);

        for (int i = 1; i < 10000; i++) {
            Put put = new Put(Bytes.toBytes("row" + format.format(i)));
            //关闭写前日志
            put.setWriteToWAL(false);
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"), Bytes.toBytes(i));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("tom" + i));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("age"), Bytes.toBytes(i % 100));
            table.put(put);

            if (i % 2000 == 0) {
                table.flushCommits();
            }
        }
        //
        table.flushCommits();
        System.out.println(System.currentTimeMillis() - start);
    }

    //根据rowkey或取值
    @Test
    public void get() throws Exception {
        //创建conf对象
        Configuration conf = HBaseConfiguration.create();
        //通过连接工厂创建连接对象
        Connection conn = ConnectionFactory.createConnection(conf);
        //通过连接查询tableName对象
        TableName tname = TableName.valueOf("TM_BUS_SITE_LOG_A");
        //获得table
        Table table = conn.getTable(tname);

        //通过bytes工具类创建字节数组(将字符串)
        //  byte[] rowid = Bytes.toBytes("001K00091220180522055001");
        Get get = new Get(Bytes.toBytes("001K00091220180522055001"));//rowkey
        Result r = table.get(get);
        byte[] idvalue = r.getValue(Bytes.toBytes("f"), Bytes.toBytes("LINE_NO"));//获取指定的列值
        System.out.println(Bytes.toInt(idvalue));
    }


    //测试在数据前补零  ---打印结果: 0000008
    @Test
    public void formatNum() {
        DecimalFormat format = new DecimalFormat();
        format.applyPattern("0000000");
        System.out.println(format.format(8));

    }

    //创建表空间
    @Test
    public void createNameSpace() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        //创建名字空间描述符
        NamespaceDescriptor nsd = NamespaceDescriptor.create("ns2").build();
        admin.createNamespace(nsd);

        NamespaceDescriptor[] ns = admin.listNamespaceDescriptors();
        for (NamespaceDescriptor n : ns) {
            System.out.println(n.getName());
        }
    }

    //    列出表空间
    @Test
    public void listNameSpaces() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();

        NamespaceDescriptor[] ns = admin.listNamespaceDescriptors();
        for (NamespaceDescriptor n : ns) {
            System.out.println(n.getName());
        }
    }

    //创建表
    @Test
    public void createTable() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        //创建表名对象
        TableName tableName = TableName.valueOf("ns2:t2");
        //创建表描述符对象
        HTableDescriptor tbl = new HTableDescriptor(tableName);
        //创建列族描述符
        HColumnDescriptor col = new HColumnDescriptor("f1");
        tbl.addFamily(col);

        admin.createTable(tbl);
        System.out.println("over");
    }

    //禁用表
    @Test
    public void disableTable() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Admin admin = conn.getAdmin();
        //禁用表 enable(...) disableTable(...)
        admin.deleteTable(TableName.valueOf("ns2:t2"));
    }

    //删除指定表的某个列值
    @Test
    public void deleteData() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("TM_BUS_SITE_LOG_A");

        Table table = conn.getTable(tname);
        Delete del = new Delete(Bytes.toBytes("001K00091220180522054722"));//rowkey
        del.addColumn(Bytes.toBytes("f"), Bytes.toBytes("REG_TIME"));
        // del.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"));
        table.delete(del);
        System.out.println("over");
    }


    ///删除在rowkey范围内的数据
    @Test
    public void scan() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("ns1:t1");
        Table table = conn.getTable(tname);
        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes("row5000"));
        scan.setStopRow(Bytes.toBytes("row8000"));
        ResultScanner rs = table.getScanner(scan);
        Iterator<Result> it = rs.iterator();
        while (it.hasNext()) {
            Result r = it.next();
            byte[] name = r.getValue(Bytes.toBytes("f1"), Bytes.toBytes("name"));
            System.out.println(Bytes.toString(name));
        }
    }

    /**
     * 动态遍历
     */
    @Test
    public void scan2() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("TM_BUS_SITE_LOG_A");
        Table table = conn.getTable(tname);
        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes("001K00091220180522055001"));
        scan.setStopRow(Bytes.toBytes("001K00091220180522060218"));
        ResultScanner rs = table.getScanner(scan);
        Iterator<Result> it = rs.iterator();
        while (it.hasNext()) {
            Result r = it.next();
            Map<byte[], byte[]> map = r.getFamilyMap(Bytes.toBytes("f"));
            for (Map.Entry<byte[], byte[]> entrySet : map.entrySet()) {
                String col = Bytes.toString(entrySet.getKey());
                String val = Bytes.toString(entrySet.getValue());
                System.out.print(col + ":" + val + ",");
            }

            System.out.println();
        }
    }

    /**
     * 动态遍历
     */
    @Test
    public void scan3() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("TM_BUS_SITE_LOG_A");
        Table table = conn.getTable(tname);
        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes("001K00091220180522055001"));
        scan.setStopRow(Bytes.toBytes("001K00091220180522060218"));
        ResultScanner rs = table.getScanner(scan);
        Iterator<Result> it = rs.iterator();
        while (it.hasNext()) {
            Result r = it.next();
            //得到一行的所有map,key=f1,value=Map<Col,Map<Timestamp,value>>
            NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = r.getMap();
            //
            for (Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entry : map.entrySet()) {
                //得到列族
                String f = Bytes.toString(entry.getKey());
                Map<byte[], NavigableMap<Long, byte[]>> colDataMap = entry.getValue();
                for (Map.Entry<byte[], NavigableMap<Long, byte[]>> ets : colDataMap.entrySet()) {
                    String c = Bytes.toString(ets.getKey());
                    Map<Long, byte[]> tsValueMap = ets.getValue();
                    for (Map.Entry<Long, byte[]> e : tsValueMap.entrySet()) {
                        Long ts = e.getKey();
                        String value = Bytes.toString(e.getValue());
                        System.out.print(f + ":" + c + ":" + ts + "=" + value + ",");
                    }
                }
            }
            System.out.println();
        }
    }
}

 上面这个方案需要添加配置文件

 

 

方案Ⅱ

 下面是示例代码。这个代码不需要依赖上面两个配置文件直接运行,如果放上面的配置文件会报错

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.
*; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class HbaseTest { private HBaseAdmin admin = null; // 定义配置对象HBaseConfiguration private HBaseConfiguration cfg = null; public HbaseTest() throws Exception { Configuration HBASE_CONFIG = new Configuration(); HBASE_CONFIG.set("hbase.zookeeper.quorum", "192.168.1.116"); HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181"); cfg = new HBaseConfiguration(HBASE_CONFIG); admin = new HBaseAdmin(cfg); } // 创建一张表,指定表名,列族 public void createTable(String tableName, String columnFarily) throws Exception { if (admin.tableExists(tableName)) { System.out.println(tableName + "存在!"); System.exit(0); } else { HTableDescriptor tableDesc = new HTableDescriptor(tableName); tableDesc.addFamily(new HColumnDescriptor(columnFarily)); admin.createTable(tableDesc); System.out.println("创建表成功!"); } } // Hbase获取所有的表信息 public List getAllTables() { List<String> tables = null; if (admin != null) { try { HTableDescriptor[] allTable = admin.listTables(); if (allTable.length > 0) tables = new ArrayList<String>(); for (HTableDescriptor hTableDescriptor : allTable) { tables.add(hTableDescriptor.getNameAsString()); System.out.println(hTableDescriptor.getNameAsString()); } } catch (IOException e) { e.printStackTrace(); } } return tables; } // Hbase中往某个表中添加一条记录 public boolean addOneRecord(String table, String key, String family, String col, byte[] dataIn) { HTablePool tp = new HTablePool(cfg, 1000); HTable tb = (HTable) tp.getTable(table); Put put = new Put(key.getBytes()); put.add(family.getBytes(), col.getBytes(), dataIn); try { tb.put(put); System.out.println("插入数据条" + key + "成功!!!"); return true; } catch (IOException e) { System.out.println("插入数据条" + key + "失败!!!"); return false; } } // Hbase表中记录信息的查询 public void getValueFromKey(String table, String key) { HTablePool tp = new HTablePool(cfg, 1000); HTable tb = (HTable) tp.getTable(table); Get get = new Get(key.getBytes()); try { Result rs = tb.get(get); if (rs.raw().length == 0) { System.out.println("不存在关键字为" + key + "的行!!"); } else { for (KeyValue kv : rs.raw()) { System.out.println(new String(kv.getKey()) + " " + new String(kv.getValue())); } } } catch (IOException e) { e.printStackTrace(); } } // 显示所有数据,通过HTable Scan类获取已有表的信息 public void getAllData(String tableName) throws Exception { HTable table = new HTable(cfg, tableName); Scan scan = new Scan(); ResultScanner rs = table.getScanner(scan); for (Result r : rs) { for (KeyValue kv : r.raw()) { System.out.println(new String(kv.getKey()) + new String(kv.getValue())); } } } // Hbase表中记录信息的删除 public boolean deleteRecord(String table, String key) { HTablePool tp = new HTablePool(cfg, 1000); HTable tb = (HTable) tp.getTable(table); Delete de = new Delete(key.getBytes()); try { tb.delete(de); return true; } catch (IOException e) { System.out.println("删除记录" + key + "异常!!!"); return false; } } // Hbase中表的删除 public boolean deleteTable(String table) { try { if (admin.tableExists(table)) { admin.disableTable(table); admin.deleteTable(table); System.out.println("删除表" + table + "!!!"); } return true; } catch (IOException e) { System.out.println("删除表" + table + "异常!!!"); return false; } } // 测试函数 public static void main(String[] args) { try { HbaseTest hbase = new HbaseTest(); hbase.createTable("student", "fam1"); // hbase.getAllTables(); // hbase.addOneRecord("student","id1","fam1","name","Jack".getBytes()); // hbase.addOneRecord("student","id1","fam1","address","HZ".getBytes()); // hbase.getValueFromKey("student","id1"); // hbase.getAllData("student"); //hbase.deleteRecord("student", "id1"); //hbase.deleteTable("student"); } catch (Exception e) { e.printStackTrace(); } } }

3.百万数据插入并优化

 @Test
    //百万数据插入优化测试
    public void bigInsert() throws Exception {
        //数字格式化
        DecimalFormat format = new DecimalFormat();
        format.applyPattern("0000");

        long start = System.currentTimeMillis();
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "192.168.1.116");
        
        Connection conn = ConnectionFactory.createConnection(conf);
        TableName tname = TableName.valueOf("ns1:t1");
        HTable table = (HTable) conn.getTable(tname);
        //不要自动清理缓冲区 ----
        table.setAutoFlush(false);

        for (int i = 1; i < 10000; i++) {
            Put put = new Put(Bytes.toBytes("row" + format.format(i)));
            //关闭写前日志 ----
            put.setWriteToWAL(false);
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("id"), Bytes.toBytes(i));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("tom" + i));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("age"), Bytes.toBytes(i % 100));
            table.put(put);

            if (i % 2000 == 0) {
                table.flushCommits();
            }
        }
        //
        table.flushCommits();
        System.out.println(System.currentTimeMillis() - start);
    }

4.数字格式化

 @Test
    public void formatNum(){
        DecimalFormat format = new DecimalFormat();
        format.applyPattern("0000000");
       //format.applyPattern("000,000,00");
        //format.applyPattern("###,###,00");
        System.out.println(format.format(8));
        
    }

 

 --------------------------------------------------------------------------------------------------------------

hbase 的表不能直接删除。删除方法:disable '表名'    禁用表。然后删除表:drop '表名'

 

 

 ========================================================================================

如果报类似主机找不到,直接删去那个资源文件的hbase-site.xml即可。java读hbase报错 java.net.UnknownHostException: ns1

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM