Spark中foreachRDD的正确使用


常出现的使用误区:


误区一:在driver上创建连接对象(比如网络连接或数据库连接)
    如果在driver上创建连接对象,然后在RDD的算子函数内使用连接对象,那么就意味着需要将连接对象序列化后从driver传递到worker上。而连接对象(比如Connection对象)通常来说是不支持序列化的,此时通常会报序列化的异常(serialization errors)。因此连接对象必须在worker上创建,不要在driver上创建。

dstream.foreachRDD { rdd =>
  val connection = createNewConnection() // 数据库连接在driver上执行
  rdd.foreach { record =>   connection.send(record) // 在worker上执行   } }

 

误区二:为每一条记录都创建一个连接对象
    通常来说,连接对象的创建和销毁都是很消耗时间的。因此频繁地创建和销毁连接对象,可能会导致降低spark作业的整体性能和吞吐量。

dstream.foreachRDD { rdd =>
rdd.foreach { record => val connection = createNewConnection() //每插入一条数据,创建一个连接  connection.send(record) connection.close() } }

  比较正确的做法是:对DStream中的RDD,调用foreachPartition,对RDD中每个分区创建一个连接对象,使用一个连接对象将一个分区内的数据都写入数据库中。这样可以大大减少创建的连接对象的数量。

正确做法一:为每个RDD分区创建一个连接对象

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }

 

正确做法二:为每个RDD分区使用一个连接池中的连接对象

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
    // 从数据库连接池中获取连接
    val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // 用完以后将连接返 回给连接池,进行复用 } }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM