聚合框架
聚合管道是一个数据聚合框架,基于数据处理管道的概念。
要了解更多关于聚合的信息,请参阅聚合管道中的服务器手册。
先决条件
您必须设置以下组件才能运行本指南中的代码示例
A
test.restaurants
集合需用从restaurants.json
文件中的文档填充,该文件位于 文档资源 GitHub。以下导入语句
import org.mongodb.scala._ import org.mongodb.scala.model.Aggregates._ import org.mongodb.scala.model.Accumulators._ import org.mongodb.scala.model.Filters._ import org.mongodb.scala.model.Projections._
注意
本指南使用 Observable
隐式转换,如《快速入门指南》所述快速入门指南.
连接到 MongoDB 部署
首先,连接到 MongoDB 部署,然后声明并定义 MongoDatabase
和 MongoCollection
实例。
以下代码连接到在 localhost
上运行的独立 MongoDB 部署的 27017
端口。然后,定义 database
变量以引用 test
数据库,并将 collection
变量定义为引用 restaurants
集合
val mongoClient: MongoClient = MongoClient() val database: MongoDatabase = mongoClient.getDatabase("test") val collection: MongoCollection[Document] = database.getCollection("restaurants")
有关连接到 MongoDB 部署的更多信息,请参阅连接到 MongoDB 指南。
执行聚合
要执行聚合,请将一个聚合阶段列表传递给 MongoCollection.aggregate()
方法。驱动程序提供了包含聚合阶段构建器的 Aggregates
辅助类。
在这个例子中,聚合管道执行以下任务
使用
$match
阶段来筛选包含categories
数组字段包含元素"Bakery"
的文档。该示例使用Aggregates.filter()
来构建$match
阶段。
使用
$group
阶段按stars
字段对匹配的文档进行分组,并为每个不同的stars
值累计文档计数。示例使用Aggregates.group()
来构建$group
阶段,并使用Accumulators.sum()
来构建累加器表达式。对于在$group
阶段中使用的累加器表达式,驱动程序提供了Accumulators
辅助类。
collection.aggregate(Seq( Aggregates.filter(Filters.equal("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1)) )).printResults()
使用聚合表达式
对于 $group
累加器表达式,驱动程序提供了 Accumulators
辅助类。对于其他聚合表达式,请使用 Document
类手动构建表达式。
在以下示例中,聚合管道使用 $project
阶段仅返回 name
字段和计算字段 firstCategory
,其值为 categories
数组中的第一个元素。示例使用 Aggregates.project()
和各种 Projections
类方法来构建 $project
阶段
collection.aggregate( Seq( Aggregates.project( Projections.fields( Projections.excludeId(), Projections.include("name"), Projections.computed( "firstCategory", Document("$arrayElemAt"-> Seq("$categories", 0)) ) ) ) ) ).printResults()
解释聚合
要$explain
一个聚合管道,调用AggregatePublisher.explain()
方法
collection.aggregate( Seq(Aggregates.filter(Filters.eq("categories", "Bakery")), Aggregates.group("$stars", Accumulators.sum("count", 1))) ).explain().printResults()