文档菜单
文档首页
/
MongoDB 手册
/ /

执行增量Map-Reduce

在本页

  • 数据设置
  • 当前集合的初始Map-Reduce
  • 后续增量Map-Reduce
  • 聚合替代方案

注意

作为替代方案的聚合管道

从MongoDB 5.0开始,map-reduce 已弃用

有关聚合管道替代map-reduce的示例,请参阅

本节提供了一个不使用自定义函数的聚合管道替代map-reduce的示例。有关使用自定义函数的示例,请参阅从Map-Reduce到聚合管道。

要执行map-reduce操作,MongoDB提供了 mapReduce 命令和,在mongosh,的 db.collection.mapReduce() 包装方法。

如果map-reduce数据集持续增长,您可能希望执行增量map-reduce,而不是每次都在整个数据集上执行map-reduce操作。

要执行增量map-reduce

  1. 在当前集合上运行一个map-reduce作业,并将结果输出到单独的集合。

  2. 当您有更多数据要处理时,使用

    • 带有 query 参数的后续map-reduce作业,该参数指定仅匹配 文档的条件。

    • 带有 out 参数的作业,该参数指定将新结果合并到现有输出集合的 reduce 操作。

考虑以下示例,您计划在一个 usersessions 集合上安排一个map-reduce操作,每天结束时运行。

《usersessions》集合包含每天记录用户会话的文档,例如

db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

按照以下方式运行第一个Map-Reduce操作

  1. 定义一个映射函数,将userid映射到一个包含字段total_timecountavg_time的对象

    var mapFunction = function() {
    var key = this.userid;
    var value = { total_time: this.length, count: 1, avg_time: 0 };
    emit( key, value );
    };
  2. 定义一个相应的reduce函数,具有两个参数keyvalues,用于计算总时间和计数。其中key对应于userid,而values是一个数组,其元素对应于在mapFunction中映射到userid的各个对象。

    var reduceFunction = function(key, values) {
    var reducedObject = { total_time: 0, count:0, avg_time:0 };
    values.forEach(function(value) {
    reducedObject.total_time += value.total_time;
    reducedObject.count += value.count;
    });
    return reducedObject;
    };
  3. 定义一个带有两个参数keyreducedValue的finalize函数。该函数修改reducedValue文档以添加另一个字段average,并返回修改后的文档。

    var finalizeFunction = function(key, reducedValue) {
    if (reducedValue.count > 0)
    reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
    return reducedValue;
    };
  4. 使用mapFunctionreduceFunctionfinalizeFunction函数对usersessions集合执行map-reduce。将结果输出到集合session_stats。如果session_stats集合已存在,则该操作将替换其内容

    db.usersessions.mapReduce(
    mapFunction,
    reduceFunction,
    {
    out: "session_stats",
    finalize: finalizeFunction
    }
    )
  5. 查询session_stats集合以验证结果

    db.session_stats.find().sort( { _id: 1 } )

    该操作返回以下文档

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }

随后,随着usersessions集合的增长,您可以运行额外的map-reduce操作。例如,向usersessions集合添加新文档

db.usersessions.insertMany([
{ userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
{ userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
{ userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
{ userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
])

在一天结束时,对usersessions集合执行增量map-reduce,但使用query字段仅选择新文档。将结果输出到集合session_stats,但用增量map-reduce的结果reduce内容

db.usersessions.mapReduce(
mapFunction,
reduceFunction,
{
query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } },
out: { reduce: "session_stats" },
finalize: finalizeFunction
}
);

查询session_stats集合以验证结果

db.session_stats.find().sort( { _id: 1 } )

该操作返回以下文档

{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }

作为map-reduce的替代方案,您可以使用一个聚合管道,该管道结合了$group$merge阶段,以更灵活的操作实现相同的结果。

重新创建usersessions集合

db.usersessions.drop();
db.usersessions.insertMany([
{ userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
{ userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
{ userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
{ userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
{ userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
{ userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
{ userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
{ userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

使用可用的聚合管道操作符,您可以重写map-reduce示例而无需定义自定义函数

db.usersessions.aggregate([
{ $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
{ $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
{ $merge: {
into: "session_stats_agg",
whenMatched: [ { $set: {
"value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
"value.count": { $add: [ "$value.count", "$$new.value.count" ] },
"value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
} } ],
whenNotMatched: "insert"
}}
])
  1. $group按照userid分组并计算

    • 使用$sum运算符计算total_time

    • 使用$sum运算符计算count

    • 使用$avg运算符计算avg_time

    该操作返回以下文档

    { "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 }
    { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 }
    { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 }
    { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 }
  2. 《$project》阶段重新塑形输出文档,以反映map-reduce的输出,包含两个字段_idvalue。如果不需要反映_idvalue结构,此阶段是可选的。

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
  3. 《$merge》阶段将结果输出到session_stats_agg集合。如果现有文档与新结果具有相同的_id,则操作将指定的管道应用于从结果和现有文档中计算total_time、count和avg_time。如果在session_stats_agg中没有具有相同_id的现有文档,则操作将插入文档。

  4. 查询session_stats_agg集合以验证结果

    db.session_stats_agg.find().sort( { _id: 1 } )

    该操作返回以下文档

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
  5. usersessions集合添加新文档

    db.usersessions.insertMany([
    { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
    { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
    { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
    { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
    ])
  6. 在管道开始处添加一个$match阶段以指定日期过滤器

    db.usersessions.aggregate([
    { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ])
  7. 查询session_stats_agg集合以验证结果

    db.session_stats_agg.find().sort( { _id: 1 } )

    该操作返回以下文档

    { "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
    { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
    { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
    { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
  8. 可选。为了避免每次运行时都需要修改聚合管道的$match日期条件,您可以在辅助函数中封装聚合

    updateSessionStats = function(startDate) {
    db.usersessions.aggregate([
    { $match: { ts: { $gte: startDate } } },
    { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
    { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
    { $merge: {
    into: "session_stats_agg",
    whenMatched: [ { $set: {
    "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
    "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
    "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] }
    } } ],
    whenNotMatched: "insert"
    }}
    ]);
    };

    然后,要运行,只需将开始日期传递给updateSessionStats()函数

    updateSessionStats(ISODate('2020-03-05 00:00:00'))

提示

另请参阅

返回

示例