现有以下关系型数据库中的表(见表4-20表4-21和表4-22),要求将具转换为适合Hbase存储的表并插入数据。


① createTable(String tableName, String[] fields)

创建表,参数tableName为表的名称,字符串数组fields为存储记录各个域名称的数组。要 求当HBase已经存在名为tableName的表的时候,先删除原有的表,再创建新的表。

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;


public class F_createTable {
    public static Configuration configuration;
    public static Connection connection;
    public static Admin admin;
    
    /**
     * @param args
     * @throws IOException 
     */
    public static void main(String[] args) throws IOException {
        // TODO Auto-generated method stub
        createTable("Score",new String[]{"sname","course"});
        
    }
    //建立连接
    public static void init(){
        configuration  = HBaseConfiguration.create();
        configuration.set("hbase.rootdir","hdfs://localhost:9000/hbase");
        try{
            connection = ConnectionFactory.createConnection(configuration);
            admin = connection.getAdmin();
        }catch (IOException e){
            e.printStackTrace();
        }
    }
    //关闭连接
    public static void close(){
        try{
            if(admin != null){
                admin.close();
            }
            if(null != connection){
                connection.close();
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }
    /**
     * F_
     */
    public static void createTable(String myTableName,String[] colFamily) throws IOException {
 
        init();
        TableName tableName = TableName.valueOf(myTableName);
 
        if(admin.tableExists(tableName)){
            deleteTable(tableName.getNameAsString());
            System.out.println("talbe is exists,it will be deleted");
        }else {
            HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
            for(String str:colFamily){
                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(str);
                hTableDescriptor.addFamily(hColumnDescriptor);
            }
            admin.createTable(hTableDescriptor);
            System.out.println("create table success");
        }
        close();
    }
    /**
     * F_
     */
    public static void deleteTable(String tableName) throws IOException {
        init();
        TableName tn = TableName.valueOf(tableName);
        if (admin.tableExists(tn)) {
            admin.disableTable(tn);
            admin.deleteTable(tn);
        }
        close();
    }
}
View Code

② addRecord(String tableName, String row, String[] fields, String。values)

向表tableName、行 row (用 S_Name表示)和字符串数组fields指定的单元格中添加对应的 valueso fields中每个元素如果对应的列族下还有相应的列限定符的话,用"colurnnFamilyicolumn"表示。例如,同 "Math" "Computer Science" "English” 3 列添加成 绩时,字符串数组 fields { "Score:Math” , “Score Computer Science" , **Score:English" } , 数组 values存储这3 门课的成绩。

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;


public class G_addRecord {
    public static Configuration configuration;
    public static Connection connection;
    public static Admin admin;
    
    /**
     * @param args
     * @throws IOException 
     */
    public static void main(String[] args) throws IOException {
        // TODO Auto-generated method stub
        addRecord("student", "2015003", new String[]{"info:S_age"}, "99");
        B_getAllData show = new B_getAllData();
        show.getTableData("student");
    }
    //建立连接
    public static void init(){
        configuration  = HBaseConfiguration.create();
        configuration.set("hbase.rootdir","hdfs://localhost:9000/hbase");
        try{
            connection = ConnectionFactory.createConnection(configuration);
            admin = connection.getAdmin();
        }catch (IOException e){
            e.printStackTrace();
        }
    }
    //关闭连接
    public static void close(){
        try{
            if(admin != null){
                admin.close();
            }
            if(null != connection){
                connection.close();
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }
    /**
     * G_
     */
    public static void addRecord(String tableName,String rowKey,String cols[],String val) throws IOException {
        init();
        Table table = connection.getTable(TableName.valueOf(tableName));
        for(String str:cols)
        {
            String[] cols_split = str.split(":");
            Put put = new Put(rowKey.getBytes());
            put.addColumn(cols_split[0].getBytes(), cols_split[1].getBytes(), val.getBytes());
            table.put(put);
        }
        table.close();
        close();
    }

}
View Code

③ scanColumn(String tableName, String column)

浏览表tableName某一列的数据,如果某一行记录中该列数据不存在,则返回null 要求当 参数column为某一列族名称时,如果底下有若干个列限定符,则要列出每个列限定符代表的列的 数据;当参数column为某一列具体名称(如 "Score:Math” )时,只需要列出该列的数据。

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;


public class H_scanColumn {
    public static Configuration configuration;
    public static Connection connection;
    public static Admin admin;
    
    /**
     * @param args
     * @throws IOException 
     */
    public static void main(String[] args) throws IOException {
        // TODO Auto-generated method stub
        scanColumn("SC","SC_score");
    }
    //建立连接
    public static void init(){
        configuration  = HBaseConfiguration.create();
        configuration.set("hbase.rootdir","hdfs://localhost:9000/hbase");
        try{
            connection = ConnectionFactory.createConnection(configuration);
            admin = connection.getAdmin();
        }catch (IOException e){
            e.printStackTrace();
        }
    }
    //关闭连接
    public static void close(){
        try{
            if(admin != null){
                admin.close();
            }
            if(null != connection){
                connection.close();
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }
    /*
     * H_
     */
    public static void scanColumn(String tablename,String col) throws IOException {
        init();
        HTableDescriptor hTableDescriptors[] = admin.listTables();
        String[] cols_split = col.split(":");
        System.out.println(tablename);
        Table table = connection.getTable(TableName.valueOf(tablename));
        //创建一个空的Scan实例
        Scan scan1 = new Scan();
        //在行上获取遍历器
        ResultScanner scanner1 = table.getScanner(scan1);
        //打印行的值
        for (Result res : scanner1) {
            Cell[] cells = res.rawCells();
            for(Cell cell:cells){
                 String colF = new String(CellUtil.cloneFamily(cell));
                 String col_son = new String(CellUtil.cloneQualifier(cell));
                 //System.out.println(colF+" "+col_son+" "+cols_split[0]+" "+cols_split[1]);
                if((colF.equals(cols_split[0])&&cols_split.length==1)||(colF.equals(cols_split[0])&&col_son.equals(cols_split[1])))
                    System.out.println(new String(CellUtil.cloneRow(cell))+" "+new String(CellUtil.cloneFamily(cell))+" "+new String(CellUtil.cloneQualifier(cell))+" "+new String(CellUtil.cloneValue(cell))+" ");
            }
        }
        //关闭释放资源
        scanner1.close();
        table.close();
        close();
    }

}
View Code

④ modifyData(String tableName, String row, String column)。

修改表tableName, row(可以用学生姓名S_Naine表示),column指定的单元格的数据。

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;


public class I_modifyData {
    public static Configuration configuration;
    public static Connection connection;
    public static Admin admin;
    
    /**
     * @param args
     * @throws IOException 
     */
    public static void main(String[] args) throws IOException {
        // TODO Auto-generated method stub
        modifyData("student","2015003","info:S_name","Sunxiaochuan");
        B_getAllData show = new B_getAllData();
        show.getTableData("student");
    }
    //建立连接
    public static void init(){
        configuration  = HBaseConfiguration.create();
        configuration.set("hbase.rootdir","hdfs://localhost:9000/hbase");
        try{
            connection = ConnectionFactory.createConnection(configuration);
            admin = connection.getAdmin();
        }catch (IOException e){
            e.printStackTrace();
        }
    }
    //关闭连接
    public static void close(){
        try{
            if(admin != null){
                admin.close();
            }
            if(null != connection){
                connection.close();
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }
    //I_
    public static void modifyData(String tableName,String rowKey,String col,String val) throws IOException {
        init();
        Table table = connection.getTable(TableName.valueOf(tableName));
        Put put = new Put(rowKey.getBytes());
        String[] cols_split = col.split(":");
        if(cols_split.length==1)
        {
            cols_split = insert(cols_split,"");
        }
        put.addColumn(cols_split[0].getBytes(), cols_split[1].getBytes(), val.getBytes());
        table.put(put);
        table.close();
        close();
    }
    //I_
    private static String[] insert(String[] arr, String str) {
        int size = arr.length;  //获取数组长度
        String[] tmp = new String[size + 1];  //新建临时字符串数组,在原来基础上长度加一
        for (int i = 0; i < size; i++){  //先遍历将原来的字符串数组数据添加到临时字符串数组
            tmp[i] = arr[i];
        }
        tmp[size] = str;  //在最后添加上需要追加的数据
        return tmp;  //返回拼接完成的字符串数组
    }
}
View Code

⑤ deleteRow(Slring tableName, String row)。

删除表tableName中row指定的行的记录。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
 
public class J_deleteRow {
    public static Configuration configuration;
    public static Connection connection;
    public static Admin admin;
    
    /**
     * @param args
     * @throws IOException 
     */
    public static void main(String[] args) throws IOException {
        B_getAllData show = new B_getAllData();
        show.getTableData("student");
        deleteRow("student", "2015003");
        show.getTableData("student");
        // TODO Auto-generated method stub

    }
    //建立连接
    public static void init(){
        configuration  = HBaseConfiguration.create();
        configuration.set("hbase.rootdir","hdfs://localhost:9000/hbase");
        try{
            connection = ConnectionFactory.createConnection(configuration);
            admin = connection.getAdmin();
        }catch (IOException e){
            e.printStackTrace();
        }
    }
    //关闭连接
    public static void close(){
        try{
            if(admin != null){
                admin.close();
            }
            if(null != connection){
                connection.close();
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }
    /**
     * J_
     */
    public static void deleteRow(String tableName,String rowKey) throws IOException {
        init();
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(rowKey.getBytes());
        table.delete(delete);
        table.close();
        close();
    }
}
View Code

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM