sparkStreaming向hbase寫數據


在SparkStreaming中統計了數據之后,我們需要將結果寫入外部文件系統。

本文,以向Hbase中寫數據,為例,說一下,SparkStreaming怎么向Hbase中寫數據。

首先,需要說一下,下面的這個方法。

foreachRDD(func)

最通用的輸出操作,把func作用於從stream生成的每一個RDD。

注意:這個函數是在 運行streaming程序的driver進程 中執行的。

下面跟着思路,看一下,怎么優雅的向Hbase中寫入數據

向外部寫數據 常見的錯誤:

向外部數據庫寫數據,通常會建立連接,使用連接發送數據(也就是保存數據)。

開發者可能 在driver中創建連接,而在spark worker 中保存數據

例如:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // 這個會在driver中執行
  rdd.foreach { record =>
    connection.send(record) //這個會在 worker中執行
  }
}

上面這種寫法是錯誤的!上面的寫法,需要connection 對象被序列化,然后從driver發送到worker。

這樣的connection是很少在機器之間傳輸的。知道這個問題后,我們可以寫出以下的,修改后的代碼:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

這種寫法也是不對的。這會導致,對於每條數據,都創建一個connection(創建connection是消耗資源的)。

下面的方法會好一些:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

上面的方法,使用 rdd.foreachPartition 創建一個connection 對象, 一個RDD分區中的所有數據,都使用這一個connection。

更優的方法

在多個RDD之間,connection對象是可以重用的,所以可以創建一個連接池。如下

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool是一個靜態的,延遲初始化的連接池
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // 返回到池中 以便別人使用  }
}

連接池中的連接應該是,應需求而延遲創建,並且,如果一段時間沒用,就超時了(也就是關閉該連接)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM