SparkStream:4)foreachRDD詳解


轉載自:http://blog.csdn.net/jiangpeng59/article/details/53318761

 

foreachRDD通常用來把SparkStream運行得到的結果保存到外部系統比如HDFS、Mysql、Redis等等。了解下面的知識可以幫助我們避免很多誤區

 

誤區1:實例化外部連接對象的位置不正確,比如下面代碼

 
           
  1. dstream.foreachRDD { rdd =>  
  2.   val connection = createNewConnection()  // executed at the driver  
  3.   rdd.foreach { record =>  
  4.     connection.send(record) // executed at the worker  
  5.   }  
  6. }  
其實例化的連接對象在driver中,然后通過序列化的方式發送到各個Worker,但實際上Connection的序列化通常是無法正確序列化的

 

誤區2:為每條記錄都創建一個連接對象

 
           
  1. dstream.foreachRDD { rdd =>  
  2.   rdd.foreach { record =>  
  3.     val connection = createNewConnection()  
  4.     connection.send(record)  
  5.     connection.close()  
  6.   }  
  7. }  
雖然誤區1的問題得到了解決,但通常情況下,外部系統如mysql,其連接對象是非常可貴的,如果一條記錄就申請一個連接資源,系統性能會非常糟糕

 

然后,給出了一個比較好的方法,為每一個分區創建一個連接對象,其具體代碼如下

 
 
           
  1. dstream.foreachRDD { rdd =>  
  2.   rdd.foreachPartition { partitionOfRecords =>  
  3.     val connection = createNewConnection()  
  4.     partitionOfRecords.foreach(record => connection.send(record))  
  5.     connection.close()  
  6.   }  
  7. }  
最后給出一個較優的方案,使用一個連接池來維護連接對象
 
 
           
  1. dstream.foreachRDD { rdd =>  
  2.   rdd.foreachPartition { partitionOfRecords =>  
  3.     // ConnectionPool is a static, lazily initialized pool of connections  
  4.     val connection = ConnectionPool.getConnection()  
  5.     partitionOfRecords.foreach(record => connection.send(record))  
  6.     ConnectionPool.returnConnection(connection)  // return to the pool for future reuse  
  7.   }  
  8. }  
正如上面代碼闡述的,連接對象推薦是使用lazy關鍵字來修飾,用到的時候才去實例化

 

下面給出網上一段把SparkStream的結果保存到Mysql中的代碼示例

 
           
  1. package spark.examples.streaming  
  2.   
  3. import java.sql.{PreparedStatement, Connection, DriverManager}  
  4. import java.util.concurrent.atomic.AtomicInteger  
  5.   
  6. import org.apache.spark.SparkConf  
  7. import org.apache.spark.streaming.{Seconds, StreamingContext}  
  8. import org.apache.spark.streaming._  
  9. import org.apache.spark.streaming.StreamingContext._  
  10.   
  11. object SparkStreamingForPartition {  
  12.   def main(args: Array[String]) {  
  13.     val conf = new SparkConf().setAppName("NetCatWordCount")  
  14.     conf.setMaster("local[3]")  
  15.     val ssc = new StreamingContext(conf, Seconds(5))  
  16.     //The DStream is a collection of RDD, which makes the method foreachRDD reasonable  
  17.     val dstream = ssc.socketTextStream("192.168.26.140", 9999)  
  18.     dstream.foreachRDD(rdd => {  
  19.       //embedded function  
  20.       def func(records: Iterator[String]) {  
  21.         var conn: Connection = null  
  22.         var stmt: PreparedStatement = null  
  23.         try {  
  24.           val url = "jdbc:mysql://192.168.26.140:3306/person";  
  25.           val user = "root";  
  26.           val password = ""  
  27.           conn = DriverManager.getConnection(url, user, password)  
  28.           records.flatMap(_.split(" ")).foreach(word => {  
  29.             val sql = "insert into TBL_WORDS(word) values (?)";  
  30.             stmt = conn.prepareStatement(sql);  
  31.             stmt.setString(1, word)  
  32.             stmt.executeUpdate();  
  33.           })  
  34.         } catch {  
  35.           case e: Exception => e.printStackTrace()  
  36.         } finally {  
  37.           if (stmt != null) {  
  38.             stmt.close()  
  39.           }  
  40.           if (conn != null) {  
  41.             conn.close()  
  42.           }  
  43.         }  
  44.       }  
  45.       val repartitionedRDD = rdd.repartition(3)  
  46.       repartitionedRDD.foreachPartition(func)  
  47.     })  
  48.     ssc.start()  
  49.     ssc.awaitTermination()  
  50.   }  
  51. }  

注意的細節:

Dstream和RDD一樣是延遲執行,只有遇到action操作才會真正去計算。因此在Dstream的內部RDD必須包含Action操作才能是接受到的數據得到處理。即使代碼中包含foreachRDD,但在內部卻沒有action的RDD,SparkStream只會簡單地接受數據數據而不進行處理

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM