Pyspark的HBaseConverters詳解


作者:Syn良子 出處:http://www.cnblogs.com/cssdongl/p/7449682.html 轉載請注明出處
最近在折騰pyspark的HbaseConverters,由於資料太少折騰了好一會兒才明白,特此分享給大家.


問題背景

最近在使用pyspark寫hbase的過程中發現,會報以下類似的錯誤

這是由於最終計算結果存入hbase的時候pyspark無法找到相關的converter造成的.啥?你問為啥要找converter,這是因為Java和Scala都可以支持組裝Hbase的Put然后存入Hbase,但是Python這塊兒spark API是后媽生的,暫時還不能直接支持,所以需要轉換.

問題詳解

這個HBaseConverters模塊位於spark-examples*.jar包下,據我挨個檢查,spark1.4和spark1.6都有這個模塊,而spark.2.x已經沒有了,但是spark2.x上我測試了可以使用1.6的spark-examples_2.10-1.6.3.jar中的HBaseConverters來讀寫Hbase,完全沒有問題.而Spark1.4和Spark1.6的HBaseConverters我推薦用后者,理由如下.

Spark1.4的HBaseConverters模塊

這是一個scala文件,里面有4個class,細心觀察的同學會發現,上面2個是讀取Hbase用,下面2個是寫入用,它們都繼承了
org.apache.spark.api.python.Converter類,這個是pyspark的API,最終會調用四個子類來進行相應的讀寫.

Spark1.6的HBaseConverters模塊

恩,仔細觀察2個截圖中的代碼,唯一的區別在於第一個HBaseResultToStringConverter這個讀取Hbase的Result轉換類.
對,我想強調的就是這個轉換類。Spark1.4的這個轉換類仔細看代碼

class HBaseResultToStringConverter extends Converter[Any, String] {
override def convert(obj: Any): String = {
val result = obj.asInstanceOf[Result]
Bytes.toStringBinary(result.value())
}
}

這個讀取hbase的轉換類得到Result后,最終返回的只是result.value()也就是列值.

然后觀察spark1.6的這個轉換類

class HBaseResultToStringConverter extends Converter[Any, String] {
override def convert(obj: Any): String = {
val result = obj.asInstanceOf[Result]
val output = result.listCells.asScala.map(cell =>
Map(
"row" -> Bytes.toStringBinary(CellUtil.cloneRow(cell)),
"columnFamily" -> Bytes.toStringBinary(CellUtil.cloneFamily(cell)),
"qualifier" -> Bytes.toStringBinary(CellUtil.cloneQualifier(cell)),
"timestamp" -> cell.getTimestamp.toString,
"type" -> Type.codeToType(cell.getTypeByte).toString,
"value" -> Bytes.toStringBinary(CellUtil.cloneValue(cell))
)
)
output.map(JSONObject(_).toString()).mkString("\n")
}
}

恩,注意中間的Map部,這個轉換類首先將Result的各個部分讀取出來(不止是value)封裝成map,然后轉換成Json字符串返回.
區別很明顯了,明顯1.6這個轉換類更詳細,我們能從中得到更多的內容.

問題總結

理解了上述HbaseConverters的本質以后,我們就能愉快的利用pyspark來快速讀寫hbase,要點如下

  • pyspark讀取hbase時,定義好keyconverter和valueconverter及hbaseconf很關鍵,不會配置的可以參考我上篇文章
  • pyspark寫入hbase時,同上定義好這幾個配置,而最終需要保存到hbase的rdd,需要構造為(tablename,[rowkey,column_fm,columnname,columnvalue])這種元組字符串的格式,然后才可以成功的保存到hbase.
  • pyspark的HbaeConverters所在jar包需要加入spark的classpath中去(或者spark-submmit提交時跟參數--jars將具體jar包include進去),可以參考我上篇文章最后的spark classpath配置.

本來想直接貼調試過的代碼的,但是總覺得MarkDown格式的代碼引用格式不好用,還是直接截圖吧,大家感受下就好


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM