在MongoDB中實現聚合函數


在MongoDB中實現聚合函數

隨着組織產生的數據爆炸性增長,從GB到TB,從TB到PB,傳統的數據庫已經無法通過垂直擴展來管理如此之大數據。傳統方法存儲和處理數據的成本將會隨着數據量增長而顯著增加。這使得很多組織都在尋找一種經濟的解決方案,比如NoSQL數據庫,它提供了所需的數據存儲和處理能力、擴展性和成本效率。NoSQL數據庫不使用SQL作為查詢語言。這種數據庫有多種不同的類型,比如文檔結構存儲、鍵值結構存儲、圖結構、對象數據庫等等。

我們在本文中使用的NoSQL是MongoDB,它是一種開源的文檔數據庫系統,開發語言為C++。它提供了一種高效的面向文檔的存儲結構,同時支持通過MapReduce程序來處理所存儲的文檔;它的擴展性很好,而且支持自動分區。Mapreduce可以用來實現數據聚合。它的數據以BSON(二進制JSON)格式存儲,在存儲結構上支持動態schema,並且允許動態查詢。和RDBMS的SQL查詢不同,Mongo查詢語言以JSON表示。

MongoDB提供了一個聚合框架,其中包括常用功能,比如count、distinct和group。然而更多的高級聚合函數,比如sum、average、max、min、variance(方差)和standard deviation(標准差)等需要通過MapReduce來實現。

這篇文章描述了在MongoDB存儲的文檔上使用MapReduce來實現通用的聚合函數,如sum、average、max、min、variance和standard deviation;聚合的典型應用包括銷售數據的業務報表,比如將各地區的數據分組后計算銷售總和、財務報表等。

我們從本文示例應用所需軟件的安裝開始。

軟件安裝

首先在本地機器上安裝並設置MongoDB服務。

  • Mongo網站上下載MongoDB,解壓到本地目錄,比如C:>Mongo
  • 在上一個文件夾內創建數據目錄。比如:C:\Mongo\Data   
    • 如果數據文件存放在其他地方,那么在用mongod.exe命令啟動MongoDB時,需要在命令行加參數—-dbpath
  • 啟動服務   
    • MongoDB提供了兩種方式:mongod.exe以后台進程啟動;mongo.exe啟動命令行界面,可做管理操作。這兩個可執行文件都位於Mongo\bin目錄下;
    • 進入Mongo安裝目錄的bin目錄下,比如:C:> cd Mongo\bin
    • 有兩種啟動方式,如下:

      mongod.exe –dbpath C:\Mongo\data
      或者       
      mongod.exe –config mongodb.config
              mongodb.config是Mongo\bin目錄下的配置文件,需要在此配置文件中指定數據目錄(比如,dbpath= C:\Mongo\Data)的位置。
  • 連接到MongoDB,到這一步,mongo后台服務已經啟動,可以通過http://localhost:27017查看。 MongoDB啟動運行后,我們接下來看它的聚合函數。

實現聚合函數

在關系數據庫中,我們可以在數值型字段上執行包含預定義聚合函數的SQL語句,比如,SUM()、COUNT()、MAX()和MIN()。但是在MongoDB中,需要通過MapReduce功能來實現聚合以及批處理,它跟SQL里用來實現聚合的GROUP BY從句比較類似。下一節將描述關系數據庫中SQL方式實現的聚合和相應的通過MongoDB提供的MapReduce實現的聚合。

為了討論這個主題,我們考慮如下所示的Sales表,它以MongoDB中的反范式形式呈現。

Sales表

#

列名

數據類型

1

OrderId

INTEGER

2

OrderDate

STRING

3

Quantity

INTEGER

4

SalesAmt

DOUBLE

5

Profit

DOUBLE

6

CustomerName

STRING

7

City

STRING

8

State

STRING

9

ZipCode

STRING

10

Region

STRING

11

ProductId

INTEGER

12

ProductCategory

STRING

13

ProductSubCategory

STRING

14

ProductName

STRING

15

ShipDate

STRING

 

基於SQL和MapReduce的實現

我們提供了一個查詢的樣例集,這些查詢使用聚合函數、過濾條件和分組從句,及其等效的MapReduce實現,即MongoDB實現SQL中GROUP BY的等效方式。在MongoDB存儲的文檔上執行聚合操作非常有用,這種方式的一個限制是聚合函數(比如,SUM、AVG、MIN、MAX)需要通過mapper和reducer函數來定制化實現。

MongoDB沒有原生態的用戶自定義函數(UDFs)支持。但是它允許使用db.system.js.save命令來創建並保存JavaScript函數,JavaScript函數可以在MapReduce中復用。下表是一些常用的聚合函數的實現。稍后,我們會討論這些函數在MapReduce任務中的使用。

聚合函數

Javascript 函數

SUM

db.system.js.save( { _id : "Sum" ,
value : function(key,values)
{
    var total = 0;
    for(var i = 0; i < values.length; i++)
        total += values[i];
    return total;
}});

AVERAGE

db.system.js.save( { _id : "Avg" ,
value : function(key,values)
{
    var total = Sum(key,values);
    var mean = total/values.length;
    return mean;
}});

MAX

db.system.js.save( { _id : "Max" ,
value : function(key,values)
{
    var maxValue=values[0];
    for(var i=1;i

MIN

db.system.js.save( { _id : "Min" ,
value : function(key,values)
{
    var minValue=values[0];
    for(var i=1;i

VARIANCE

db.system.js.save( { _id : "Variance" ,
value : function(key,values)
{
    var squared_Diff = 0;
    var mean = Avg(key,values);
    for(var i = 0; i < values.length; i++)
    {
        var deviation = values[i] - mean;
        squared_Diff += deviation * deviation;
    }
    var variance = squared_Diff/(values.length);
    return variance;
}});

STD DEVIATION

db.system.js.save( { _id : "Standard_Deviation"
, value : function(key,values)
{
    var variance = Variance(key,values);
    return Math.sqrt(variance);
}});

 

SQL和MapReduce腳本在四種不同的用例場景中實現聚合函數的代碼片段如下表所示。

1.各地區的平均訂單量

下面的查詢是用來獲取不同地區的平均訂單量。

SQL Query

MapReduce Functions

SELECT

db.sales.runCommand(
{
mapreduce : "sales" ,

 

City,

State,

Region,

map:function()
{ // emit function handles the group by
        emit( {
        // Key
        city:this.City,
        state:this.State,
        region:this.Region},
        // Values
        this.Quantity);
},

 

AVG(Quantity)

reduce:function(key,values)
{
    var result = Avg(key, values);
    return result;
}

FROM sales

 

GROUP BY City, State, Region

// Group By is handled by the emit(keys, values)
 line in the map() function above
 
out : { inline : 1 } });

2.產品的分類銷售總額

下面的查詢是用來獲取產品的分類銷售額,根據產品類別的層級分組。在下面例子中,不同的產品類別作為個體維度,它們也可以被稱為更復雜的基於層次的維度。

SQL 查詢

MapReduce 函數

SELECT

db.sales.runCommand(
{
mapreduce : "sales" ,

 

ProductCategory, ProductSubCategory, ProductName,

map:function()
{
        emit(
        // Key
        {key0:this.ProductCategory,
        key1:this.ProductSubCategory,
        key2:this.ProductName},
        // Values
        this.SalesAmt);
},

 

SUM(SalesAmt)

reduce:function(key,values)
{
    var result = Sum(key, values);
    return result;
}

FROM sales

 

GROUP BY ProductCategory, ProductSubCategory, ProductName

// Group By is handled by the emit(keys, values) 
line in the map() function above
 
out : { inline : 1 } });
 

 

3. 一種產品的最大利潤

下面的查詢是用來獲取一個給定產品基於過濾條件的最大利潤。

SQL查詢

MapReduce 函數

SELECT

db.sales.runCommand(
{
mapreduce : "sales" ,

 

 

ProductId, ProductName,

map:function()
{
    if(this.ProductId==1)
        emit( {
            key0:this.ProductId,
            key1:this.ProductName},
            this.Profit);
},

 

MAX(SalesAmt)

reduce:function(key,values)
{
    var maxValue=Max(key,values);
    return maxValue;
}

FROM sales

 

WHERE ProductId=’1’

// WHERE condition implementation is provided in 
map() function

GROUP BY ProductId, ProductName

// Group By is handled by the emit(keys, values) 
line in the map() function above
 
out : { inline : 1 } });

 

4. 總量、總銷售額、平均利潤

這個場景的需求是計算訂單的總數、總銷售額和平均利潤,訂單ID在1到10之間,發貨時間在2011年的1月1日到12月31日之間。下面的查詢是用來執行多個聚合,比如,在指定年份以及指定的不同區域和產品類別范圍里訂單的總數、總銷售額和平均利潤。

SQL 查詢

MapReduce 函數

SELECT

db.sales.runCommand(
{ mapreduce : "sales" ,

 

 

Region,

ProductCategory,

ProductId,

map:function()
{
    emit( {
        // Keys
        region:this.Region,
        productCategory:this.ProductCategory,
        productid:this.ProductId},

        // Values
        {quantSum:this.Quantity,
        salesSum:this.SalesAmt,
        avgProfit:this.Profit} );
}

 

 

 

Sum(Quantity),

Sum(Sales),

Avg(Profit)

reduce:function(key,values)
{
    var result=
{quantSum:0,salesSum:0,avgProfit:0};
    var count = 0;
    values.forEach(function(value)
    {
        // Calculation of Sum(Quantity)
        result.quantSum += values[i].quantSum;
        // Calculation of Sum(Sales)
        result.salesSum += values[i].salesSum;
        result.avgProfit += values[i].avgProfit;
        count++;
    }
    // Calculation of Avg(Profit)
    result.avgProfit = result.avgProfit / count;
    return result;
},

FROM Sales

 

WHERE

 

Orderid between 1 and 10 AND

Shipdate BETWEEN ‘01/01/2011’ and

‘12/31/2011’

query : {
        "OrderId" : { "$gt" : 1 },
        "OrderId" : { "$lt" : 10 },
        "ShipDate" : { "$gt" : "01/01/2011" },
        "ShipDate" : { "$lt" : "31/12/2011" },
},

GROUP BY

Region, ProductCategory, ProductId

// Group By is handled by the emit(keys, values) 
line in the map() function above

LIMIT 3;

limit : 3,
 
out : { inline : 1 } });

既然我們已經看了在不同業務場景下的聚合函數的代碼示例,接下來我們准備來測試這些函數。

測試聚合函數

MongoDB的MapReduce功能通過數據庫命令來調用。Map和Reduce函數在前面章節里已經使用JavaScript實現。下面是執行MapReduce函數的語法。

db.runCommand(

    { mapreduce : <collection>,

        map : <mapfunction>,

        reduce : <reducefunction>

        [, query : <query filter object>]

        [, sort : <sorts the input objects using this key. Useful for 
 optimization, like sorting by the emit key for fewer reduces>]

        [, limit : <number of objects to return from collection>]

        [, out : <see output options below>]

        [, keeptemp: <true|false>]

        [, finalize : <finalizefunction>]

        [, scope : <object where fields go into javascript global scope >]

        [, jsMode : true]

        [, verbose : true]

    }

)


Where the Output Options include:

{ replace : "collectionName" }

{ merge : "collectionName"

{ reduce : "collectionName" }

{ inline : 1}

 

 

下面是用來保存聚合函數並在MapReduce中使用的命令。

啟動Mongo命令行並設置表

  • 確保Mongo后台進程在運行,然后執行mongo.exe啟動Mongo命令行。
  • 使用命令切換數據庫:use mydb
  • 使用命令查看Sales表的內容:db.sales.find()

find命令的輸出如下:

{ "_id" : ObjectId("4f7be0d3e37b457077c4b13e"), "_class" : "com.infosys.mongo.Sales", "orderId" : 1, "orderDate" : "26/03/2011",
"quantity" : 20, "salesAmt" : 200, "profit" : 150, "customerName" : "CUST1", "productCategory" : "IT", "productSubCategory" : "software", 
"productName" : "Grad", "productId" : 1 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b13f"), "_class" : "com.infosys.mongo.Sales", "orderId" : 2, "orderDate" : "23/05/2011", 
"quantity" : 30, "salesAmt" : 200, "profit" : 40, "customerName" : "CUST2", "productCategory" : "IT", "productSubCategory" : "hardware",
 "productName" : "HIM", "productId" : 1 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b140"), "_class" : "com.infosys.mongo.Sales", "orderId" : 3, "orderDate" : "22/09/2011",
 "quantity" : 40, "salesAmt" : 200, "profit" : 80, "customerName" : "CUST1", "productCategory" : "BT", "productSubCategory" : "services",
 "productName" : "VOCI", "productId" : 2 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b141"), "_class" : "com.infosys.mongo.Sales", "orderId" : 4, "orderDate" : "21/10/2011", 
"quantity" : 30, "salesAmt" : 200, "profit" : 20, "customerName" : "CUST3", "productCategory" : "BT", "productSubCategory" : "hardware", 
"productName" : "CRUD", "productId" : 2 }
{ "_id" : ObjectId("4f7be0d3e37b457077c4b142"), "_class" : "com.infosys.mongo.Sales", "orderId" : 5, "orderDate" : "21/06/2011", 
"quantity" : 50, "salesAmt" : 200, "profit" : 20, "customerName" : "CUST3", "productCategory" : "BT", "productSubCategory" : "hardware", 
"productName" : "CRUD", "productId" : 1 }

 

創建並保存聚合函數

  • 通過MongoDB命令行窗口執行如下命令:
> db.system.js.save( { _id : "Sum" ,
value : function(key,values)
{
    var total = 0;
    for(var i = 0; i < values.length; i++)
        total += values[i];
    return total;
}}); 
  • 在示例表Sales表上執行MapReduce程序
> db.sales.runCommand(
{
mapreduce : "sales" ,
map:function()
{
emit(
{key0:this.ProductCategory,
key1:this.ProductSubCategory,
key2:this.ProductName},
this.SalesAmt);
},
reduce:function(key,values)
{
    var result = Sum(key, values);
    return result;
}
out : { inline : 1 } });

輸出如下:

"results" : [
        {
                "_id" : {
                        "key0" : "BT",
                        "key1" : "hardware",
                        "key2" : "CRUD"
                },
                "value" : 400
        },
        {
                "_id" : {
                        "key0" : "BT",
                        "key1" : "services",
                        "key2" : "VOCI"
                },
                "value" : 200
        },
        {
                "_id" : {
                        "key0" : "IT",
                        "key1" : "hardware",
                        "key2" : "HIM"
                },
                "value" : 200
        },

        {
                "_id" : {
                        "key0" : "IT",
                        "key1" : "software",
                        "key2" : "Grad"
                },
                "value" : 200
        }
],
"timeMillis" : 1,
"timing" : {
        "mapTime" : NumberLong(1),
        "emitLoop" : 1,
        "total" : 1
},
"counts" : {
        "input" : 5,
        "emit" : 5,
        "output" : 4
},
"ok" : 1

總結

MongoDB提供了面向文檔的存儲結構,可以很容易擴展支持TB級數據。同時也提供了Map Reduce功能,可以通過批處理方式使用類SQL函數來實現數據聚合。在這篇文章中,我們描述了安裝MongoDB並使用MapReduce特性執行聚合函數的過程,也提供了簡單SQL聚合的MapReduce示例實現。在MongoDB中,更復雜的聚合函數也可以通過使用MapReduce功能實現。

原文鏈接:http://www.infoq.com/articles/implementing-aggregation-functions-in-mongodb


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM