Spark With Mongodb 實現方法及error code -5, 6, 13127解決方案


1.spark mongo 讀取

val rdd = MongoSpark.builder().sparkSession(spark).pipeline(Seq(`match`(regex("path", java.util.regex.Pattern compile r.toString)))).build.toRDD()

  

2.error code 6

在spark讀數據時容易遇到,mongos連接池已滿,操作被拒絕,需要修改spark中的connectionperhost
lazy val mongo = new MongoClient("192.168.12.161", MongoClientOptions.builder().connectionsPerHost(8).build())

  

 然后找管理員查看Mongos當前已連接數,在過多時需要進行重啟mongos ./bin/mongostat --host 192.168.12.161
PS: 修改MongoDB機器的打開文件數會明顯改善此問題出現的頻次,甚至不需要修改connectionsPerHost即可解決問題。修改/etc/security/limits.conf中的nofile即可,mongoDB3.4之后的版本連接數默認是65536,不用修改連接數限制。

3.error code -5

driver出現錯誤,任務終止
Caused by: com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 2639909050433532364 not found on server 192.168.12.161:27017' on server 192.168.12.161:27017
at com.mongodb.operation.QueryHelper.translateCommandException(QueryHelper.java:27)
at com.mongodb.operation.QueryBatchCursor.getMore(QueryBatchCursor.java:213)
at com.mongodb.operation.QueryBatchCursor.hasNext(QueryBatchCursor.java:103)
at com.mongodb.MongoBatchCursorAdapter.hasNext(MongoBatchCursorAdapter.java:46)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
View Code
 cursor超時,按照官方說法為cursor10分鍾未使用,查看spark日志發現是第433個分片出現錯誤
17/07/17 19:37:31 ERROR Executor: Exception in task 433.0 in stage 0.0 (TID 433)
com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message 'Cursor 2639909048849185072 not found on server 192.168.12.161:27017' on server 192.168.12.161:27017
View Code
 可以看到是19:37出現的錯誤,這個task啟動時間為: 17/07/17 19:14:23 INFO CoarseGrainedExecutorBackend: Got assigned task 433 17/07/17 19:14:23 INFO Executor: Running task 433.0 in stage 0.0 (TID 433) 可以確定的是,確實超過10分鍾了,申請完cursor之后並沒有開始執行,而是等到10分鍾之后才開始進行操作,目前未發現原因。 查看日志發現中間有一些文件訪問被拒絕的錯誤,:ulimit -a 看只有1024, 於是修改/etc/security/limits.conf: * soft nofile 40960 * hard nofile 40960 修改之后不設置connectionsPerHost也不會出現訪問被拒絕的錯誤或者error code 6,但仍舊會出現error code -5 在最新一次運行中,第452、1940、2005等分片出現錯誤,而且分片處於不同的executor上,也就是說此錯誤和計算節點無關。 
在stackoverflow上發現java driver的解決方案,java里可以使用 db.find().nocursorTimeout()來解決,但需要記得關閉cursor,不然mongos會一直占用額外內存。 去github上查看mongo-spark-connector的源代碼發現: MongoRDD的compute方法:
override def compute(split: Partition, context: TaskContext): Iterator[D] = {
val client = connector.value.acquireClient()
val cursor = getCursor(client, split.asInstanceOf[MongoPartition])
context.addTaskCompletionListener((ctx: TaskContext) => {
  log.debug("Task completed closing the MongoDB cursor")
  Try(cursor.close())
  connector.value.releaseClient(client)
})
cursor.asScala
}

 

 getCursor的函數:
private def getCursor(client: MongoClient, partition: MongoPartition)(implicit ct: ClassTag[D]): MongoCursor[D] = {
val partitionPipeline: Seq[BsonDocument] = readConfig.partitioner match {
  case MongoSinglePartitioner => pipeline
  case _                      => new BsonDocument("$match", partition.queryBounds) +: pipeline
}
client.getDatabase(readConfig.databaseName)
  .getCollection[D](readConfig.collectionName, classTagToClassOf(ct))
  .withReadConcern(readConfig.readConcern)
  .withReadPreference(readConfig.readPreference)
  .aggregate(partitionPipeline.asJava)
  .allowDiskUse(true)
  .iterator
}
 對於connector來講,每個分片創建一個Mongoclient,獲取database,添加聚合數據,由於我程序中執行完toRDD操作之后直接進行了foreach,按理說不會出現獲取了cursor但是未使用的狀況。考慮到mongos的執行過程:一次操作獲取每個shard上的一個cursor,最后把數據匯總起來返回結果。
開始懷疑是不是因為某一個節點上pipeline執行equal的操作過慢導致cursor被拒絕,后來發現即使不加pipeline也會出現問題。
后來排查是不是Mongodb並發讀數據有問題,后來發現執行MongoSpark.load.toRDD.count並沒有出錯,而且訪問速度也較有處理過程的快得多,於是決定先進行cache,然后count獲取全量數據cache在本地,再對此rdd進行操作。解決問題的原理就是通過一個簡單的count程序將所需要的數據全部讀到分片本地,使用cache方法緩存起來,這樣后面處理此RDD時就用的本地緩存數據,而不會因為處理時間過長出現curser超時的問題。

因此推薦解決方案如下:

1)單機條件下Java driver 使用
db.find().nocursorTimeout()來解決,但需要記得關閉cursor。
2) Spark環境下在代碼真正的處理邏輯之前加上如下兩句:
       rdd.cache()
       println(rdd.count())

先把讀取數據cache一下,然后使用一個簡單的Action操作把數據真正緩存起來

另一種可以解決但是不用每次都修改Spark代碼邏輯的方法是:
先修改MongoDB的代碼,把AggregateIterable加入noCursorTimeout方法,然后修改mongo-spark-connector,使用此方法。是不是很6?
因為AggregateIterable雖然和FindIterable都是獲取數據的方式,但是noCursorTimeout是FindIterable的特有方法,但是又不能把connector的Aggregate方法改成Find方法,因為Find不能加Pipeline,畢竟還得加查詢條件不是~
不要看沒用的
 
        

4. error code 13127

Query failed with error code 13127 and error message 'cursor id 206776738953 didn't exist on server.' on server 192.168.12.161:27017 和-5錯誤原因是一樣的,同樣的解決方案。

5.spark resource引用

試了好幾種方法,最穩的還是把resource拷貝到每台機器並指定絕對路徑。。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM