聚合查詢結構體系
我們都知道Mongo中聚合是由$match,$project等聚合項組成,所以在C# Driver中具有兩種類型:聚合管道(PipelineDefinition)和聚合管道項(IPipelineStageDefinition) ,下面先來看一下聚合管道項的結構體系
IPipelineStageDefinition
IPipelineStageDefinition接口是聚合管道項的頂級接口,這個接口中只定義了一些獲取輸入類型和輸出類型的簡單的屬性
public interface IPipelineStageDefinition
{
// 輸入類型
Type InputType { get; }
// 獲取管道操作名稱
string OperatorName { get; }
// 輸出類型
Type OutputType { get; }
// 獲取一個IRenderedIRenderedPipelineStageDefinition
// IRenderedPipelineStageDefinition是一個Stage提供對象,下面會介紹
IRenderedPipelineStageDefinition Render(IBsonSerializer inputSerializer, IBsonSerializerRegistry serializerRegistry);
// 獲取當前管道項的字符串格式,例:{ \"$match\" : { \"_id\" : \"402066782845407232\" } }
string ToString(IBsonSerializer inputSerializer, IBsonSerializerRegistry serializerRegistry);
}
這個接口具有一個PipelineStageDefinition派生類,這個類是一個抽象類,在這個抽象類中只多了兩個隱式轉換,
public abstract class PipelineStageDefinition<TInput, TOutput> : IPipelineStageDefinition
{
// 將一個BsonDocument對象轉換為管道項
public static implicit operator PipelineStageDefinition<TInput, TOutput>(BsonDocument document);
// 將一個json字符串轉換為管道項
public static implicit operator PipelineStageDefinition<TInput, TOutput>(string json);
}
用過C# Driver的朋友都應該知道我們使用Driver時經常使用這種隱式轉換,例如經常使用FilterDefinition便可使用json字符串直接賦值,這也是Driver強大的地方。
FilterDefinition<BsonDocument> filter = "{_id:123}";
其實這兩個隱式轉換如果翻源碼就會看到直接創建了這個抽象類的實現類對象
public static implicit operator PipelineStageDefinition<TInput, TOutput>(BsonDocument document)
{
if (document == null)
return null;
return new BsonDocumentPipelineStageDefinition<TInput, TOutput>(document);
}
public static implicit operator PipelineStageDefinition<TInput, TOutput>(string json)
{
if (json == null)
return null;
return new JsonPipelineStageDefinition<TInput, TOutput>(json);
}
也就是說這個抽象類具有這么兩個派生類BsonDocumentPipelineStageDefinition,JsonPipelineStageDefinition 這兩個類型就是使用Bsondocument對象和json字符串進行實例化聚合管道項
PipelineStageDefinition其它派生類
如果僅僅使用,只使用上面那兩個派生類即可,但實際上IPipelineStageDefinition的派生類還有兩個:
DelegatedPipelineStageDefinition:由一個Func<IBsonSerializer
, IBsonSerializerRegistry, RenderedPipelineStageDefinition 委托創建的實例對象SortPipelineStageDefinition:排序項的實例對象
其實這兩個派生類在使用上根本不需要知道,它們的訪問級別是internal,也就是說在使用時根本無法創建這兩個派生類的實例對象,其實這兩個類都是PipelineStageDefinition實例在調用Match() ,Project() ,Sort() 方法時進行內部創建的,這個下面再說
對於SortPipelineStageDefinition和DelegatedPipelineStageDefinition這兩個派生類其實內部特別簡單,但是卻又扯到了另外一個類型
internal class SortPipelineStageDefinition<TInput> : PipelineStageDefinition<TInput, TInput>
{
public SortPipelineStageDefinition(SortDefinition<TInput> sort)
{
Sort = sort;
}
// 排序條件對象
public SortDefinition<TInput> Sort { get; private set; }
// 操作項名稱
public override string OperatorName => "$sort";
public override RenderedPipelineStageDefinition<TInput> Render(IBsonSerializer<TInput> inputSerializer, IBsonSerializerRegistry serializerRegistry)
{
var renderedSort = Sort.Render(inputSerializer, serializerRegistry);
var document = new BsonDocument(OperatorName, renderedSort);
return new RenderedPipelineStageDefinition<TInput>(OperatorName, document, inputSerializer);
}
}
internal sealed class DelegatedPipelineStageDefinition<TInput, TOutput> : PipelineStageDefinition<TInput, TOutput>
{
// 委托緩存
private readonly Func<IBsonSerializer<TInput>, IBsonSerializerRegistry, RenderedPipelineStageDefinition<TOutput>> _renderer;
public DelegatedPipelineStageDefinition(string operatorName, Func<IBsonSerializer<TInput>, IBsonSerializerRegistry, RenderedPipelineStageDefinition<TOutput>> renderer)
{
_renderer = renderer;
}
// 獲取RenderedPipelineStageDefinition 直接返回委托調用
public override RenderedPipelineStageDefinition<TOutput> Render(IBsonSerializer<TInput> inputSerializer, IBsonSerializerRegistry serializerRegistry)
{
return _renderer(inputSerializer, serializerRegistry);
}
}
通過上面代碼可以看到這兩個派生類型特別簡單,感覺就是一個簡單的代理,一切都指向於RenderedPipelineStageDefinition這個類型,也就是在真正執行聚合操作時可能使用的就是這個類型,這個在這先留一下懸念,因為RenderedPipelineStageDefinition這個類型還涉及到了整個聚合管道對象和執行操作,等到下面再講解。
PipelineStageDefinitionBuilder
下面來說一下PipelineStageDefinitionBuilder這個類型,顧名思義,這是一個創建PipelineStageDefinition的類型,它是一個靜態類,內部具有創建各種的使用方法,這個類型中方法特別多,也不一一細講,只講三個方法,也就是上面提到的Match() ,Project() ,Sort()
public static class PipelineStageDefinitionBuilder
{
// match
public static PipelineStageDefinition<TInput, TInput> Match<TInput>(
FilterDefinition<TInput> filter)
{
const string operatorName = "$match";
var stage = new DelegatedPipelineStageDefinition<TInput, TInput>(
operatorName,
(s, sr) => new RenderedPipelineStageDefinition<TInput>(operatorName, new BsonDocument(operatorName, filter.Render(s, sr)), s));
return stage;
}
// project
public static PipelineStageDefinition<TInput, TOutput> Project<TInput, TOutput>(ProjectionDefinition<TInput, TOutput> projection)
{
const string operatorName = "$project";
var stage = new DelegatedPipelineStageDefinition<TInput, TOutput>(
operatorName,
(s, sr) =>
{
var renderedProjection = projection.Render(s, sr);
BsonDocument document;
if (renderedProjection.Document == null)
document = new BsonDocument();
else
document = new BsonDocument(operatorName, renderedProjection.Document);
return new RenderedPipelineStageDefinition<TOutput>(operatorName, document, renderedProjection.ProjectionSerializer);
});
return stage;
}
// sort
public static PipelineStageDefinition<TInput, TInput> Sort<TInput>(
SortDefinition<TInput> sort)
{
return new SortPipelineStageDefinition<TInput>(sort);
}
}
上面就是這三個方法的源代碼,三個方法分別使用FilterDefinition,ProjectionDefinition,SortDefinition實例創建PipelineStageDefinition對象,而所創建的也是后面講的那兩個派生類,這也驗證了上面所說的兩個類型的用途。
PipelineStageDefinition類總結
從上面一步步可以得知,Driver為我們提供了三種創建聚合項的辦法,其實這三種也應用於driver的各種使用上
-
BsonDocument創建
-
json字符串創建
-
使用PipelineStageDefinitionBuilder進行創建
PipelineDefinition
說完管道項,下面就說一下整個聚合管道的操作類PipelineDefinition以及它的派生類
首先PipelineDefinition這個父級類型,它跟PipelineStageDefinition一樣是一個抽象類型,並且和PipelineStageDefinition相同的是它也有一個Render方法和兩個隱式轉換,多了幾個靜態的創建方法,使得更具有擴展性
public abstract class PipelineDefinition<TInput, TOutput>
{
public abstract RenderedPipelineDefinition<TOutput> Render(IBsonSerializer<TInput> inputSerializer, IBsonSerializerRegistry serializerRegistry);
// 使用管道項集合創建一個PipelineStagePipelineDefinition實例對象
public static PipelineDefinition<TInput, TOutput> Create(
IEnumerable<IPipelineStageDefinition> stages,
IBsonSerializer<TOutput> outputSerializer = null)
{
return new PipelineStagePipelineDefinition<TInput, TOutput>(stages, outputSerializer);
}
// 使用BsonDocument集合創建一個BsonDocumentStagePipelineDefinition對象
public static PipelineDefinition<TInput, TOutput> Create(
IEnumerable<BsonDocument> stages,
IBsonSerializer<TOutput> outputSerializer = null)
{
return new BsonDocumentStagePipelineDefinition<TInput, TOutput>(stages, outputSerializer);
}
// 使用json字符串集合創建一個BsonDocumentStagePipelineDefinition對象
public static PipelineDefinition<TInput, TOutput> Create(
IEnumerable<string> stages,
IBsonSerializer<TOutput> outputSerializer = null)
{
return Create(stages?.Select(s => BsonDocument.Parse(s)), outputSerializer);
}
// 隱式轉換
// 將IPipelineStageDefinition集合隱式轉換為PipelineStagePipelineDefinition對象
public static implicit operator PipelineDefinition<TInput, TOutput>(List<IPipelineStageDefinition> stages)
{
return Create(stages);
}
// 將BsonDocument集合隱式轉換為BsonDocumentStagePipelineDefinition對象
public static implicit operator PipelineDefinition<TInput, TOutput>(List<BsonDocument> stages)
{
return Create(stages);
}
}
注:PipelineDefinition類中還封裝了數組參數和其它內容,有興趣的朋友可以自己去看看
上面類型可以看出PipelineDefinition做了很多封裝,為了使用更加便捷。從上面也看到了兩個派生類型:PipelineStagePipelineDefinition和BsonDocumentStagePipelineDefinition
其實PipelineDefinition派生類型一共有7個,我們能用到的是6個,我將這個7個類型分為:創建性,改變性和外部不可用性這三種,下面先來看看創建性
注:其實嚴格意義上是兩種,外部不可用的派生類型屬於創建性,外部不可用的派生類型也只是在特定情況下被內部用到。
創建性派PipelineDefinition
創建性有3個,其中兩個就是上面基類中創建的兩個派生類型,另外一個是EmptyPipelineDefinition,顧名思義這是一個空的管道,這個跟創建空條件那個是極其相似的( Builders
// EmptyPipelineDefinition
public sealed class EmptyPipelineDefinition<TInput> : PipelineDefinition<TInput, TInput>
{
public override IEnumerable<IPipelineStageDefinition> Stages => Enumerable.Empty<IPipelineStageDefinition>();
//
public override RenderedPipelineDefinition<TInput> Render(IBsonSerializer<TInput> inputSerializer, IBsonSerializerRegistry serializerRegistry)
{
var documents = Enumerable.Empty<BsonDocument>();
return new RenderedPipelineDefinition<TInput>(documents, _inputSerializer ?? inputSerializer);
}
}
// PipelineStagePipelineDefinition
public sealed class PipelineStagePipelineDefinition<TInput, TOutput> : PipelineDefinition<TInput, TOutput>
{
private readonly IList<IPipelineStageDefinition> _stages;
public PipelineStagePipelineDefinition(IEnumerable<IPipelineStageDefinition> stages, IBsonSerializer<TOutput> outputSerializer = null)
{
_stages = stages;
_outputSerializer = outputSerializer;
}
public override IEnumerable<IPipelineStageDefinition> Stages => _stages;
public override RenderedPipelineDefinition<TOutput> Render(IBsonSerializer<TInput> inputSerializer, IBsonSerializerRegistry serializerRegistry)
{
// 當前集合進行存儲當前聚合管道所有聚合項的BsonDocument
var pipeline = new List<BsonDocument>();
IBsonSerializer currentSerializer = inputSerializer;
foreach (var stage in _stages)
{
// 獲取每一個聚合項的RenderedPipelineDefinition
// 然后獲取每個聚合項RenderedPipelineDefinition中的Bsondocument
var renderedStage = stage.Render(currentSerializer, serializerRegistry);
currentSerializer = renderedStage.OutputSerializer;
if (renderedStage.Document.ElementCount > 0)
{
pipeline.Add(renderedStage.Document);
}
}
return new RenderedPipelineDefinition<TOutput>(
pipeline,
_outputSerializer ?? (currentSerializer as IBsonSerializer<TOutput>) ?? serializerRegistry.GetSerializer<TOutput>());
}
// BsonDocumentStagePipelineDefinition
public sealed class BsonDocumentStagePipelineDefinition<TInput, TOutput> : PipelineDefinition<TInput, TOutput>
{
private readonly List<BsonDocument> _stages;
public BsonDocumentStagePipelineDefinition(IEnumerable<BsonDocument> stages, IBsonSerializer<TOutput> outputSerializer = null)
{
_stages = stages;
_outputSerializer = outputSerializer;
}
public override IBsonSerializer<TOutput> OutputSerializer => _outputSerializer;
public IList<BsonDocument> Documents
{
get { return _stages; }
}
// 獲取當前聚合的所有聚合項
public override IEnumerable<IPipelineStageDefinition> Stages => _stages.Select(s => new BsonDocumentPipelineStageDefinition<TInput, TOutput>(s, _outputSerializer));
public override RenderedPipelineDefinition<TOutput> Render(IBsonSerializer<TInput> inputSerializer, IBsonSerializerRegistry serializerRegistry)
{
return new RenderedPipelineDefinition<TOutput>(
_stages,
_outputSerializer ?? (inputSerializer as IBsonSerializer<TOutput>) ?? serializerRegistry.GetSerializer<TOutput>());
}
}
上面是這個三個派生類型基本實現,基本上也都沒什么特別的地方,而邏輯也是在Render()這個方法中,EmptyPipelineDefinition中創建了一個空的Bsondocument對象集合實例化的RenderedPipelineDefinition,而BsonDocumentStagePipelineDefinition和PipelineStagePipelineDefinition分別以傳入的Bsondocument集合和從管道項對象中調用的Render()中獲取Bsondocument集合。從這里可以得出2點
1.RenderedPipelineStageDefinition的作用是為了提供其內部的Bsondocument然后創建RenderedPipelineDefinition對象
2.RenderedPipelineStageDefinition和RenderedPipelineDefinition的關系就像BsonDocumentPipelineStageDefinition和BsonDocumentStagePipelineDefinition關系類似,一個對應管道項,一個對應管道
至此,一切的源頭都指向了<span style="color:#009BDB">RenderedPipelineDefinition</span>這個類,但是這個類在下面再介紹,先來看一下改變性的<span style="color:#009BDB">PipelineDefinition</span>
改變性PipelineDefinition
為什么我叫它為改變性呢,因為它是在一個已有PipelineDefinition基礎上進行的添加或者替換,下面來看看這三個派生類型
PrependedStagePipelineDefinition:在一個PipelineDefinition管道前面添加一個管道項
AppendedStagePipelineDefinition:在一個PipelineDefinition管道后面添加一個管道項
ReplaceOutputSerializerPipelineDefinition:替換一個PipelineDefinition的序列化對象類型
其實看到這三個派生類就知道其作用了,所以在這里也不進行詳細介紹了,只貼出它們的構造方法,有興趣的朋友可以翻閱源碼
// PrependedStagePipelineDefinition
public sealed class PrependedStagePipelineDefinition<TInput, TIntermediate, TOutput> : PipelineDefinition<TInput, TOutput>
{
public PrependedStagePipelineDefinition(
PipelineStageDefinition<TInput, TIntermediate> stage,
PipelineDefinition<TIntermediate, TOutput> pipeline,
IBsonSerializer<TOutput> outputSerializer = null)
{
}
}
// AppendedStagePipelineDefinition
public sealed class AppendedStagePipelineDefinition<TInput, TIntermediate, TOutput> : PipelineDefinition<TInput, TOutput>
{
public AppendedStagePipelineDefinition(
PipelineDefinition<TInput, TIntermediate> pipeline,
PipelineStageDefinition<TIntermediate, TOutput> stage,
IBsonSerializer<TOutput> outputSerializer = null)
{
}
}
// ReplaceOutputSerializerPipelineDefinition
public sealed class ReplaceOutputSerializerPipelineDefinition<TInput, TIntermediate, TOutput> : PipelineDefinition<TInput, TOutput>
{
public ReplaceOutputSerializerPipelineDefinition(
PipelineDefinition<TInput, TIntermediate> pipeline,
IBsonSerializer<TOutput> outputSerializer = null)
{
}
}
外部不可用派生類
這個外部不可用的派生類型是OptimizingPipelineDefinition ,按照翻譯看起來像最優的管道,其實在執行操作時都會現將外部定義的PipelineDefinition轉換為OptimizingPipelineDefinition 類型,首先先看看這個類型的定義
internal class OptimizingPipelineDefinition<TInput, TOutput> : PipelineDefinition<TInput, TOutput>
{
private readonly PipelineDefinition<TInput, TOutput> _wrapped;
public OptimizingPipelineDefinition(PipelineDefinition<TInput, TOutput> wrapped)
{
_wrapped = wrapped;
}
/// <inheritdoc />
public override IEnumerable<IPipelineStageDefinition> Stages => _wrapped.Stages;
public override RenderedPipelineDefinition<TOutput> Render(IBsonSerializer<TInput> inputSerializer, IBsonSerializerRegistry serializerRegistry)
{
var rendered = _wrapped.Render(inputSerializer, serializerRegistry);
if (rendered.Documents.Count > 1)
{
// 如果有可能,進行組合一下$match
var firstStage = rendered.Documents[0].GetElement(0);
var secondStage = rendered.Documents[1].GetElement(0);
if (firstStage.Name == "$match" && secondStage.Name == "$match")
{
var combinedFilter = Builders<BsonDocument>.Filter.And(
(BsonDocument)firstStage.Value,
(BsonDocument)secondStage.Value);
var combinedStage = new BsonDocument("$match", combinedFilter.Render(BsonDocumentSerializer.Instance, serializerRegistry));
rendered.Documents[0] = combinedStage;
rendered.Documents.RemoveAt(1);
}
}
return rendered;
}
}
可以看到只是在Render()代碼中進行了一個輕微的優化操作,這個優化類是針對OfType情況進行優化的,唯一的使用地方是在FilteredMongoCollectionBase這個抽象類中,而這個抽象類的實現類是OfTypeMongoCollection
internal abstract class FilteredMongoCollectionBase<TDocument> : MongoCollectionBase<TDocument>, IFilteredMongoCollection<TDocument>
{
// 創建OptimizingPipelineDefinition
private PipelineDefinition<TDocument, TResult> CreateFilteredPipeline<TResult>(PipelineDefinition<TDocument, TResult> pipeline)
{
var filterStage = PipelineStageDefinitionBuilder.Match(_filter);
var filteredPipeline = new PrependedStagePipelineDefinition<TDocument, TDocument, TResult>(filterStage, pipeline);
return new OptimizingPipelineDefinition<TDocument, TResult>(filteredPipeline);
}
}
internal class OfTypeMongoCollection<TRootDocument, TDerivedDocument> : FilteredMongoCollectionBase<TDerivedDocument>
where TDerivedDocument : TRootDocument
{
}
PipelineDefinitionBuilder類型
PipelineDefinitionBuilder類型是管道系列的一個幫助類,這個與PipelineStageDefinitionBuilder類相似,但又不盡相同,PipelineDefinitionBuilder中定義的都是PipelineDefinition對象的擴展方法,定義了一系列方便的方法
public static class PipelineDefinitionBuilder
{
// $match
public static PipelineDefinition<TInput, TOutput> Match<TInput, TOutput>(
this PipelineDefinition<TInput, TOutput> pipeline,
FilterDefinition<TOutput> filter)
{
return pipeline.AppendStage(PipelineStageDefinitionBuilder.Match(filter));
}
// $project
public static PipelineDefinition<TInput, TOutput> Project<TInput, TIntermediate, TOutput>(
this PipelineDefinition<TInput, TIntermediate> pipeline,
ProjectionDefinition<TIntermediate, TOutput> projection)
{
return pipeline.AppendStage(PipelineStageDefinitionBuilder.Project(projection));
}
// AppendStage
public static PipelineDefinition<TInput, TOutput> AppendStage<TInput, TIntermediate, TOutput> (
this PipelineDefinition<TInput, TIntermediate> pipeline,
PipelineStageDefinition<TIntermediate, TOutput> stage,
IBsonSerializer<TOutput> outputSerializer = null)
{
return new AppendedStagePipelineDefinition<TInput, TIntermediate, TOutput>(pipeline, stage, outputSerializer);
}
public static PipelineDefinition<TInput, TOutput> As<TInput, TIntermediate, TOutput>(
this PipelineDefinition<TInput, TIntermediate> pipeline,
IBsonSerializer<TOutput> outputSerializer = null)
{
return new ReplaceOutputSerializerPipelineDefinition<TInput, TIntermediate, TOutput>(pipeline, outputSerializer);
}
}
其實可以看出從上面幾個個方法可以看出其本質還是使用AppendedStagePipelineDefinition和ReplaceOutputSerializerPipelineDefinition。Match()和Project()都是調用了AppendStage(),而這個方法是創建了一個新的AppendedStagePipelineDefinition對象返回。而As()也是創建了一個新的ReplaceOutputSerializerPipelineDefinition返回。其本質沒變,但是可以使得整個driver多了擴展性,更加方便了使用。有的聚合項像$addFields並沒有封裝方法,可能使用率不大,所以並沒有封裝,像這樣的直接就調用AppendStage()即可
PipelineDefinition類總結
通過上面介紹其實可以看出來了,Mongo的C# Driver中聚合操作使用起來特別方便,使用時先創建聚合項對象再創建聚合管道對象還是直接創建聚合管道對象或者直接使用隱式轉換都可以。其實不止聚合,C# Driver中各個操作基本都是如此,使用起來都特別方便,既然創建聚合管道實例的方法特別多,所以在這也就不一一列出,只簡單的列出幾個
1.先實例化聚合項,再實例化聚合管道對象
2.直接使用隱式轉換進行創建聚合管道對象
3.使用擴展方法進行創建
RenderedPipelineStageDefinition和RenderedPipelineDefinition介紹
下面我們來說說RenderedPipelineStageDefinition和RenderedPipelineDefinition這兩個類,這兩個類叫做聚合項和聚合管道的提供者,它們真正提供了聚合的語句。上面已經簡單說過,它們分別是聚合項實例和聚合管道實例創建的,並且在PipelineStagePipelineDefinition中也可以看到RenderedPipelineDefinition是根據RenderedPipelineStageDefinition內部BsonDocument進行實例化的,下面先來看一看這兩個類型的內部結構
// RenderedPipelineStageDefinition
public class RenderedPipelineStageDefinition<TOutput> : IRenderedPipelineStageDefinition
{
private string _operatorName;
private BsonDocument _document;
private IBsonSerializer<TOutput> _outputSerializer;
public RenderedPipelineStageDefinition(string operatorName, BsonDocument document, IBsonSerializer<TOutput> outputSerializer)
{
_operatorName = Ensure.IsNotNull(operatorName, nameof(operatorName));
_document = Ensure.IsNotNull(document, nameof(document));
_outputSerializer = Ensure.IsNotNull(outputSerializer, nameof(outputSerializer));
}
public BsonDocument Document
{
get { return _document; }
}
public IBsonSerializer<TOutput> OutputSerializer
{
get { return _outputSerializer; }
}
public string OperatorName
{
get { return _operatorName; }
}
IBsonSerializer IRenderedPipelineStageDefinition.OutputSerializer
{
get { return _outputSerializer; }
}
}
// RenderedPipelineDefinition
public class RenderedPipelineDefinition<TOutput>
{
private List<BsonDocument> _documents;
private IBsonSerializer<TOutput> _outputSerializer;
public RenderedPipelineDefinition(IEnumerable<BsonDocument> documents, IBsonSerializer<TOutput> outputSerializer)
{
_documents = Ensure.IsNotNull(documents, nameof(documents)).ToList();
_outputSerializer = Ensure.IsNotNull(outputSerializer, nameof(outputSerializer));
}
public IList<BsonDocument> Documents
{
get { return _documents; }
}
public IBsonSerializer<TOutput> OutputSerializer
{
get { return _outputSerializer; }
}
}
可以看到其實最重要是就是內部的BsonDocument這個屬性,那么這個屬性里面是什么呢,我們先來看一下
可以看出BsonDocument其實存放就是一個聚合項的json字符串,也就是
注:這個Render()是以序列化器類型實例和序列化注冊實例進行序列化為字符串的
然后我來驗證聚合的最后執行操作,也就是RenderedPipelineDefinition的作用,這個操作是在MongoCollectionImpl中,從下面代碼可以看出,使用Render()方法獲取聚合管道的真實語句。然后由此語句執行,由此可以看出其實一切的PipelineDefinition對象最終都是生成RenderedPipelineDefinition對象,這個對象攜帶着執行語句的json字符串形式。
internal sealed class MongoCollectionImpl<TDocument> : MongoCollectionBase<TDocument>
{
public override IAsyncCursor<TResult> Aggregate<TResult>(IClientSessionHandle session, PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options, CancellationToken cancellationToken = default(CancellationToken))
{
// 獲取當前聚合管道對象的語句
var renderedPipeline = pipeline.Render(_documentSerializer, _settings.SerializerRegistry);
options = options ?? new AggregateOptions();
var last = renderedPipeline.Documents.LastOrDefault();
if (last != null && last.GetElement(0).Name == "$out")
{
var aggregateOperation = CreateAggregateToCollectionOperation(renderedPipeline, options);
ExecuteWriteOperation(session, aggregateOperation, cancellationToken);
var findOperation = CreateAggregateToCollectionFindOperation(last, renderedPipeline.OutputSerializer, options);
var forkedSession = session.Fork();
var deferredCursor = new DeferredAsyncCursor<TResult>(
() => forkedSession.Dispose(),
ct => ExecuteReadOperation(forkedSession, findOperation, ReadPreference.Primary, ct),
ct => ExecuteReadOperationAsync(forkedSession, findOperation, ReadPreference.Primary, ct));
return deferredCursor;
}
else
{
var aggregateOperation = CreateAggregateOperation(renderedPipeline, options);
return ExecuteReadOperation(session, aggregateOperation, cancellationToken);
}
}
}
聚合操作的執行方法
上面說了整個聚合管道類的體系,下面說一下最后調用的執行方法
執行方法調用的是IMongoCollection對象的Aggregate()方法,這個方法在IMongoCollection類中具有兩個重載,都是需要PipelineDefinition為參數的。
在這個方法中還有一個AggregateOptions參數。這個類是執行聚合的一些選擇操作。比如是否使用游標,如果內存不足情況下是否允許使用磁盤等等。。
IAsyncCursor<TResult> Aggregate<TResult>(PipelineDefinition<TDocument, TResult> pipeline, AggregateOptions options = null, CancellationToken cancellationToken = default(CancellationToken));
Aggregate()方法會返回一個IAsyncCursor實例,這個對象代表一個游標。
其實在IMongoCollectionExtensions這個擴展類中還具有Aggregate()方法,這個方法也算是另外一種用法。因為這個方法參數並沒有PipelineDefinition對象,並且返回類型也不再是IAsyncCursor,而是一個IAggregateFluent類型。IAggregateFluent類型具有一系列方法
public static IAggregateFluent<TDocument> Aggregate<TDocument>(this IMongoCollection<TDocument> collection, AggregateOptions options = null);