聚合框架
聚合管道是一个数据聚合框架,基于数据处理管道的概念。
要了解更多关于聚合的信息,请参阅聚合管道 服务器手册。
先决条件
您必须设置以下组件以运行本指南中的代码示例
A
test.restaurants
集合已从restaurants.json
文件中填充,该文件位于 文档资源 GitHub。以下导入语句
import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.client.model.Aggregates; import com.mongodb.client.model.Accumulators; import com.mongodb.client.model.Projections; import com.mongodb.client.model.Filters; import org.bson.Document;
重要
本指南使用 Subscriber
实现,这些实现在本指南的快速入门指南.
连接到 MongoDB 部署
首先,连接到 MongoDB 部署,然后声明并定义 MongoDatabase
和 MongoCollection
实例。
以下代码连接到在 localhost
的独立 MongoDB 部署,端口号为 27017
。然后,定义 database
变量以引用 test
数据库,定义 collection
变量以引用 restaurants
集合
MongoClient mongoClient = MongoClients.create(); MongoDatabase database = mongoClient.getDatabase("test"); MongoCollection<Document> collection = database.getCollection("restaurants");
要了解更多关于连接到 MongoDB 部署的信息,请参阅 连接到 MongoDB 教程。
执行聚合
要执行聚合,请将聚合阶段的列表传递给 MongoCollection.aggregate()
方法。驱动程序提供了包含聚合阶段构建器的 Aggregates
辅助类。
在此示例中,聚合管道执行以下任务
使用一个
$match
阶段来筛选那些categories
数组字段包含元素"Bakery"
的文档。示例使用Aggregates.match()
来构建$match
阶段。
使用一个
$group
阶段来按stars
字段分组匹配的文档,累计每个不同stars
值的文档数量。示例使用Aggregates.group()
来构建$group
阶段,并使用Accumulators.sum()
来构建累加器表达式。对于在$group
阶段使用的累加器表达式,驱动程序提供了Accumulators
辅助类。
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)) ) ).subscribe(new PrintDocumentSubscriber());
使用聚合表达式
对于$group
累加器表达式,驱动程序提供了Accumulators
辅助类。对于其他聚合表达式,通过使用Document
类手动构建表达式。
在以下示例中,聚合管道使用一个$project
阶段来返回只有name
字段以及计算字段firstCategory
,其值为categories
数组中的第一个元素。示例使用Aggregates.project()
和多个Projections
类方法来构建$project
阶段
collection.aggregate( Arrays.asList( Aggregates.project( Projections.fields( Projections.excludeId(), Projections.include("name"), Projections.computed( "firstCategory", new Document("$arrayElemAt", Arrays.asList("$categories", 0)) ) ) ) ) ).subscribe(new PrintDocumentSubscriber());
解释聚合
要解释聚合管道,调用AggregatePublisher.explain()
方法
collection.aggregate( Arrays.asList( Aggregates.match(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)))) .explain() .subscribe(new PrintDocumentSubscriber());