将 Map-Reduce 转换为聚合管道
一个聚合管道比map-reduce操作提供更好的性能和可用性。
Map-reduce操作可以使用聚合管道阶段重写,例如使用$group
和$merge
。
对于需要自定义功能的map-reduce操作,MongoDB提供了$accumulator
和$function
聚合操作符。使用这些操作符在JavaScript中定义自定义聚合表达式。
Map-reduce表达式可以重写,如下文所述。
Map-Reduce到聚合管道转换表
该表仅是一个近似翻译。例如,该表显示了使用$project
对mapFunction
的近似翻译。
然而,
mapFunction
逻辑可能需要额外的阶段,例如如果逻辑包括对数组的迭代。function() { this.items.forEach(function(item){ emit(item.sku, 1); }); } 那么,聚合管道将包括一个
$unwind
和一个$project
:{ $unwind: "$items "}, { $project: { emits: { key: { "$items.sku" }, value: 1 } } }, $project
中的emits
字段可能被命名为其他名称。为了视觉比较,选择了字段名称emits
。
Map-Reduce | 聚合管道 |
---|---|
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: <collection> } ) | db.collection.aggregate( [ { $match: <查询过滤器> }, { $sort: <排序顺序> }, { $limit: <数量> }, { $project: { emits: { k: <表达式>, v: <表达式> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <初始化代码>, accumulate: <累加函数>, accumulateArgs: [ "$emit.v"], merge: <累加函数>, finalize: <finalizeFunction>, lang: "js" }} } }, { $out: <集合> } ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { replace: <集合>, db:<数据库> } } ) | db.collection.aggregate( [ { $match: <查询过滤器> }, { $sort: <排序顺序> }, { $limit: <数量> }, { $project: { emits: { k: <表达式>, v: <表达式> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <初始化代码>, accumulate: <累加函数>, accumulateArgs: [ "$emit.v"], merge: <累加函数>, finalize: <finalizeFunction>, lang: "js" }} } }, { $out: { db: <数据库>, coll: <集合> } } ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { merge: <集合>, db: <数据库> } } ) | db.collection.aggregate( [ { $match: <查询过滤器> }, { $sort: <排序顺序> }, { $limit: <数量> }, { $project: { emits: { k: <表达式>, v: <表达式> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <初始化代码>, accumulate: <累加函数>, accumulateArgs: [ "$emit.v"], merge: <累加函数>, finalize: <finalizeFunction>, lang: "js" }} } }, { $merge: { into: { db: <数据库>, coll: <集合>}, on: "_id" whenMatched: "replace", whenNotMatched: "insert" } }, ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { reduce: <集合>, db: <数据库> } } ) | db.collection.aggregate( [ { $match: <查询过滤器> }, { $sort: <排序顺序> }, { $limit: <数量> }, { $project: { emits: { k: <表达式>, v: <表达式> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <初始化代码>, accumulate: <累加函数>, accumulateArgs: [ "$emit.v"], merge: <累加函数>, finalize: <finalizeFunction>, lang: "js" }} } }, { $merge: { into: { db: <数据库>, coll: <集合> }, on: "_id" whenMatched: [ { $project: { value: { $function: { body: <累加函数>, args: [ "$_id", [ "$value", "$$new.value" ] ], lang: "js" } } } } ] whenNotMatched: "insert" } }, ] ) |
db.collection.mapReduce( <mapFunction>, <reduceFunction>, { query: <queryFilter>, sort: <sortOrder>, limit: <number>, finalize: <finalizeFunction>, out: { inline: 1 } } ) | db.collection.aggregate( [ { $match: <查询过滤器> }, { $sort: <排序顺序> }, { $limit: <数量> }, { $project: { emits: { k: <表达式>, v: <表达式> } } }, { $unwind: "$emits" }, { $group: { _id: "$emits.k"}, value: { $accumulator: { init: <初始化代码>, accumulate: <累加函数>, accumulateArgs: [ "$emit.v"], merge: <累加函数>, finalize: <finalizeFunction>, lang: "js" }} } } ] ) |
示例
各种map-reduce表达式可以使用聚合管道操作符重写,例如 $group
、$merge
等,无需自定义函数。但是,为了说明目的,以下示例提供了两种替代方案。
示例 1
以下是对 orders
集合进行的 map-reduce 操作,按 cust_id
进行分组,并计算每个 cust_id
的 price
总和。
var mapFunction1 = function() { emit(this.cust_id, this.price); }; var reduceFunction1 = function(keyCustId, valuesPrices) { return Array.sum(valuesPrices); }; db.orders.mapReduce( mapFunction1, reduceFunction1, { out: "map_reduce_example" } )
方案1:(推荐)您可以将操作重写为一个聚合管道,无需将 map-reduce 函数转换为等效的管道阶段。
db.orders.aggregate([ { $group: { _id: "$cust_id", value: { $sum: "$price" } } }, { $out: "agg_alternative_1" } ])
方案2:(仅用于说明)以下聚合管道提供了各种 map-reduce 函数的翻译,使用 $accumulator
定义自定义函数。
db.orders.aggregate( [ { $project: { emit: { key: "$cust_id", value: "$price" } } }, // equivalent to the map function { $group: { // equivalent to the reduce function _id: "$emit.key", valuesPrices: { $accumulator: { init: function() { return 0; }, initArgs: [], accumulate: function(state, value) { return state + value; }, accumulateArgs: [ "$emit.value" ], merge: function(state1, state2) { return state1 + state2; }, lang: "js" } } } }, { $out: "agg_alternative_2" } ] )
首先,
$project
阶段输出具有emit
字段的文档。该emit
字段是一个文档,包含以下字段key
包含文档的cust_id
值value
包含文档的price
值
{ "_id" : 1, "emit" : { "key" : "Ant O. Knee", "value" : 25 } } { "_id" : 2, "emit" : { "key" : "Ant O. Knee", "value" : 70 } } { "_id" : 3, "emit" : { "key" : "Busby Bee", "value" : 50 } } { "_id" : 4, "emit" : { "key" : "Busby Bee", "value" : 25 } } { "_id" : 5, "emit" : { "key" : "Busby Bee", "value" : 50 } } { "_id" : 6, "emit" : { "key" : "Cam Elot", "value" : 35 } } { "_id" : 7, "emit" : { "key" : "Cam Elot", "value" : 25 } } { "_id" : 8, "emit" : { "key" : "Don Quis", "value" : 75 } } { "_id" : 9, "emit" : { "key" : "Don Quis", "value" : 55 } } { "_id" : 10, "emit" : { "key" : "Don Quis", "value" : 25 } } 然后,
$group
使用$accumulator
操作符来添加发出的值{ "_id" : "Don Quis", "valuesPrices" : 155 } { "_id" : "Cam Elot", "valuesPrices" : 60 } { "_id" : "Ant O. Knee", "valuesPrices" : 95 } { "_id" : "Busby Bee", "valuesPrices" : 125 } 最后,
$out
将输出写入集合agg_alternative_2
。或者,您也可以使用$merge
来代替$out
。
示例2
以下是对 orders
集合进行的 map-reduce 操作,按 item.sku
字段进行分组,并计算每个 sku 的订单数和总订购量。操作然后计算每个 sku 值的每订单平均数量,并将结果合并到输出集合中。
var mapFunction2 = function() { for (var idx = 0; idx < this.items.length; idx++) { var key = this.items[idx].sku; var value = { count: 1, qty: this.items[idx].qty }; emit(key, value); } }; var reduceFunction2 = function(keySKU, countObjVals) { reducedVal = { count: 0, qty: 0 }; for (var idx = 0; idx < countObjVals.length; idx++) { reducedVal.count += countObjVals[idx].count; reducedVal.qty += countObjVals[idx].qty; } return reducedVal; }; var finalizeFunction2 = function (key, reducedVal) { reducedVal.avg = reducedVal.qty/reducedVal.count; return reducedVal; }; db.orders.mapReduce( mapFunction2, reduceFunction2, { out: { merge: "map_reduce_example2" }, query: { ord_date: { $gte: new Date("2020-03-01") } }, finalize: finalizeFunction2 } );
方案1:(推荐)您可以将操作重写为一个聚合管道,无需将 map-reduce 函数转换为等效的管道阶段。
db.orders.aggregate( [ { $match: { ord_date: { $gte: new Date("2020-03-01") } } }, { $unwind: "$items" }, { $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } }, { $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } }, { $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } } ] )
方案2:(仅用于说明)以下聚合管道提供了各种 map-reduce 函数的翻译,使用 $accumulator
定义自定义函数。
db.orders.aggregate( [ { $match: { ord_date: {$gte: new Date("2020-03-01") } } }, { $unwind: "$items" }, { $project: { emit: { key: "$items.sku", value: { count: { $literal: 1 }, qty: "$items.qty" } } } }, { $group: { _id: "$emit.key", value: { $accumulator: { init: function() { return { count: 0, qty: 0 }; }, initArgs: [], accumulate: function(state, value) { state.count += value.count; state.qty += value.qty; return state; }, accumulateArgs: [ "$emit.value" ], merge: function(state1, state2) { return { count: state1.count + state2.count, qty: state1.qty + state2.qty }; }, finalize: function(state) { state.avg = state.qty / state.count; return state; }, lang: "js"} } } }, { $merge: { into: "agg_alternative_4", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } } ] )
$match
阶段仅选择具有ord_date
大于或等于new Date("2020-03-01")
的文档。$unwind
阶段通过items
数组字段分解文档,为每个数组元素输出一个文档。例如{ "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" } { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" } { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" } { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" } { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" } { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" } { "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" } { "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" } ... 在
$project
阶段,输出包含一个emit
字段的文档。该emit
字段是一个包含以下字段的文档:key
字段包含items.sku
的值value
字段包含一个包含qty
值和count
值的文档
{ "_id" : 1, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 5 } } } { "_id" : 1, "emit" : { "key" : "apples", "value" : { "count" : 1, "qty" : 5 } } } { "_id" : 2, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 8 } } } { "_id" : 2, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } } { "_id" : 3, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } } { "_id" : 3, "emit" : { "key" : "pears", "value" : { "count" : 1, "qty" : 10 } } } { "_id" : 4, "emit" : { "key" : "oranges", "value" : { "count" : 1, "qty" : 10 } } } { "_id" : 5, "emit" : { "key" : "chocolates", "value" : { "count" : 1, "qty" : 5 } } } ... $group
阶段使用$accumulator
运算符来添加发射的count
和qty
,并计算avg
字段{ "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } } { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } } { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } } { "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } } { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } } 最后,
$merge
将输出写入到集合agg_alternative_4
。如果现有文档与新结果具有相同的键_id
,则操作将覆盖现有文档。如果没有与相同键的现有文档,则操作将插入文档。