文档菜单
文档首页
/
MongoDB 手册
/ / /

$accumulator(聚合)

本页内容

  • 定义
  • 语法
  • 行为
  • 示例
$accumulator

重要

服务器端JavaScript已弃用

从MongoDB 8.0开始,服务器端JavaScript函数($accumulator, $function$where)已被弃用。运行这些函数时,MongoDB会记录一条警告信息。

定义一个自定义的累加器运算符。累加器是维护其状态(例如总计、最大值、最小值和相关数据)的运算符,当文档通过管道时。使用$accumulator运算符来执行自己的JavaScript函数以实现MongoDB查询语言不支持的行为。另请参阅$function

$accumulator运算符可在以下阶段使用

  • $bucket

  • $bucketAuto

  • $group

重要

在聚合运算符内部执行JavaScript可能会降低性能。只有当提供的$accumulator运算符无法满足应用程序需求时,才使用此运算符。

$accumulator运算符的语法如下

{
$accumulator: {
init: <code>,
initArgs: <array expression>, // Optional
accumulate: <code>,
accumulateArgs: <array expression>,
merge: <code>,
finalize: <code>, // Optional
lang: <string>
}
}
字段
类型
描述
字符串或代码

用于初始化状态的函数。The init函数从initArgs数组表达式接收其参数。您可以指定函数定义作为BSON类型的Code或String。

The init函数具有以下形式

function (<initArg1>, <initArg2>, ...) {
...
return <initialState>
}

将数据溢出到磁盘或在一个分片集群中运行查询可能会导致累加器被计算为多个子累加的合并,每个子累加器都通过调用 init() 开始。请确保您的 init()accumulate()merge() 函数与这种执行模型兼容。

数组

可选。传递给 init 函数的参数。

initArgs 的形式如下

[ <initArg1>, <initArg2>, ... ]

重要:当在 $bucketAuto 阶段中使用时,initArgs 不能引用分组键(即,您不能使用 $<fieldName> 语法)。相反,在 $bucketAuto 阶段中,您只能将常量值指定为 initArgs

字符串或代码

用于累加文档的函数。累加函数从当前状态和 累加参数 数组表达式中接收其参数。累加函数的结果成为新状态。您可以将函数定义指定为 BSON 类型 Code 或 String。

累加函数的形式如下

function(state, <accumArg1>, <accumArg2>, ...) {
...
return <newState>
}
数组

传递给累加函数的参数。您可以使用 accumulateArgs 来指定要传递给累加函数的字段值。

accumulateArgs 的形式如下

[ <accumArg1>, <accumArg2>, ... ]
字符串或代码

用于合并两个内部状态的函数。合并必须是 String 或 Code BSON 类型。合并函数返回两个合并状态的合并结果。有关合并函数何时被调用的信息,请参阅 使用 $merge 合并两个状态。

合并函数的形式如下

function (<state1>, <state2>) {
<logic to merge state1 and state2>
return <newState>
}
字符串或代码

可选。用于更新累加结果的函数。

finalize 函数的形式如下

function (state) {
...
return <finalState>
}
字符串

$accumulator 代码中使用的语言。

重要:目前,lang 支持的唯一值是 js

以下步骤概述了$accumulator运算符如何处理文档

  1. 运算符从由init函数定义的初始状态开始。

  2. 对于每个文档,运算符根据accumulate函数更新状态。accumulate函数的第一个参数是当前状态,额外的参数在accumulateArgs数组中指定。

  3. 当运算符需要合并多个中间状态时,它执行merge函数。有关merge函数何时被调用的更多信息,请参阅使用$merge合并两个状态。

  4. 如果已定义了finalize函数,则一旦所有文档都已处理并且状态已相应更新,finalize将状态转换为最终输出。

作为其内部操作的一部分,$accumulator运算符可能需要合并两个单独的中间状态。merge函数指定了运算符应该如何合并两个状态。

合并函数始终一次合并两个状态。如果必须合并超过两个状态,则将两个状态的合并结果与单个状态合并。这个过程会一直重复,直到所有状态都被合并。

例如,$accumulator可能需要在以下场景下合并两个状态

  • 在分片集群上运行$accumulator。操作员需要合并每个分片的输出以获得最终结果。

  • 单个$accumulator操作超过了其指定的内存限制。如果您指定了allowDiskUse选项,操作员将正在进行的操作存储到磁盘上,并在内存中完成操作。一旦操作完成,将使用合并函数将磁盘和内存中的结果合并在一起。

对于init()accumulate()merge()函数,MongoDB处理文档的顺序可能不同,可能与向$accumulator函数指定的文档顺序不同。

例如,考虑一系列文档,其中_id字段是字母表中的字母

{ _id: 'a' },
{ _id: 'b' },
{ _id: 'c' }
...
{ _id: 'z' }

接下来,考虑一个按_id字段排序的文档的聚合管道,然后使用一个$accumulator函数来连接_id字段值

[
{
$sort: { _id: 1 }
},
{
$group: {
_id: null,
alphabet: {
$accumulator: {
init: function() {
return ""
},
accumulate: function(state, letter) {
return(state + letter)
},
accumulateArgs: [ "$_id" ],
merge: function(state1, state2) {
return(state1 + state2)
},
lang: "js"
}
}
}
}
]

MongoDB不能保证按排序顺序处理文档,这意味着alphabet字段不一定被设置为abc...z

由于这种行为,请确保您的$accumulator函数不需要按特定顺序处理和返回文档。

要使用 $accumulator,您必须启用服务器端脚本。

如果您不使用 $accumulator(或 $function$wheremapReduce),请禁用服务器端脚本

另请参阅 ➤ 使用安全配置选项运行 MongoDB。

MongoDB 6.0版本升级了用于服务器端JavaScript$accumulator$function$where表达式的内部JavaScript引擎,从MozJS-60升级到MozJS-91。MozJS-91移除了MozJS-60中存在的几个已弃用、非标准的数组和字符串函数。

有关移除的数组和字符串函数的完整列表,请参阅6.0兼容性说明。

注意

此示例通过使用$accumulator运算符来实施MongoDB已支持的$avg运算符,旨在说明具有熟悉逻辑的$accumulator运算符的行为和语法。此示例的目标不是实现新功能,而是展示其逻辑。

mongosh 中,创建一个名为 books 的示例集合,包含以下文档

db.books.insertMany([
{ "_id" : 8751, "title" : "The Banquet", "author" : "Dante", "copies" : 2 },
{ "_id" : 8752, "title" : "Divine Comedy", "author" : "Dante", "copies" : 1 },
{ "_id" : 8645, "title" : "Eclogues", "author" : "Dante", "copies" : 2 },
{ "_id" : 7000, "title" : "The Odyssey", "author" : "Homer", "copies" : 10 },
{ "_id" : 7020, "title" : "Iliad", "author" : "Homer", "copies" : 10 }
])

以下操作 groups 文档按 author 分组,并使用 $accumulator 计算每位作者的书籍平均副本数

db.books.aggregate([
{
$group :
{
_id : "$author",
avgCopies:
{
$accumulator:
{
init: function() { // Set the initial state
return { count: 0, sum: 0 }
},
accumulate: function(state, numCopies) { // Define how to update the state
return {
count: state.count + 1,
sum: state.sum + numCopies
}
},
accumulateArgs: ["$copies"], // Argument required by the accumulate function
merge: function(state1, state2) { // When the operator performs a merge,
return { // add the fields from the two states
count: state1.count + state2.count,
sum: state1.sum + state2.sum
}
},
finalize: function(state) { // After collecting the results from all documents,
return (state.sum / state.count) // calculate the average
},
lang: "js"
}
}
}
}
])

此操作返回以下结果

{ "_id" : "Dante", "avgCopies" : 1.6666666666666667 }
{ "_id" : "Homer", "avgCopies" : 10 }

$accumulator 定义了一个初始状态,其中 countsum 均设置为 0。对于 $accumulator 处理的每个文档,它通过以下方式更新状态

  • count 增加 1

  • 将文档的 copies 字段的值添加到 sumaccumulate 函数可以访问 copies 字段,因为它包含在 accumulateArgs 字段中。

处理每个文档时,accumulate 函数返回更新后的状态。

所有文档处理完成后,终结函数将副本的总数除以文档的计数来获得平均值。这样就不需要保持运行的平均值,因为终结函数接收所有文档的累计总数和计数。

此操作等同于以下管道,该管道使用$avg运算符

db.books.aggregate([
{
$group : {
_id : "$author",
avgCopies: { $avg: "$copies" }
}
}
])

您可以使用initArgs选项来改变$accumulator的初始状态。如果您想,例如

  • 使用不在您状态中的字段的值来影响您的状态,或者

  • 根据正在处理的组设置不同的初始状态。

mongosh中,创建一个名为restaurants的样本集合,包含以下文档

db.restaurants.insertMany([
{ "_id" : 1, "name" : "Food Fury", "city" : "Bettles", "cuisine" : "American" },
{ "_id" : 2, "name" : "Meal Macro", "city" : "Bettles", "cuisine" : "Chinese" },
{ "_id" : 3, "name" : "Big Crisp", "city" : "Bettles", "cuisine" : "Latin" },
{ "_id" : 4, "name" : "The Wrap", "city" : "Onida", "cuisine" : "American" },
{ "_id" : 5, "name" : "Spice Attack", "city" : "Onida", "cuisine" : "Latin" },
{ "_id" : 6, "name" : "Soup City", "city" : "Onida", "cuisine" : "Chinese" },
{ "_id" : 7, "name" : "Crave", "city" : "Pyote", "cuisine" : "American" },
{ "_id" : 8, "name" : "The Gala", "city" : "Pyote", "cuisine" : "Chinese" }
])

假设一个应用程序允许用户查询这些数据来查找餐厅。对于用户所在的城镇,显示更多结果可能很有用。在这个例子中,我们假设用户的城镇存储在一个名为 userProfileCity 的变量中。

以下聚合管道 groups 按照城市对文档进行分组。该操作使用 $accumulator 来根据餐厅的城市是否与用户配置文件中的城市匹配来显示不同数量的结果

注意

要在 mongosh 中执行此示例,请将 initArgs 中的 <userProfileCity> 替换为包含实际城市值的字符串,例如 Bettles

1db.restaurants.aggregate([
2{
3 $group :
4 {
5 _id : { city: "$city" },
6 restaurants:
7 {
8 $accumulator:
9 {
10 init: function(city, userProfileCity) { // Set the initial state
11 return {
12 max: city === userProfileCity ? 3 : 1, // If the group matches the user's city, return 3 restaurants
13 restaurants: [] // else, return 1 restaurant
14 }
15 },
16
17 initArgs: ["$city", <userProfileCity>], // Argument to pass to the init function
18
19 accumulate: function(state, restaurantName) { // Define how to update the state
20 if (state.restaurants.length < state.max) {
21 state.restaurants.push(restaurantName);
22 }
23 return state;
24 },
25
26 accumulateArgs: ["$name"], // Argument required by the accumulate function
27
28 merge: function(state1, state2) {
29 return {
30 max: state1.max,
31 restaurants: state1.restaurants.concat(state2.restaurants).slice(0, state1.max)
32 }
33 },
34
35 finalize: function(state) { // Adjust the state to only return field we need
36 return state.restaurants
37 }
38
39 lang: "js"
40 }
41 }
42 }
43}
44])

如果 userProfileCity 的值为 Bettles,此操作返回以下结果

{ "_id" : { "city" : "Bettles" }, "restaurants" : { "restaurants" : [ "Food Fury", "Meal Macro", "Big Crisp" ] } }
{ "_id" : { "city" : "Onida" }, "restaurants" : { "restaurants" : [ "The Wrap" ] } }
{ "_id" : { "city" : "Pyote" }, "restaurants" : { "restaurants" : [ "Crave" ] } }

如果 userProfileCity 的值为 Onida,此操作返回以下结果

{ "_id" : { "city" : "Bettles" }, "restaurants" : { "restaurants" : [ "Food Fury" ] } }
{ "_id" : { "city" : "Onida" }, "restaurants" : { "restaurants" : [ "The Wrap", "Spice Attack", "Soup City" ] } }
{ "_id" : { "city" : "Pyote" }, "restaurants" : { "restaurants" : [ "Crave" ] } }

如果 userProfileCity 的值为 Pyote,此操作返回以下结果

{ "_id" : { "city" : "Bettles" }, "restaurants" : { "restaurants" : [ "Food Fury" ] } }
{ "_id" : { "city" : "Onida" }, "restaurants" : { "restaurants" : [ "The Wrap" ] } }
{ "_id" : { "city" : "Pyote" }, "restaurants" : { "restaurants" : [ "Crave", "The Gala" ] } }

如果 userProfileCity 的值为任何其他值,此操作返回以下结果

{ "_id" : { "city" : "Bettles" }, "restaurants" : { "restaurants" : [ "Food Fury" ] } }
{ "_id" : { "city" : "Onida" }, "restaurants" : { "restaurants" : [ "The Wrap" ] } }
{ "_id" : { "city" : "Pyote" }, "restaurants" : { "restaurants" : [ "Crave" ] } }

函数 init 定义了一个初始状态,其中包含 maxrestaurants 字段。max 字段设置了该特定组餐厅的最大数量。如果文档的 city 字段与 userProfileCity 匹配,则该组最多包含 3 家餐厅。否则,如果文档 _iduserProfileCity 不匹配,则该组最多包含一家餐厅。init 函数接收来自 initArgs 数组的 cityuserProfileCity 参数。

对于 $accumulator 处理的每个文档,只要该名称不会使 restaurants 数组的长度超过 max 值,它就会将该餐厅的 name 推送到 restaurants 数组。在处理每个文档时,accumulate 函数返回更新后的状态。

函数 merge 定义了如何合并两个状态。该函数将每个状态中的 restaurant 数组连接起来,并使用 slice() 方法限制结果数组的长度,以确保不超过 max 值。

处理完所有文档后,finalize 函数修改结果状态,只返回餐厅的名称。如果没有这个函数,max 字段也会包含在输出中,这并不满足应用程序的任何需求。

返回

$abs