mongoDB實現MapReduce


一、MongoDB Map Reduce

Map-Reduce是一種計算模型,簡單的說就是將大批量的工作(數據)分解(MAP)執行,然后再將結果合並成最終結果(REDUCE)。MongoDB提供的Map-Reduce非常靈活,對於大規模數據分析也相當實用。

基本語法:

db.collection.mapReduce(
   function() {emit(key,value);},  //map 函數
   function(key,values) {return reduceFunction}, //reduce 函數
   {
      out: collection,
      query: document,
      sort: document,
      limit: number
   }
)

使用 MapReduce 要實現兩個函數 Map 函數和 Reduce 函數,Map 函數調用 emit(key, value), 遍歷 collection 中所有的記錄, 將 key 與 value 傳遞給 Reduce 函數進行處理。

Map 函數必須調用 emit(key, value) 返回鍵值對。

參數說明:

  • map :映射函數 (生成鍵值對序列,作為 reduce 函數參數)。
  • reduce 統計函數,reduce函數的任務就是將key-values變成key-value,也就是把values數組變成一個單一的值value。。
  • out 統計結果存放集合 (不指定則使用臨時集合,在客戶端斷開后自動刪除)。
  • query 一個篩選條件,只有滿足條件的文檔才會調用map函數。(query。limit,sort可以隨意組合)
  • sort 和limit結合的sort排序參數(也是在發往map函數前給文檔排序),可以優化分組機制
  • limit 發往map函數的文檔數量的上限(要是沒有limit,單獨使用sort的用處不大)

二、示例

我們通過下面的一個例子來理解上面的概念

mongodb的student集合中存在以下數據:

/* 1 */
{
    "_id" : ObjectId("5c735e26b21aeac107319873"),
    "stu_name" : "張三",
    "course" : "英語",
    "score" : 70,
    "level" : "C"
}

/* 2 */
{
    "_id" : ObjectId("5c735e26b21aeac107319874"),
    "stu_name" : "張三",
    "course" : "數學",
    "score" : 95,
    "level" : "A"
}

/* 3 */
{
    "_id" : ObjectId("5c735e26b21aeac107319875"),
    "stu_name" : "張三",
    "course" : "語文",
    "score" : 91,
    "level" : "A"
}

/* 4 */
{
    "_id" : ObjectId("5c735e26b21aeac107319876"),
    "stu_name" : "張三",
    "course" : "歷史",
    "score" : 98,
    "level" : "A"
}

/* 5 */
{
    "_id" : ObjectId("5c735e26b21aeac107319877"),
    "stu_name" : "李四",
    "course" : "數學",
    "score" : 88,
    "level" : "B"
}

/* 6 */
{
    "_id" : ObjectId("5c735e26b21aeac107319878"),
    "stu_name" : "李四",
    "course" : "英語",
    "score" : 93,
    "level" : "A"
}

/* 7 */
{
    "_id" : ObjectId("5c735e26b21aeac107319879"),
    "stu_name" : "李四",
    "course" : "語文",
    "score" : 99,
    "level" : "A"
}

要求:統計出每個學生的level為A的成績的總和,並按學生名字進行分組顯示

其執行的邏輯過程如下圖所示:

在mongo shell里面執行:

db.student.mapReduce( 
   function() { emit(this.stu_name,this.score); }, 
   function(key, values) {return Array.sum(values)}, 
      {  
         query:{level:"A"},  
         out:"total_score" 
      }
)
/* 1 */
{
    "result" : "total_score",
    "timeMillis" : 171.0,
    "counts" : {
        "input" : 5,
        "emit" : 5,
        "reduce" : 2,
        "output" : 2
    },
    "ok" : 1.0
}

結果表明,共有 5 個符合查詢條件("level":"A")的student, 在map函數中生成了 5 個鍵值對文檔,最后使用reduce函數將相同的鍵值分為 2 組。

具體參數說明:

  • result:儲存結果的collection的名字,這是個臨時集合,MapReduce的連接關閉后自動就被刪除了。
  • timeMillis:執行花費的時間,毫秒為單位
  • input:滿足條件被發送到map函數的文檔個數
  • emit:在map函數中emit被調用的次數,也就是所有集合中的數據總量
  • ouput:結果集合中的文檔個數(count對調試非常有幫助)
  • ok:是否成功,成功為1
  • err:如果失敗,這里可以有失敗原因,不過從經驗上來看,原因比較模糊,作用不大

查看真正的統計結果:

 

三、用spring-boot-starter-data-mongodb來實現上面的操作

 1、新建maven工程:mongo-mapreduce

引入springboot依賴和mongodb依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.mongo.mapreduce</groupId>
    <artifactId>mongo-mapreduce</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.1.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>spring-boot-starter-logging</artifactId>
                    <groupId>org.springframework.boot</groupId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>
</project>

2、創建配置文件application.yml,map函數:map.js,reduce函數:reduce.js

server:
  port: 8084
  context-path: /
spring:
  data:
    mongodb:
      uri: mongodb://admin:admin@172.16.1.11:27017,172.16.1.11:27018/testdb?AutoConnectRetry=true

map.js

function() {
    emit(this.stu_name,this.score);
}

reduce.js

function(key,values) {
    var sum = 0;
    for (var i = 0; i < values.length; i++)
        sum += values[i];
    return sum;
}

3、創建springboot啟動主類

package com.mongo.mapreduce;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author Administrator
 * @date 2019/02/25
 */
@SpringBootApplication
public class Application {
    public static void main(String[] args){
        SpringApplication.run(Application.class, args);
    }
}

4、創建接收mapreduce結果的實體類

package com.mongo.mapreduce.model;

/**
 * @author Administrator
 * @date 2019/02/25
 */
public class MapReduceResult {
    private String id;
    private Integer value;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Integer getValue() {
        return value;
    }

    public void setValue(Integer value) {
        this.value = value;
    }
}

5、創建controller

package com.mongo.mapreduce.controller;

import com.mongo.mapreduce.model.MapReduceResult;
import com.sun.beans.decoder.ValueObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
import org.springframework.data.mongodb.core.mapreduce.MapReduceResults;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
 * @author Administrator
 * @date 2019/02/25
 */
@RestController
@RequestMapping("/map-reduce")
public class TestController {

    @Autowired
    private MongoTemplate mongoTemplate;

    @RequestMapping(value = "/result",method = RequestMethod.GET)
    public void postTest(){
        //刪除_id不等於空的數據,等於刪除所有數據,目的是清空上一次mapreduce的結果
        Criteria criteria=new Criteria("_id");
        criteria.ne("");
        Query query = new Query(criteria);
        mongoTemplate.remove(query,"total_score");

        //執行map reduce操作
        Criteria criteria1=new Criteria("level");
        criteria1.is("A");
        Query query1 = new Query(criteria1);
        MapReduceOptions options = MapReduceOptions.options();
        options.outputCollection("total_score");
        options.outputTypeReduce();
        MapReduceResults<MapReduceResult> reduceResults =
                mongoTemplate.mapReduce(query1,"student",
                        "classpath:map.js",
                        "classpath:reduce.js",
                        options,
                        MapReduceResult.class);
        for(MapReduceResult reduceResult:reduceResults){
            System.out.println("map reduce的結果如下:=========");
            System.out.println("姓名:"+reduceResult.getId()+",A的總分:"+reduceResult.getValue());
        }
    }
}

6、用postman調用

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM