import com.mongodb.hadoop.MongoOutputFormat import org.apache.hadoop.conf.Configuration import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.bson.BasicBSONObject import com.mongodb.{BasicDBObject} import com.mongodb.hadoop.io.MongoUpdateWritable /** * Created by Administrator on 2016/8/23 0023. */ object TestMongoDB { def main(args: Array[String]): Unit = { System.setProperty("hadoop.home.dir", "E:/hadoop-2.4.1/") val conf = new SparkConf().setAppName("macType").setMaster("local[10]") val sc = new SparkContext(conf) var data = sc.textFile("F:/aa.txt") insertResultToMongoDB(data) } def insertResultToMongoDB(rdd1:RDD[String]): Unit ={ val config = new Configuration() config.set("mongo.output.uri", "mongodb://localhost:27017/admin.zz") //輸出表名 val rdd = rdd1.map((s: String) =>{ var bson = new BasicBSONObject() var time = System.currentTimeMillis() bson.put("_id",s) bson.put("xxx", time.toString) (s,new MongoUpdateWritable( new BasicDBObject("_id", s), // Query new BasicDBObject("$set", bson), // Update operation true, // Upsert false, // Update multiple documents false )) }) for(e <- rdd){ println(e._1+" " + e._2) } println("-------------------") rdd.saveAsNewAPIHadoopFile( "file:///this-is-completely-unused", classOf[Object], classOf[MongoUpdateWritable], classOf[MongoOutputFormat[Object, MongoUpdateWritable]], config) } }