上面讀完了HDFS,當然還有寫了。
先上代碼:
WriteHBase
public class WriteHBase { public static void writeHbase(String content){ // HDFS 數據是一行一條記錄 String[] lines = content.split("\n"); int userSize = 0; List<Put> puts = new ArrayList<Put>(); Put put; for(String line : lines){
//只有兩列,以#號分割,一列rowkey,一列value,一個value是很多列數據拼接起來的。 if(line.contains("#")){ String[] arr = line.split("#"); // 添加一行, put = new Put(Bytes.toBytes(arr[0]));
// 給行添加列 cf column value put.add(Bytes.toBytes(Constant.CF), Bytes.toBytes(Constant.COLUMN), Bytes.toBytes(arr[1])); puts.add(put); }else{ continue; } lines[userSize] = null; ++userSize; // write when list have 1000 沒1000 條提交一次,已經改的 5000 if (userSize % Constant.BATCH ==0){ writeDate(userSize, puts); } } writeDate(userSize, puts); HDFSReadLog.writeLog("analysis " +userSize +" users"); } private static void writeDate(int userSize, List<Put> puts) { try { table.put(puts); HDFSReadLog.writeLog("write "+userSize + " item."); } catch (IOException e) { e.printStackTrace(); HDFSReadLog.writeLog("write "+userSize + " error."); HDFSReadLog.writeLog(e.getMessage()); } } static HTable table = null; // static HTablePool pool = null; static{ try {
// 創建HTable對象,對應hbase 的table table = new HTable(HBaseConf.getConf(),Constant.HBASE_TABLE);
// 如果表不存在就創建一個 fitTable(Constant.HBASE_TABLE); } catch (IOException e) { e.printStackTrace(); HDFSReadLog.writeLog("create table error."); HDFSReadLog.writeLog(e.getMessage()); } } /** * if table is not exists, create it * @param tab * @throws IOException */ private static void fitTable(String tab) throws IOException { HBaseAdmin admin = new HBaseAdmin(HBaseConf.getConf()); if (admin.tableExists(tab)) { HDFSReadLog.writeLog(tab + " exists"); } else {
HTableDescriptor tableDesc = new HTableDescriptor(tab);
// 建表的使用要指定 column family tableDesc.addFamily(new HColumnDescriptor("cf")); admin.createTable(tableDesc); HDFSReadLog.writeLog(tab + " create success"); } } }
HBaseConfig(z這個必須,不然會卡在table.put 上面,沒有報錯,就是卡)
public class HBaseConf { public static Configuration conf = null; public static Configuration getConf(){ if (conf == null){ conf = new Configuration(); String path = Constant.getSysEnv("HBASE_HOME") +"/conf/"; HDFSReadLog.writeLog("Get HBase home : " + path); // hbase conf conf.setClassLoader(HBaseConf.class.getClassLoader()); conf.addResource(path + "hbase-default.xml"); conf.addResource(path + "hbase-site.xml"); conf = HBaseConfiguration.create(conf); HDFSReadLog.writeLog("hbase.zookeeper.quorum : " + conf.get("hbase.zookeeper.quorum")); } // 如果配置文件讀不到,set這兩個參數,也可以讀 /*conf.set("hbase.zookeeper.quorum", "ip,ip,ip"); conf.set("hbase.zookeeper.property.clientPort", "port");*/ return conf; } }
注: hbase的配置文件很重要,如果讀不到 “hbase.zookeeper.quorum” 會默認到 localhost,然后在table.put 的時候,卡住。
table.put(),不止可以put 一個Put,也可以put 一個Put的list,這樣算是到底批量提交了。
一個一個寫,太慢了。這邊的結果:334403 條數據,寫了112秒