前言

用的yolov5,作者自己写的loadStream函数就是依据 streams.txt里面的rtsp流地址列表来新建线程,然后实现多路监控的。

在这里插入图片描述
大体就是这个图里面说的,我已经是为了个整体业务,去小改了这个loadStreams方法,将那些可用的rtsp流地址保存到一个新的list里面,然后再新建线程。

如何检验可用与否,就是用opencv VideoCapture.isOpened() 慢慢去试,连不上就剔除

上面这个流程,是蠢(还想着定时任务,隔一会重启一下服务器,其实就是潜意识里,想用最直接(不动脑子)的方法糊弄这个问题)。跟老师一说,老师也是说,还是改成触发器吧。拖了十天了,今天趁着还没有干别的事情,脑子还能转起来,来学一下。

技能要求

监听数据库,然后 读写最新数据库信息到文件,然后利用文件重新启动服务器。

准备工作

  1. pycharm新建一个python文件
  2. nacicat 打开自己的数据库
  3. 找一篇 python监听数据库表变化的博客

再去找博客的时候,(我其实之前是找了一些了的),但大多都是对数据库进行轮询,也就是和我上面的定时任务差不多了。可我想要的是触发器,只有当数据库改变的时候,才会执行,这样才叫触发嘛,一直盯着,那叫监控。

然后就看到了这条
在这里插入图片描述
确实,那思路就从监听数据库 转变为 用mysql的触发器然后生成的日志文件(只生成rtsp地址那一列变动的日志),然后再用python的watchdog(这个是轮子Python 监控文件增加、修改、删除等变化)监听这个日志文件。 这样就不用监听数据库了,变成了监听文件。(主要是同事的数据库连起来很慢,所以能不监听就不监听。)
另外一个想法,想着能不能通过mysql的触发器,直接调用python服务重启的脚本。这就牵扯到,这个脚本是不是也要放到同事数据库所在的那台机子上。如果是我这个想法的话,那我上面需要监听的那个日志文件,肯定也是在数据库的本机地址上生成的呀,可是我的服务器是部署在autodl的docker里面的,这两个又要怎么互通呢?
好麻烦,要不就 轮询数据库得了。

后期要是这个东西真的要上线的话,那数据库肯定也是在服务器上的吧。mysql部署在docker那就是说日志文件和我项目的代码是在一个地方的。
如果是一个地方的话,那自己访问自己肯定很快的呀,轮询是可以的;或者然后设定触发器,监控触发器生成的文件也是可以的;或者触发器触发然后调用python重启脚本也是可以的。

那暂时不知道未来是什么情况,就把上述可行的路子都看一下。

轮询

写脚本 轮询 写的挺好,罗列了两种需要监听的类型——新增和改变
我业务里面也主要就是处理新增的那些数据,如何将它们加入到streams.txt,所以我就可以在轮询的回调方法那里写“将数据写入streams.txt的文件操作”。
我把他这个代码排版好了之后,就是如下:

import os
import sys
import time
import threading
import pickle
import pymysql


class BaseListener(object):
    # 使用一个线程启动监听
    def __init__(self):
        self.checkpoint = 0
        self.stop_flag = True
        self.listen_thread = threading.Thread(name="Listener", target=self.do_listen)
        self.listen_thread.start()

    def start(self):
        self.stop_flag = False


    def stop(self):
        self.stop_flag = True


    def set_checkpoint(self, v):
        # 设置监听的断点,如果需要可以持久存储在磁盘上
        self.checkpoint = v


    def get_checkpoint(self):
        return self.checkpoint


    def do_listen(self):
        filename = 'utils/streams.txt'
        with open(filename, 'w') as f:
            f.write('')
        cnt=0
        while True:
            if not self.stop_flag:
                conn = pymysql.connect(host="localhost", user="root", password="123456", charset='utf8')
                # 监听用sql语句,应当以id倒排,需要使用 WHERE id > {CHECK_POINT}进行筛选,如
                # sql = "SELECT * FROM a WHERE id>{CHECK_POINT} ORDER BY id DESC"
                cursor = conn.cursor()
                sql = "SELECT id,monitor_name FROM `fall`.`monitor` WHERE id>{CHECK_POINT}"
                checkpoint = self.get_checkpoint()
                sql_listen = sql.replace("{CHECK_POINT}", str(checkpoint))

                # fetchall为读取全部记录的语句
                cursor.execute(sql_listen)
                # 获取所有记录列表
                results = cursor.fetchall()
                #记录最后一个游标
                rec_id =""
                if(len(results)!=0):
                    cnt += len(results)
                    rec_id = self.callback(results,cnt)
                    self.set_checkpoint(rec_id)
                # 关闭数据库连接
                cursor.close()
                conn.close()
                # 根据情况设置轮询时间
                time.sleep(1)

    def callback(self,data):
        filename = 'utils/streams.txt'
        # 这是do_listen调用的一个回调函数,把数据传过来处理,在子类中实现
        with open(filename, 'a+') as f:
            f.write(data)


class BaseMonitor(object):
    """
    监听数据变化的基类
    """
    def __init__(self):
        self.prev_data = None
        self.stop_flag = True
        self.monitor_thread = threading.Thread(name="Monitor", target=self.do_monitor)
        self.monitor_thread.start()
    def start(self):
        self.stop_flag = False
    def stop(self):
        self.stop_flag = True
    def do_monitor(self):
        while True:
            if not self.stop_flag:
                self.execute(self.extra_sql)
                data = self.fetchall(self.base_sql)
            if data:
                str_data = pickle.dumps(data)
            if str_data != self.prev_data:
                self.callback(data)
                self.prev_data = str_data
    def callback(self, dictdata):
        # 这是do_monitor调用的一个回调函数,把数据传过来处理,在子类中实现
        print
        "Should be implemented in subclasses!"
class Listener(BaseListener):
    def callback(self, data,cnt):
        filename = 'utils/streams.txt'
        if(cnt!=len(data)):
            p = sys.executable
            os.execl(p, p, *sys.argv)

        rec_id=""
        # 这是do_listen调用的一个回调函数,把数据传过来处理,在子类中实现
        with open(filename, 'a+') as f:
            for i, row in enumerate(data):
                ip = row[1]
                if(i==0):
                    f.write(ip)
                else:
                    f.write('\n'+ip)
                rec_id = row[0]
        return rec_id

class MonitorTest(BaseMonitor):
    def callback(self, dictdata):
        print
        "Monitor:",dictdata

if __name__ == "__main__":
    ad = Listener()
    ad.start()

其余两个 有机会再写吧
监控日志 触发器调用脚本
然后这个重启功能,在这篇博客表头有提到,所以在pycharm上面无法成功演示,但在服务器linux上应该就ok了。

windows下任务管理器中运行在pycharm或者其他ide下的python应用程序,我们知道此时pycharm是进程,而运行的.py文件是线程功能,这样如果监测进程实现起来比较繁琐,因此可以将.py文件转换为.exe文件使用pyinstaller将py文件转换成.exe可执行文件,这样在windows直接执行.exe文件*

所以任务就这样了,最后还是选择了,轮询的方法,然后每次都是讲streams.txt文档清空,重新写入,当第一次这批写入之后的下一次,就重启。(因为我要重启,重启之后,那些记录的标记呀,都没有了,在博客案例里面,还指望这个标记去记录,然后方便下一次查询)
所以我就是把 记录的数量记录了一下,如果数量增加,我就重启。《——对 我已经变成这个逻辑了
okokok
烦死了,自己太垃了,这么个小功能,吭哧了四五个小时。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