文档菜单
文档首页
/
MongoDB 手册
/ /

Map-Reduce示例

注意

将聚合管道作为Map-Reduce的替代方案

聚合管道提供比map-reduce操作更好的性能和可用性。

可以使用聚合管道阶段,如$group$merge来重写map-reduce操作。

对于需要自定义功能的map-reduce操作,MongoDB提供了$accumulator$function聚合操作符。使用这些操作符在JavaScript中定义自定义聚合表达式。

mongosh中,db.collection.mapReduce()方法是对mapReduce命令的包装。以下示例使用db.collection.mapReduce()方法。

本节中的示例包括没有自定义聚合表达式的聚合管道替代方案。有关使用自定义表达式的替代方案的示例,请参阅Map-Reduce到聚合管道转换示例。

创建一个包含以下文档的示例集合orders

db.orders.insertMany([
{ _id: 1, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-01"), price: 25, items: [ { sku: "oranges", qty: 5, price: 2.5 }, { sku: "apples", qty: 5, price: 2.5 } ], status: "A" },
{ _id: 2, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-08"), price: 70, items: [ { sku: "oranges", qty: 8, price: 2.5 }, { sku: "chocolates", qty: 5, price: 10 } ], status: "A" },
{ _id: 3, cust_id: "Busby Bee", ord_date: new Date("2020-03-08"), price: 50, items: [ { sku: "oranges", qty: 10, price: 2.5 }, { sku: "pears", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 4, cust_id: "Busby Bee", ord_date: new Date("2020-03-18"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 5, cust_id: "Busby Bee", ord_date: new Date("2020-03-19"), price: 50, items: [ { sku: "chocolates", qty: 5, price: 10 } ], status: "A"},
{ _id: 6, cust_id: "Cam Elot", ord_date: new Date("2020-03-19"), price: 35, items: [ { sku: "carrots", qty: 10, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 7, cust_id: "Cam Elot", ord_date: new Date("2020-03-20"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 8, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 75, items: [ { sku: "chocolates", qty: 5, price: 10 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 9, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 55, items: [ { sku: "carrots", qty: 5, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 }, { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 10, cust_id: "Don Quis", ord_date: new Date("2020-03-23"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }
])

orders集合执行map-reduce操作,按cust_id分组,并计算每个cust_idprice总和

  1. 定义处理每个输入文档的map函数

    • 在此函数中,this指的是map-reduce操作正在处理的文档。

    • 该函数将每个文档的price映射到相应的cust_id,并输出cust_idprice

    var mapFunction1 = function() {
    emit(this.cust_id, this.price);
    };
  2. 定义相应的reduce函数,具有两个参数keyCustIdvaluesPrices

    • valuesPrices是一个数组,其元素是map函数发出的price值,并按keyCustId分组。

    • 函数将valuesPrice数组缩减为其元素的求和。

    var reduceFunction1 = function(keyCustId, valuesPrices) {
    return Array.sum(valuesPrices);
    };
  3. 使用mapFunction1 map函数和reduceFunction1 reduce函数对orders集合中的所有文档执行map-reduce操作

    db.orders.mapReduce(
    mapFunction1,
    reduceFunction1,
    { out: "map_reduce_example" }
    )

    此操作将结果输出到名为map_reduce_example的集合中。如果map_reduce_example集合已存在,则操作将用此map-reduce操作的结果替换其内容。

  4. 查询map_reduce_example集合以验证结果

    db.map_reduce_example.find().sort( { _id: 1 } )

    此操作返回以下文档

    { "_id" : "Ant O. Knee", "value" : 95 }
    { "_id" : "Busby Bee", "value" : 125 }
    { "_id" : "Cam Elot", "value" : 60 }
    { "_id" : "Don Quis", "value" : 155 }

使用可用的聚合管道操作符,您可以重写map-reduce操作,而无需定义自定义函数

db.orders.aggregate([
{ $group: { _id: "$cust_id", value: { $sum: "$price" } } },
{ $out: "agg_alternative_1" }
])
  1. $group 阶段中,根据 cust_id 进行分组,并计算 value 字段(参见$sum)。value 字段包含每个 cust_id 的总 price

    该阶段将以下文档输出到下一阶段

    { "_id" : "Don Quis", "value" : 155 }
    { "_id" : "Ant O. Knee", "value" : 95 }
    { "_id" : "Cam Elot", "value" : 60 }
    { "_id" : "Busby Bee", "value" : 125 }
  2. 然后,$out 将输出写入集合 agg_alternative_1。或者,您也可以使用 $merge 来代替 $out

  3. 查询 agg_alternative_1 集合以验证结果

    db.agg_alternative_1.find().sort( { _id: 1 } )

    操作返回以下文档

    { "_id" : "Ant O. Knee", "value" : 95 }
    { "_id" : "Busby Bee", "value" : 125 }
    { "_id" : "Cam Elot", "value" : 60 }
    { "_id" : "Don Quis", "value" : 155 }

提示

另请参阅

有关使用自定义聚合表达式的替代方法的示例,请参阅将 Map-Reduce 转换为聚合管道的翻译示例。

以下示例将展示在所有具有 ord_date 值大于或等于 2020-03-01orders 集合上执行 map-reduce 操作。

示例中的操作

  1. item.sku 字段进行分组,并计算每个 sku 的订单数量和总订单数量。

  2. 计算每个 sku 值的平均订单数量,并将结果合并到输出集合中。

在合并结果时,如果现有文档与新结果具有相同的键,则操作将覆盖现有文档。如果没有具有相同键的现有文档,则操作将插入该文档。

示例步骤

  1. 定义处理每个输入文档的map函数

    • 在此函数中,this指的是map-reduce操作正在处理的文档。

    • 对于每个项目,该函数将 sku 与一个新对象 value 相关联,该对象包含 count 字段(值为 1)和订单的项 qty,并发出 sku(存储在 key 中)和 value

    var mapFunction2 = function() {
    for (var idx = 0; idx < this.items.length; idx++) {
    var key = this.items[idx].sku;
    var value = { count: 1, qty: this.items[idx].qty };
    emit(key, value);
    }
    };
  2. 定义一个带有两个参数 keySKUcountObjVals 的相应 reduce 函数

    • countObjVals 是一个数组,其元素是由 map 函数传递给 reducer 函数的按 keySKU 分组的对象。

    • 该函数将 countObjVals 数组减少为一个包含 countqty 字段的单个对象 reducedValue

    • reducedVal 中,count 字段包含来自单个数组元素的 count 字段的和,而 qty 字段包含来自单个数组元素的 qty 字段的和。

    var reduceFunction2 = function(keySKU, countObjVals) {
    reducedVal = { count: 0, qty: 0 };
    for (var idx = 0; idx < countObjVals.length; idx++) {
    reducedVal.count += countObjVals[idx].count;
    reducedVal.qty += countObjVals[idx].qty;
    }
    return reducedVal;
    };
  3. 定义一个带有两个参数 keyreducedVal 的 finalize 函数。该函数修改 reducedVal 对象以添加一个名为 avg 的计算字段,并返回修改后的对象

    var finalizeFunction2 = function (key, reducedVal) {
    reducedVal.avg = reducedVal.qty/reducedVal.count;
    return reducedVal;
    };
  4. 使用 mapFunction2reduceFunction2finalizeFunction2 函数在 orders 集合上执行 map-reduce 操作

    db.orders.mapReduce(
    mapFunction2,
    reduceFunction2,
    {
    out: { merge: "map_reduce_example2" },
    query: { ord_date: { $gte: new Date("2020-03-01") } },
    finalize: finalizeFunction2
    }
    );

    此操作使用 query 字段仅选择那些 ord_date 大于或等于 new Date("2020-03-01") 的文档。然后将结果输出到集合 map_reduce_example2

    如果 map_reduce_example2 集合已存在,则操作将现有内容与 map-reduce 操作的结果合并。也就是说,如果现有文档与新结果具有相同的键,则操作将覆盖现有文档。如果没有具有相同键的现有文档,则操作将插入文档。

  5. 查询 map_reduce_example2 集合以验证结果

    db.map_reduce_example2.find().sort( { _id: 1 } )

    此操作返回以下文档

    { "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
    { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
    { "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
    { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
    { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }

使用可用的聚合管道操作符,您可以重写map-reduce操作,而无需定义自定义函数

db.orders.aggregate( [
{ $match: { ord_date: { $gte: new Date("2020-03-01") } } },
{ $unwind: "$items" },
{ $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } },
{ $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } },
{ $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
] )
  1. $match 阶段仅选择那些 ord_date 大于或等于 new Date("2020-03-01") 的文档。

  2. $unwind 阶段根据 items 数组字段拆分文档,为每个数组元素输出一个文档。例如

    { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" }
    { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" }
    { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" }
    { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
    { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
    ...
  3. $group 阶段按 items.sku 分组,为每个 sku 计算值

    • 数量字段 qty。数量字段 qty 包含了
      每个 items.sku 的订单总数量(参见 $sum)。
    • 订单 IDs 数组 orders_ids。订单 IDs 字段包含一个
      针对每个 items.sku 的不同订单 _id 的数组(参见 $addToSet)。
    { "_id" : "chocolates", "qty" : 15, "orders_ids" : [ 2, 5, 8 ] }
    { "_id" : "oranges", "qty" : 63, "orders_ids" : [ 4, 7, 3, 2, 9, 1, 10 ] }
    { "_id" : "carrots", "qty" : 15, "orders_ids" : [ 6, 9 ] }
    { "_id" : "apples", "qty" : 35, "orders_ids" : [ 9, 8, 1, 6 ] }
    { "_id" : "pears", "qty" : 10, "orders_ids" : [ 3 ] }
  4. $project 阶段重新塑造输出文档,以反映 map-reduce 的输出,具有两个字段 _idvalue。在 $project 中设置

  5. $unwind 阶段根据 items 数组字段拆分文档,为每个数组元素输出一个文档。例如

    { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" }
    { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" }
    { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" }
    { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
    { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
    ...
  6. $group 阶段按 items.sku 分组,为每个 sku 计算值

    • 数量字段 qty。数量字段 qty 包含使用 $sum 计算的每个 items.sku 的订单总数量。

    • 订单 IDs 数组 orders_ids。订单 IDs 字段包含使用 $addToSet 为每个 items.sku 创建的不同订单 _id 的数组。

    { "_id" : "chocolates", "qty" : 15, "orders_ids" : [ 2, 5, 8 ] }
    { "_id" : "oranges", "qty" : 63, "orders_ids" : [ 4, 7, 3, 2, 9, 1, 10 ] }
    { "_id" : "carrots", "qty" : 15, "orders_ids" : [ 6, 9 ] }
    { "_id" : "apples", "qty" : 35, "orders_ids" : [ 9, 8, 1, 6 ] }
    { "_id" : "pears", "qty" : 10, "orders_ids" : [ 3 ] }
  7. $project 阶段重新塑造输出文档,以反映 map-reduce 的输出,具有两个字段 _idvalue。在 $project 中设置

    • value.count 设置为 orders_ids 数组的大小,使用 $size

    • value.qty 设置为输入文档的 qty 字段。

    • value.avg 设置为每笔订单的数量的平均值,使用 $divide$size

    { "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
    { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
    { "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
    { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
    { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
  8. 最后,$merge 将输出写入集合 agg_alternative_3。如果现有文档具有与新结果相同的键 _id,则操作将覆盖现有文档。如果没有具有相同键的现有文档,则操作将插入文档。

  9. 查询 agg_alternative_3 集合以验证结果

    db.agg_alternative_3.find().sort( { _id: 1 } )

    操作返回以下文档

    { "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
    { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
    { "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
    { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
    { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }

提示

另请参阅

有关使用自定义聚合表达式的替代方案,请参阅 Map-Reduce 到聚合管道转换示例。

返回

并发