使用spark DStream的foreachRDD時要注意哪些坑?


答案: 兩個坑, 性能坑和線程坑

DStream是抽象類,它把連續的數據流拆成很多的小RDD數據塊, 這叫做“微批次”, spark的流式處理, 都是“微批次處理”。 DStream內部實現上有批次處理時間間隔,滑動窗口等機制來保證每個微批次的時間間隔里, 數據流以RDD的形式發送給spark做進一步處理。因此, 在一個為批次的處理時間間隔里, DStream只產生一個RDD。 

可以利用dstream.foreachRDD把數據發送給外部系統。 但是想要正確地, 有效率的使用它, 必須理解一下背后的機制。通常向外部系統寫數據需要一個Connection對象(通過它與外部服務器交互)。程序員可能會想當然地在spark上創建一個connection對象, 然后在spark線程里用這個對象來存RDD。比如下面的程序:

dstream.foreachRDD { rdd => val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker } }

這個代碼會產生執行錯誤, 因為rdd是分布式存儲的,它是一個數據結構,它是一組指向集群數據的指針, rdd.foreach會在集群里的不同機器上創建spark工作線程, 而connection對象則不會在集群里的各個機器之間傳遞, 所以有些spark工作線程就會產生connection對象沒有被初始化的執行錯誤。 解決的辦法可以是在spark worker里為每一個worker創建一個connection對象, 但是如果你這么做, 程序要為每一條record創建一次connection,顯然效率和性能都非常差。

另一種改進方法是為每個spark分區創建一個connection對象,同時維護一個全局的靜態的連接遲對象, 這樣就可以最好的復用connection。 另外需要注意: 雖然有多個connection對象, 但在同一時間只有一個connection.send(record)執行, 因為在同一個時間里, 只有 一個微批次的RDD產生出來。

dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }

有人問了個問題,為什么foreachRDD里有兩層嵌套的foreach? 為什么dstream.foreachRDD里還要再套一層rdd.foreach

可以這么理解, DStream.foreachRDD 是一個輸出操作符,它返回的不是RDD里的一行數據, 而是輸出DStream后面的RDD,在一個時間間隔里, 只返回一個RDD的“微批次”, 為了訪問這個“微批次”RDD里的數據, 我們還需要在RDD數據對象上做進一步操作.。 參考下面的代碼實例, 更容易理解。

給頂一個 RDD [Security, Prices]數據結構

     dstream.foreachRDD { pricesRDD =>  // Loop over RDD

       val x= pricesRDD.count

       if (x > 0)  // RDD has data

       {

         for(line <- pricesRDD.collect.toArray) // Look for each record in the RDD

         {

           var index = line._2.split(',').view(0).toInt          // That is the index

           var timestamp = line._2.split(',').view(1).toString   // This is the timestamp from source

           var security =  line._2.split(',').view(12.toString   // This is the name of the security

           var price = line._2.split(',').view(3).toFloat        // This is the price of the security

           if (price.toFloat > 90.0)

           {

            // Do something here

            // Sent notification, write to HDFS etc

           }

         }

       }

     }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM