常出現的使用誤區:
誤區一:在driver上創建連接對象(比如網絡連接或數據庫連接)
如果在driver上創建連接對象,然后在RDD的算子函數內使用連接對象,那么就意味着需要將連接對象序列化后從driver傳遞到worker上。而連接對象(比如Connection對象)通常來說是不支持序列化的,此時通常會報序列化的異常(serialization errors)。因此連接對象必須在worker上創建,不要在driver上創建。
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // 數據庫連接在driver上執行
rdd.foreach { record => connection.send(record) // 在worker上執行 } }
誤區二:為每一條記錄都創建一個連接對象
通常來說,連接對象的創建和銷毀都是很消耗時間的。因此頻繁地創建和銷毀連接對象,可能會導致降低spark作業的整體性能和吞吐量。
dstream.foreachRDD { rdd =>
rdd.foreach { record => val connection = createNewConnection() //每插入一條數據,創建一個連接 connection.send(record) connection.close() } }
比較正確的做法是:對DStream中的RDD,調用foreachPartition,對RDD中每個分區創建一個連接對象,使用一個連接對象將一個分區內的數據都寫入數據庫中。這樣可以大大減少創建的連接對象的數量。
正確做法一:為每個RDD分區創建一個連接對象
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }
正確做法二:為每個RDD分區使用一個連接池中的連接對象
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// 從數據庫連接池中獲取連接
val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // 用完以后將連接返 回給連接池,進行復用 } }