mongodb 使用 change stream 监听数据变动 php 示例

技术 置顶 精帖
0 576
peng49
peng49 2022-09-08 17:13:30
 

本地测试环境搭建

测试环境

系统 版本
centos 7
mongodb 4.4.6
php 7.4

生成认证文件

  1. openssl rand -base64 756 > mongo.key
  2. #修改mongo.key文件的权限
  3. chmod 600 mongo.key

启动容器

  1. docker run -it -d --name mongo-relicat-set -p 27017:27017 -v `pwd`/mongo.key:/mongo.key --restart always -e MONGO_INITDB_ROOT_USERNAME=root -e MONGO_INITDB_ROOT_PASSWORD=Admin@123 mongo:4.4.6 mongod --replSet rs0 --keyFile /mongo.key

mongo命令行操作示例

控制台1

  1. $ mongo # 进入mongo控制台
  2. rs0:PRIMARY> use admin # 使用admin数据库
  3. switched to db admin
  4. rs0:PRIMARY> db.auth('root','Admin@123') # 用户名密码认证
  5. 1 # 1表示认证成功
  6. rs0:PRIMARY> use test # 切换到test数据库
  7. switched to db test
  8. rs0:PRIMARY> db.watch([],{maxAwaitTimeMS:60000}) # 监听当前数据库test的所有变动

控制台2

  1. $ mongo # 进入mongo控制台
  2. rs0:PRIMARY> use admin # 使用admin数据库
  3. switched to db admin
  4. rs0:PRIMARY> db.auth('root','Admin@123') # 用户名密码认证
  5. 1 # 1表示认证成功
  6. rs0:PRIMARY> use test # 切换到test数据库
  7. switched to db test
  8. rs0:PRIMARY> db.users.insertMany(
  9. [
  10. { username:"user01", name:"1"},
  11. { username:"user02", name:"2"}
  12. ]
  13. )
  14. # 结果如下,成功新增两条数据
  15. {
  16. "acknowledged" : true,
  17. "insertedIds" : [
  18. ObjectId("6319a9203df851bacd5446b6"),
  19. ObjectId("6319a9203df851bacd5446b7")
  20. ]
  21. }

此时控制台1的效果如下

  1. rs0:PRIMARY> db.watch([],{maxAwaitTimeMS:30000})
  2. { "_id" : { "_data" : "826319AACA000000012B022C0100296E5A1004B4A3BF6163714E6EBED5BBD210175F7346645F696400646319AACA2A72408C1964CAD40004" }, "operationType" : "insert", "clusterTime" : Timestamp(1662626506, 1), "fullDocument" : { "_id" : ObjectId("6319aaca2a72408c1964cad4"), "username" : "user01", "name" : "1" }, "ns" : { "db" : "test", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("6319aaca2a72408c1964cad4") } }
  3. { "_id" : { "_data" : "826319AACA000000022B022C0100296E5A1004B4A3BF6163714E6EBED5BBD210175F7346645F696400646319AACA2A72408C1964CAD50004" }, "operationType" : "insert", "clusterTime" : Timestamp(1662626506, 2), "fullDocument" : { "_id" : ObjectId("6319aaca2a72408c1964cad5"), "username" : "user02", "name" : "2" }, "ns" : { "db" : "test", "coll" : "users" }, "documentKey" : { "_id" : ObjectId("6319aaca2a72408c1964cad5") } }

php示例

安装扩展

  1. pecl install mongodb

在php.ini文件中添加如下内容

  1. extension=mongodb.so

可用php -m检查配置是否生效

composer安装 mongodb/mongodb 库

  1. composer require mongodb/mongodb

示例代码:

change_stream_demo01.php

  1. <?php
  2. include_once 'vendor/autoload.php';
  3. $client = new MongoDB\Client(
  4. 'mongodb://192.168.1.100:27017',
  5. [
  6. 'username' => 'root',
  7. 'password' => 'Admin@123',
  8. 'authSource' => 'admin',
  9. ]
  10. );
  11. $options = [
  12. 'maxAwaitTimeMS' => 30000,
  13. 'fullDocument' => 'updateLookup'
  14. ];
  15. //最后处理的一条记录,如果有记录,从记录的位置开始
  16. if ($lastContent = @file_get_contents('last.txt')) {
  17. $options['resumeAfter'] = new MongoDB\Model\BSONDocument(json_decode($lastContent, true));
  18. }
  19. //$client->selectCollection('watch','users')->watch([],$options); //监听指定集合的变动
  20. //监听数据库的变动
  21. $changeStream = $client->selectDatabase('watch')
  22. ->watch([], $options);
  23. while (true) {
  24. $changeStream->next();//如果没有变动数据,阻塞进程,最长时间为 maxAwaitTimeMS 设置的值
  25. if (!$changeStream->valid()) {
  26. continue;
  27. }
  28. $row = $changeStream->current();
  29. var_dump(json_encode($row));
  30. $item = json_decode(json_encode($row), true);
  31. //保存处理成功的最后一条日志
  32. file_put_contents('last.txt', json_encode($item['_id']));
  33. }

运行php change_stream_demo01.php后,在指定集合中新增两条数据,效果如下:

  1. $ php change_stream_demo01.php
  2. string(391) "{"_id":{"_data":"826319AC79000000012B022C0100296E5A1004B4A3BF6163714E6EBED5BBD210175F7346645F696400646319AC792A72408C1964CAD60004"},"operationType":"insert","clusterTime":{"$timestamp":{"t":1662626937,"i":1}},"fullDocument":{"_id":{"$oid":"6319ac792a72408c1964cad6"},"username":"user01","name":"1"},"ns":{"db":"test","coll":"users"},"documentKey":{"_id":{"$oid":"6319ac792a72408c1964cad6"}}}"
  3. string(391) "{"_id":{"_data":"826319AC79000000022B022C0100296E5A1004B4A3BF6163714E6EBED5BBD210175F7346645F696400646319AC792A72408C1964CAD70004"},"operationType":"insert","clusterTime":{"$timestamp":{"t":1662626937,"i":2}},"fullDocument":{"_id":{"$oid":"6319ac792a72408c1964cad7"},"username":"user02","name":"2"},"ns":{"db":"test","coll":"users"},"documentKey":{"_id":{"$oid":"6319ac792a72408c1964cad7"}}}"

参考链接
MongoDB 4.2 内核解析 - Change Stream
MongoDB Change Stream之一——上手及初体验 - 腾讯云开发者社区-腾讯云
db.watch() 官网说明
MongoDB PHP库

回帖
登录
忘记密码?