执行增量Map-Reduce
注意
作为替代方案的聚合管道
从MongoDB 5.0开始,map-reduce 已弃用
与 map-reduce 相比,您应使用 聚合管道。聚合管道比map-reduce提供了更好的性能和可用性。
对于需要自定义功能的map-reduce操作,您可以使用
$accumulator
和$function
聚合运算符。您可以使用这些运算符在JavaScript中定义自定义聚合表达式。
有关聚合管道替代map-reduce的示例,请参阅
本节提供了一个不使用自定义函数的聚合管道替代map-reduce的示例。有关使用自定义函数的示例,请参阅从Map-Reduce到聚合管道。
要执行map-reduce操作,MongoDB提供了 mapReduce
命令和,在mongosh
,的 db.collection.mapReduce()
包装方法。
如果map-reduce数据集持续增长,您可能希望执行增量map-reduce,而不是每次都在整个数据集上执行map-reduce操作。
要执行增量map-reduce
在当前集合上运行一个map-reduce作业,并将结果输出到单独的集合。
当您有更多数据要处理时,使用
带有
query
参数的后续map-reduce作业,该参数指定仅匹配 新 文档的条件。带有
out
参数的作业,该参数指定将新结果合并到现有输出集合的reduce
操作。
考虑以下示例,您计划在一个 usersessions
集合上安排一个map-reduce操作,每天结束时运行。
数据设置
《usersessions》集合包含每天记录用户会话的文档,例如
db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
当前集合的初始Map-Reduce
按照以下方式运行第一个Map-Reduce操作
定义一个映射函数,将
userid
映射到一个包含字段total_time
、count
和avg_time
的对象var mapFunction = function() { var key = this.userid; var value = { total_time: this.length, count: 1, avg_time: 0 }; emit( key, value ); }; 定义一个相应的reduce函数,具有两个参数
key
和values
,用于计算总时间和计数。其中key
对应于userid
,而values
是一个数组,其元素对应于在mapFunction
中映射到userid
的各个对象。var reduceFunction = function(key, values) { var reducedObject = { total_time: 0, count:0, avg_time:0 }; values.forEach(function(value) { reducedObject.total_time += value.total_time; reducedObject.count += value.count; }); return reducedObject; }; 定义一个带有两个参数
key
和reducedValue
的finalize函数。该函数修改reducedValue
文档以添加另一个字段average
,并返回修改后的文档。var finalizeFunction = function(key, reducedValue) { if (reducedValue.count > 0) reducedValue.avg_time = reducedValue.total_time / reducedValue.count; return reducedValue; }; 使用
mapFunction
、reduceFunction
和finalizeFunction
函数对usersessions
集合执行map-reduce。将结果输出到集合session_stats
。如果session_stats
集合已存在,则该操作将替换其内容db.usersessions.mapReduce( mapFunction, reduceFunction, { out: "session_stats", finalize: finalizeFunction } ) 查询
session_stats
集合以验证结果db.session_stats.find().sort( { _id: 1 } ) 该操作返回以下文档
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
后续增量Map-Reduce
随后,随着usersessions
集合的增长,您可以运行额外的map-reduce操作。例如,向usersessions
集合添加新文档
db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ])
在一天结束时,对usersessions
集合执行增量map-reduce,但使用query
字段仅选择新文档。将结果输出到集合session_stats
,但用增量map-reduce的结果reduce
内容
db.usersessions.mapReduce( mapFunction, reduceFunction, { query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } }, out: { reduce: "session_stats" }, finalize: finalizeFunction } );
查询session_stats
集合以验证结果
db.session_stats.find().sort( { _id: 1 } )
该操作返回以下文档
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
聚合替代方案
作为map-reduce的替代方案,您可以使用一个聚合管道,该管道结合了$group
和$merge
阶段,以更灵活的操作实现相同的结果。
重新创建usersessions
集合
db.usersessions.drop(); db.usersessions.insertMany([ { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 }, { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 }, { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 }, { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 }, { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 }, { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 }, { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 }, { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 } ])
使用可用的聚合管道操作符,您可以重写map-reduce示例而无需定义自定义函数
db.usersessions.aggregate([ { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ])
$group
按照userid
分组并计算使用
$sum
运算符计算total_time
使用
$sum
运算符计算count
使用
$avg
运算符计算avg_time
该操作返回以下文档
{ "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 } { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 } { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 } { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 } 《$project》阶段重新塑形输出文档,以反映map-reduce的输出,包含两个字段
_id
和value
。如果不需要反映_id
和value
结构,此阶段是可选的。{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } 《$merge》阶段将结果输出到
session_stats_agg
集合。如果现有文档与新结果具有相同的_id
,则操作将指定的管道应用于从结果和现有文档中计算total_time、count和avg_time。如果在session_stats_agg
中没有具有相同_id
的现有文档,则操作将插入文档。查询
session_stats_agg
集合以验证结果db.session_stats_agg.find().sort( { _id: 1 } ) 该操作返回以下文档
{ "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } } { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } } { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } } { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } } 向
usersessions
集合添加新文档db.usersessions.insertMany([ { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 }, { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 }, { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 }, { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 } ]) 在管道开始处添加一个
$match
阶段以指定日期过滤器db.usersessions.aggregate([ { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]) 查询
session_stats_agg
集合以验证结果db.session_stats_agg.find().sort( { _id: 1 } ) 该操作返回以下文档
{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } } { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } } { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } } { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } } 可选。为了避免每次运行时都需要修改聚合管道的
$match
日期条件,您可以在辅助函数中封装聚合updateSessionStats = function(startDate) { db.usersessions.aggregate([ { $match: { ts: { $gte: startDate } } }, { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } }, { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } }, { $merge: { into: "session_stats_agg", whenMatched: [ { $set: { "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] }, "value.count": { $add: [ "$value.count", "$$new.value.count" ] }, "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] }, { $add: [ "$value.count", "$$new.value.count" ] } ] } } } ], whenNotMatched: "insert" }} ]); }; 然后,要运行,只需将开始日期传递给
updateSessionStats()
函数updateSessionStats(ISODate('2020-03-05 00:00:00'))