MapReduce和Spark寫入Hbase多表總結


作者:Syn良子 出處:http://www.cnblogs.com/cssdongl 轉載請注明出處

大家都知道用mapreduce或者spark寫入已知的hbase中的表時,直接在mapreduce或者spark的driver class中聲明如下代碼

job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tablename);

隨后mapreduce在mapper或者reducer中直接context寫入即可,而spark則是構造好包含Put的PairRDDFunctions后saveAsHadoopDataset即可.

而經常會碰到一些要求是根據輸入數據,處理后需要寫入hbase多個表或者表名是未知的,需要按照數據中某個字段來構造表名寫入hbase.

由於表名未知,所以不能設置TableOutputFormat.OUTPUT_TABLE,那么這種要求也容易實現,分別總結mapreduce和spark的實現方法(其實到最后會發現殊途同歸)

一.MapReduce寫入Hbase多表

在MR的main方法中加入如下代碼即可

job.setOutputFormatClass(MultiTableOutputFormat.class);

隨后就可以在mapper或者reducer的context中根據相關字段構造表名和put寫入多個hbase表.

二.Spark寫入Hbase多表

這里直接用我測試過的spark streaming程序寫入多個hbase表,上代碼

object SparkStreamingWriteToHbase {
  def main(args: Array[String]): Unit = {
    var masterUrl = "yarn-client"
    if (args.length > 0) {
      masterUrl = args(0)
    }
    val conf = new SparkConf().setAppName("Write to several tables of Hbase").setMaster(masterUrl)

    val ssc = new StreamingContext(conf, Seconds(5))

    val topics = Set("app_events")

    val brokers = PropertiesUtil.getValue("BROKER_ADDRESS")

    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

    val hbaseTableSuffix = "_clickcounts"

    val hConf = HBaseConfiguration.create()
    val zookeeper = PropertiesUtil.getValue("ZOOKEEPER_ADDRESS")
    hConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeper)

    val jobConf = new JobConf(hConf, this.getClass)

    val kafkaDStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    val appUserClicks = kafkaDStreams.flatMap(rdd => {
      val data = JSONObject.fromObject(rdd._2)
      Some(data)
    }).map{jsonLine =>
        val key = jsonLine.getString("appId") + "_" + jsonLine.getString("uid")
        val value = jsonLine.getString("click_count")
        (key, value)
    }

    val result = appUserClicks.map { item =>
      val rowKey = item._1
      val value = item._2
      convertToHbasePut(rowKey, value, hbaseTableSuffix)
    }

    result.foreachRDD { rdd =>
 rdd.saveAsNewAPIHadoopFile("", classOf[ImmutableBytesWritable], classOf[Put], classOf[MultiTableOutputFormat], jobConf) }

    ssc.start()
    ssc.awaitTermination()
  }

  def convertToHbasePut(key: String, value: String, tableNameSuffix: String): (ImmutableBytesWritable, Put) = {
    val rowKey = key
    val tableName = rowKey.split("_")(0) + tableNameSuffix
    val put = new Put(Bytes.toBytes(rowKey))
    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(value))
    (new ImmutableBytesWritable(Bytes.toBytes(tableName)), put)
  }

}

簡單描述下,這里spark streaming中處理的是從kafka中讀取的json數據,其中的appId字段用來構造tablename區分寫入不同的hbase table.最后以saveAsNewAPIHadoopFile把rdd寫入hbase表

進入saveAsNewAPIHadoopFile會發現其實和mapreduce的配置沒什么區別,如下

def saveAsNewAPIHadoopFile(
      path: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
      conf: Configuration = self.context.hadoopConfiguration)
  {
    // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
    val hadoopConf = conf
    val job = new NewAPIHadoopJob(hadoopConf)
    job.setOutputKeyClass(keyClass)
    job.setOutputValueClass(valueClass)
    job.setOutputFormatClass(outputFormatClass)
    job.getConfiguration.set("mapred.output.dir", path)
    saveAsNewAPIHadoopDataset(job.getConfiguration)
  }

這個方法的參數分別是ouput path,這里寫入hbase,傳入為空即可,其他參數outputKeyClass,outputValueClass,outputFormatClass,jobconf

這里的outputFormatClass確保一定是MultiTableOutputFormat來保證寫入多表,對了,這里說明一點,確保你要寫入的hbase表首先被create了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM