在SparkStreaming中統計了數據之后,我們需要將結果寫入外部文件系統。
本文,以向Hbase中寫數據,為例,說一下,SparkStreaming怎么向Hbase中寫數據。
首先,需要說一下,下面的這個方法。
foreachRDD(func)
最通用的輸出操作,把func作用於從stream生成的每一個RDD。
注意:這個函數是在 運行streaming程序的driver進程 中執行的。
下面跟着思路,看一下,怎么優雅的向Hbase中寫入數據
向外部寫數據 常見的錯誤:
向外部數據庫寫數據,通常會建立連接,使用連接發送數據(也就是保存數據)。
開發者可能 在driver中創建連接,而在spark worker 中保存數據
例如:
dstream.foreachRDD { rdd => val connection = createNewConnection() // 這個會在driver中執行 rdd.foreach { record => connection.send(record) //這個會在 worker中執行 } }
上面這種寫法是錯誤的!上面的寫法,需要connection 對象被序列化,然后從driver發送到worker。
這樣的connection是很少在機器之間傳輸的。知道這個問題后,我們可以寫出以下的,修改后的代碼:
dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() connection.send(record) connection.close() } }
這種寫法也是不對的。這會導致,對於每條數據,都創建一個connection(創建connection是消耗資源的)。
下面的方法會好一些:
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }
上面的方法,使用 rdd.foreachPartition 創建一個connection 對象, 一個RDD分區中的所有數據,都使用這一個connection。
更優的方法
在多個RDD之間,connection對象是可以重用的,所以可以創建一個連接池。如下
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool是一個靜態的,延遲初始化的連接池 val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // 返回到池中 以便別人使用 } }
連接池中的連接應該是,應需求而延遲創建,並且,如果一段時間沒用,就超時了(也就是關閉該連接)