Change Stream生产实战建议及触发事件

实战建议

如果我们对那些打开了change stream的集合或数据库进行删除或重命名操作,当change stream游标到操作日志中的那个节点时会被关闭。带有fullDocument:updateLookup选项的change stream游标可能会在查询文档是返回null

尝试在一个已经删除的集合上恢复一个change stream会引发错误。任何发生在从change stream中捕捉到的最后一个事件和集合删除之间的数据修改都会丢失。

change stream响应的文档必须与16MB的BSON文档限制保持一致。根据开启change stream的集合上的文档大小而决定,如果结果的通知文档超过了16MB大小的限制,通知可能会失败。例如,在配置的change stream上的更新操作返回完整的根性文档,或者对小于或等于大小限制的文档进行插入或替换操作。

复制集

对于有仲裁节点的复制集,如果大部分数据存储节点不可用,而导致操作无法变为大部分提交的状态时,change stream可能会保持空闲的状态。

例如,考虑一个有两个数据存储和一个仲裁节点的3成员复制集。如果从节点挂了(由于故障或升级),写操作无法成为大多数提交的情况。那么change stream保持打开状态,但是不会发送任何通知。

在这种场景下,应用可以获取到所有发生在宕机时间内的操作,只要应用中接收的最后一个操作仍然在复制集的操作日志中。

如果估计会有一段比较长的宕机时间,例如升级或者一个较大的事故,考虑增大操作日志的大小,以保证操作能被保留一段大于估计宕机的时间。使用rs.printReplicationInfo()来检索操作日志状态的信息,包括操作日志的大小和操作的时间范围。

分片集群

change stream通过利用一个全球逻辑时钟来提供一个分片之间总共的修改顺序。MongoDB保证保留了修改的顺序, 并且change stream通知可以被安全地按照接收的顺序被解析。例如,在一个3分片的分片集群上打开的change stream游标根据在这所有3个分片上事件发生的完整顺序来返回修改通知。

为了保证修改的完整顺序,mongos会检查每个分片的每个修改通知来查看分片是否有最新的修改。集合上只有很少活动甚至没有活动的一个或更多分片的分片集群(或者是“冷的”)会对change stream的响应时间有负面影响,因为mongos仍然必须检查这些冷分片来保证修改的完整顺序。这个影响可能在对地理分离的分片上表现得更加明显,或者大部分操作的工作负载集中在集群中某些分片的子集时也会有影响。

如果一个分片集合有高级别的行为,mongo可能无法跟上所有分片上的变化。考虑在这些类型的集合上利用通知进行过滤。例如,传递一个$match管道配置来过滤只发送插入操作。

对于分片集合,使用multi:true的更新选项可能会造成在该集合上打开的所有change stream发送孤立文档的通知。

目前,一个未分片的集合被分片时,直到change stream赶上第一个分片迁移,change stream通知文档中的documentKey只会包含文档的_id字段,而不是完整的片键。

修改事件

下面的文档表示一个change stream响应文档中可能包含的所有可能字段。

{
   _id : { <BSON Object> },
   "operationType" : "<operation>",
   "fullDocument" : { <document> },
   "ns" : {
      "db" : "<database>",
      "coll" : "<collection"
   },
   "documentKey" : { "_id" : <ObjectId> },
   "updateDescription" : {
      "updatedFields" : { <document> },
      "removedFields" : [ "<field>", ... ]
   }
}

一些字段只出现在一些特定的操作中,例如更新。下面的表格描述了change stream响应文档中的各个字段。

字段 类型 描述
_id 文档 与操作相关的元数据。在恢复一个change stream时使用这个文档作为一个resumeAfter参数的resumeToken值。
operationType 字符串 发生的操作类型,可能是下面值的任意一种:插入(insert)、删除(delete)、替换(replace)、更新(update)、废弃(invalidate)。
fullDocument 文档 操作创建会修改的文档。对于插入和替换操作,这表示该操作创建的新文档。对于删除操作,这个字段被省略掉了,因为该文档不再存在。对于更新操作,只有在将fullDocument设置为updateLookup时对change stream进行配置时,这个字段才会出现。然后该字段为被这个更新操作修改的最新大部分提交版本的该文档信息。这个文档可能会与updateDescription中的得到的修改不一样,如果其他大部分提交的操作在原始的更新操作和全文档查找的过程中对文档进行了修改。
ns 文档 change stream所在的数据库和集合的命名空间。
ns.db 字符串 数据库的名称。
ns.coll 字符串 集合的名称。
documentKey 文档 这个操作创建会修改的文档的objectID。对于分片集合,也会返回该文档完整的片键。如果_id字段已经是片键的一个部分,那么该字段就不会重复出现。
updateDescription 文档 一个描述该更新操作更新或删除字段的文档。这个文档和字段只会在opreationTypeupdate时会出现。
updateDescription.updateFields 文档 一个键值对应被更新操作中修改字段的文档、每个字段的值对应这些字段的新值,而不是这个操作造成的新值。
updateDescription.removedFields 数组 一个被更新操作删除的字段数组。

插入(insert)事件

下面的例子展示了一个插入(insert)事件:

{
   _id: { < Resume Token > },
   operationType: 'insert',
   ns: {
      db: 'engineering',
      coll: 'users'
   },
   documentKey: {
      userName: 'alice123',
      _id: ObjectId("599af247bb69cd89961c986d")
   },
   fullDocument: {
      _id: ObjectId("599af247bb69cd89961c986d"),
      userName: 'alice123',
      name: 'Alice'
   }
}

documentKey字段同时包含了_iduserName字段。这表明engineering.users是一个分片集合,其中片键为userName_id字段。

fullDocument文档表示了插入时的文档版本。

更新(update)事件

下面的示例展示了一个更新(update)事件:

{
   _id: { < Resume Token > },
   operationType: 'update',
   ns: {
      db: 'engineering',
      coll: 'users'
   },
   documentKey: {
      _id: ObjectId("58a4eb4a30c75625e00d2820")
   },
   updateDescription: {
      updatedFields: {
         email: 'alice@10gen.com'
      },
      removedFields: ['phoneNumber']
   }
}

下面的示例展示了一个打开了fullDocument: updateLookup选项的change stream的更新事件:

{
   _id: { < Resume Token > },
   operationType: 'update',
   ns: {
      db: 'engineering',
      coll: 'users'
   },
   documentKey: {
      _id: ObjectId("58a4eb4a30c75625e00d2820")
   },
   updateDescription: {
      updatedFields: {
         email: 'alice@10gen.com'
      },
      removedFields: ['phoneNumber']
   },
   fullDocument: {
      _id: ObjectId("58a4eb4a30c75625e00d2820"),
      name: 'Alice',
      userName: 'alice123',
      email: 'alice@10gen.com',
      team: 'replication'
   }
}

fullDocument文档表示更新文档最近的大部分提交版本。fullDocument可能会与更新操作时的文档有所不同,根据在更新操作和文档查找操作之间发生的大部分提交操作数目而确定。

替换(replace)事件

下面的示例展示了一个替换(replace)操作:

{
   _id: { < Resume Token > },
   operationType: 'replace',
   ns: {
      db: 'engineering',
      coll: 'users'
   },
   documentKey: {
      _id: ObjectId("599af247bb69cd89961c986d")
   },
   fullDocument: {
      _id: ObjectId("599af247bb69cd89961c986d"),
      userName: 'alice123',
      name: 'Alice'
   }
}

replace操作使用更新命令,由两个阶段构成:
– 删除documentKey的原始文档
– 使用相同的documentKey来插入新文档

replace事件的fullDocument表示替换文档在插入之后的文档。

删除(delete)事件

下面的事件展示了一个删除(delete)事件:

{
   _id: { < Resume Token > },
   "ns":"engineering.users",
   "documentKey":{
      "_id":{
         "$oid":"599af247bb69cd89961c986d"
      }
   },
   operationType: 'delete',
   ns: {
      db: 'engineers',
      coll: 'users'
   },
   documentKey: {
      _id: ObjectId("599af247bb69cd89961c986d")
   }
}

fullDocument文档被省略了,因为该文档在change stream游标向客户端发送了删除事件之后不再存在。

废弃(invalidate)事件

下面的示例展示了一个废弃(invalidate)事件:

{
   _id: { < Resume Token > },
   operationType: 'invalidate'
}

除了_idoperationType之外的所有字段都被省略了。

废弃事件一般发生在dropDatebase/drop或者renameCollection这些会影响被观察集合的命令之后。废弃事件关闭change stream游标和发送任何缓存的数据不再与服务器保持一致的信号。

注意: 废弃change stream无法被恢复。尝试在一个被重命名或者删除的集合上恢复change stream将会触发错误。


以上就是本周博客的技术内容啦。另外想啰嗦两句,对自己的这一周做个小小的总结。1. 工作中把短期画像相关的代码重新撸了一遍,完成了v1.2的可用版本,然后开始弄排重的事情,慢慢开始做一些算法沾边的事情;2. 办了一张健身卡,开始坚持每天锻炼1个小时左右,感觉整个人的精气神都好了很多,特别棒,希望能继续坚持下去; 3. 开始继续使用百词斩了,向张老板学习,努力学英语;4. 第一次洗牙,注意身体的健康,争取每天12点之前睡觉,睡前喝杯牛奶,敷面膜,刷牙。希望这一周能够坚持上面的事情,然后开始多看些书~

打赏

mickey

记录生活,写给几十年后的自己。