Spark讀HBase多表組成一個RDD


環境:Spark-1.5.0 HBase-1.0.0。

場景:HBase中按天分表存數據,要求將任意時間段的數據合並成一個RDD以做后續計算。

嘗試1: 尋找一次讀取多個表的API,找到最接近的是一個叫MultiTableInputFormat的東西,它在MapReduce中使用良好,

  但沒有找到用於RDD讀HBase的方法。

嘗試2: 每個表生成一個RDD,再用union合並,代碼邏輯如下:

var totalRDD = xxx  // 讀取第一張表
for {  // 循環讀表並合並到totalRDD
  val sRDD = xxx
  totalRDD.union(sRDD) }

代碼放到集群上執行,totalRDD並不是正確的union結果,用var還真是不行。

嘗試3: 思路類似2,但使用SparkContext.union來一次合並多個RDD,代碼邏輯如下:

var rddSet: xxx = Set()  // 創建RDD列表
dateSet.foreach(date => {  // 將所有表的RDD放入列表中
    val sRDD = xxx
    rddSet += sRDD
}
val totalRDD = sc.union(rddSet.toSeq)  // 合並列表中的所有RDD

完整代碼如下:

import java.text.SimpleDateFormat
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import scala.collection.mutable.Set

/**
  * 時間處理類
  */
object Htime {
  /**
    * 根據起止日期獲取日期列表
    * 例如起止時間為20160118,20160120,那么日期列表為(20160118,20160119,20160120)
    *
    * @param sDate 開始日期
    * @param eDate 結束日期
    * @return 日期列表
    */
  def getDateSet(sDate:String, eDate:String): Set[String] = {
    // 定義要生成的日期列表
    var dateSet: Set[String] = Set()

    // 定義日期格式
    val sdf = new SimpleDateFormat("yyyyMMdd")

    // 按照上邊定義的日期格式將起止時間轉化成毫秒數
    val sDate_ms = sdf.parse(sDate).getTime
    val eDate_ms = sdf.parse(eDate).getTime

    // 計算一天的毫秒數用於后續迭代
    val day_ms = 24*60*60*1000

    // 循環生成日期列表
    var tm = sDate_ms
    while (tm <= eDate_ms) {
      val dateStr = sdf.format(tm)
      dateSet += dateStr
      tm = tm + day_ms
    }

    // 日期列表作為返回
    dateSet
  }
}

/**
  * 從HBase中讀取行為數據計算人群分類
  */
object Classify {
  /**
    * @param args 命令行參數,第一個參數為行為數據開始日期,第二個為結束日期,例如20160118
    */
  def main(args: Array[String]) {
    // 命令行參數個數必須為2
    if (args.length != 2) {
      System.err.println("參數個數錯誤")
      System.err.println("Usage: Classify <開始日期> <結束日期>")
      System.exit(1)
    }

    // 獲取命令行參數中的行為數據起止日期
    val startDate = args(0)
    val endDate   = args(1)

    // 根據起止日志獲取日期列表
    // 例如起止時間為20160118,20160120,那么日期列表為(20160118,20160119,20160120)
    val dateSet = Htime.getDateSet(startDate, endDate)

    // Spark上下文
    val sparkConf = new SparkConf().setAppName("Classify")
    val sc = new SparkContext(sparkConf)

    // 初始化HBase配置
    val conf = HBaseConfiguration.create()

    // 按照日期列表讀出多個RDD存在一個Set中,再用SparkContext.union()合並成一個RDD
    var rddSet: Set[RDD[(ImmutableBytesWritable, Result)] ] = Set()
    dateSet.foreach(date => {
      conf.set(TableInputFormat.INPUT_TABLE, "behaviour_test_" + date) // 設置表名
      val bRdd: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
        classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
        classOf[org.apache.hadoop.hbase.client.Result])
      rddSet += bRdd
    })
    
    val behavRdd = sc.union(rddSet.toSeq)
    
    behavRdd.collect().foreach(println)
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM