利用MongoDB中oplog机制实现准实时数据的操作监控
前言
最近有一个需求是要实时获取到新插入到MongoDB的数据,而插入程序本身已经有一套处理逻辑,所以不方便直接在插入程序里写相关程序,传统的数据库大多自带这种触发器机制,但是Mongo没有相关的函数可以用(也可能我了解的太少了,求纠正),当然还有一点是需要python实现,于是收集整理了一个相应的实现方法。
一、引子
首先可以想到,这种需求其实很像数据库的主从备份机制,从数据库之所以能够同步主库是因为存在某些指标来做控制,我们知道MongoDB虽然没有现成触发器,但是它能够实现主从备份,所以我们就从它的主从备份机制入手。
二、OPLOG
首先,需要以master模式来打开mongod守护,命令行使用–master,或者配置文件增加master键为true。
此时,我们可以在Mongo的系统库local里见到新增的collection——oplog,此时oplog.$main里就会存储进oplog信息,如果此时还有充当从数据库的Mongo存在,就会还有一些slaves的信息,由于我们这里并不是主从同步,所以不存在这些集合。
再来看看oplog结构:
ts : Timestamp(6417682881216249, 1), 时间戳
h : NumberLong(0), 长度
v : 2,
op : n , 操作类型
ns : , 操作的库和集合
o2 : _id update条件
o : {} 操作值,即document
这里需要知道op的几种属性:
insert, i
update, u
remove(delete), d
cmd, c
noop, n 空操作
从上面的信息可以看出,我们只要不断读取到ts来做对比,然后根据op即可判断当前出现的是什么操作,相当于使用程序实现了一个从数据库的接收端。
三、CODE
在Github上找到了别人的实现方式,不过它的函数库太老旧,所以在他的基础上进行修改。
Github地址:https://github.com/RedBeard0531/mongo-oplog-watcher
mongo_oplog_watcher.py如下:
#!/usr/bin/python import pymongo import re import time from pprint import pprint # pretty printer from pymongo.errors import AutoReconnect class OplogWatcher(object): def __init__(self, db=None, collection=None, poll_time=1.0, connection=None, start_now=True): if collection is not None: if db is None: raise ValueError("must specify db if you specify a collection") self._ns_filter = db + "." + collection elif db is not None: self._ns_filter = re.compile(r"^%s\." % db) else: self._ns_filter = None self.poll_time = poll_time self.connection = connection or pymongo.Connection() if start_now: self.start() @staticmethod def __get_id(op): id = None o2 = op.get("o2") if o2 is not None: id = o2.get("_id") if id is None: id = op["o"].get("_id") return id def start(self): oplog = self.connection.local["oplog.$main"] ts = oplog.find().sort("$natural", -1)[0]["ts"] while True: if self._ns_filter is None: filter = {} else: filter = {"ns": self._ns_filter} filter["ts"] = {"$gt": ts} try: cursor = oplog.find(filter, tailable=True) while True: for op in cursor: ts = op["ts"] id = self.__get_id(op) self.all_with_noop(ns=op["ns"], ts=ts, op=op["op"], id=id, raw=op) time.sleep(self.poll_time) if not cursor.alive: break except AutoReconnect: time.sleep(self.poll_time) def all_with_noop(self, ns, ts, op, id, raw): if op == "n": self.noop(ts=ts) else: self.all(ns=ns, ts=ts, op=op, id=id, raw=raw) def all(self, ns, ts, op, id, raw): if op == "i": self.insert(ns=ns, ts=ts, id=id, obj=raw["o"], raw=raw) elif op == "u": self.update(ns=ns, ts=ts, id=id, mod=raw["o"], raw=raw) elif op == "d": self.delete(ns=ns, ts=ts, id=id, raw=raw) elif op == "c": self.command(ns=ns, ts=ts, cmd=raw["o"], raw=raw) elif op == "db": self.db_declare(ns=ns, ts=ts, raw=raw) def noop(self, ts): pass def insert(self, ns, ts, id, obj, raw, **kw): pass def update(self, ns, ts, id, mod, raw, **kw): pass def delete(self, ns, ts, id, raw, **kw): pass def command(self, ns, ts, cmd, raw, **kw): pass def db_declare(self, ns, ts, **kw): pass class OplogPrinter(OplogWatcher): def all(self, **kw): pprint (kw) print #newline if __name__ == "__main__": OplogPrinter()
首先是实现一个数据库的初始化,设定一个延迟时间(准实时):
self.poll_time = poll_time self.connection = connection or pymongo.MongoClient()
主要的函数是start() ,实现一个时间的比对并进行相应字段的处理:
def start(self): oplog = self.connection.local["oplog.$main"] #读取之前提到的库 ts = oplog.find().sort("$natural", -1)[0]["ts"] #获取一个时间边际 while True: if self._ns_filter is None: filter = {} else: filter = {"ns": self._ns_filter} filter["ts"] = {"$gt": ts} try: cursor = oplog.find(filter) #对此时间之后的进行处理 while True: for op in cursor: ts = op["ts"] id = self.__get_id(op) self.all_with_noop(ns=op["ns"], ts=ts, op=op["op"], id=id, raw=op) #可以指定处理插入监控,更新监控或者删除监控等 time.sleep(self.poll_time) if not cursor.alive: break except AutoReconnect: time.sleep(self.poll_time)
循环这个start函数,在all_with_noop这里就可以编写相应的监控处理逻辑。
这样就可以实现一个简易的准实时Mongo数据库操作监控器,下一步就可以配合其他操作来对新入库的程序进行相应处理。
本篇文章到此结束,如果您有相关技术方面疑问可以联系我们技术人员远程解决,感谢大家支持本站!
我想要获取技术服务或软件
服务范围:MySQL、ORACLE、SQLSERVER、MongoDB、PostgreSQL 、程序问题
服务方式:远程服务、电话支持、现场服务,沟通指定方式服务
技术标签:数据恢复、安装配置、数据迁移、集群容灾、异常处理、其它问题
本站部分文章参考或来源于网络,如有侵权请联系站长。
数据库远程运维 利用MongoDB中oplog机制实现准实时数据的操作监控
相关文章
- MongoDB下根据数组大小进行查询的方法
- 掌握 MongoDB 数据类型,提升数据存储性能(mongodb数据类型)
- 基于Linux环境下MongoDB数据库的使用(linuxcmdb)
- 限制MongoDB单文档大小限制:解决方案(mongodb单个文档)
- Python中MongoDB使用详解编程语言
- MongoDB监控软件:有效精確管理你的数据库(mongodb监控软件)
- MongoDB: 启动你的新服务(mongodb开启服务)
- MongoDB数据库性能监控实践(mongodb数据库监控)
- MongoDB 分片搭建:实现无限可能(mongodb分片搭建)
- MongoDB清空表:操作指南(mongodb清空表)
- MongoDB实时备份:实现完美数据安全(mongodb实时备份)
- 删除MongoDB:利用条件删除数据(mongodb条件)
- MongoDB与Redis:不同点异同之辨(mongodb和redis的区别)
- MongoDB服务搭建路上的成功之路(mongodb建立服务)
- MongoDB 数据更新:高效、灵活、实时(mongodb数据更新)
- MongoDB 监控:打造高效、安全的数据库管理系统(mongodb监控)
- MongoDB潜在的不足及改善(mongodb不足)
- 利用 MongoDB 快速搭建副本集(mongodb创建副本集)
- 化查询利用MongoDB实现结构化查询(mongodb结构)
- MongoDB实例:简单易学利用起来(mongodb例子)
- MongoDB数据库中添加字段的操作(mongodb添加字段)
- MongoDB胜出:探索为什么它是一款领先的数据库解决方案(mongodbwin)
- MongoDB 负载均衡策略:优化数据处理效率(mongodb负载均衡)
- MongoDB删除命令如何操作?(mongodb删除命令)
- 安装MongoDB,接入PHP开发新篇章(php安装mongodb)
- MongoDB开发指南:构建高效应用(mongodb开发手册)
- C#对MongoDB进行增删改查的简单操作实例