文档菜单
文档首页
/ / /
Java 反应式流驱动程序
/

聚合框架

本页内容

  • 先决条件
  • 连接到 MongoDB 部署
  • 执行聚合
  • 使用聚合表达式
  • 解释聚合

聚合管道是一个数据聚合框架,基于数据处理管道的概念。

要了解更多关于聚合的信息,请参阅聚合管道 服务器手册。

您必须设置以下组件以运行本指南中的代码示例

  • Atest.restaurants 集合已从 restaurants.json 文件中填充,该文件位于 文档资源 GitHub。

  • 以下导入语句

import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.Accumulators;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Filters;
import org.bson.Document;

重要

本指南使用 Subscriber 实现,这些实现在本指南的快速入门指南.

首先,连接到 MongoDB 部署,然后声明并定义 MongoDatabaseMongoCollection 实例。

以下代码连接到在 localhost 的独立 MongoDB 部署,端口号为 27017。然后,定义 database 变量以引用 test 数据库,定义 collection 变量以引用 restaurants 集合

MongoClient mongoClient = MongoClients.create();
MongoDatabase database = mongoClient.getDatabase("test");
MongoCollection<Document> collection = database.getCollection("restaurants");

要了解更多关于连接到 MongoDB 部署的信息,请参阅 连接到 MongoDB 教程。

要执行聚合,请将聚合阶段的列表传递给 MongoCollection.aggregate() 方法。驱动程序提供了包含聚合阶段构建器的 Aggregates 辅助类。

在此示例中,聚合管道执行以下任务

  • 使用一个$match阶段来筛选那些categories数组字段包含元素"Bakery"的文档。示例使用Aggregates.match()来构建$match阶段。

  • 使用一个$group阶段来按stars字段分组匹配的文档,累计每个不同stars值的文档数量。示例使用Aggregates.group()来构建$group阶段,并使用Accumulators.sum()来构建累加器表达式。对于在$group阶段使用的累加器表达式,驱动程序提供了Accumulators辅助类。

collection.aggregate(
Arrays.asList(
Aggregates.match(Filters.eq("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1))
)
).subscribe(new PrintDocumentSubscriber());

对于$group累加器表达式,驱动程序提供了Accumulators辅助类。对于其他聚合表达式,通过使用Document类手动构建表达式。

在以下示例中,聚合管道使用一个$project阶段来返回只有name字段以及计算字段firstCategory,其值为categories数组中的第一个元素。示例使用Aggregates.project()和多个Projections类方法来构建$project阶段

collection.aggregate(
Arrays.asList(
Aggregates.project(
Projections.fields(
Projections.excludeId(),
Projections.include("name"),
Projections.computed(
"firstCategory",
new Document("$arrayElemAt", Arrays.asList("$categories", 0))
)
)
)
)
).subscribe(new PrintDocumentSubscriber());

要解释聚合管道,调用AggregatePublisher.explain()方法

collection.aggregate(
Arrays.asList(
Aggregates.match(Filters.eq("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1))))
.explain()
.subscribe(new PrintDocumentSubscriber());

返回

批量写入操作