Map-Reduce示例
注意
将聚合管道作为Map-Reduce的替代方案
聚合管道提供比map-reduce操作更好的性能和可用性。
可以使用聚合管道阶段,如$group
和$merge
来重写map-reduce操作。
对于需要自定义功能的map-reduce操作,MongoDB提供了$accumulator
和$function
聚合操作符。使用这些操作符在JavaScript中定义自定义聚合表达式。
在mongosh
中,db.collection.mapReduce()
方法是对mapReduce
命令的包装。以下示例使用db.collection.mapReduce()
方法。
本节中的示例包括没有自定义聚合表达式的聚合管道替代方案。有关使用自定义表达式的替代方案的示例,请参阅Map-Reduce到聚合管道转换示例。
创建一个包含以下文档的示例集合orders
db.orders.insertMany([ { _id: 1, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-01"), price: 25, items: [ { sku: "oranges", qty: 5, price: 2.5 }, { sku: "apples", qty: 5, price: 2.5 } ], status: "A" }, { _id: 2, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-08"), price: 70, items: [ { sku: "oranges", qty: 8, price: 2.5 }, { sku: "chocolates", qty: 5, price: 10 } ], status: "A" }, { _id: 3, cust_id: "Busby Bee", ord_date: new Date("2020-03-08"), price: 50, items: [ { sku: "oranges", qty: 10, price: 2.5 }, { sku: "pears", qty: 10, price: 2.5 } ], status: "A" }, { _id: 4, cust_id: "Busby Bee", ord_date: new Date("2020-03-18"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }, { _id: 5, cust_id: "Busby Bee", ord_date: new Date("2020-03-19"), price: 50, items: [ { sku: "chocolates", qty: 5, price: 10 } ], status: "A"}, { _id: 6, cust_id: "Cam Elot", ord_date: new Date("2020-03-19"), price: 35, items: [ { sku: "carrots", qty: 10, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" }, { _id: 7, cust_id: "Cam Elot", ord_date: new Date("2020-03-20"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }, { _id: 8, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 75, items: [ { sku: "chocolates", qty: 5, price: 10 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" }, { _id: 9, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 55, items: [ { sku: "carrots", qty: 5, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 }, { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }, { _id: 10, cust_id: "Don Quis", ord_date: new Date("2020-03-23"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" } ])
返回每位客户的总价
对orders
集合执行map-reduce操作,按cust_id
分组,并计算每个cust_id
的price
总和
定义处理每个输入文档的map函数
在此函数中,
this
指的是map-reduce操作正在处理的文档。该函数将每个文档的
price
映射到相应的cust_id
,并输出cust_id
和price
。
var mapFunction1 = function() { emit(this.cust_id, this.price); }; 定义相应的reduce函数,具有两个参数
keyCustId
和valuesPrices
valuesPrices
是一个数组,其元素是map函数发出的price
值,并按keyCustId
分组。函数将
valuesPrice
数组缩减为其元素的求和。
var reduceFunction1 = function(keyCustId, valuesPrices) { return Array.sum(valuesPrices); }; 使用
mapFunction1
map函数和reduceFunction1
reduce函数对orders
集合中的所有文档执行map-reduce操作db.orders.mapReduce( mapFunction1, reduceFunction1, { out: "map_reduce_example" } ) 此操作将结果输出到名为
map_reduce_example
的集合中。如果map_reduce_example
集合已存在,则操作将用此map-reduce操作的结果替换其内容。查询
map_reduce_example
集合以验证结果db.map_reduce_example.find().sort( { _id: 1 } ) 此操作返回以下文档
{ "_id" : "Ant O. Knee", "value" : 95 } { "_id" : "Busby Bee", "value" : 125 } { "_id" : "Cam Elot", "value" : 60 } { "_id" : "Don Quis", "value" : 155 }
聚合替代方法
使用可用的聚合管道操作符,您可以重写map-reduce操作,而无需定义自定义函数
db.orders.aggregate([ { $group: { _id: "$cust_id", value: { $sum: "$price" } } }, { $out: "agg_alternative_1" } ])
在
$group
阶段中,根据cust_id
进行分组,并计算value
字段(参见$sum
)。value
字段包含每个cust_id
的总price
。该阶段将以下文档输出到下一阶段
{ "_id" : "Don Quis", "value" : 155 } { "_id" : "Ant O. Knee", "value" : 95 } { "_id" : "Cam Elot", "value" : 60 } { "_id" : "Busby Bee", "value" : 125 } 然后,
$out
将输出写入集合agg_alternative_1
。或者,您也可以使用$merge
来代替$out
。查询
agg_alternative_1
集合以验证结果db.agg_alternative_1.find().sort( { _id: 1 } ) 操作返回以下文档
{ "_id" : "Ant O. Knee", "value" : 95 } { "_id" : "Busby Bee", "value" : 125 } { "_id" : "Cam Elot", "value" : 60 } { "_id" : "Don Quis", "value" : 155 }
使用平均每件物品数量计算订单和总数量
以下示例将展示在所有具有 ord_date
值大于或等于 2020-03-01
的 orders
集合上执行 map-reduce 操作。
示例中的操作
按
item.sku
字段进行分组,并计算每个sku
的订单数量和总订单数量。计算每个
sku
值的平均订单数量,并将结果合并到输出集合中。
在合并结果时,如果现有文档与新结果具有相同的键,则操作将覆盖现有文档。如果没有具有相同键的现有文档,则操作将插入该文档。
示例步骤
定义处理每个输入文档的map函数
在此函数中,
this
指的是map-reduce操作正在处理的文档。对于每个项目,该函数将
sku
与一个新对象value
相关联,该对象包含count
字段(值为1
)和订单的项qty
,并发出sku
(存储在key
中)和value
。
var mapFunction2 = function() { for (var idx = 0; idx < this.items.length; idx++) { var key = this.items[idx].sku; var value = { count: 1, qty: this.items[idx].qty }; emit(key, value); } }; 定义一个带有两个参数
keySKU
和countObjVals
的相应 reduce 函数countObjVals
是一个数组,其元素是由 map 函数传递给 reducer 函数的按keySKU
分组的对象。该函数将
countObjVals
数组减少为一个包含count
和qty
字段的单个对象reducedValue
。在
reducedVal
中,count
字段包含来自单个数组元素的count
字段的和,而qty
字段包含来自单个数组元素的qty
字段的和。
var reduceFunction2 = function(keySKU, countObjVals) { reducedVal = { count: 0, qty: 0 }; for (var idx = 0; idx < countObjVals.length; idx++) { reducedVal.count += countObjVals[idx].count; reducedVal.qty += countObjVals[idx].qty; } return reducedVal; }; 定义一个带有两个参数
key
和reducedVal
的 finalize 函数。该函数修改reducedVal
对象以添加一个名为avg
的计算字段,并返回修改后的对象var finalizeFunction2 = function (key, reducedVal) { reducedVal.avg = reducedVal.qty/reducedVal.count; return reducedVal; }; 使用
mapFunction2
、reduceFunction2
和finalizeFunction2
函数在orders
集合上执行 map-reduce 操作db.orders.mapReduce( mapFunction2, reduceFunction2, { out: { merge: "map_reduce_example2" }, query: { ord_date: { $gte: new Date("2020-03-01") } }, finalize: finalizeFunction2 } ); 此操作使用
query
字段仅选择那些ord_date
大于或等于new Date("2020-03-01")
的文档。然后将结果输出到集合map_reduce_example2
。如果
map_reduce_example2
集合已存在,则操作将现有内容与 map-reduce 操作的结果合并。也就是说,如果现有文档与新结果具有相同的键,则操作将覆盖现有文档。如果没有具有相同键的现有文档,则操作将插入文档。查询
map_reduce_example2
集合以验证结果db.map_reduce_example2.find().sort( { _id: 1 } ) 此操作返回以下文档
{ "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } } { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } } { "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } } { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } } { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
聚合替代方案
使用可用的聚合管道操作符,您可以重写map-reduce操作,而无需定义自定义函数
db.orders.aggregate( [ { $match: { ord_date: { $gte: new Date("2020-03-01") } } }, { $unwind: "$items" }, { $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } }, { $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } }, { $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } } ] )
$match
阶段仅选择那些ord_date
大于或等于new Date("2020-03-01")
的文档。$unwind
阶段根据items
数组字段拆分文档,为每个数组元素输出一个文档。例如{ "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" } { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" } { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" } { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" } { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" } { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" } { "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" } { "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" } ... $group
阶段按items.sku
分组,为每个 sku 计算值- 数量字段
qty
。数量字段qty
包含了 - 每个
items.sku
的订单总数量(参见$sum
)。
- 数量字段
- 订单 IDs 数组
orders_ids
。订单 IDs 字段包含一个 - 针对每个
items.sku
的不同订单_id
的数组(参见$addToSet
)。
- 订单 IDs 数组
{ "_id" : "chocolates", "qty" : 15, "orders_ids" : [ 2, 5, 8 ] } { "_id" : "oranges", "qty" : 63, "orders_ids" : [ 4, 7, 3, 2, 9, 1, 10 ] } { "_id" : "carrots", "qty" : 15, "orders_ids" : [ 6, 9 ] } { "_id" : "apples", "qty" : 35, "orders_ids" : [ 9, 8, 1, 6 ] } { "_id" : "pears", "qty" : 10, "orders_ids" : [ 3 ] } $project
阶段重新塑造输出文档,以反映 map-reduce 的输出,具有两个字段_id
和value
。在$project
中设置$unwind
阶段根据items
数组字段拆分文档,为每个数组元素输出一个文档。例如{ "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" } { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" } { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" } { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" } { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" } { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" } { "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" } { "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" } ... $group
阶段按items.sku
分组,为每个 sku 计算值数量字段
qty
。数量字段qty
包含使用$sum
计算的每个items.sku
的订单总数量。订单 IDs 数组
orders_ids
。订单 IDs 字段包含使用$addToSet
为每个items.sku
创建的不同订单_id
的数组。
{ "_id" : "chocolates", "qty" : 15, "orders_ids" : [ 2, 5, 8 ] } { "_id" : "oranges", "qty" : 63, "orders_ids" : [ 4, 7, 3, 2, 9, 1, 10 ] } { "_id" : "carrots", "qty" : 15, "orders_ids" : [ 6, 9 ] } { "_id" : "apples", "qty" : 35, "orders_ids" : [ 9, 8, 1, 6 ] } { "_id" : "pears", "qty" : 10, "orders_ids" : [ 3 ] } $project
阶段重新塑造输出文档,以反映 map-reduce 的输出,具有两个字段_id
和value
。在$project
中设置将
value.count
设置为orders_ids
数组的大小,使用$size
。将
value.qty
设置为输入文档的qty
字段。
{ "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } } { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } } { "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } } { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } } { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } } 最后,
$merge
将输出写入集合agg_alternative_3
。如果现有文档具有与新结果相同的键_id
,则操作将覆盖现有文档。如果没有具有相同键的现有文档,则操作将插入文档。查询
agg_alternative_3
集合以验证结果db.agg_alternative_3.find().sort( { _id: 1 } ) 操作返回以下文档
{ "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } } { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } } { "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } } { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } } { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }