变更流是数据库变化到应用的一个实时流。使用变更流,应用可以实时响应单个集合、数据库或整个部署中的数据变化。对于依赖于数据变化通知的应用,变更流至关重要。
以下是一些你可能会找到变更流的用例
如果你使用的是MongoDB 3.6或更高版本,变更流已经内置,利用它们非常简单。
让我们看看MongoDB变更流的主要功能,一些与这些流相关的选项(例如修改流输出),最后我们将深入代码,看看如何使用Python和Node.js实现MongoDB变更流。
MongoDB变更流提供了一个高级API,可以通知应用MongoDB数据库、集合或集群的变化,而不需要轮询(这会带来更高的开销)。以下是一些变更流的特性,可能有助于你理解变更流的工作原理以及它们可以用作什么。
为了在 MongoDB 中使用 change streams,您的环境必须满足一些要求。
在 MongoDB 4.0 及更早版本中,只有当默认启用读取关注点 "majority" 支持时,change streams 才可用。读取关注点 “majority” 保证读取的文档是持久的,并保证不会回滚。然而,从 MongoDB 4.2 开始,无论是否支持读取关注点 "majority",change streams 都是可用的。
如果您想尝试 MongoDB change streams 但没有设置支持它们的开发生态,您可以在 MongoDB Atlas 注册账户并选择免费集群选项。几分钟后,您将拥有一个支持 change streams 且终身免费的集群。
要为副本集打开 change stream,您可以从任何数据承载成员发出 open change stream 操作。对于分片集群,您必须从 mongos 二进制文件发出 open change stream 操作。
幸运的是,大多数 MongoDB 驱动程序支持使用与您应该很熟悉的语法使用 change streams。让我们看看使用 Node.js 中的官方 MongoDB Node 驱动程序 和 Python 中的 PyMongo 的示例,看看它实际上有多简单。
此示例为一个集合打开 change stream 并迭代游标以检索 change stream 文档。它假设您已连接到 MongoDB 副本集并访问了一个包含评论集合的数据库。
在这里,我们使用流来处理评论集合中的所有更改事件
conn = new Mongo("YOUR_CONNECTION_STRING");
db = conn.getDB('blog');
const collection = db.collection('comment');
const changeStream = collection.watch();
changeStream.on('change', next => {
// do something when there is a change in the comment collection
});
但我们也可以通过迭代 change stream 游标来使用迭代器处理更改事件
const changeStreamIterator = collection.watch();
const next = await changeStreamIterator.next();
使用 Python 打开 change stream 非常相似且同样简单。在 Python 中,我们为集合打开 change stream 并迭代游标以检索 change stream 文档。此示例假设您已连接到 MongoDB 副本集并访问了一个包含库存集合的数据库。
cursor = db.inventory.watch()
document = next(cursor)
根据您的需求自定义 change stream 同样简单。您通过在配置 change stream 时提供以下管道阶段的数组之一或多个来控制 change stream 的输出。有关每个阶段的作用的详细信息,您可以 查看文档。
这允许您将完整的 change stream 过滤到只想监听的那些更改。例如,以下是使用 Node.js 和 Python 修改 change stream 输出的示例。
Node.js
const pipeline = [
{ $match: { 'fullDocument.username': 'alice' } },
{ $addFields: { newField: 'this is an added field!' } }
];
const collection = db.collection('inventory');
const changeStream = collection.watch(pipeline);
changeStream.on('change', next => {
// process next document
});
Python
pipeline = [
{'$match': {'fullDocument.username': 'alice'}},
{'$addFields': {'newField': 'this is an added field!'}}
]
cursor = db.inventory.watch(pipeline=pipeline)
document = next(cursor)
如果更改流断开连接,它可以重新建立并从上次停止的地方开始监听。这是可能的,因为每个更改流事件都附带一个恢复令牌。
{
_id: <resumeToken>,
operationType: 'update'
...
}
客户端可以通过传递此恢复令牌来重新建立更改流,并将能够从上次停止的地方开始。
MongoDB驱动程序也会尝试自动恢复一次,以防错误是暂时性错误,例如网络错误。即使在这种情况下,您也可以自己访问此恢复令牌并编写自己的重试逻辑。
在这里,我们通过存储_id缓存更改的恢复令牌。在发生错误时,我们尝试使用此缓存的恢复令牌重新建立更改流。
在Node.js中
changeStream.on('change', (change) => {
console.log(change)
cachedResumeToken = change["_id"]
})
changeStream.on('error', () => {
if (cachedResumeToken) {
establishChangeStream(cachedResumeToken)
}
})
在Python中,代码更简单
resume_token = cursor.resume_token
cursor = db.inventory.watch(resume_after=resume_token)
document = next(cursor)
更改流通过利用MongoDB的复制过程,将MongoDB数据库转换为实时数据库。它们监视MongoDB中的复制,为需要实时数据但不涉及oplog跟踪风险或轮询开销的外部应用程序提供API。有关详细信息,请参阅官方更改流文档。
更改流是一个实时流,从您的MongoDB数据库流向您的应用程序,包含所有数据库更改。
除非您在字段或多个字段上创建唯一索引,否则MongoDB允许重复项。
您可以通过实现更改流将MongoDB用作实时数据库。
MongoDB没有对触发器的原生支持。但通过使用可以通知外部应用程序任何文档更改的更改流,您可以从头开始使用您选择的编程语言创建自己的触发器,或者您可以在Atlas App Services中创建触发器。