作者:Syn良子 出處:http://www.cnblogs.com/cssdongl/p/7449682.html 轉載請注明出處
最近在折騰pyspark的HbaseConverters,由於資料太少折騰了好一會兒才明白,特此分享給大家.
問題背景
最近在使用pyspark寫hbase的過程中發現,會報以下類似的錯誤
這是由於最終計算結果存入hbase的時候pyspark無法找到相關的converter造成的.啥?你問為啥要找converter,這是因為Java和Scala都可以支持組裝Hbase的Put然后存入Hbase,但是Python這塊兒spark API是后媽生的,暫時還不能直接支持,所以需要轉換.
問題詳解
這個HBaseConverters模塊位於spark-examples*.jar包下,據我挨個檢查,spark1.4和spark1.6都有這個模塊,而spark.2.x已經沒有了,但是spark2.x上我測試了可以使用1.6的spark-examples_2.10-1.6.3.jar中的HBaseConverters來讀寫Hbase,完全沒有問題.而Spark1.4和Spark1.6的HBaseConverters我推薦用后者,理由如下.
Spark1.4的HBaseConverters模塊
這是一個scala文件,里面有4個class,細心觀察的同學會發現,上面2個是讀取Hbase用,下面2個是寫入用,它們都繼承了
org.apache.spark.api.python.Converter類,這個是pyspark的API,最終會調用四個子類來進行相應的讀寫.
Spark1.6的HBaseConverters模塊
恩,仔細觀察2個截圖中的代碼,唯一的區別在於第一個HBaseResultToStringConverter這個讀取Hbase的Result轉換類.
對,我想強調的就是這個轉換類。Spark1.4的這個轉換類仔細看代碼
class HBaseResultToStringConverter extends Converter[Any, String] {
override def convert(obj: Any): String = {
val result = obj.asInstanceOf[Result]
Bytes.toStringBinary(result.value())
}
}
這個讀取hbase的轉換類得到Result后,最終返回的只是result.value()也就是列值.
然后觀察spark1.6的這個轉換類
class HBaseResultToStringConverter extends Converter[Any, String] {
override def convert(obj: Any): String = {
val result = obj.asInstanceOf[Result]
val output = result.listCells.asScala.map(cell =>
Map(
"row" -> Bytes.toStringBinary(CellUtil.cloneRow(cell)),
"columnFamily" -> Bytes.toStringBinary(CellUtil.cloneFamily(cell)),
"qualifier" -> Bytes.toStringBinary(CellUtil.cloneQualifier(cell)),
"timestamp" -> cell.getTimestamp.toString,
"type" -> Type.codeToType(cell.getTypeByte).toString,
"value" -> Bytes.toStringBinary(CellUtil.cloneValue(cell))
)
)
output.map(JSONObject(_).toString()).mkString("\n")
}
}
恩,注意中間的Map部,這個轉換類首先將Result的各個部分讀取出來(不止是value)封裝成map,然后轉換成Json字符串返回.
區別很明顯了,明顯1.6這個轉換類更詳細,我們能從中得到更多的內容.
問題總結
理解了上述HbaseConverters的本質以后,我們就能愉快的利用pyspark來快速讀寫hbase,要點如下
- pyspark讀取hbase時,定義好keyconverter和valueconverter及hbaseconf很關鍵,不會配置的可以參考我上篇文章
- pyspark寫入hbase時,同上定義好這幾個配置,而最終需要保存到hbase的rdd,需要構造為(tablename,[rowkey,column_fm,columnname,columnvalue])這種元組字符串的格式,然后才可以成功的保存到hbase.
- pyspark的HbaeConverters所在jar包需要加入spark的classpath中去(或者spark-submmit提交時跟參數--jars將具體jar包include進去),可以參考我上篇文章最后的spark classpath配置.
本來想直接貼調試過的代碼的,但是總覺得MarkDown格式的代碼引用格式不好用,還是直接截圖吧,大家感受下就好