在Spark中通過Scala + Mongodb實現連接池


How to implement connection pool in spark

https://github.com/YulinGUO/BigDataTips/blob/master/spark/How%20to%20implement%20connection%20pool%20in%20Spark.md

問題所在

Spark Streaming Guid中,提到:

dstream.foreachRDD { rdd =>
    rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
    }}   

 

可是如何具體實現呢?

 

Scala + Mongodb實現連接池

一個通常意義上的連接池,能夠請求獲取資源,也能釋放資源。不過MongoDB java driver已經幫我們實現了這一套邏輯。

 

Note: The Mongo object instance actually represents a pool of connections to the database; you will only need one object of class Mongo even with multiple threads. See the concurrency doc page for more information.

The Mongo class is designed to be thread safe and shared among threads. Typically you create only 1 instance for a given DB cluster and use it across your app. If for some reason you decide to create many mongo intances, note that:

all resource usage limits (max connections, etc) apply per mongo instance to dispose of an instance, make sure you call mongo.close() to clean up resources

也就是說,我們的pool,只要能獲得Mongo就可以了。也就是說每次請求,在executor端,能get已經創建好了MongoClient就可以了。

object MongoPool {

  var  instances = Map[String, MongoClient]()

  //node1:port1,node2:port2 -> node
  def nodes2ServerList(nodes : String):java.util.List[ServerAddress] = {
    val serverList = new java.util.ArrayList[ServerAddress]()
    nodes.split(",")
      .map(portNode => portNode.split(":"))
      .flatMap{ar =>{
      if (ar.length==2){
        Some(ar(0),ar(1).toInt)
      }else{
        None
      }
    }}
      .foreach{case (node,port) => serverList.add(new ServerAddress(node, port))}

    serverList
  }

  def apply(nodes : String) : MongoClient = {
    instances.getOrElse(nodes,{
      val servers = nodes2ServerList(nodes)
      val client =  new MongoClient(servers)
      instances += nodes -> client
      println("new client added")
      client
    })
  }
}

 

這樣,一個static 的MongoPool的Object已經創建,scala Ojbect類,在每個JVM中會初始化一次。

rdd.foreachPartition(partitionOfRecords => {

   val nodes = "node:port,node2:port2"
   lazy val  client = MongoPool(nodes)
   lazy val  coll2 = client.getDatabase("dm").getCollection("profiletags")

   partitionOfRecords.grouped(500).foreach()
})

 

注意,此處client用lazy修飾,等到executor使用client的時候,才會執行。否則的話,會出現client not serializable.

優點分析

1.不重復創建,銷毀跟數據庫的連接,效率高。 Spark 每個executor 申請一個JVM進程,task是多線程模型,運行在executor當中。task==partition數目。Object只在每個JVM初始化一次。
2.代碼design pattern

參考資料

Spark Streaming Guid


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM