公告推出MongoDB 8.0,史上最快的MongoDB! 阅读更多 >>推出MongoDB 8.0,史上最快的MongoDB! >>

如何在MongoDB中使用变更流?

什么是变更流?

变更流是数据库变化到应用的一个实时流。使用变更流,应用可以实时响应单个集合、数据库或整个部署中的数据变化。对于依赖于数据变化通知的应用,变更流至关重要。

以下是一些你可能会找到变更流的用例

  • 分析仪表板 - 变更流可以为应用提供审计跟踪。
  • 物联网事件跟踪 - 变更流可以用于检测和调整系统,以跟踪互联网设备跟踪的事件 - 例如,跟踪设备是否移出地理围栏区域。变更流可以被过滤,只检测那些超出此范围的事件,并在事件发生时触发警报。
  • 实时交易应用 - 变更流可以用于跟踪金融数据的变化,并实时对其做出反应。

如果你使用的是MongoDB 3.6或更高版本,变更流已经内置,利用它们非常简单。

让我们看看MongoDB变更流的主要功能,一些与这些流相关的选项(例如修改流输出),最后我们将深入代码,看看如何使用Python和Node.js实现MongoDB变更流。

MongoDB变更流的功能

MongoDB变更流提供了一个高级API,可以通知应用MongoDB数据库、集合或集群的变化,而不需要轮询(这会带来更高的开销)。以下是一些变更流的特性,可能有助于你理解变更流的工作原理以及它们可以用作什么。

  • 可过滤
    • 应用可以过滤变化,只接收它们需要的变更通知。
  • 可恢复 - 变更流是可恢复的,因为每个响应都包含一个恢复令牌。使用该令牌,应用可以从它断开连接的地方继续流(如果它曾经断开过)。
  • 有序
    • 变更通知以数据库更新的相同顺序发生。
  • 持久性 - 变更流仅包括多数提交的变化。这样做是为了确保每个被监听应用看到的变化在故障场景(如选举新主节点)中都是持久的。
  • 安全性 - 只有有权读取集合的用户才能在集合上创建变更流。
  • 易于使用
    • Change Streams API 的语法使用现有的 MongoDB 驱动程序和查询语言。

MongoDB Change Streams 的可用性

为了在 MongoDB 中使用 change streams,您的环境必须满足一些要求。

在 MongoDB 4.0 及更早版本中,只有当默认启用读取关注点 "majority" 支持时,change streams 才可用。读取关注点 “majority” 保证读取的文档是持久的,并保证不会回滚。然而,从 MongoDB 4.2 开始,无论是否支持读取关注点 "majority",change streams 都是可用的。

在 MongoDB Atlas 中使用 Change Streams

如果您想尝试 MongoDB change streams 但没有设置支持它们的开发生态,您可以在 MongoDB Atlas 注册账户并选择免费集群选项。几分钟后,您将拥有一个支持 change streams 且终身免费的集群。

setting up change streams in mongodb atlas

打开 Change Stream

要为副本集打开 change stream,您可以从任何数据承载成员发出 open change stream 操作。对于分片集群,您必须从 mongos 二进制文件发出 open change stream 操作。

幸运的是,大多数 MongoDB 驱动程序支持使用与您应该很熟悉的语法使用 change streams。让我们看看使用 Node.js 中的官方 MongoDB Node 驱动程序 和 Python 中的 PyMongo 的示例,看看它实际上有多简单。

MongoDB Change Streams Node.js 示例

此示例为一个集合打开 change stream 并迭代游标以检索 change stream 文档。它假设您已连接到 MongoDB 副本集并访问了一个包含评论集合的数据库。

在这里,我们使用流来处理评论集合中的所有更改事件

conn = new Mongo("YOUR_CONNECTION_STRING");
db = conn.getDB('blog');
const collection = db.collection('comment');
const changeStream = collection.watch();
changeStream.on('change', next => {
  // do something when there is a change in the comment collection
});

但我们也可以通过迭代 change stream 游标来使用迭代器处理更改事件

const changeStreamIterator = collection.watch();
const next = await changeStreamIterator.next();

MongoDB Change Streams Python 示例

使用 Python 打开 change stream 非常相似且同样简单。在 Python 中,我们为集合打开 change stream 并迭代游标以检索 change stream 文档。此示例假设您已连接到 MongoDB 副本集并访问了一个包含库存集合的数据库。

cursor = db.inventory.watch()
document = next(cursor)

修改 MongoDB Change Streams 的输出

根据您的需求自定义 change stream 同样简单。您通过在配置 change stream 时提供以下管道阶段的数组之一或多个来控制 change stream 的输出。有关每个阶段的作用的详细信息,您可以 查看文档

  • $addFields
  • $match
  • $project
  • $replaceRoot
  • $replaceWith(从 MongoDB 4.2 开始可用)
  • $redact
  • $set(从 MongoDB 4.2 开始可用)
  • $unset(从 MongoDB 4.2 开始可用)

这允许您将完整的 change stream 过滤到只想监听的那些更改。例如,以下是使用 Node.js 和 Python 修改 change stream 输出的示例。

Node.js

const pipeline = [
  { $match: { 'fullDocument.username': 'alice' } },
  { $addFields: { newField: 'this is an added field!' } }
];

const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
  // process next document
});

Python

pipeline = [
    {'$match': {'fullDocument.username': 'alice'}},
    {'$addFields': {'newField': 'this is an added field!'}}
]
cursor = db.inventory.watch(pipeline=pipeline)
document = next(cursor)

恢复 Change Streams

如果更改流断开连接,它可以重新建立并从上次停止的地方开始监听。这是可能的,因为每个更改流事件都附带一个恢复令牌。

{
  _id: <resumeToken>,
  operationType: 'update'
  ...
}

客户端可以通过传递此恢复令牌来重新建立更改流,并将能够从上次停止的地方开始。

MongoDB驱动程序也会尝试自动恢复一次,以防错误是暂时性错误,例如网络错误。即使在这种情况下,您也可以自己访问此恢复令牌并编写自己的重试逻辑。

在这里,我们通过存储_id缓存更改的恢复令牌。在发生错误时,我们尝试使用此缓存的恢复令牌重新建立更改流。

在Node.js中

changeStream.on('change', (change) => {
    console.log(change)
    cachedResumeToken = change["_id"]
})

changeStream.on('error', () => {
    if (cachedResumeToken) {
        establishChangeStream(cachedResumeToken)
    }
})

在Python中,代码更简单

resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = next(cursor)

结论

更改流通过利用MongoDB的复制过程,将MongoDB数据库转换为实时数据库。它们监视MongoDB中的复制,为需要实时数据但不涉及oplog跟踪风险或轮询开销的外部应用程序提供API。有关详细信息,请参阅官方更改流文档

常见问题解答

MongoDB中的更改流是什么?

更改流是一个实时流,从您的MongoDB数据库流向您的应用程序,包含所有数据库更改。


MongoDB允许重复项吗?

除非您在字段或多个字段上创建唯一索引,否则MongoDB允许重复项。


MongoDB是实时数据库吗?

您可以通过实现更改流将MongoDB用作实时数据库。


MongoDB有触发器吗?

MongoDB没有对触发器的原生支持。但通过使用可以通知外部应用程序任何文档更改的更改流,您可以从头开始使用您选择的编程语言创建自己的触发器,或者您可以在Atlas App Services中创建触发器。


准备好开始了吗?

启动新的集群或将应用程序迁移到MongoDB Atlas,无需停机。无需信用卡。