sparkcore入門到實戰之(12)Spark讀取Hbase中的數據


大家可能都知道很熟悉Spark的兩種常見的數據讀取方式(存放到RDD中):(1)、調用parallelize函數直接從集合中獲取數據,並存入RDD中;Java版本如下:

JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList( 1 , 2 , 3 ));

Scala版本如下:

val myRDD= sc.parallelize(List( 1 , 2 , 3 ))

  這種方式很簡單,很容易就可以將一個集合中的數據變成RDD的初始化值;更常見的是(2)、從文本中讀取數據到RDD中,這個文本可以是純文本文件、可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,還可以存放在S3上。其實對文件來說,Spark支持Hadoop所支持的所有文件類型和文件存放位置。Java版如下:

/////////////////////////////////////////////////////////////////////
  User: 過往記憶
  Date: 14 - 6 - 29
  Time: 23 : 59
  bolg:
  本文地址:/archives/ 1051
  過往記憶博客,專注於hadoop、hive、spark、shark、flume的技術博客,大量的干貨
  過往記憶博客微信公共帳號:iteblog_hadoop
/////////////////////////////////////////////////////////////////////
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
 
SparkConf conf = new SparkConf().setAppName( "Simple Application" );
JavaSparkContext sc = new JavaSparkContext(conf);
sc.addFile( "wyp.data" );
JavaRDD<String> lines = sc.textFile(SparkFiles.get( "wyp.data" ));

Scala版本如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
 
val conf = new SparkConf().setAppName( "Simple Application" )
val sc = new SparkContext(conf)
sc.addFile( "spam.data" )
val inFile = sc.textFile(SparkFiles.get( "spam.data" ))

  在實際情況下,我們需要的數據可能不是簡單的存放在HDFS文本中,我們需要的數據可能就存放在Hbase中,那么我們如何用Spark來讀取Hbase中的數據呢?本文的所有測試是基於Hadoop 2.2.0、Hbase 0.98.2、Spark 0.9.1,不同版本可能代碼的編寫有點不同。本文只是簡單地用Spark來讀取Hbase中的數據,如果需要對Hbase進行更強的操作,本文可能不能幫你。話不多說,Spark操作Hbase的Java版本代碼如下:

package com.iteblog.spark;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Serializable;
import scala.Tuple2;

import java.io.IOException;
import java.util.List;

/**
 * User: iteblog
 * Date: 14-6-27
 * Time: 下午5:18
 *blog: http://www.iteblog.com
 *
 * Usage: bin/spark-submit --master yarn-cluster --class com.iteblog.spark.SparkFromHbase
 * --jars /home/q/hbase/hbase-0.96.0-hadoop2/lib/htrace-core-2.01.jar,
 * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-common-0.96.0-hadoop2.jar,
 * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-client-0.96.0-hadoop2.jar,
 * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-protocol-0.96.0-hadoop2.jar,
 * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-server-0.96.0-hadoop2.jar
 * ./spark_2.10-1.0.jar
 */
public class SparkFromHbase implements Serializable {

    /**
     * copy from org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
     *
     * @param scan
     * @return
     * @throws IOException
     */
    String convertScanToString(Scan scan) throws IOException {
        ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
        return Base64.encodeBytes(proto.toByteArray());
    }

    public void start() {
        SparkConf sparkConf = new SparkConf();
        JavaSparkContext sc = new JavaSparkContext(sparkConf);


        Configuration conf = HBaseConfiguration.create();

        Scan scan = new Scan();
        //scan.setStartRow(Bytes.toBytes("195861-1035177490"));
        //scan.setStopRow(Bytes.toBytes("195861-1072173147"));
        scan.addFamily(Bytes.toBytes("cf"));
        scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col_1"));

        try {

            String tableName = "wyp";
            conf.set(TableInputFormat.INPUT_TABLE, tableName);
            conf.set(TableInputFormat.SCAN, convertScanToString(scan));


            JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(conf,
                    TableInputFormat.class, ImmutableBytesWritable.class,
                    Result.class);

            JavaPairRDD<String, Integer> levels = hBaseRDD.mapToPair(
                    new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Integer>() {
                        @Override
                        public Tuple2<String, Integer> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
                            byte[] o = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("cf"), Bytes.toBytes("col_1"));
                            if (o != null) {
                                return new Tuple2<String, Integer>(new String(o), 1);
                            }
                            return null;
                        }
                    });

            JavaPairRDD<String, Integer> counts = levels.reduceByKey(
                    new Function2<Integer, Integer, Integer>() {
                        @Override
                        public Integer call(Integer i1, Integer i2) {
                            return i1 + i2;
                        }
                    });

            List<Tuple2<String, Integer>> output = counts.collect();
            for (Tuple2 tuple : output) {
                System.out.println(tuple._1() + ": " + tuple._2());
            }

            sc.stop();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new SparkFromHbase().start();
        System.exit(0);
    }
}

 

這樣本段代碼段是從Hbase表名為flight_wap_order_log的數據庫中讀取cf列簇上的airName一列的數據,這樣我們就可以對myRDD進行相應的操作:

System.out.println(myRDD.count());

本段代碼需要在pom.xml文件加入以下依賴:

<dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-core_2. 10 </artifactId>
         <version> 0.9 . 1 </version>
</dependency>
 
<dependency>
         <groupId>org.apache.hbase</groupId>
         <artifactId>hbase</artifactId>
         <version> 0.98 . 2 -hadoop2</version>
</dependency>
 
<dependency>
         <groupId>org.apache.hbase</groupId>
         <artifactId>hbase-client</artifactId>
         <version> 0.98 . 2 -hadoop2</version>
</dependency>
 
<dependency>
         <groupId>org.apache.hbase</groupId>
         <artifactId>hbase-common</artifactId>
         <version> 0.98 . 2 -hadoop2</version>
</dependency>
 
<dependency>
         <groupId>org.apache.hbase</groupId>
         <artifactId>hbase-server</artifactId>
         <version> 0.98 . 2 -hadoop2</version>
</dependency>

Scala版如下:

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 
/////////////////////////////////////////////////////////////////////
  User: 過往記憶
  Date: 14 - 6 - 29
  Time: 23 : 59
  bolg:
  本文地址:/archives/ 1051
  過往記憶博客,專注於hadoop、hive、spark、shark、flume的技術博客,大量的干貨
  過往記憶博客微信公共帳號:iteblog_hadoop
/////////////////////////////////////////////////////////////////////
 
object HBaseTest {
   def main(args: Array[String]) {
     val sc = new SparkContext(args( 0 ), "HBaseTest" ,
       System.getenv( "SPARK_HOME" ), SparkContext.jarOfClass( this .getClass))
 
     val conf = HBaseConfiguration.create()
     conf.set(TableInputFormat.INPUT_TABLE, args( 1 ))
 
     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
       classOf[org.apache.hadoop.hbase.client.Result])
 
     hBaseRDD.count()
 
     System.exit( 0 )
   }
}

我們需要在加入如下依賴:

libraryDependencies ++= Seq(
         "org.apache.spark" % "spark-core_2.10" % "0.9.1" ,
         "org.apache.hbase" % "hbase" % "0.98.2-hadoop2" ,
         "org.apache.hbase" % "hbase-client" % "0.98.2-hadoop2" ,
         "org.apache.hbase" % "hbase-common" % "0.98.2-hadoop2" ,
         "org.apache.hbase" % "hbase-server" % "0.98.2-hadoop2"
)

  在測試的時候,需要配置好Hbase、Hadoop環境,否則程序會出現問題,特別是讓程序找到Hbase-site.xml配置文件。

package com.iteblog.spark;
   
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.hbase.HBaseConfiguration;
  import org.apache.hadoop.hbase.client.Result;
  import org.apache.hadoop.hbase.client.Scan;
  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
  import org.apache.hadoop.hbase.util.Base64;
  import org.apache.hadoop.hbase.util.Bytes;
  import org.apache.spark.SparkConf;
  import org.apache.spark.api.java.JavaPairRDD;
  import org.apache.spark.api.java.JavaSparkContext;
  import org.apache.spark.api.java.function.Function2;
  import org.apache.spark.api.java.function.PairFunction;
  import scala.Serializable;
  import scala.Tuple2;
   
  import java.io.IOException;
  import java.util.List;
   
  /**
  * User: iteblog
  * Date: 14-6-27
  * Time: 下午5:18
  *blog: http://www.iteblog.com
  *
  * Usage: bin/spark-submit --master yarn-cluster --class com.iteblog.spark.SparkFromHbase
  * --jars /home/q/hbase/hbase-0.96.0-hadoop2/lib/htrace-core-2.01.jar,
  * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-common-0.96.0-hadoop2.jar,
  * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-client-0.96.0-hadoop2.jar,
  * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-protocol-0.96.0-hadoop2.jar,
  * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-server-0.96.0-hadoop2.jar
  * ./spark_2.10-1.0.jar
  */
  public class SparkFromHbase implements Serializable {
   
  /**
  * copy from org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil
  *
  * @param scan
  * @return
  * @throws IOException
  */
  String convertScanToString(Scan scan) throws IOException {
  ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
  return Base64.encodeBytes(proto.toByteArray());
  }
   
  public void start() {
  SparkConf sparkConf = new SparkConf();
  JavaSparkContext sc = new JavaSparkContext(sparkConf);
   
   
  Configuration conf = HBaseConfiguration.create();
   
  Scan scan = new Scan();
  //scan.setStartRow(Bytes.toBytes("195861-1035177490"));
  //scan.setStopRow(Bytes.toBytes("195861-1072173147"));
  scan.addFamily(Bytes.toBytes("cf"));
  scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col_1"));
   
  try {
   
  String tableName = "wyp";
  conf.set(TableInputFormat.INPUT_TABLE, tableName);
  conf.set(TableInputFormat.SCAN, convertScanToString(scan));
   
   
  JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(conf,
  TableInputFormat.class, ImmutableBytesWritable.class,
  Result.class);
   
  JavaPairRDD<String, Integer> levels = hBaseRDD.mapToPair(
  new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Integer>() {
  @Override
  public Tuple2<String, Integer> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception {
  byte[] o = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("cf"), Bytes.toBytes("col_1"));
  if (o != null) {
  return new Tuple2<String, Integer>(new String(o), 1);
  }
  return null;
  }
  });
   
  JavaPairRDD<String, Integer> counts = levels.reduceByKey(
  new Function2<Integer, Integer, Integer>() {
  @Override
  public Integer call(Integer i1, Integer i2) {
  return i1 + i2;
  }
  });
   
  List<Tuple2<String, Integer>> output = counts.collect();
  for (Tuple2 tuple : output) {
  System.out.println(tuple._1() + ": " + tuple._2());
  }
   
  sc.stop();
   
  } catch (Exception e) {
  e.printStackTrace();
  }
  }
   
  public static void main(String[] args) throws InterruptedException {
  new SparkFromHbase().start();
  System.exit(0);
  }
  }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM