【做一下1】python 监听数据库变化
监听数据库
前言
用的yolov5,作者自己写的loadStream函数就是依据 streams.txt里面的rtsp流地址列表来新建线程,然后实现多路监控的。
大体就是这个图里面说的,我已经是为了个整体业务,去小改了这个loadStreams方法,将那些可用的rtsp流地址保存到一个新的list里面,然后再新建线程。
如何检验可用与否,就是用opencv VideoCapture.isOpened() 慢慢去试,连不上就剔除
上面这个流程,是蠢(还想着定时任务,隔一会重启一下服务器,其实就是潜意识里,想用最直接(不动脑子)的方法糊弄这个问题)。跟老师一说,老师也是说,还是改成触发器吧。拖了十天了,今天趁着还没有干别的事情,脑子还能转起来,来学一下。
技能要求
监听数据库,然后 读写最新数据库信息到文件,然后利用文件重新启动服务器。
准备工作
- pycharm新建一个python文件
- nacicat 打开自己的数据库
- 找一篇 python监听数据库表变化的博客
再去找博客的时候,(我其实之前是找了一些了的),但大多都是对数据库进行轮询,也就是和我上面的定时任务差不多了。可我想要的是触发器,只有当数据库改变的时候,才会执行,这样才叫触发嘛,一直盯着,那叫监控。
然后就看到了这条
确实,那思路就从监听数据库 转变为 用mysql的触发器然后生成的日志文件(只生成rtsp地址那一列变动的日志),然后再用python的watchdog(这个是轮子Python 监控文件增加、修改、删除等变化)监听这个日志文件。 这样就不用监听数据库了,变成了监听文件。(主要是同事的数据库连起来很慢,所以能不监听就不监听。)
另外一个想法,想着能不能通过mysql的触发器,直接调用python服务重启的脚本。这就牵扯到,这个脚本是不是也要放到同事数据库所在的那台机子上。如果是我这个想法的话,那我上面需要监听的那个日志文件,肯定也是在数据库的本机地址上生成的呀,可是我的服务器是部署在autodl的docker里面的,这两个又要怎么互通呢?
好麻烦,要不就 轮询数据库得了。
后期要是这个东西真的要上线的话,那数据库肯定也是在服务器上的吧。mysql部署在docker那就是说日志文件和我项目的代码是在一个地方的。
如果是一个地方的话,那自己访问自己肯定很快的呀,轮询是可以的;或者然后设定触发器,监控触发器生成的文件也是可以的;或者触发器触发然后调用python重启脚本也是可以的。
那暂时不知道未来是什么情况,就把上述可行的路子都看一下。
轮询
写脚本 轮询 写的挺好,罗列了两种需要监听的类型——新增和改变
我业务里面也主要就是处理新增的那些数据,如何将它们加入到streams.txt,所以我就可以在轮询的回调方法那里写“将数据写入streams.txt的文件操作”。
我把他这个代码排版好了之后,就是如下:
import os
import sys
import time
import threading
import pickle
import pymysql
class BaseListener(object):
# 使用一个线程启动监听
def __init__(self):
self.checkpoint = 0
self.stop_flag = True
self.listen_thread = threading.Thread(name="Listener", target=self.do_listen)
self.listen_thread.start()
def start(self):
self.stop_flag = False
def stop(self):
self.stop_flag = True
def set_checkpoint(self, v):
# 设置监听的断点,如果需要可以持久存储在磁盘上
self.checkpoint = v
def get_checkpoint(self):
return self.checkpoint
def do_listen(self):
filename = 'utils/streams.txt'
with open(filename, 'w') as f:
f.write('')
cnt=0
while True:
if not self.stop_flag:
conn = pymysql.connect(host="localhost", user="root", password="123456", charset='utf8')
# 监听用sql语句,应当以id倒排,需要使用 WHERE id > {CHECK_POINT}进行筛选,如
# sql = "SELECT * FROM a WHERE id>{CHECK_POINT} ORDER BY id DESC"
cursor = conn.cursor()
sql = "SELECT id,monitor_name FROM `fall`.`monitor` WHERE id>{CHECK_POINT}"
checkpoint = self.get_checkpoint()
sql_listen = sql.replace("{CHECK_POINT}", str(checkpoint))
# fetchall为读取全部记录的语句
cursor.execute(sql_listen)
# 获取所有记录列表
results = cursor.fetchall()
#记录最后一个游标
rec_id =""
if(len(results)!=0):
cnt += len(results)
rec_id = self.callback(results,cnt)
self.set_checkpoint(rec_id)
# 关闭数据库连接
cursor.close()
conn.close()
# 根据情况设置轮询时间
time.sleep(1)
def callback(self,data):
filename = 'utils/streams.txt'
# 这是do_listen调用的一个回调函数,把数据传过来处理,在子类中实现
with open(filename, 'a+') as f:
f.write(data)
class BaseMonitor(object):
"""
监听数据变化的基类
"""
def __init__(self):
self.prev_data = None
self.stop_flag = True
self.monitor_thread = threading.Thread(name="Monitor", target=self.do_monitor)
self.monitor_thread.start()
def start(self):
self.stop_flag = False
def stop(self):
self.stop_flag = True
def do_monitor(self):
while True:
if not self.stop_flag:
self.execute(self.extra_sql)
data = self.fetchall(self.base_sql)
if data:
str_data = pickle.dumps(data)
if str_data != self.prev_data:
self.callback(data)
self.prev_data = str_data
def callback(self, dictdata):
# 这是do_monitor调用的一个回调函数,把数据传过来处理,在子类中实现
print
"Should be implemented in subclasses!"
class Listener(BaseListener):
def callback(self, data,cnt):
filename = 'utils/streams.txt'
if(cnt!=len(data)):
p = sys.executable
os.execl(p, p, *sys.argv)
rec_id=""
# 这是do_listen调用的一个回调函数,把数据传过来处理,在子类中实现
with open(filename, 'a+') as f:
for i, row in enumerate(data):
ip = row[1]
if(i==0):
f.write(ip)
else:
f.write('\n'+ip)
rec_id = row[0]
return rec_id
class MonitorTest(BaseMonitor):
def callback(self, dictdata):
print
"Monitor:",dictdata
if __name__ == "__main__":
ad = Listener()
ad.start()
其余两个 有机会再写吧
监控日志 触发器调用脚本
然后这个重启功能,在这篇博客表头有提到,所以在pycharm上面无法成功演示,但在服务器linux上应该就ok了。
windows下任务管理器中运行在pycharm或者其他ide下的python应用程序,我们知道此时pycharm是进程,而运行的.py文件是线程功能,这样如果监测进程实现起来比较繁琐,因此可以将.py文件转换为.exe文件使用pyinstaller将py文件转换成.exe可执行文件,这样在windows直接执行.exe文件*
所以任务就这样了,最后还是选择了,轮询的方法,然后每次都是讲streams.txt文档清空,重新写入,当第一次这批写入之后的下一次,就重启。(因为我要重启,重启之后,那些记录的标记呀,都没有了,在博客案例里面,还指望这个标记去记录,然后方便下一次查询)
所以我就是把 记录的数量记录了一下,如果数量增加,我就重启。《——对 我已经变成这个逻辑了
okokok
烦死了,自己太垃了,这么个小功能,吭哧了四五个小时。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)