MongoDB中的change stream

3.6版中的新功能。
change stream允许应用容易地获取实时的数据流变化,并且没有跟踪日志的风险。应用可以使用change stream来订阅一个集合上的所有数据变化,并且立刻进行响应。

打开一个change stream

我们可以在一个复制集或分片集群中打开一个change stream。对于一个分片集群,我们必须在mongos上发送打开change stream的操作。

复制集或者分片几区必须使用复制集协议版本1(pv1)和WiredTiger存储引擎(可以加密)。

python为例
下面的Python示例假设我们已经连接到一个MongoDB复制集并且对包含inventory集合的数据库有读取权限。

下面的代码在一个复制集上打开一个change streamchange stream被绑定在一个集合上,变化流上的文档使用一个游标进行迭代。只要对MongoDB的部署保持打开并且集合存在,这个游标一直保持打开状态,直到被显式关闭。

cursor = db.inventory.watch()
document = next(cursor)

可以对change stream游标进行迭代来检索数据修改事件的通知。

注意:一个未关闭游标的生命周期是依赖语言的。

修改change stream输出

我们可以在配置change stream时通过提供一个或多个下列管道阶段来控制change stream的输出:
$macth
$project
$addFields
$replaceRoot
$redact

查询更新操作的完整文档

默认地,在更新操作中,change stream只会返回字段的变化量。但是,我们可以配置change stream返回被更新文档最近的重大提交版本。

如果想要返回被更新文档最近的重要提交版本,向db.collection.watch()方法传递full_document='updateLookup'的参数。

在下面的示例中,所有的更新通知都会包含一个full_document字段来表示这个更新操作的文档当前的版本。

cursor = db.inventory.watch(full_documennt='updateLookup')
document = next(cursor)

注意:如果在更新操作之后但是在查找操作之前,有一个或者多个对更新文档修改的重大提交操作,返回的整个文档有可能与更新操作时的文档完全不一样。
然而,在change stream文档中包含的增量一般能够正确地描述被观察到应用到change stream事件的集合变化。

恢复change stream

change stream是可以恢复的,只要日志有足够的历史来定位应用收到的最后一个操作。

每个change stream响应文档有一个_id字段,里面有一个包含恢复令牌的文档。通过向change stream传递_id来恢复从指定操作开始的通知。

在下面的示例中,resume_token包含了change stream通知idresume_after修改函数接收一个参数,被解析为一个恢复令牌。将resume_token传递给resume_after修改函数,可以将change stream导流到尝试从恢复令牌中指定操作开始的恢复通知。

resume_token = document.get("_id")
cursor = db.inventory.watch(resume_after=resume_token)
document = next(cursor)

只要这个操作还没有被轮转出操作日志,change stream可以成功恢复通知。

注意:无效的change stream无法恢复。尝试在一个已经删除或者被重命名的集合上恢复change stream会报错。

使用实例

change stream对可靠的业务系统的架构非常有用,一旦数据修改持久化之后通知下游系统。例如,在实现提取、转化和载入(ETL)服务、跨平台同步、功能合作和通知服务时,change stream可以节约开发者的时间。

存取控制

对于要求认证和授权的部署,应用只能在它们有读权限的集合上打开change stream

事件通知

change stream只会对那些已经保存到复制集中大多数数据存储成员的数据修改进行通知、这保证了通知只会被大部分提交修改的情况下被触发(在失效的场景下是可持久的)。

例如,考虑在一个3成员复制集的主成员上打开change stream游标。如果客户端发送一个插入操作,change stream只会在这个插入已经保存到大部分数据存储成员时通知给应用数据的变化。

打赏

mickey

记录生活,写给几十年后的自己。