MongoDB change stream 详解
Change Stream指数据的变化事件流,MongoDB从3.6版本开始提供订阅数据变更的功能。Change Stream 是 MongoDB 用于实现变更追踪的解决方案,类似于关系数据库的触发器,但原理不完全相同:**Change Stream 是基于 oplog 实现的,提供推送实时增量的推送功能。**它在 oplog 上开启一个 tailable cursor 来追踪所有复制集上的变更操
什么是 Chang Streams
Change Stream指数据的变化事件流,MongoDB从3.6版本开始提供订阅数据变更的功能。
Change Stream 是 MongoDB 用于实现变更追踪的解决方案,类似于关系数据库的触发器,但原理不完全相同:
实现原理
**Change Stream 是基于 oplog 实现的,提供推送实时增量的推送功能。**它在 oplog 上开启一个 tailable cursor 来追踪所有复制集上的变更操作,最终调用应用中定义的回调函数。
被追踪的变更事件主要包括:
- insert/update/delete:插入、更新、删除;
- drop:集合被删除;
- rename:集合被重命名;
- dropDatabase:数据库被删除;
- invalidate:drop/rename/dropDatabase 将导致 invalidate 被触发, 并关闭 change stream;
从MongoDB 6.0开始,change stream支持DDL事件的更改通知,如createIndexes和dropIndexes事件。
如果只对某些类型的变更事件感兴趣,可以使用使用聚合管道的过滤步骤过滤事件:
var cs = db.user.watch([{
$match:{operationType:{$in:["insert","delete"]}}
}])
db.watch()语法: https://www.mongodb.com/docs/manual/reference/method/db.watch/#example
Change Stream会采用 "readConcern:majority"这样的一致性级别,保证写入的变更不会被回滚。因此:
- 未开启 majority readConcern 的集群无法使用 Change Stream;
- 当集群无法满足 {w: “majority”} 时,不会触发 Change Stream(例如 PSA 架构 中的 S 因故障宕机)。
MongoShell测试
# 窗口1执行 watch()
db.user.watch([],{maxAwaitTimeMS:1000000})
# 窗口2进行一条新增操作
db.user.insert({name:"xxxx"})
# mongodb 6 的版本这里就只是简单的打印一条日志
rs0 [direct: primary] test> db.emp.watch([], {maxAwaitTimeMS:1000000})
ChangeStreamCursor on emp
故障恢复
假设在一系列写入操作的过程中,订阅 Change Stream 的应用在接收到“写3”之后 于 t0 时刻崩溃,重启后后续的变更怎么办?
想要从上次中断的地方继续获取变更流,只需要保留上次变更通知中的 _id 即可。 Change Stream 回调所返回的的数据带有 _id,这个 _id 可以用于断点恢复。例如:
var cs = db.collection.watch([], {resumeAfter: <_id>})
即可从上一条通知中断处继续获取后续的变更通知。
使用场景
-
监控
用户需要及时获取变更信息(例如账户相关的表),ChangeStreams 可以提供监控功能,一旦相关的表信息发生变更,就会将变更的消息实时推送出去。
-
分析平台
例如需要基于增量去分析用户的一些行为,可以基于 ChangeStreams 把数据拉出来,推到下游的计算平台, 比如 类似 Flink、Spark 等计算平台等等。
-
数据同步
基于 ChangeStreams,用户可以搭建额外的 MongoDB 集群,这个集群是从原端的 MongoDB 拉取过来的, 那么这个集群可以做一个热备份,假如源端集群发生 网络不通等等之类的变故,备集群就可以接管服务。 还可以做一个冷备份,如用户基于 ChangeStreams 把数据同步到文件,万一源端数据库发生不可服务, 就可以从文件里恢复出完整的 MongoDB 数据库, 继续提供服务。(当然,此处还需要借助定期全量备份来一同完成恢复) 另外数据同步它不仅仅局限于同一地域,可以跨地域,从北京到上海甚至从中国到美国等等。
-
消息推送
假如用户想实时了解公交车的信息,那么公交车的位置每次变动,都实时推送变更的信息给想了解的用户,用户能够实时收到公交车变更的数据,非常便捷实用。
注意事项
- Change Stream 依赖于 oplog,因此中断时间不可超过 oplog 回收的最大时间窗;
- 在执行 update 操作时,如果只更新了部分数据,那么 Change Stream 通知的也是增量部分;
- 删除数据时通知的仅是删除数据的 _id。
Spring Boot整合Chang Stream
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
配置yml
spring:
data:
mongodb:
uri: mongodb://hushang:123456@192.168.75.100:28017,192.168.75.100:28018,192.168.75.100:28019/test?authSource=admin&replicaSet=rs0
配置mongo监听器,用于接收数据库的变更信息
package com.hs.learn.changestream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.stereotype.Component;
/**
* @Description: change stream 监听器
* @Author 胡尚
* @Date: 2024/8/1 14:34
*/
@Component
@Slf4j
public class DocumentMessageListener<S,T> implements MessageListener<S, T> {
@Override
public void onMessage(Message<S, T> message) {
// TODO 在监听器方法中完成自己的定制化需求
log.info("Received Message in collection: {}", message.getProperties().getCollectionName());
log.info("trawsource: {}", message.getRaw());
log.info("tconverted: {}", message.getBody());
}
}
配置 mongo监听器的容器MessageListenerContainer,spring启动时会自动启动监听的任务用于接收changestream
package com.hs.learn.config;
import com.hs.learn.changestream.DocumentMessageListener;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.TransactionOptions;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.changestream.FullDocument;
import org.bson.Document;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.MongoTransactionManager;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.query.Criteria;
import java.util.concurrent.*;
/**
* @Description: mongodb的配置类
* @Author 胡尚
* @Date: 2024/7/31 14:52
*/
@Configuration
public class MongodbConfig {
/**
* change stream的配置
*
* @param template 引入mongodb的依赖后,内置的bean对象
* @param documentMessageListener 自定义的messageListener
* @return
*/
@Bean
public MessageListenerContainer messageListenerContainer(MongoTemplate template, DocumentMessageListener documentMessageListener) {
// 创建一个线程池
Executor executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(5));
// 创建一个MessageListenerContainer对象
MessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer(template, executor) {
@Override
public boolean isAutoStartup() {
return true;
}
};
ChangeStreamRequest<Document> changeStreamRequest = ChangeStreamRequest.builder(documentMessageListener)
// 需要监听的集合名
.collection("emp")
//过滤需要监听的操作类型,可以根据需求指定过滤条件
.filter(Aggregation.newAggregation(Aggregation.match(
Criteria.where("operationType").in("insert", "update", "delete"))))
//不设置时,文档更新时,只会发送变更字段的信息,设置UPDATE_LOOKUP会返回文档的全部信息
.fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
.build();
messageListenerContainer.register(changeStreamRequest, Document.class);
return messageListenerContainer;
}
}
测试
mongo shell插入一条文档
rs0 [direct: primary] test> db.emp.insertOne({name: "hushang", age: 24})
{
acknowledged: true,
insertedId: ObjectId("66ab31bec9f5b6d436cdb9d5")
}
控制台输出:
Received Message in collection: emp
trawsource: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "8266AB31BE000000012B022C0100296E5A1004ED8D395B97A348039DC133ABDBC3800F46645F6964006466AB31BEC9F5B6D436CDB9D50004"}, namespace=test.emp, destinationNamespace=null, fullDocument=Document{{_id=66ab31bec9f5b6d436cdb9d5, name=hushang, age=24}}, documentKey={"_id": {"$oid": "66ab31bec9f5b6d436cdb9d5"}}, clusterTime=Timestamp{value=7398061504999718913, seconds=1722495422, inc=1}, updateDescription=null, txnNumber=null, lsid=null}
tconverted: Document{{_id=66ab31bec9f5b6d436cdb9d5, name=hushang, age=24}}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)