一、MongoDB Map Reduce
Map-Reduce是一種計算模型,簡單的說就是將大批量的工作(數據)分解(MAP)執行,然后再將結果合並成最終結果(REDUCE)。MongoDB提供的Map-Reduce非常靈活,對於大規模數據分析也相當實用。
基本語法:
db.collection.mapReduce( function() {emit(key,value);}, //map 函數 function(key,values) {return reduceFunction}, //reduce 函數 { out: collection, query: document, sort: document, limit: number } )
使用 MapReduce 要實現兩個函數 Map 函數和 Reduce 函數,Map 函數調用 emit(key, value), 遍歷 collection 中所有的記錄, 將 key 與 value 傳遞給 Reduce 函數進行處理。
Map 函數必須調用 emit(key, value) 返回鍵值對。
參數說明:
- map :映射函數 (生成鍵值對序列,作為 reduce 函數參數)。
- reduce 統計函數,reduce函數的任務就是將key-values變成key-value,也就是把values數組變成一個單一的值value。。
- out 統計結果存放集合 (不指定則使用臨時集合,在客戶端斷開后自動刪除)。
- query 一個篩選條件,只有滿足條件的文檔才會調用map函數。(query。limit,sort可以隨意組合)
- sort 和limit結合的sort排序參數(也是在發往map函數前給文檔排序),可以優化分組機制
- limit 發往map函數的文檔數量的上限(要是沒有limit,單獨使用sort的用處不大)
二、示例
我們通過下面的一個例子來理解上面的概念
mongodb的student集合中存在以下數據:
/* 1 */ { "_id" : ObjectId("5c735e26b21aeac107319873"), "stu_name" : "張三", "course" : "英語", "score" : 70, "level" : "C" } /* 2 */ { "_id" : ObjectId("5c735e26b21aeac107319874"), "stu_name" : "張三", "course" : "數學", "score" : 95, "level" : "A" } /* 3 */ { "_id" : ObjectId("5c735e26b21aeac107319875"), "stu_name" : "張三", "course" : "語文", "score" : 91, "level" : "A" } /* 4 */ { "_id" : ObjectId("5c735e26b21aeac107319876"), "stu_name" : "張三", "course" : "歷史", "score" : 98, "level" : "A" } /* 5 */ { "_id" : ObjectId("5c735e26b21aeac107319877"), "stu_name" : "李四", "course" : "數學", "score" : 88, "level" : "B" } /* 6 */ { "_id" : ObjectId("5c735e26b21aeac107319878"), "stu_name" : "李四", "course" : "英語", "score" : 93, "level" : "A" } /* 7 */ { "_id" : ObjectId("5c735e26b21aeac107319879"), "stu_name" : "李四", "course" : "語文", "score" : 99, "level" : "A" }
要求:統計出每個學生的level為A的成績的總和,並按學生名字進行分組顯示
其執行的邏輯過程如下圖所示:
在mongo shell里面執行:
db.student.mapReduce( function() { emit(this.stu_name,this.score); }, function(key, values) {return Array.sum(values)}, { query:{level:"A"}, out:"total_score" } )
/* 1 */ { "result" : "total_score", "timeMillis" : 171.0, "counts" : { "input" : 5, "emit" : 5, "reduce" : 2, "output" : 2 }, "ok" : 1.0 }
結果表明,共有 5 個符合查詢條件("level":"A")的student, 在map函數中生成了 5 個鍵值對文檔,最后使用reduce函數將相同的鍵值分為 2 組。
具體參數說明:
- result:儲存結果的collection的名字,這是個臨時集合,MapReduce的連接關閉后自動就被刪除了。
- timeMillis:執行花費的時間,毫秒為單位
- input:滿足條件被發送到map函數的文檔個數
- emit:在map函數中emit被調用的次數,也就是所有集合中的數據總量
- ouput:結果集合中的文檔個數(count對調試非常有幫助)
- ok:是否成功,成功為1
- err:如果失敗,這里可以有失敗原因,不過從經驗上來看,原因比較模糊,作用不大
查看真正的統計結果:
三、用spring-boot-starter-data-mongodb來實現上面的操作
1、新建maven工程:mongo-mapreduce
引入springboot依賴和mongodb依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.mongo.mapreduce</groupId> <artifactId>mongo-mapreduce</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.1.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> <exclusions> <exclusion> <artifactId>spring-boot-starter-logging</artifactId> <groupId>org.springframework.boot</groupId> </exclusion> </exclusions> </dependency> </dependencies> </project>
2、創建配置文件application.yml,map函數:map.js,reduce函數:reduce.js
server: port: 8084 context-path: / spring: data: mongodb: uri: mongodb://admin:admin@172.16.1.11:27017,172.16.1.11:27018/testdb?AutoConnectRetry=true
map.js
function() { emit(this.stu_name,this.score); }
reduce.js
function(key,values) { var sum = 0; for (var i = 0; i < values.length; i++) sum += values[i]; return sum; }
3、創建springboot啟動主類
package com.mongo.mapreduce; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author Administrator * @date 2019/02/25 */ @SpringBootApplication public class Application { public static void main(String[] args){ SpringApplication.run(Application.class, args); } }
4、創建接收mapreduce結果的實體類
package com.mongo.mapreduce.model; /** * @author Administrator * @date 2019/02/25 */ public class MapReduceResult { private String id; private Integer value; public String getId() { return id; } public void setId(String id) { this.id = id; } public Integer getValue() { return value; } public void setValue(Integer value) { this.value = value; } }
5、創建controller
package com.mongo.mapreduce.controller; import com.mongo.mapreduce.model.MapReduceResult; import com.sun.beans.decoder.ValueObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions; import org.springframework.data.mongodb.core.mapreduce.MapReduceResults; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import java.util.List; /** * @author Administrator * @date 2019/02/25 */ @RestController @RequestMapping("/map-reduce") public class TestController { @Autowired private MongoTemplate mongoTemplate; @RequestMapping(value = "/result",method = RequestMethod.GET) public void postTest(){ //刪除_id不等於空的數據,等於刪除所有數據,目的是清空上一次mapreduce的結果 Criteria criteria=new Criteria("_id"); criteria.ne(""); Query query = new Query(criteria); mongoTemplate.remove(query,"total_score"); //執行map reduce操作 Criteria criteria1=new Criteria("level"); criteria1.is("A"); Query query1 = new Query(criteria1); MapReduceOptions options = MapReduceOptions.options(); options.outputCollection("total_score"); options.outputTypeReduce(); MapReduceResults<MapReduceResult> reduceResults = mongoTemplate.mapReduce(query1,"student", "classpath:map.js", "classpath:reduce.js", options, MapReduceResult.class); for(MapReduceResult reduceResult:reduceResults){ System.out.println("map reduce的結果如下:========="); System.out.println("姓名:"+reduceResult.getId()+",A的總分:"+reduceResult.getValue()); } } }
6、用postman調用