Spark中foreachRDD的正確使用


常出現的使用誤區:


誤區一:在driver上創建連接對象(比如網絡連接或數據庫連接)
    如果在driver上創建連接對象,然后在RDD的算子函數內使用連接對象,那么就意味着需要將連接對象序列化后從driver傳遞到worker上。而連接對象(比如Connection對象)通常來說是不支持序列化的,此時通常會報序列化的異常(serialization errors)。因此連接對象必須在worker上創建,不要在driver上創建。

dstream.foreachRDD { rdd =>
  val connection = createNewConnection() // 數據庫連接在driver上執行
  rdd.foreach { record =>   connection.send(record) // 在worker上執行   } }

 

誤區二:為每一條記錄都創建一個連接對象
    通常來說,連接對象的創建和銷毀都是很消耗時間的。因此頻繁地創建和銷毀連接對象,可能會導致降低spark作業的整體性能和吞吐量。

dstream.foreachRDD { rdd =>
rdd.foreach { record => val connection = createNewConnection() //每插入一條數據,創建一個連接  connection.send(record) connection.close() } }

  比較正確的做法是:對DStream中的RDD,調用foreachPartition,對RDD中每個分區創建一個連接對象,使用一個連接對象將一個分區內的數據都寫入數據庫中。這樣可以大大減少創建的連接對象的數量。

正確做法一:為每個RDD分區創建一個連接對象

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }

 

正確做法二:為每個RDD分區使用一個連接池中的連接對象

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
    // 從數據庫連接池中獲取連接
    val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // 用完以后將連接返 回給連接池,進行復用 } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM