spark-mongo(1 讀寫數據)


參考鏈接

場景:適用於數據清洗,如只需要部分字段:

 

依賴:

    <dependency>
        <groupId>org.mongodb.spark</groupId>
        <artifactId>mongo-spark-connector_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.2</version>
    </dependency>

 

 

代碼:

package com.edurt.ssi
import com.mongodb.spark._
import org.apache.spark.{SparkConf, SparkContext}
import org.bson._
object MongoSparkTest {

  def main(args: Array[String]): Unit =
  {


    val conf = new SparkConf()
      .setMaster("local[4]")
      .setAppName("Mingdao-Score")
      .set("spark.mongodb.input.uri", "mongodb://192.168.18.129:27017/swift.booking")
      .set("spark.mongodb.output.uri", "mongodb://192.168.18.129:27017/outputDB.collectionName")
//    import com.mongodb.spark.config._
//    val readConfig = ReadConfig(Map("collection" -> "employee", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))
    //同時還支持mongo驅動的readPreference配置, 可以只從secondary讀取數據
    //      .set("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/inputDB.collectionName")
    //      .set("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/outputDB.collectionName")

    val sc = new SparkContext(conf)
    // 創建rdd
    //    val customRdd = MongoSpark.load(sc, readConfig)
    val originRDD = MongoSpark.load(sc)

    // 構造查詢
    //    val dateQuery = new BsonDocument()
    //      .append("$gte", new BsonDateTime(start.getTime))
    //      .append("$lt", new BsonDateTime(end.getTime))
    val matchQuery = new Document("$match", BsonDocument.parse("{\"businessType\":\"B2B\"}"))

    // 構造Projection
    val projection1 = new BsonDocument("$project", BsonDocument.parse("{\"businessType\":\"$businessType\",\"bookingNo\":\"$bookingNo\",\"status\":\"$status\"}"))
    val aggregatedRDD = originRDD.withPipeline(Seq(matchQuery, projection1))

    //比如分組
    val rdd1 = aggregatedRDD.keyBy(x=>{
      Map(
        "businessType" -> x.get("businessType")
      )
    })

    //    val rdd2 = rdd1.groupByKey.map(t=>{
    //      (t._1, t._2.map(x => {
    //        x.getString("message").length
    //      }).sum)
    //    })

    rdd1.collect().foreach(x=>{
      println(x)
    })

    //保持統計結果至MongoDB outputurl 所指定的數據庫
    MongoSpark.save(aggregatedRDD)


  }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM