快速掌握mongoDB(二)——聚合管道和MapReduce


  上一節簡單介紹了一下mongoDB的增刪改查操作,這一節將介紹其聚合操作。我們在使用mysql、sqlserver時經常會用到一些聚合函數,如sum/avg/max/min/count等,mongoDB也提供了豐富的聚合功能,讓我們可以方便地進行數據的分析和計算。這里主要介紹兩種聚合方式:聚合管道和MapReduce.

1 聚合管道

  官網文檔:https://docs.mongodb.com/manual/core/aggregation-pipeline/

  聚合管道(aggregation pipeline),顧名思義是基於管道模式的聚合框架,簡單的說就是在聚合管道中前一個階段處理的結果會作為后一階段處理的輸入,documents通過管道中的各個階段處理后得到最終的聚合結果。聚合管道的語法: db.aggregate( [ { stage1 },{ stage2} ... ] ) 。

  我們看一個官網提供的栗子就理解了:

//准備測試數據
db.orders.insertMany([
  {cust_id:"A123",amount:500,status:"A"},
  {cust_id:"A123",amount:250,status:"A"},
  {cust_id:"B212",amount:200,status:"A"},
  {cust_id:"A123",amount:300,status:"D"}
])
//聚合操作
db.orders.aggregate([
    {$match:{status:"A"}},
    {$group:{_id:"$cust_id",total:{$sum:"$amount"}}}
])

   執行上邊的命令,結果如下:

  上邊的聚合過程一共有兩個階段,如下圖所示:

  第一階段:$match會篩選出status="A"的documents,並把篩選出的document傳給下一個階段;

  第二階段:$group將上一階段傳過來的documents通過cust_id進行分組,並合計各組的amount。

  通過官網的栗子我們大概知道了集合管道的基本執行流程,下邊我們通過幾個栗子來理解幾個常用的聚合運算符。

栗子1:$lookup,$match,$project,$group,$sort,$skip,$limit,$out

  首先看一個訂單庫存的栗子,我們將分步查看各個階段的聚合結果:

////准備測試數據
//訂單
 db.orders.insertMany([
    { "_id" : 1, "item" : "almonds", "price" : 12, "quantity" : 2 },
    { "_id" : 2, "item" : "bread", "price" : 8, "quantity" : 3 },
    { "_id" : 3, "item" : "pecans", "price" : 20, "quantity" : 1 },
    { "_id" : 4, "item" : "pecans", "price" : 20, "quantity" : 3 },
    { "_id" : 5, "item" : "cashews", "price" : 25, "quantity" :2},
    { "_id" : 6 }
 ])

//庫存
 db.inventory.insertMany([
    { "_id" : 1, "sku" : "almonds", description: "product 1", "instock" : 120 },
    { "_id" : 2, "sku" : "bread", description: "product 2", "instock" : 80 },
    { "_id" : 3, "sku" : "cashews", description: "product 3", "instock" : 60 },
    { "_id" : 4, "sku" : "pecans", description: "product 4", "instock" : 70 },
    { "_id" : 5, "sku": null, description: "Incomplete" },
    { "_id" : 6 }
 ])
   
    
db.orders.aggregate([
//$lookup實現collection連接,作用類似於sql中的join
   {
     $lookup:
       {
         from: "inventory",//要join的集合
         localField: "item",//連接時的本地字段
         foreignField: "sku",//連接時的外部字段
         as: "inventory_docs"//連接后添加數組字段的名字
       }
  },
//$match過濾document,結果集合只包含符合條件的集合
  {$match:
      {price:{$gt:0}}
  },
//$project用於獲取結果字段,可以新增字段;也可以對已存在的字段重新賦值
  {
      $project:{
          _id:0,
          item:1,
          price:1,
          quantity:1,
          totalprice:{$multiply:["$price","$quantity"]}
          }
  },
//$group實現分組,_id是必須的,用於指定分組的字段;這里查詢各個分組的totalprice的最大值
    {
        $group:{_id:"$item",maxtotalprice:{$max:"$totalprice"}}
     },
//$sort用於排序,1表示正序,-1表示倒序
     {$sort:{maxtotalprice:-1}},
//$skip用於跳過指定條數的document,和linq中的skip作用一樣
    {$skip:1},
//limit用於指定傳遞給下一階段的document條數,和mysql的limit作用一樣
   {$limit:2},
//$out用於將結果存在指定的collection中,如果collection不存在則新建一個,如果存在則用結果集合覆蓋以前的值
    {$out:"resultCollection"} 
])

 $ lookup階段 用於collection連接,作用類似於sql中的join,經過該階段結果如下:

[
    {
        "_id" : 1,
        "item" : "almonds",
        "price" : 12,
        "quantity" : 2,
        "inventory_docs" : [
            {
                "_id" : 1,
                "sku" : "almonds",
                "description" : "product 1",
                "instock" : 120
            }
        ]
    },
    {
        "_id" : 2,
        "item" : "bread",
        "price" : 8,
        "quantity" : 3,
        "inventory_docs" : [
            {
                "_id" : 2,
                "sku" : "bread",
                "description" : "product 2",
                "instock" : 80
            }
        ]
    },
    {
        "_id" : 3,
        "item" : "pecans",
        "price" : 20,
        "quantity" : 1,
        "inventory_docs" : [
            {
                "_id" : 4,
                "sku" : "pecans",
                "description" : "product 4",
                "instock" : 70
            }
        ]
    },
    {
        "_id" : 4,
        "item" : "pecans",
        "price" : 20,
        "quantity" : 3,
        "inventory_docs" : [
            {
                "_id" : 4,
                "sku" : "pecans",
                "description" : "product 4",
                "instock" : 70
            }
        ]
    },
    {
        "_id" : 5,
        "item" : "cashews",
        "price" : 25,
        "quantity" : 2,
        "inventory_docs" : [
            {
                "_id" : 3,
                "sku" : "cashews",
                "description" : "product 3",
                "instock" : 60
            }
        ]
    },
    {
        "_id" : 6,
        "inventory_docs" : [
            {
                "_id" : 5,
                "sku" : null,
                "description" : "Incomplete"
            },
            {
                "_id" : 6
            }
        ]
    }
]
View Code

$match階段 用於篩選出符合過濾條件的documents,相當於sql中的where

[
    {
        "_id" : 1,
        "item" : "almonds",
        "price" : 12,
        "quantity" : 2,
        "inventory_docs" : [
            {
                "_id" : 1,
                "sku" : "almonds",
                "description" : "product 1",
                "instock" : 120
            }
        ]
    },
    {
        "_id" : 2,
        "item" : "bread",
        "price" : 8,
        "quantity" : 3,
        "inventory_docs" : [
            {
                "_id" : 2,
                "sku" : "bread",
                "description" : "product 2",
                "instock" : 80
            }
        ]
    },
    {
        "_id" : 3,
        "item" : "pecans",
        "price" : 20,
        "quantity" : 1,
        "inventory_docs" : [
            {
                "_id" : 4,
                "sku" : "pecans",
                "description" : "product 4",
                "instock" : 70
            }
        ]
    },
    {
        "_id" : 4,
        "item" : "pecans",
        "price" : 20,
        "quantity" : 3,
        "inventory_docs" : [
            {
                "_id" : 4,
                "sku" : "pecans",
                "description" : "product 4",
                "instock" : 70
            }
        ]
    },
    {
        "_id" : 5,
        "item" : "cashews",
        "price" : 25,
        "quantity" : 2,
        "inventory_docs" : [
            {
                "_id" : 3,
                "sku" : "cashews",
                "description" : "product 3",
                "instock" : 60
            }
        ]
    }
]
View Code

$project階段 用於設置查詢的字段,想到於sql中的select field1,field2...

[
    {
        "item" : "almonds",
        "price" : 12,
        "quantity" : 2,
        "totalprice" : 24
    },
    {
        "item" : "bread",
        "price" : 8,
        "quantity" : 3,
        "totalprice" : 24
    },
    {
        "item" : "pecans",
        "price" : 20,
        "quantity" : 1,
        "totalprice" : 20
    },
    {
        "item" : "pecans",
        "price" : 20,
        "quantity" : 3,
        "totalprice" : 60
    },
    {
        "item" : "cashews",
        "price" : 25,
        "quantity" : 2,
        "totalprice" : 50
    }
]
View Code

$group 用於分組,相當於sql中的group by,經過該階段結果如下:

[
    { "_id" : "cashews","maxtotalprice" : 50},
    { "_id" : "pecans","maxtotalprice" : 60},
    { "_id" : "bread", "maxtotalprice" : 24},
    { "_id" : "almonds","maxtotalprice" : 24}
]

$sort階段 用於排序,相當於sql中的 sort by,其中值為1表示正序,-1表示反序,經過該階段結果如下:

[
    { "_id" : "pecans","maxtotalprice" : 60},
    { "_id" : "cashews","maxtotalprice" : 50},
    { "_id" : "bread", "maxtotalprice" : 24},
    { "_id" : "almonds","maxtotalprice" : 24}
]

$skip階段 用於跳過指定條數的document,和linq中的skip作用一樣,經過該階段結果如下:

[
    { "_id" : "cashews", "maxtotalprice" : 50},
    { "_id" : "bread","maxtotalprice" : 24},
    { "_id" : "almonds", "maxtotalprice" : 24}
]

$limit階段 用於指定傳遞給下一階段的document條數,相當於mysql中的limit,和linq中的take作用一樣,經過該階段結果如下:

[
    {"_id" : "cashews","maxtotalprice" : 50},
    {"_id" : "bread","maxtotalprice" : 24}
]

$out階段,用於將結果存在指定的collection中,如果collection不存在則新建一個,如果存在則用結果集合覆蓋以前的值,這里我們將 db.resultCollection.find() 查看resultCollection,結果如下:

[
    { "_id" : "cashews", "maxtotalprice" : 50},
    { "_id" : "bread", "maxtotalprice" : 24}
]

栗子2:$addFields,$unwind,$count

  看一個用戶分析的栗子,添加測試數據:

//測試數據
db.userinfos.insertMany([
    { _id:1, name: "王五", age: 25, roles:["vip","gen" ]},
    { _id:2, name: "趙六", age: 26, roles:["vip"]},
    { _id:3, name: "田七", age: 27}
])

$addFields  用於給所有的document添加新字段,如果字段名已經存在的話,用新值替代舊值

db.userinfos.aggregate([
  {$addFields:{address:'上海',age:30}}
])

  執行上邊的命令后,結果如下:

[{"_id":1,"name":"王五","age":30,"roles":["vip","gen"],"address":"上海"},
 {"_id":2,"name":"趙六","age":30,"roles":["vip"],"address":"上海"},
 {"_id":3,"name":"田七","age":30,"address":"上海"}]

$unwind  $unwind用於對數組字段解構,數組中的每個元素都分解成一個單獨的document

 db.userinfos.aggregate([
    {$unwind:"$roles"}
  ])

  執行命令后,結果如下:

[{"_id" : 1,"name" : "王五","age" : 25,"roles" : "vip"},
 {"_id" : 1,"name" : "王五","age" : 25,"roles" : "gen"},
 {"_id" : 2,"name" : "趙六","age" : 26,"roles" : "vip"}]

$count 用於獲取document的條數,輸入的值為字段名字,用法十分簡單

db.userinfos.aggregate([
    {$count:"mycount"}
])

  執行命令后,結果如下:

[ { "mycount" : 3 } ]

栗子3 $bucket,$bucketAuto,$sample

  看一個網上書店的栗子,首先添加測試數據:

db.books.insertMany([
   { "_id" : 1, "title" : "《白鯨》", "artist" : "赫爾曼・梅爾維爾(美)", "year" : 1845,"price" : NumberDecimal("199.99") },
    { "_id" : 2, "title" : "《懺悔錄》", "artist" : "讓・雅克・盧梭(法)", "year" : 1782,"price" : NumberDecimal("280.00") },
    { "_id" : 3, "title" : "《罪與罰》", "artist" : "陀斯妥耶夫斯基(俄)", "year" : 1866,"price" : NumberDecimal("76.04") },
    { "_id" : 4, "title" : "《復活》", "artist" : "列夫・托爾斯泰(俄)","year" : 1899,"price" : NumberDecimal("167.30") },
    { "_id" : 5, "title" : "《九三年》", "artist" : "維克多・雨果(法)", "year" : 1895,"price" : NumberDecimal("483.00") },
    { "_id" : 6, "title" : "《尤利西斯》", "artist" : "詹姆斯・喬伊斯(愛爾蘭)", "year" : 1922,"price" : NumberDecimal("385.00") },
    { "_id" : 7, "title" : "《魔山》", "artist" : "托馬斯・曼(德)", "year" : 1924/* No price*/ },
    { "_id" : 8, "title" : "《永別了,武器》", "artist" : "歐內斯特・海明威(美)", "year" : 1929,"price" : NumberDecimal("118.42") }
])

$bucket  按范圍分組,手動指定各個分組的邊界,用法如下:

db.books.aggregate( [
  {
    $bucket: {
      groupBy: "$price",           //用於分組的表達式,這里使用價格進行分組
      boundaries: [ 0, 200, 400 ], //分組邊界,這里分組邊界為[0,200),[200,400)和其他
      default: "Other",            //不在[0,200)和[200,400)范圍內的數據放在_id為Other的bucket中
      output: {
        "count": { $sum: 1 },      //bucket中document的個數
        "titles" : { $push: {title:"$title",price:"$price"} }  //bucket中的titles
      }
    }
  }
] )

  執行上邊的聚合操作,結果如下:

[
    {
        "_id" : 0,
        "count" : 4,
        "titles" : [
            {"title" : "《白鯨》","price" : NumberDecimal("199.99")},
            {"title" : "《罪與罰》","price" : NumberDecimal("76.04")},
            {"title" : "《復活》","price" : NumberDecimal("167.30")},
            {"title" : "《永別了,武器》","price" : NumberDecimal("118.42")}
        ]
    },
    {
        "_id" : 200,
        "count" : 2,
        "titles" : [
            {"title" : "《懺悔錄》","price" : NumberDecimal("280.00")},
            {"title" : "《尤利西斯》","price" : NumberDecimal("385.00")}
        ]
    },
    {
        "_id" : "Other",
        "count" : 2,
        "titles" : [
            {"title" : "《九三年》","price" : NumberDecimal("483.00")},
            {"title" : "《魔山》"}
        ]
    }
]
View Code

$bucketAuto 和$bucket作用類似,區別在於$bucketAuto不指定分組的邊界,而是指定分組的個數,分組邊界是自動生成的

 db.books.aggregate([
    {
        $bucketAuto: {
            groupBy: "$year",
            buckets: 3,
            output:{
                count:{$sum:1},
                title:{$push:{title:"$title",year:"$year"}}
             }
        }
   }
  ])

  執行上邊的聚合操作,結果如下:

[
    {
        "_id" : {
            "min" : 1782,
            "max" : 1895
        },
        "count" : 3,
        "title" : [
            {"title" : "《懺悔錄》","year" : 1782},
            {"title" : "《白鯨》","year" : 1845},
            {"title" : "《罪與罰》","year" : 1866}
        ]
    },
    {
        "_id" : {
            "min" : 1895,
            "max" : 1924
        },
        "count" : 3,
        "title" : [
            {"title" : "《九三年》","year" : 1895},
            {"title" : "《復活》","year" : 1899},
            {"title" : "《尤利西斯》","year" : 1922}
        ]
    },
    {
        "_id" : {
            "min" : 1924,
            "max" : 1929
        },
        "count" : 2,
        "title" : [
            {"title" : "《魔山》","year" : 1924},
            {"title" : "《永別了,武器》","year" : 1929}
        ]
    }
]
View Code

$sample 用於隨機抽取指定數量的document

db.books.aggregate([
    { $sample:{size:2} }
])    

  執行后隨即抽取了2條document,結果如下,注意因為是隨即抽取的,所以每次執行的結果不同。

[{"_id":6,"title":"《尤利西斯》","artist":"詹姆斯・喬伊斯(愛爾蘭)","year":1922,"price":NumberDecimal("385.00")},
 {"_id":8,"title":"《永別了,武器》","artist":"歐內斯特・海明威(美)","year":1929,"price":NumberDecimal("118.42")}]

  通過上邊三個栗子,我們應該已經對聚合管道有了一定的理解,其實各種聚合運算符的用法都比較簡單,怎么靈活的組合各種聚合操作以達到聚合的目的才是我們考慮的重點。

2 mapReduce

  MapReduce是一種編程模型,用於大規模數據集的並行運算,MapReduce采用"分而治之"的思想,簡單的說就是:將待處理的大數據集分為若干個小數據集,對各個小數據集進行計算獲取中間結果,最后整合中間結果獲取最終的結果。mongoDB也實現了MapReduce,用法還是比較簡單的,語法如下:

