Basics of Aggregation Pipeline in MongoDB

Sumit Bopche
6 min readMay 10, 2021

Aggregation Pipeline is a framework that is based on the concepts of data processing pipelines. Documents enter a multi-stage pipeline that transforms the documents into an aggregated result. Data flows from one stage to another stage and at the end of the pipeline, we get the transformed data which can be used directly. The MongoDB aggregation pipeline consists of stages that we can use as per our requirements.

Import the sample data in your local MongoDB.

  1. Download the sample data from https://github.com/ozlerhakan/mongodb-json-files/blob/master/datasets/grades.json
  2. Run following commands in terminal

(Run mongod command to start the local MongoDB in another terminal tab)

$ mongoimport -d sampleData -c grades grades.json$ mongo> show dbs> use sampleData

Sample Document:-

<script src=”https://gist.github.com/sumitbopche01/a77d80b8e45ccc28f3092d9c5586577e.js"></script>

Stages

$match: The $match stage filters documents. Place the $match as early in the aggregation pipeline as possible. If $match is first stage then it can take advantage of indexes, which increases the speed of our queries.

$match limits the total number of documents in the aggregation pipeline which minimizes the amount of processing down the pipe.

db.grades.aggregate([{$match: {student_id: 1 }}]).pretty()

This will return all the documents having student_id = 1.

db.grades.aggregate([
{ $match: {
scores : {
$elemMatch: {
score: { $gt: 90 },
type: "exam"
}
}
}
},
{$limit: 2}
]).pretty()

Here we are finding all the documents where the score is greater than 90 and the type is “exam” but here we need to use the $elemMatch as the field scores is an array.

The $elemMatch operator matches documents that contain an array field with at least one element that matches all the specified query criteria.

$limit: Limits the number of documents passed to the next stage in the pipeline. In the above example, only two documents matching the condition will be returned as we have mentioned the limit as 2 in the second stage. Place the $limit stage after the $match stage to decrease the number of documents which is to be processed by other stages. If we place $limit stage at the end of the pipeline then previous stages will perform operations on all of the documents we got in the $match stage which is increase query execution time.

$sort: As the name suggests, it is used for sorting the documents. The $sort stage can use an index as long as it is not preceded by a $project, $unwind, or $group stage.

$count :

db.grades.aggregate([ 
{
$match: {
scores : {
$elemMatch: {
score: { $gt: 90 },
type: "exam"
}
}
}
},
{$count: "above90ExamScore"}
])

First Stage($match): It is used for finding the data matching our criteria. The field “scores” is an array of objects and to compare each object in the array we need to use the $elemMatch operator. If the score is greater than 90 and the type is exam then only that whole object will be returned.

Second Stage($count): The $count stage returns a count of the remaining documents in the aggregation pipeline and assigns the value to a field called above90ExamScore.

$project: $project is used in MongoDB query for inclusion and exclusion of fields, the suppression of the _id field, the addition of new fields, and the resetting of the values of existing fields.

$project passes specified fields which can be existing fields from input documents or new computed fields.

$project cannot be used to assign a new value to existing fields.

db.grades.aggregate([ 
{ $match: {} },
{ $limit: 3 },
{ $project: {
_id: 1,
student_id: 1,
class_id: 1
}
}
])

First Stage($match): Returns all documents as we have specified any filter condition.

Second Stage($limit): Return a limited number of documents which is 3.

Third Stage($project): Here we are selecting only _id, student_id and class_id fields. Other fields in the documents will not returned in the output.

db.grades.aggregate([ 
{ $match: {} },
{ $limit: 3 },
{ $project: {
_id: 1,
student_id: 1,
class_id: 0
}
}
])

In the above query, we have specified class_id as 0 which will give an error as

"errmsg" : "Bad projection specification, cannot exclude fields other than '_id' in an inclusion projection: { _id: 1.0, student_id: 1.0, class_id: 0.0 }

Only _id can be set as 0 or 1 in projection and the value specified for another field should be homogeneous either all 0 or 1.

db.grades.aggregate([ 
{ $match: {} },
{ $limit: 3 },
{ $project: {
_id: 0,
student_id: 1,
class_id: 1
}
}
])

We can assign value to new fields, like

db.grades.aggregate([
{ $match: {} },
{ $project: {
_id:0,
"score_types": "$scores.type"}
},
{ $limit: 1 }
]).pretty()

We will get result as

{
"score_types" : [
"exam",
"quiz",
"homework",
"homework",
"homework"
]
}

Now if we want unique score types then one of the solutions can be like this:

db.grades.aggregate([
{ $match: {}},
{
$project: {
_id:0,
"score_types": "$scores.type" }
},
{ $unwind: "$score_types" },
{
$group: {
_id: null,
types: { $addToSet: "$score_types"}
}
}]).pretty()

This will give result

{ "_id" : null, "types" : [ "quiz", "exam", "homework" ] }

Here we are unwinding items of the array and then we are adding types into types array. $addToSet adds only unique values into array and we can use $addToSet in $group stage only.

$addFields:

The $addFields stage is equivalent to a $project stage.

As the name applies, $addFields adds fields to a document. While with the project we can selectively remove and retain fields.

$addFields only allows us to modify incoming pipeline documents with new computed fields or to modify existing fields.

db.grades.aggregate([
{ $match: {} },
{ $limit: 2 },
{ $addFields: {
school_name: 'Aggregate School'
}
}
]).pretty()

This will add a new field “school_name” in every document.

$group:

{
$group: {
_id: <matching or grouping criteria,
<fieldName>: <accumulator expression>,
...
}}

The expression we specify to _id becomes the criteria group stage uses to categorize and bundle documents together. _id is the required field that can accept null in value.

If we want to accumulate all documents rather than just a subset then we use null as matching criteria.

{
$group: {
_id: null,
count: { $sum: 1 }
}
}

$facet:

What if you want to apply different aggregation logic on the same input in a single aggregation call? $facet is here to help you out.

{ $facet:   {      
<outputField1>: [ <stage1>, <stage2>, ... ],
<outputField2>: [ <stage1>, <stage2>, ... ],
...
}}

Many cases require the ability to manipulate, inspect and analyze data across multiple dimensions. $facet stage has three sub-pipelines that use $sortByCount, $bucket, or $bucketAuto to perform this multi-faceted aggregation.

$sortByCount: is equivalent to group stage to sort(in descending order).

db.grades.aggregate([
{ $match: {}},
{$sortByCount: "$class_id"}
])

$bucket:

  • must always specify atleast 2 values to boundaries
  • boundaries must be of same data type
db.grades.aggregate([
{ $match: {}},
{ $bucket:
{ groupBy: "$class_id",
boundaries: [0,2,4,6,8,10, Infinity ]
}
}
])

$bucketAuto:

db.grades.aggregate([
{ $match: {}},
{$bucketAuto:
{
groupBy: "$class_id",
buckets: 5
}
}
])

Multiple Facets:

db.grades.aggregate([
{ $match: {}},
{$facet:{
autoBucketResults: [
{$bucketAuto:
{
groupBy: "$class_id",
buckets: 5
}
}
],
manualBucketResults: [
{ $bucket:
{ groupBy: "$class_id",
boundaries: [0,2,4,6,8,10, Infinity ]
}
}
]
}
}
])

This will give two results which help us in applying different aggregation pipeline on same data in single aggregation call.

{
"autoBucketResults" : [
{
"_id" : {
"min" : 0,
"max" : 8
},
"count" : 35
},
{
"_id" : {
"min" : 8,
"max" : 13
},
"count" : 31
},
{
"_id" : {
"min" : 13,
"max" : 20
},
"count" : 33
},
{
"_id" : {
"min" : 20,
"max" : 26
},
"count" : 33
},
{
"_id" : {
"min" : 26,
"max" : 30
},
"count" : 24
}
],
"manualBucketResults" : [
{
"_id" : 0,
"count" : 8
},
{
"_id" : 2,
"count" : 8
},
{
"_id" : 4,
"count" : 9
},
{
"_id" : 6,
"count" : 10
},
{
"_id" : 8,
"count" : 12
},
{
"_id" : 10,
"count" : 109
}
]
}

Performance Optimisation:

  • When $limit and $sort are close together a very performant top-k sort can be performed. This is when the server is able to only allocate memory for the final number of documents.
  • Transforming data in a pipeline stage prevents us from using indexes in the stages that follow and because of this we should use $sort before applying any transformation.
  • Once the server encounters a stage that is not able to use indexes, all of the following stages will no longer be able to use indexes.
  • The Aggregation Framework can automatically project fields if the shape of the final document is only dependent upon those fields in the input document. So do not add $project stage in this pipeline.

Memory Constraints:

  • Results are subject to 16 MB document limit: The best to mitigate this issue by using $limit and $project
  • MongoDB can use 100MB of RAM per stage

LinkedIn: https://www.linkedin.com/in/sumit-bopche-609131a0/

--

--