How to implement connection pool in spark
問題所在
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse }}
可是如何具體實現呢?
Scala + Mongodb實現連接池
一個通常意義上的連接池,能夠請求獲取資源,也能釋放資源。不過MongoDB java driver已經幫我們實現了這一套邏輯。
Note: The Mongo object instance actually represents a pool of connections to the database; you will only need one object of class Mongo even with multiple threads. See the concurrency doc page for more information.
The Mongo class is designed to be thread safe and shared among threads. Typically you create only 1 instance for a given DB cluster and use it across your app. If for some reason you decide to create many mongo intances, note that:
all resource usage limits (max connections, etc) apply per mongo instance to dispose of an instance, make sure you call mongo.close() to clean up resources
也就是說,我們的pool,只要能獲得Mongo就可以了。也就是說每次請求,在executor端,能get已經創建好了MongoClient就可以了。
object MongoPool { var instances = Map[String, MongoClient]() //node1:port1,node2:port2 -> node def nodes2ServerList(nodes : String):java.util.List[ServerAddress] = { val serverList = new java.util.ArrayList[ServerAddress]() nodes.split(",") .map(portNode => portNode.split(":")) .flatMap{ar =>{ if (ar.length==2){ Some(ar(0),ar(1).toInt) }else{ None } }} .foreach{case (node,port) => serverList.add(new ServerAddress(node, port))} serverList } def apply(nodes : String) : MongoClient = { instances.getOrElse(nodes,{ val servers = nodes2ServerList(nodes) val client = new MongoClient(servers) instances += nodes -> client println("new client added") client }) } }
這樣,一個static 的MongoPool的Object已經創建,scala Ojbect類,在每個JVM中會初始化一次。
rdd.foreachPartition(partitionOfRecords => { val nodes = "node:port,node2:port2" lazy val client = MongoPool(nodes) lazy val coll2 = client.getDatabase("dm").getCollection("profiletags") partitionOfRecords.grouped(500).foreach() })
注意,此處client用lazy修飾,等到executor使用client的時候,才會執行。否則的話,會出現client not serializable.
優點分析
1.不重復創建,銷毀跟數據庫的連接,效率高。 Spark 每個executor 申請一個JVM進程,task是多線程模型,運行在executor當中。task==partition數目。Object只在每個JVM初始化一次。
2.代碼design pattern