db.collection.mapReduce(
   function() {emit(key,value);},                  //map 函數
   function(key,values) {return reduceFunction},   //reduce 函數
   {
      out: collection,   //輸出
      query: document,   //查詢條件,在Map函數執行前過濾出符合條件的documents
      sort: document,    //再Map前對documents進行排序
      limit: number      //發送給map函數的document條數
   }
)

其中:map: 映射函數,遍歷符合query查詢條件的所有document,獲取一系列的鍵值對key-values,如果一個key對應多個value,多個value以數組形式保存。

     reduce: 統計函數,reduce函數的參數是map函數發送的key/value集合

      out:  指定將統計結果存放在哪個collection中 (不指定則使用臨時集合,在客戶端斷開后自動刪除)。

     query:篩選條件,只有滿足條件的文檔才會發送給map函數;

     sort:和limit結合使用,在將documents發往map函數前先排序;

     limit:和sort結合使用,設置發往map函數的document條數。

  我們通過官網的栗子來理解mapReduce的用法,命令如下:

//添加測試數據
db.orders.insertMany([
  {cust_id:"A123",amount:500,status:"A"},
  {cust_id:"A123",amount:250,status:"A"},
  {cust_id:"B212",amount:200,status:"A"},
  {cust_id:"A123",amount:300,status:"D"}
])
//mapReduce 
db.orders.mapReduce(
    function() {emit(this.cust_id,this.amount)},       //map函數,遍歷documents key為cust_id值,values為amount值,或者數組[amount1,amount2...]
    function(key,values){return  Array.sum(values)},   //reduce函數,返回合計結果(只會統計values是數組)
    {
        query:{status:"A"},   //過濾條件,只向map函數發送status="A"的documents
        out:"myresultColl"    //結果存放在myresultColl集合中,如果沒有名字為myresultCOll的集合則新建一個,如果集合存在的話覆蓋舊值
    }
)
printjson(db.myresultColl.find() .toArray())

mongoDB的MapReduce可以簡單分為兩個階段:

Map階段:

  栗子中的map函數為 function() {emit(this.cust_id,this.amount)} ,執行map函數前先進行query過濾,找到 status=A 的documents,然后將過濾后的documents發送給map函數,map函數遍歷documents,將cust_id作為key,amount作為value,如果一個cust_id有多個amount值時,value為數組[amount1,amount2..],栗子的map函數獲取的key/value集合中有兩個key/value對: {“A123”:[500,250]}和{“B212”:200} 

Reduce階段:

  reduce函數封裝了我們的聚合邏輯,reduce函數會逐個計算map函數傳過去的key/value對,在上邊栗子中的reduce函數的目的是計算amount的總和。

  上邊栗子最終結果存放在集合myresultColl中(out的作用),通過命令 db.myresultColl.find() 查看結果如下:

  {"_id" : "A123","value" : 750},
   {"_id" : "B212","value" : 200}
]

   MapReduce屬於輕量級的集合框架,沒有聚合管道那么多運算符,聚合的思路也比較簡單:先寫一個map函數用於確定需要整合的key/value對(就是我們感興趣的字段),然后寫一個reduce函數,其內部封裝着我們的聚合邏輯,reduce函數會逐一處理map函數獲取的key/value對,以此獲取聚合結果。

小結

  本節通過幾個栗子簡單介紹了mongoDB的聚合框架:集合管道和MapReduce,聚合管道提供了十分豐富的運算符,讓我們可以方便地進行各種聚合操作,因為聚合管道使用的是mongoDB內置函數所以計算效率一般不會太差。需要注意:①管道處理結果會放在一個document中,所以處理結果不能超過16M(Bson格式的最大尺寸),②聚合過程中內存不能超過100M(可以通過設置{“allowDiskUse”: True}來解決);MapReduce的map函數和reduce函數都是我們自定義的js函數,這種聚合方式比聚合管道更加靈活,我們可以通過編寫js代碼來處理復雜的聚合任務,MapReduce的缺點是聚合的邏輯需要我們自己編碼實現。綜上,對於一些簡單的固定的聚集操作推薦使用聚合管道,對於一些復雜的、大量數據集的聚合任務推薦使用MapReduce。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM