MapReduce操作HBase


運行HBase時常會遇到個錯誤,我就有這樣的經歷。 

ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times

檢查日志:org.apache.hadoop.ipc.RPC$VersionMismatch: Protocol org.apache.hadoop.hdfs.protocol.ClientProtocol version mismatch. (client = 42, server = 41)

如果是這個錯誤,說明RPC協議不一致所造成的,解決方法:將hbase/lib目錄下的hadoop-core的jar文件刪除,將hadoop目錄下的hadoop-0.20.2-core.jar拷貝到hbase/lib下面,然后重新啟動hbase即可。第二種錯誤是:沒有啟動hadoop,先啟用hadoop,再啟用hbase。

在Eclipse開發中,需要加入hadoop所有的jar包以及HBase二個jar包(hbase,zooKooper)。

HBase基礎可見帖子:http://www.cnblogs.com/liqizhou/archive/2012/05/14/2499112.html

  1. 建表,通過HBaseAdmin類中的create靜態方法來創建表。
  2. HTable類是操作表,例如,靜態方法put可以插入數據,該類初始化時可以傳遞一個行鍵,靜態方法getScanner()可以獲得某一列上的所有數據,返回Result類,Result類中有個靜態方法getFamilyMap()可以獲得以列名為key,值為value,這剛好與hadoop中map結果是一樣的。
  3. package test;
    import java.io.IOException;
    import java.util.Map;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    
    public class Htable {
    
        /**
         * @param args
         */
        public static void main(String[] args) throws IOException {
            // TODO Auto-generated method stub
            Configuration hbaseConf = HBaseConfiguration.create();
            HBaseAdmin admin = new HBaseAdmin(hbaseConf);
            HTableDescriptor htableDescriptor = new HTableDescriptor("table"
                    .getBytes());  //set the name of table
            htableDescriptor.addFamily(new HColumnDescriptor("fam1")); //set the name of column clusters
            admin.createTable(htableDescriptor); //create a table 
            HTable table = new HTable(hbaseConf, "table"); //get instance of table.
            for (int i = 0; i < 3; i++) {   //for is number of rows
                Put putRow = new Put(("row" + i).getBytes()); //the ith row
                putRow.add("fam1".getBytes(), "col1".getBytes(), "vaule1"
                        .getBytes());  //set the name of column and value.
                putRow.add("fam1".getBytes(), "col2".getBytes(), "vaule2"
                        .getBytes());
                putRow.add("fam1".getBytes(), "col3".getBytes(), "vaule3"
                        .getBytes());
                table.put(putRow);
            }
            for(Result result: table.getScanner("fam1".getBytes())){//get data of column clusters 
                for(Map.Entry<byte[], byte[]> entry : result.getFamilyMap("fam1".getBytes()).entrySet()){//get collection of result
                    String column = new String(entry.getKey());
                    String value = new String(entry.getValue());
                    System.out.println(column+","+value);
                }
            }
            admin.disableTable("table".getBytes()); //disable the table
            admin.deleteTable("table".getBytes());  //drop the tbale
        }
    }
    以上代碼不難看懂。

下面介紹一下,用mapreduce怎樣操作HBase,主要對HBase中的數據進行讀取。

現在有一些大的文件,需要存入HBase中,其思想是先把文件傳到HDFS上,利用map階段讀取<key,value>對,可在reduce把這些鍵值對上傳到HBase中。

package test;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MapperClass extends Mapper<LongWritable,Text,Text,Text>{
        public void map(LongWritable key,Text value,Context context)thorws IOException{
            String[] items = value.toString().split(" ");
            String k = items[0];
            String v = items[1];         
            context.write(new Text(k), new Text(v));
    }

}

Reduce類,主要是將鍵值傳到HBase表中

package test;

import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;

public class ReducerClass extends TableReducer<Text,Text,ImmutableBytesWritable>{
    public void reduce(Text key,Iterable<Text> values,Context context){
        String k = key.toString();
        StringBuffer str=null;
        for(Text value: values){
            str.append(value.toString());
        }
        String v = new String(str); 
        Put putrow = new Put(k.getBytes());
        putrow.add("fam1".getBytes(), "name".getBytes(), v.getBytes());     
    }
}

由上面可知ReducerClass繼承TableReduce,在hadoop里面ReducerClass繼承Reducer類。它的原型為:TableReducer<KeyIn,Values,KeyOut>可以看出,HBase里面是讀出的Key類型是ImmutableBytesWritable。

Map,Reduce,以及Job的配置分離,比較清晰,mahout也是采用這種構架。

package test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;

public class Driver extends Configured implements Tool{

    @Override
    public static void run(String[] arg0) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum.", "localhost");  
        
        Job job = new Job(conf,"Hbase");
        job.setJarByClass(TxtHbase.class);
        
        Path in = new Path(arg0[0]);
        
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.addInputPath(job, in);
        
        job.setMapperClass(MapperClass.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        TableMapReduceUtil.initTableReducerJob("table", ReducerClass.class, job);
        
       job.waitForCompletion(true);
    }
    
}

Driver中job配置的時候沒有設置 job.setReduceClass(); 而是用 TableMapReduceUtil.initTableReducerJob("tab1", THReducer.class, job); 來執行reduce類。

主函數

package test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;

public class TxtHbase {
    public static void main(String [] args) throws Exception{

Driver.run(new Configuration(),new THDriver(),args);

}
}

 

讀取數據時比較簡單,編寫Mapper函數,讀取<key,value>值就行了。

package test;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class MapperClass extends MapReduceBase implements
        TableMap<Text, Text> {
    static final String NAME = "GetDataFromHbaseTest";
    private Configuration conf;

    public void map(ImmutableBytesWritable row, Result values,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {
        StringBuilder sb = new StringBuilder();
        for (Entry<byte[], byte[]> value : values.getFamilyMap(
                "fam1".getBytes()).entrySet()) {
            String cell = value.getValue().toString();
            if (cell != null) {
                sb.append(new String(value.getKey())).append(new String(cell));
            }
        }
        output.collect(new Text(row.get()), new Text(sb.toString()));
    }

要實現這個方法 initTableMapJob(String table, String columns, Class<? extends TableMap> mapper, Class<? extends org.apache.hadoop.io.WritableComparable> outputKeyClass, Class<? extends org.apache.hadoop.io.Writable> outputValueClass, org.apache.hadoop.mapred.JobConf job, boolean addDependencyJars)。

package test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;

public class Driver extends Configured implements Tool{

    @Override
    public static void run(String[] arg0) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum.", "localhost");  
        Job job = new Job(conf,"Hbase");
        job.setJarByClass(TxtHbase.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
TableMapReduceUtilinitTableMapperJob(
"table", args0[0],MapperClass.class, job);
job.waitForCompletion(
true); }
}

主函數

package test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;

public class TxtHbase {
    public static void main(String [] args) throws Exception{

        Driver.run(new Configuration(),new THDriver(),args); 

    } 
}

 

 作者:BIGBIGBOAT/Liqizhou


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM