Skip to content

MongoDB Aggregation Pipelines — The Analytics Engine Inside Your Document Database

Posted on:June 18, 2025 at 10:00 AM

When engineers say “MongoDB doesn’t scale for analytics”, they usually mean they’ve been running find() queries and doing the aggregation in application code. That’s the wrong tool. MongoDB’s aggregation pipeline is a server-side transformation engine built directly into the database — and for variable-structure data, its composable stage model is a natural fit that avoids the impedance mismatch of forcing nested events into flat SQL tables.

This post came out of a real problem: tracking DEX swap events across multiple decentralised exchanges. Thousands of events per hour, variable structure per protocol, nested arrays of token transfers. MongoDB’s document model handled the schema variation without migrations; the aggregation pipeline handled the analytics.

Why document databases for event-driven data

Relational databases model the world as tables with fixed schemas. That works well for entities with stable, known shapes: users, products, orders.

Financial events don’t have stable shapes. A Uniswap V2 swap has a different structure from a V3 swap. An ERC-20 Transfer event is simpler than a Compound liquidation. Protocol upgrades change event signatures. Forcing these into a fixed schema means either a sprawling column list (mostly null) or a JSON blob column that gives up all query capability.

A document model maps naturally to events: each document is one event, exactly as received, with the fields that event actually has. MongoDB stores BSON (binary JSON) — rich types, nested objects, arrays — and can index into nested fields as efficiently as top-level ones.

// A Uniswap V3 swap event stored as a document
{
_id: ObjectId("..."),
protocol: "uniswap-v3",
chain: "ethereum",
blockNumber: 19847392,
timestamp: ISODate("2024-11-15T14:23:11Z"),
transactionHash: "0xabc...",
pool: "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640",
token0: { address: "0xc02...", symbol: "WETH", decimals: 18 },
token1: { address: "0xa0b...", symbol: "USDC", decimals: 6 },
amountIn: { raw: "1500000000000000000", formatted: 1.5, usd: 3782.50 },
amountOut: { raw: "3782500000", formatted: 3782.5, usd: 3782.50 },
direction: "buy",
sender: "0x789...",
gasUsed: 184293
}

Indexing into this structure is standard:

// Compound index for the queries you'll run
db.swaps.createIndex({ chain: 1, timestamp: -1 });
db.swaps.createIndex({ "token0.symbol": 1, "token1.symbol": 1, timestamp: -1 });
db.swaps.createIndex({ pool: 1, blockNumber: -1 });

The aggregation pipeline mental model

An aggregation pipeline is a sequence of stages. Each stage receives a stream of documents, transforms it, and passes the result to the next stage. The final stage produces the output.

collection → [$match] → [$group] → [$sort] → [$project] → result

This is functionally equivalent to a SQL query, but expressed as a composable array of transformation steps rather than a declarative sentence. You build it incrementally, which makes it easier to debug: run the pipeline up to a given stage, inspect the output, add the next stage.

Core stages with examples

$match — filter early, filter hard

Always put $match first. It reduces the working set before any expensive operations. If an index covers the match fields, MongoDB won’t scan the collection at all.

// Only ETH/USDC swaps in the last 24 hours with meaningful size
{ $match: {
chain: "ethereum",
"token0.symbol": "WETH",
"token1.symbol": "USDC",
timestamp: { $gte: new Date(Date.now() - 86_400_000) },
"amountIn.usd": { $gte: 10_000 } // filter noise
}}

$group — aggregate by key

Equivalent to SQL’s GROUP BY. The _id field defines the grouping key; accumulator operators compute values per group.

// Volume and trade count by pool, per hour
{ $group: {
_id: {
pool: "$pool",
hour: { $dateToString: { format: "%Y-%m-%dT%H:00", date: "$timestamp" } }
},
volumeUSD: { $sum: "$amountIn.usd" },
tradeCount: { $sum: 1 },
avgTradeUSD: { $avg: "$amountIn.usd" },
maxTradeUSD: { $max: "$amountIn.usd" }
}}

Accumulator operators: $sum, $avg, $min, $max, $push (collect into array), $addToSet (unique values), $first, $last.

$sort and $limit

{ $sort: { volumeUSD: -1 } },
{ $limit: 10 }

Always pair $sort + $limit for top-N queries. If the sort field is indexed and comes right after $match, MongoDB can use the index to return the top N without a full collection scan.

$project — shape the output

Include, exclude, or compute fields. 1 = include, 0 = exclude.

{ $project: {
_id: 0,
pool: "$_id.pool",
hour: "$_id.hour",
volumeUSD: { $round: ["$volumeUSD", 2] },
tradeCount: 1,
avgTradeUSD: { $round: ["$avgTradeUSD", 2] }
}}

$unwind — flatten arrays

Documents with arrays are one of the key advantages of a document model. $unwind explodes an array into one document per element — making the array contents groupable and filterable.

// A liquidity provision event has multiple token transfers
{
txHash: "0xabc...",
transfers: [
{ token: "WETH", amount: 1.5, direction: "in" },
{ token: "USDC", amount: 3782.5, direction: "in" },
{ token: "UNI-V3-LP", amount: 0.00042, direction: "out" }
]
}
// Unwind to group by individual token
{ $unwind: "$transfers" },
{ $group: {
_id: "$transfers.token",
totalVolume: { $sum: "$transfers.amount" }
}}

$lookup — joins across collections

MongoDB is not “no joins”. $lookup performs a left outer join between collections.

// Enrich swaps with pool metadata from a separate collection
{ $lookup: {
from: "pools",
localField: "pool",
foreignField: "address",
as: "poolInfo"
}},
{ $unwind: { path: "$poolInfo", preserveNullAndEmptyArrays: true } },
{ $project: {
pool: 1,
volumeUSD: 1,
"poolInfo.fee": 1,
"poolInfo.tvlUSD": 1
}}

For high-cardinality lookups on hot paths, embed the data you’ll always need directly in the document — a deliberate denormalisation that trades storage for query speed.


A complete analytics query

Top 10 trading pools by 24-hour volume on Ethereum, including trade count and average trade size:

db.swaps.aggregate([
// Stage 1: narrow to Ethereum, last 24 hours
{ $match: {
chain: "ethereum",
timestamp: { $gte: new Date(Date.now() - 86_400_000) }
}},
// Stage 2: group by pool
{ $group: {
_id: "$pool",
volumeUSD: { $sum: "$amountIn.usd" },
tradeCount: { $sum: 1 },
avgTradeUSD: { $avg: "$amountIn.usd" },
tokens: { $addToSet: {
t0: "$token0.symbol",
t1: "$token1.symbol"
}}
}},
// Stage 3: top 10 by volume
{ $sort: { volumeUSD: -1 } },
{ $limit: 10 },
// Stage 4: clean output
{ $project: {
_id: 0,
pool: "$_id",
pair: { $first: "$tokens" },
volumeUSD: { $round: ["$volumeUSD", 0] },
tradeCount: 1,
avgTradeUSD: { $round: ["$avgTradeUSD", 0] }
}}
]);

This runs entirely server-side. With the { chain: 1, timestamp: -1 } index covering the $match, MongoDB scans only the relevant documents, groups them in memory, and returns the result. No round-trip to application code for intermediate processing.


Time series collections

MongoDB 5.0+ includes a native time series collection type optimised for append-heavy, time-ordered data:

db.createCollection("price_ticks", {
timeseries: {
timeField: "timestamp",
metaField: "symbol",
granularity: "seconds"
},
expireAfterSeconds: 7_776_000 // auto-delete after 90 days
});

The internal storage format uses columnar compression for time and value fields — significantly reducing storage size and improving range query performance for typical “give me all ticks for BTC/USD in the last hour” queries. For financial tick data this is a better fit than a standard collection.


MongoDB vs PostgreSQL — the honest comparison

Neither is universally better. The choice comes down to data shape and query patterns.

FactorMongoDBPostgreSQL
Variable-structure data✅ Natural fit❌ Workarounds needed
Complex relational queries⚠️ $lookup works but awkward✅ Native JOINs
Schema enforcementOptional (JSON Schema validation)Enforced by default
Horizontal sharding✅ Built-in Atlas Sharding⚠️ Requires Citus or application sharding
Full-text search✅ Atlas Search (Lucene-backed)✅ tsvector/tsquery + GIN indexes (production-grade for many workloads)
Time series✅ Native collection type⚠️ TimescaleDB extension needed
ACID transactions✅ Multi-document since 4.0✅ Full ACID always

For event-driven architectures — DeFi protocols, trading systems, IoT, user activity streams — MongoDB’s document model avoids the impedance mismatch between the naturally nested, variable-shape event data and a rigid relational schema. For a system with well-understood, stable entities and complex multi-table reporting, PostgreSQL wins on query expressiveness and tooling maturity.

The pattern I use: PostgreSQL for user/account/order data where ACID guarantees and relational integrity matter; MongoDB for the event log and time-series data where flexibility and throughput matter more.


Index strategy for aggregation queries

Aggregation pipelines benefit from the same index types as regular queries, but placement matters:

// Compound index: matches the $match stage field order
db.swaps.createIndex({ chain: 1, timestamp: -1, "amountIn.usd": 1 });
// Explain the pipeline to verify index usage
db.swaps.explain("executionStats").aggregate([...pipeline...]);
// Look for IXSCAN (index scan) vs COLLSCAN (full collection scan)

Key rules:

With the right indexes, the $match stage narrows 10 million swap events to a relevant subset via an index scan. The $group stage still processes matching documents in memory, but with a selective $match the working set stays small. For a query like “top 10 pools in the last 24 hours” over a well-indexed collection, response times are typically in the tens of milliseconds.