大家可能都知道很熟悉Spark的兩種常見的數據讀取方式(存放到RDD中):(1)、調用parallelize函數直接從集合中獲取數據,並存入RDD中;Java版本如下:
JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(
1
,
2
,
3
));
|
Scala版本如下:
val myRDD= sc.parallelize(List(
1
,
2
,
3
))
|
這種方式很簡單,很容易就可以將一個集合中的數據變成RDD的初始化值;更常見的是(2)、從文本中讀取數據到RDD中,這個文本可以是純文本文件、可以是sequence文件;可以存放在本地(file://)、可以存放在HDFS(hdfs://)上,還可以存放在S3上。其實對文件來說,Spark支持Hadoop所支持的所有文件類型和文件存放位置。Java版如下:
/////////////////////////////////////////////////////////////////////
User: 過往記憶
Date:
14
-
6
-
29
Time:
23
:
59
bolg:
本文地址:/archives/
1051
過往記憶博客,專注於hadoop、hive、spark、shark、flume的技術博客,大量的干貨
過往記憶博客微信公共帳號:iteblog_hadoop
/////////////////////////////////////////////////////////////////////
import
org.apache.spark.SparkConf;
import
org.apache.spark.api.java.JavaRDD;
import
org.apache.spark.api.java.JavaSparkContext;
SparkConf conf =
new
SparkConf().setAppName(
"Simple Application"
);
JavaSparkContext sc =
new
JavaSparkContext(conf);
sc.addFile(
"wyp.data"
);
JavaRDD<String> lines = sc.textFile(SparkFiles.get(
"wyp.data"
));
|
Scala版本如下:
import
org.apache.spark.SparkContext
import
org.apache.spark.SparkConf
val conf =
new
SparkConf().setAppName(
"Simple Application"
)
val sc =
new
SparkContext(conf)
sc.addFile(
"spam.data"
)
val inFile = sc.textFile(SparkFiles.get(
"spam.data"
))
|
在實際情況下,我們需要的數據可能不是簡單的存放在HDFS文本中,我們需要的數據可能就存放在Hbase中,那么我們如何用Spark來讀取Hbase中的數據呢?本文的所有測試是基於Hadoop 2.2.0、Hbase 0.98.2、Spark 0.9.1,不同版本可能代碼的編寫有點不同。本文只是簡單地用Spark來讀取Hbase中的數據,如果需要對Hbase進行更強的操作,本文可能不能幫你。話不多說,Spark操作Hbase的Java版本代碼如下:
package com.iteblog.spark; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import scala.Serializable; import scala.Tuple2; import java.io.IOException; import java.util.List; /** * User: iteblog * Date: 14-6-27 * Time: 下午5:18 *blog: http://www.iteblog.com * * Usage: bin/spark-submit --master yarn-cluster --class com.iteblog.spark.SparkFromHbase * --jars /home/q/hbase/hbase-0.96.0-hadoop2/lib/htrace-core-2.01.jar, * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-common-0.96.0-hadoop2.jar, * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-client-0.96.0-hadoop2.jar, * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-protocol-0.96.0-hadoop2.jar, * /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-server-0.96.0-hadoop2.jar * ./spark_2.10-1.0.jar */ public class SparkFromHbase implements Serializable { /** * copy from org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil * * @param scan * @return * @throws IOException */ String convertScanToString(Scan scan) throws IOException { ClientProtos.Scan proto = ProtobufUtil.toScan(scan); return Base64.encodeBytes(proto.toByteArray()); } public void start() { SparkConf sparkConf = new SparkConf(); JavaSparkContext sc = new JavaSparkContext(sparkConf); Configuration conf = HBaseConfiguration.create(); Scan scan = new Scan(); //scan.setStartRow(Bytes.toBytes("195861-1035177490")); //scan.setStopRow(Bytes.toBytes("195861-1072173147")); scan.addFamily(Bytes.toBytes("cf")); scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col_1")); try { String tableName = "wyp"; conf.set(TableInputFormat.INPUT_TABLE, tableName); conf.set(TableInputFormat.SCAN, convertScanToString(scan)); JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); JavaPairRDD<String, Integer> levels = hBaseRDD.mapToPair( new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception { byte[] o = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("cf"), Bytes.toBytes("col_1")); if (o != null) { return new Tuple2<String, Integer>(new String(o), 1); } return null; } }); JavaPairRDD<String, Integer> counts = levels.reduceByKey( new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2 tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); } sc.stop(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws InterruptedException { new SparkFromHbase().start(); System.exit(0); } }
這樣本段代碼段是從Hbase表名為flight_wap_order_log的數據庫中讀取cf列簇上的airName一列的數據,這樣我們就可以對myRDD進行相應的操作:
System.out.println(myRDD.count());
|
本段代碼需要在pom.xml文件加入以下依賴:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.
10
</artifactId>
<version>
0.9
.
1
</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>
0.98
.
2
-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>
0.98
.
2
-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>
0.98
.
2
-hadoop2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>
0.98
.
2
-hadoop2</version>
</dependency>
|
Scala版如下:
import
org.apache.spark._
import
org.apache.spark.rdd.NewHadoopRDD
import
org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import
org.apache.hadoop.hbase.client.HBaseAdmin
import
org.apache.hadoop.hbase.mapreduce.TableInputFormat
/////////////////////////////////////////////////////////////////////
User: 過往記憶
Date:
14
-
6
-
29
Time:
23
:
59
bolg:
本文地址:/archives/
1051
過往記憶博客,專注於hadoop、hive、spark、shark、flume的技術博客,大量的干貨
過往記憶博客微信公共帳號:iteblog_hadoop
/////////////////////////////////////////////////////////////////////
object HBaseTest {
def main(args: Array[String]) {
val sc =
new
SparkContext(args(
0
),
"HBaseTest"
,
System.getenv(
"SPARK_HOME"
), SparkContext.jarOfClass(
this
.getClass))
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, args(
1
))
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()
System.exit(
0
)
}
}
|
我們需要在加入如下依賴:
libraryDependencies ++= Seq(
"org.apache.spark"
%
"spark-core_2.10"
%
"0.9.1"
,
"org.apache.hbase"
%
"hbase"
%
"0.98.2-hadoop2"
,
"org.apache.hbase"
%
"hbase-client"
%
"0.98.2-hadoop2"
,
"org.apache.hbase"
%
"hbase-common"
%
"0.98.2-hadoop2"
,
"org.apache.hbase"
%
"hbase-server"
%
"0.98.2-hadoop2"
)
|
在測試的時候,需要配置好Hbase、Hadoop環境,否則程序會出現問題,特別是讓程序找到Hbase-site.xml配置文件。
package com.iteblog.spark; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.client.Scan; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.hbase.mapreduce.TableInputFormat; | |
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; | |
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; | |
import org.apache.hadoop.hbase.util.Base64; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.Function2; | |
import org.apache.spark.api.java.function.PairFunction; | |
import scala.Serializable; | |
import scala.Tuple2; | |
import java.io.IOException; | |
import java.util.List; | |
/** | |
* User: iteblog | |
* Date: 14-6-27 | |
* Time: 下午5:18 | |
*blog: http://www.iteblog.com | |
* | |
* Usage: bin/spark-submit --master yarn-cluster --class com.iteblog.spark.SparkFromHbase | |
* --jars /home/q/hbase/hbase-0.96.0-hadoop2/lib/htrace-core-2.01.jar, | |
* /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-common-0.96.0-hadoop2.jar, | |
* /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-client-0.96.0-hadoop2.jar, | |
* /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-protocol-0.96.0-hadoop2.jar, | |
* /home/q/hbase/hbase-0.96.0-hadoop2/lib/hbase-server-0.96.0-hadoop2.jar | |
* ./spark_2.10-1.0.jar | |
*/ | |
public class SparkFromHbase implements Serializable { | |
/** | |
* copy from org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil | |
* | |
* @param scan | |
* @return | |
* @throws IOException | |
*/ | |
String convertScanToString(Scan scan) throws IOException { | |
ClientProtos.Scan proto = ProtobufUtil.toScan(scan); | |
return Base64.encodeBytes(proto.toByteArray()); | |
} | |
public void start() { | |
SparkConf sparkConf = new SparkConf(); | |
JavaSparkContext sc = new JavaSparkContext(sparkConf); | |
Configuration conf = HBaseConfiguration.create(); | |
Scan scan = new Scan(); | |
//scan.setStartRow(Bytes.toBytes("195861-1035177490")); | |
//scan.setStopRow(Bytes.toBytes("195861-1072173147")); | |
scan.addFamily(Bytes.toBytes("cf")); | |
scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("col_1")); | |
try { | |
String tableName = "wyp"; | |
conf.set(TableInputFormat.INPUT_TABLE, tableName); | |
conf.set(TableInputFormat.SCAN, convertScanToString(scan)); | |
JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = sc.newAPIHadoopRDD(conf, | |
TableInputFormat.class, ImmutableBytesWritable.class, | |
Result.class); | |
JavaPairRDD<String, Integer> levels = hBaseRDD.mapToPair( | |
new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, Integer>() { | |
@Override | |
public Tuple2<String, Integer> call(Tuple2<ImmutableBytesWritable, Result> immutableBytesWritableResultTuple2) throws Exception { | |
byte[] o = immutableBytesWritableResultTuple2._2().getValue(Bytes.toBytes("cf"), Bytes.toBytes("col_1")); | |
if (o != null) { | |
return new Tuple2<String, Integer>(new String(o), 1); | |
} | |
return null; | |
} | |
}); | |
JavaPairRDD<String, Integer> counts = levels.reduceByKey( | |
new Function2<Integer, Integer, Integer>() { | |
@Override | |
public Integer call(Integer i1, Integer i2) { | |
return i1 + i2; | |
} | |
}); | |
List<Tuple2<String, Integer>> output = counts.collect(); | |
for (Tuple2 tuple : output) { | |
System.out.println(tuple._1() + ": " + tuple._2()); | |
} | |
sc.stop(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
public static void main(String[] args) throws InterruptedException { | |
new SparkFromHbase().start(); | |
System.exit(0); | |
} | |
} |