聚合构建器
本页内容
概述
在本指南中,您可以了解如何使用聚合 类,该类提供静态工厂方法,用于在 MongoDB Java 驱动程序中构建聚合管道阶段。
要了解更多关于聚合的信息,请参阅我们的聚合指南。要了解如何使用可发现、类型安全的 Java 方法创建表达式操作以用于您的聚合,请参阅聚合表达式操作。
提示
为了简洁,您可以选择静态导入以下类的函数,使您的查询更简洁。
聚合
过滤器
投影
排序
累加器
import static com.mongodb.client.model.Aggregates.*; import static com.mongodb.client.model.Filters.*; import static com.mongodb.client.model.Projections.*; import static com.mongodb.client.model.Sorts.*; import static com.mongodb.client.model.Accumulators.*; import static com.mongodb.client.model.search.SearchPath.fieldPath; import static com.mongodb.client.model.search.VectorSearchOptions.exactVectorSearchOptions; import static java.util.Arrays.asList;
本页上的示例假设这些静态导入,包括静态导入asList()
方法。
使用这些方法构建管道阶段,并在您的聚合中将它们指定为列表。
Bson matchStage = match(eq("some_field", "some_criteria")); Bson sortByCountStage = sortByCount("some_field"); collection.aggregate(asList(matchStage, sortByCountStage)).forEach(doc -> System.out.println(doc));
匹配
使用match()
方法创建一个匹配输入文档与指定查询过滤器的$match
管道阶段,过滤掉不匹配的文档。
提示
过滤器可以是实现Bson
的任何类的实例,但与使用过滤器类结合使用更方便。
以下示例创建一个匹配所有title
字段等于"The Shawshank Redemption"的文档的管道阶段。
match(eq("title", "The Shawshank Redemption"));
投影
使用project()
方法创建一个$project
管道阶段,以投影指定的文档字段。聚合中的字段投影遵循与查询中字段投影相同的规则字段投影。
提示
尽管投影可以是实现Bson
的任何类的实例,但与使用投影类结合使用更方便。
以下示例创建一个管道阶段,排除_id
字段,但包括title
和plot
字段。
project(fields(include("title", "plot"), excludeId()));
投影计算字段
在$project
阶段,也可以投影计算字段。
以下示例创建了一个管道阶段,将rated
字段投影到一个名为rating
的新字段中,实际上是对字段进行了重命名。
project(fields(computed("rating", "$rated"), excludeId()));
文档
使用documents()
方法创建一个返回输入值的字面文档的$documents
管道阶段。
重要
如果你在聚合管道中使用$documents
阶段,它必须是管道中的第一个阶段。
以下示例创建了一个创建带有title
字段的示例文档的管道阶段
documents(asList( new Document("title", "The Shawshank Redemption"), new Document("title", "Back to the Future"), new Document("title", "Jurassic Park")));
重要
如果你使用documents()
方法向聚合管道提供输入,你必须在数据库上调用aggregate()
方法,而不是在集合上。
样本
使用sample()
方法创建一个$sample管道阶段,从输入中随机选择文档。
以下示例创建了一个随机选择5个文档的管道阶段
sample(5);
排序
使用sort()
方法创建一个$sort管道阶段,根据指定的标准进行排序。
提示
虽然排序标准可以是实现Bson
的任何类的实例,但与排序器一起使用会更方便。
以下示例创建了一个管道阶段,根据year
字段的值降序排序,然后根据title
字段的值升序排序
sort(orderBy(descending("year"), ascending("title")));
跳过
使用skip()
方法创建一个$skip管道阶段,在将文档传递到下一阶段之前跳过指定的文档数。
以下示例创建了一个管道阶段,跳过了前5
个文档
skip(5);
限制
使用 $limit 管道阶段来限制传递到下一阶段的文档数量。
以下示例创建了一个管道阶段,将文档数量限制为 10
limit(10);
查找
使用 lookup()
方法创建一个 $lookup 管道阶段,在两个集合之间执行连接和无关子查询。
左外连接
以下示例创建了一个管道阶段,在 movies
和 comments
集合之间执行左外连接
它将
movies
中的_id
字段与comments
中的movie_id
字段连接起来它将结果输出到
joined_comments
字段
lookup("comments", "_id", "movie_id", "joined_comments");
全连接和无关子查询
以下示例创建了一个管道阶段,通过项目以及是否有足够的库存来满足订单数量,将两个集合 orders
和 warehouses
连接起来
List<Variable<String>> variables = asList(new Variable<>("order_item", "$item"), new Variable<>("order_qty", "$ordered")); List<Bson> pipeline = asList( match(expr(new Document("$and", asList(new Document("$eq", asList("$$order_item", "$stock_item")), new Document("$gte", asList("$instock", "$$order_qty")))))), project(fields(exclude("stock_item"), excludeId()))); Bson innerJoinLookup = lookup("warehouses", variables, pipeline, "stockdata");
组
使用group()
方法创建一个$group管道阶段,根据指定的表达式对文档进行分组,并为每个不同的分组输出一个文档。
提示
驱动程序包括一个累加器类,其中包含每个支持的累加器的静态工厂方法。
以下示例创建了一个管道阶段,该阶段根据customerId
字段的值对文档进行分组。每个组将quantity
字段的值总和和平均值累加到totalQuantity
和averageQuantity
字段中。
group("$customerId", sum("totalQuantity", "$quantity"), avg("averageQuantity", "$quantity"));
有关累加器运算符的更多信息,请参阅服务器手册中的累加器。
选择-N 累加器
选择项累积器是聚合累积操作符,它根据特定的排序返回顶部和底部元素。使用以下构建器之一创建聚合累积操作符:
提示
只有在运行 MongoDB v5.2 或更高版本时,您才能使用这些选择项累积器执行聚合操作。
请从服务器手册的累积器部分了解您可以与哪些聚合管道阶段一起使用累积操作符。
MinN
构建器 minN()
创建返回包含分组中 n
个最低值的文档数据的 $minN 累积器。
提示
累积器 $minN
和 $bottomN
可以执行类似任务。请参阅$minN 和 $bottomN 累积器的比较,了解每个累积器的推荐用法。
以下示例演示了如何使用 minN()
方法返回按 year
分组的电影中 imdb.rating
的前三个最低值。
group( "$year", minN( "lowest_three_ratings", new BsonString("$imdb.rating"), 3 ));
有关更多信息,请参阅minN() API 文档。
MaxN
maxN()
累加器从包含分组中最高 n
个值的文档中返回数据。
以下示例演示了如何使用 maxN()
方法返回按 year
分组的电影中最高的两个 imdb.rating
值。
group( "$year", maxN( "highest_two_ratings", new BsonString("$imdb.rating"), 2 ));
有关更多信息,请参阅 maxN() API 文档。
FirstN
firstN()
累加器按照指定的排序顺序,从每个分组的第一个 n
个文档中返回数据。
提示
$firstN
和 $topN
累加器可以执行类似的任务。有关每个累加器的推荐用法,请参阅 $firstN 和 $topN 累加器比较。
以下示例演示了如何使用 firstN()
方法返回按顺序进入阶段的第一个四个电影 title
值,按 year
分组。
group( "$year", firstN( "first_four_movies", new BsonString("$title"), 4 ));
请参阅 firstN() API 文档 获取更多信息。
LastN
lastN()
聚合器返回每个分组中指定排序顺序的最后 n
份文档的数据。
以下示例演示了如何使用 lastN()
方法来显示根据其进入阶段的顺序,按 year
分组的最后三部电影的 title
值。
group( "$year", lastN( "last_three_movies", new BsonString("$title"), 3 ));
请参阅 lastN() API 文档 获取更多信息。
顶部
top()
累加器根据指定的排序顺序,从组中的第一个文档返回数据。
以下示例演示了如何使用 top()
方法返回根据 imdb.rating
排序、按 year
分组的最高评分电影的 title
和 imdb.rating
值。
group( "$year", top( "top_rated_movie", descending("imdb.rating"), asList(new BsonString("$title"), new BsonString("$imdb.rating")) ));
有关更多信息,请参阅top() API 文档。
TopN
topN()
累加器返回具有指定字段最高 n
值的文档中的数据。
提示
$firstN
和 $topN
累加器可以执行类似的任务。有关每个累加器的推荐用法,请参阅 $firstN 和 $topN 累加器比较。
以下示例演示了如何使用 topN()
方法返回基于 runtime
值、按 year
分组的三部最长电影的 title
和 runtime
值。
group( "$year", topN( "longest_three_movies", descending("runtime"), asList(new BsonString("$title"), new BsonString("$runtime")), 3 ));
请参阅 topN() API 文档 以获取更多信息。
底部
bottom()
累加器根据指定的排序顺序从组中的最后一个文档返回数据。
以下示例演示了如何使用 bottom()
方法,根据 runtime
值返回最短电影的 title
和 runtime
值,按 year
分组。
group( "$year", bottom( "shortest_movies", descending("runtime"), asList(new BsonString("$title"), new BsonString("$runtime")) ));
请参阅 bottom() API 文档 以获取更多信息。
底部N
底N累加器返回包含指定字段最低n个值的文档中的数据。
提示
累积器 $minN
和 $bottomN
可以执行类似任务。请参阅$minN 和 $bottomN 累积器的比较,了解每个累积器的推荐用法。
以下示例演示了如何使用底N方法返回根据imdb.rating值,按年份分组的前两个评分最低电影的title和imdb.rating值。
group( "$year", bottomN( "lowest_rated_two_movies", descending("imdb.rating"), asList(new BsonString("$title"), new BsonString("$imdb.rating")), 2 ));
有关更多信息,请参阅底N API文档。
展开
使用unwind()
方法创建一个$unwind管道阶段,从输入文档中解构数组字段,为每个数组元素创建一个输出文档。
以下示例为sizes数组中的每个元素创建一个文档。
unwind("$sizes");
保留数组字段缺失或为null的文档,或数组为空的文档。
unwind("$sizes", new UnwindOptions().preserveNullAndEmptyArrays(true));
包含数组索引,在本例中为名为“position”的字段。
unwind("$sizes", new UnwindOptions().includeArrayIndex("position"));
Out
使用out()
方法创建一个将所有文档写入同一数据库中指定集合的$out管道阶段。
重要
在任意聚合管道中,$out
阶段必须是最后一个阶段。
以下示例将管道的结果写入authors
集合
out("authors");
合并
使用merge()
方法创建一个将所有文档合并到指定集合的$merge管道阶段。
重要
在任意聚合管道中,$merge
阶段必须是最后一个阶段。
以下示例使用默认选项将管道合并到authors
集合
merge("authors");
以下示例使用某些选项将管道合并到reporting
数据库中的customers
集合,如果date
和customerId
都匹配,则替换文档,否则插入文档
merge(new MongoNamespace("reporting", "customers"), new MergeOptions().uniqueIdentifier(asList("date", "customerId")) .whenMatched(MergeOptions.WhenMatched.REPLACE) .whenNotMatched(MergeOptions.WhenNotMatched.INSERT));
图查找
使用 graphLookup()
方法创建一个 $graphLookup 管道阶段,该阶段在指定的集合中执行递归搜索以匹配一个文档中指定的字段与另一个文档的指定字段。
以下示例计算了 contacts
集合中用户的社交网络图,递归地匹配 friends
字段中的值到 name
字段。
graphLookup("contacts", "$friends", "friends", "name", "socialNetwork");
使用 GraphLookupOptions
,您可以指定递归的深度以及深度字段的名称(如果需要的话)。在这个例子中,$graphLookup
将递归到两次,并为每个文档创建一个名为 degrees
的字段,其中包含递归深度信息。
graphLookup("contacts", "$friends", "friends", "name", "socialNetwork", new GraphLookupOptions().maxDepth(2).depthField("degrees"));
使用 GraphLookupOptions
,您可以指定一个过滤器,文档必须匹配该过滤器,MongoDB 才会将它们包含在您的搜索中。在这个例子中,只有其 hobbies
字段中包含 "golf" 的链接将被包括。
graphLookup("contacts", "$friends", "friends", "name", "socialNetwork", new GraphLookupOptions().maxDepth(1).restrictSearchWithMatch(eq("hobbies", "golf")));
按计数排序
使用 sortByCount()
方法创建一个 $sortByCount 管道阶段,该阶段根据给定的表达式对文档进行分组,然后按计数降序对这些组进行排序。
提示
$sortByCount
阶段与具有 $sum
累加器的 $group
阶段以及随后的 $sort
阶段相同。
[ { "$group": { "_id": <expression to group on>, "count": { "$sum": 1 } } }, { "$sort": { "count": -1 } } ]
以下示例根据字段 x
的截断值对文档进行分组,并计算每个不同值的计数。
sortByCount(new Document("$floor", "$x"));
替换根节点
使用replaceRoot()
方法创建一个$replaceRoot管道阶段,用于替换每个输入文档为指定的文档。
以下示例将每个输入文档替换为spanish_translation
字段中的嵌套文档
replaceRoot("$spanish_translation");
添加字段
使用addFields()
方法创建一个$addFields管道阶段,用于向文档添加新字段。
提示
当您不希望投影字段包含或排除时,请使用$addFields
。
以下示例向输入文档添加两个新字段a
和b
addFields(new Field("a", 1), new Field("b", 2));
计数
使用count()
方法创建一个$count管道阶段,用于计算进入该阶段的文档数量,并将该值分配给指定的字段名称。如果您未指定字段,则count()
默认字段名为"count"。
提示
$count
阶段是
{ "$group":{ "_id": 0, "count": { "$sum" : 1 } } }
以下示例创建一个输出字段名为"total"的输入文档计数的管道阶段
count("total");
桶
使用 bucket()
方法创建一个 $bucket 管道阶段,自动将数据根据预定义的边界值进行分桶。
以下示例创建一个管道阶段,根据文档的 screenSize
字段的值进行分组,包含下边界,不包含上边界。
bucket("$screenSize", asList(0, 24, 32, 50, 70, 200));
使用 BucketOptions
类指定超出指定边界的默认桶,以及指定额外的累加器。
以下示例创建一个管道阶段,根据文档的 screenSize
字段的值进行分组,计算每个桶中文档的数量,将 screenSize
的值推送到名为 matches
的字段中,并将任何大于 "70" 的屏幕尺寸捕获到名为 "monster" 的桶中,用于极大尺寸的屏幕
提示
驱动程序包括一个累加器类,其中包含每个支持的累加器的静态工厂方法。
bucket("$screenSize", asList(0, 24, 32, 50, 70), new BucketOptions().defaultBucket("monster").output(sum("count", 1), push("matches", "$screenSize")));
BucketAuto
使用 bucketAuto()
方法创建一个 $bucketAuto 管道阶段,自动确定每个桶的边界,以尝试将文档均匀地分配到指定的桶数中。
以下示例创建一个管道阶段,尝试根据文档的 price
字段的值将文档均匀地分配到 10 个桶中。
bucketAuto("$price", 10);
使用 BucketAutoOptions
类来指定一个基于优选数的边界值,并指定额外的累加器。
以下示例创建了一个管道阶段,该阶段将尝试使用文档的price
字段值,将文档均匀地分配到10个桶中,并将桶边界设置为2的幂(2, 4, 8, 16, ...)。它还统计每个桶中的文档数量,并在名为avgPrice
的新字段中计算它们的平均价格。
提示
驱动程序包括一个累加器类,其中包含每个支持的累加器的静态工厂方法。
bucketAuto("$price", 10, new BucketAutoOptions().granularity(BucketGranularity.POWERSOF2) .output(sum("count", 1), avg("avgPrice", "$price")));
细分
使用 facet()
方法创建一个$facet 管道阶段,允许定义并行管道。
以下示例创建了一个执行两个并行聚合的管道阶段
第一个聚合根据文档的
attributes.screen_size
字段将传入的文档分配到5个组中。第二个聚合计算所有制造商的数量并返回它们的计数,限制为前5个。
facet(new Facet("Screen Sizes", bucketAuto("$attributes.screen_size", 5, new BucketAutoOptions().output(sum("count", 1)))), new Facet("Manufacturer", sortByCount("$attributes.manufacturer"), limit(5)));
设置窗口字段
使用 setWindowFields()
方法创建一个 $setWindowFields 管道阶段,该阶段允许使用窗口操作符对集合中指定范围的文档执行操作。
以下示例创建了一个管道阶段,该阶段计算过去一个月每个地区的累积降雨量和平均温度,这些数据来自于 rainfall
和 temperature
字段中的更精细的测量值。
Window pastMonth = Windows.timeRange(-1, MongoTimeUnit.MONTH, Windows.Bound.CURRENT); setWindowFields("$localityId", Sorts.ascending("measurementDateTime"), WindowOutputFields.sum("monthlyRainfall", "$rainfall", pastMonth), WindowOutputFields.avg("monthlyAvgTemp", "$temperature", pastMonth));
加密
使用 densify()
方法创建一个 $densify 管道阶段,生成一系列文档以跨越指定的间隔。
提示
您只能在运行 MongoDB v5.1 或更高版本时使用 $densify()
聚合阶段。
考虑以下从 Atlas 示例天气数据集 中检索到的文档,其中包含对类似 position
字段的测量,每隔一小时一次。
Document{{ _id=5553a..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:00:00 EST 1984, ... }} Document{{ _id=5553b..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 09:00:00 EST 1984, ... }}
假设您需要创建一个管道阶段,对以下文档执行以下操作
在每 15 分钟间隔添加一个文档,其中
ts
值不存在。按
position
字段对文档进行分组。
调用 densify()
聚合阶段构建器以完成这些操作,如下所示
densify( "ts", DensifyRange.partitionRangeWithStep(15, MongoTimeUnit.MINUTE), DensifyOptions.densifyOptions().partitionByFields("position.coordinates"));
以下输出突出显示了由聚合阶段生成的文档,这些文档包含在现有文档之间每 15 分钟的 ts
值
Document{{ _id=5553a..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:00:00 EST 1984, ... }} Document{{ position=Document{{coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:15:00 EST 1984 }} Document{{ position=Document{{coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:30:00 EST 1984 }} Document{{ position=Document{{coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 08:45:00 EST 1984 }} Document{{ _id=5553b..., position=Document{{type=Point, coordinates=[-47.9, 47.6]}}, ts=Mon Mar 05 09:00:00 EST 1984, ... }}
有关更多信息,请参阅 densify 包 API 文档。
填
使用 fill()
方法创建一个 $fill 管道阶段,填充 null
和缺失的字段值。
提示
您只能在运行 MongoDB v5.3 或更高版本时使用 $fill()
聚合阶段。
考虑以下包含温度和空气压力测量的每小时间隔的文档
Document{{_id=6308a..., hour=1, temperature=23C, air_pressure=29.74}} Document{{_id=6308b..., hour=2, temperature=23.5C}} Document{{_id=6308c..., hour=3, temperature=null, air_pressure=29.76}}
假设您需要按照以下方式填充文档中的缺失温度和空气压力数据点
使用线性插值计算值,填充 "2" 小时的
air_pressure
字段。将 "3" 小时缺失的
temperature
值设置为 "23.6C"。
调用 fill()
聚合阶段构建器以完成这些操作,如下所示
fill( FillOptions.fillOptions().sortBy(ascending("hour")), FillOutputField.value("temperature", "23.6C"), FillOutputField.linear("air_pressure") );
以下输出突出显示了包含由聚合阶段填充的字段的文档
Document{{_id=6308a..., hour=1, temperature=23C, air_pressure=29.74}} Document{{_id=6308b..., hour=2, temperature=23.5C, air_pressure=29.75}} Document{{_id=6308c..., hour=3, temperature=23.6C, air_pressure=29.76}}
请参阅fill包API文档了解更多信息。
Atlas全文搜索
使用search()
方法创建一个$search管道阶段,指定一个或多个字段的全文搜索。
提示
仅适用于MongoDB v4.2及以上版本的Atlas
此聚合管道操作符仅适用于托管在MongoDB Atlas集群中的集合,这些集群运行v4.2或更高版本,并且受Atlas搜索索引的保护。从Atlas Search文档中了解更多关于所需设置和此操作符的功能。
以下示例创建了一个管道阶段,用于搜索包含单词"Future"的title
字段。
Bson textSearch = Aggregates.search( SearchOperator.text( SearchPath.fieldPath("title"), "Future"));
了解更多关于构建器的信息,请参阅搜索包API文档。
Atlas搜索元数据
使用searchMeta()
方法创建一个$searchMeta管道阶段,该阶段只返回Atlas全文搜索查询的结果元数据部分。
提示
仅在MongoDB v4.4.11及更高版本上的Atlas可用
此聚合管道操作符仅在运行v4.4.11及更高版本的MongoDB Atlas集群上可用。有关版本可用性的详细列表,请参阅MongoDB Atlas文档中的$searchMeta。
以下示例展示了Atlas搜索聚合阶段的count
元数据
Aggregates.searchMeta( SearchOperator.near(2010, 1, SearchPath.fieldPath("year")));
从searchMeta() API 文档中了解更多关于此辅助工具的信息。
Atlas 矢量搜索
重要
要了解哪些版本的 MongoDB Atlas 支持此功能,请参阅 MongoDB Atlas 文档中的 限制。
使用 vectorSearch()
方法创建一个指定 语义搜索 的 $vectorSearch 管道阶段。语义搜索是一种在意义上相似的信息定位搜索类型。
要使用此功能,您必须设置一个矢量搜索索引并索引您的矢量嵌入。有关如何以编程方式创建矢量搜索索引的信息,请参阅索引指南中的 Atlas Search 和矢量搜索索引 部分。有关矢量嵌入的更多信息,请参阅 如何为矢量搜索索引矢量嵌入。
以下示例演示了如何构建一个聚合管道,该管道使用 vectorSearch()
和 project()
方法计算矢量搜索分数
List<Double> queryVector = (asList(-0.0072121937, -0.030757688, -0.012945653)); String indexName = "mflix_movies_embedding_index"; FieldSearchPath fieldSearchPath = fieldPath("plot_embedding"); int limit = 1; VectorSearchOptions options = exactVectorSearchOptions().filter(gte("year", 2016)); List<Bson> pipeline = asList( vectorSearch( fieldSearchPath, queryVector, indexName, limit, options), project( metaVectorSearchScore("vectorSearchScore")));
以下示例演示了如何从先前的聚合管道的结果中打印分数
Document found = collection.aggregate(pipeline).first(); double score = found.getDouble("vectorSearchScore").doubleValue(); System.out.println("vectorSearch score: " + score);
在vectorSearch() API 文档中了解更多关于此辅助程序的信息。