yma16-logo

⭐前言

大家好,我是yma16,本文分享python_uiautoanimation实现自动化微信聊天。

uiautoanimation简介

uiautomation封装了微软UIAutomation API,支持自动化Win32,MFC,WPF,Modern UI(Metro UI), Qt, IE, Firefox(version<=56 or >=60

开源文档:https://github.com/yinkaisheng/Python-UIAutomation-for-Windows?tab=readme-ov-file

python系列文章
python爬虫_基本数据类型
python爬虫_函数的使用
python爬虫_requests的使用
python爬虫_selenuim可视化质量分
python爬虫_django+vue3可视化csdn用户质量分
python爬虫_正则表达式获取天气预报并用echarts折线图显示
python爬虫_requests获取bilibili锻刀村系列的字幕并用分词划分可视化词云图展示

⭐微软inspect工具定位元素

💖工具查找属性

官方地址:https://learn.microsoft.com/zh-cn/windows/win32/winauto/inspect-objects
打开inspect点击元素即可查找属性,这里我查找微信小程序平台的窗口
在这里插入图片描述

⭐查找微信窗口

💖命令行查找运行窗口

pip 安装 automation 在scripts目录下运行automation.py

automation.py -t 0
# 打印当前激活窗口的所有控件
automation.py -r -d 1 -t 0
# 打印桌面(树的根控件 )和它的第一层子窗口(TopLevel顶层窗口)

⭐查找微信的聊天窗口

首先查找微信主窗口

import uiautomation as auto
if __name__=='__main__':
    global wechatWindow
    wechatWindow = auto.WindowControl(searchDepth=1, Name="微信", ClassName='WeChatMainWndForPC')
    print('wechatWindow',wechatWindow)

查找窗口
在这里插入图片描述

⭐封装发送消息

查找输入框
在这里插入图片描述

def sendMsg(wechatWindow,msg):
    # 群聊
    edit = wechatWindow.EditControl(Name='微信小程序平台')
    prefix=""
    edit.SendKeys(prefix+msg)
    sendButton = wechatWindow.ButtonControl(Name='发送(S)')
    sendButton.Click()

⭐定时查询消息

定位消息元素
定时查找消息
遇到@回复消息

import uiautomation as auto
import json,time
import requests,json

API_KEY = "API_KEY "
SECRET_KEY = "SECRET_KEY "

def receive_commend(url, message):
    payload = json.dumps({
        "messages": [
            {
                "role": "user",
                "content": message
            }
        ]
    })
    headers = {
        'Content-Type': 'application/json'
    }
    response = requests.request("POST", url, headers=headers, data=payload)
    if response.status_code == 200:
        return response.json()['result']
    else:
        return response.text

def get_access_token():
    url = "https://aip.baidubce.com/oauth/2.0/token"
    params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY}
    return str(requests.post(url, params=params).json().get("access_token"))

def ai_answer(question):
    url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions?access_token=" + get_access_token()
    return receive_commend(url, question)


def sendMsg(wechatWindow,msg):
    # 群聊
    edit = wechatWindow.EditControl(ProcessId='微信小程序平台')
    prefix=""
    edit.SendKeys(prefix+msg)
    sendButton = wechatWindow.ButtonControl(Name='发送(S)')
    sendButton.Click()

def writeJson(data,name):
    with open("./db/"+name, 'w', encoding='utf-8') as write_f:
        write_f.write(json.dumps(data, indent=4, ensure_ascii=False))

def saveOldChatJson(data):
    writeJson(data,'oldChat.json')

def saveNewChatJson(data):
    writeJson(data,'newChat.json')


def openNewJson():
    with open("./db/newChat.json", 'r', encoding='utf-8') as open_f:
        content = open_f.read()
        readJson = json.loads(content)
        return readJson

def openOldJson():
    with open("./db/oldChat.json", 'r', encoding='utf-8') as open_f:
        content = open_f.read()
        readJson = json.loads(content)
        return readJson


def getMsg(wechatWindow):
    oldJson=openOldJson()
    newChatFileJson=openNewJson()
    wechatWindow.SetFocus()
    messages = wechatWindow.ListControl(Name='消息')
    replyRes = []
    chatRes=[]
    # 备份
    newJson=oldJson
    markFont = '@robot\u2005'
    sendTime='empty_time'
    isCall = '否'
    isFinish = '否'
    # 新的聊天记录
    newChatJson={}
    print('messages',messages)
    map_index=0
    max_len=len(messages.GetChildren())
    for i in range(1):
        if max_len <=0:
            break
        message=messages.GetChildren()[max_len-1]
        #     跳过

        print('message',message)
        content = message.Name
        print('content',content)

        if content in ["查看更多", "以下为新消息","以下是新消息"]:
            print('匹配新消息')
            continue
        details = message.GetChildren()[0].GetChildren()
        if len(details) == 0:
            sendTime = content
            continue
        name=''
        detail=''
        me=''
        print('details', details)
        nickname, detail, me = details
        name = nickname.Name

        print('nickname, detail, me',nickname, detail, me)
        if me.Name:
            name = me.Name
        if name == markFont:
            continue
        if not (content == "[图片]" or content.startswith("[语音]")):
            details = detail.GetChildren()
            if len(details) == 0:
                continue
            detail = details[-1].GetChildren()[0].GetChildren()[0].GetChildren()[0]
            details = detail.GetChildren()

            if len(details) != 0:
                link_title = details[0].Name
                link_content = details[1].Name
                content += link_title+'/n'+link_content
        recieveMsg=content.strip()
        # 本人 聊天跳过(出现在newChat.json则跳过)
        if  sendTime in newChatFileJson.keys() \
            or isinstance(sendTime,str) \
                and sendTime in oldJson.keys() \
                and'isFinish' in oldJson[sendTime].keys() \
                and oldJson[sendTime]['isFinish'] == '是'\
                and oldJson[sendTime]['content']==recieveMsg:
            print('time empty___',oldJson[sendTime])
            continue
        elif(markFont in recieveMsg and name != markFont ):
            isCall='是'
            askQuestion=recieveMsg.replace(markFont,'')
            print(askQuestion)
            if askQuestion:
                prefix='@'+name+'\u2005关于“'+askQuestion+'”的回复: '
                sendMsg(wechatWindow,prefix+ai_answer(askQuestion))
                isFinish='是'
                # 写入数据
                replyRes.append([sendTime, name, recieveMsg, isCall, isFinish])
        #       新的聊天记录
        chatRes.append([sendTime, name, recieveMsg])


    # 替换 旧消息
    for item in replyRes:
        timeItem=item[0]
        if isinstance(timeItem,str):
            # 已回复的消息
            newJson[str(timeItem)]={
                "name":item[1],
                "content":item[2],
                "isFinish":item[4]
            }
    # 替换
    for item in chatRes:
        timeItem=item[0]
        if isinstance(timeItem,str):
            # 新的消息回复
            newChatJson[str(timeItem)]={
                "name":item[1],
                "content":item[2]
            }
    # 已回复的记录
    saveOldChatJson(newJson)
    # 新的消息回复
    saveNewChatJson(newChatJson)
    re_run()

def re_run():
    time.sleep(3)
    global wechatWindow
    getMsg(wechatWindow)

if __name__=='__main__':
    global wechatWindow
    wechatWindow = auto.WindowControl(searchDepth=1, Name="微信", ClassName='WeChatMainWndForPC')
    print('wechatWindow',wechatWindow)
    getMsg(wechatWindow)



定时轮询查找消息调用chatgpt回复
在这里插入图片描述

⭐结束

本文分享到这结束,如有错误或者不足之处欢迎指出!
在这里插入图片描述

👍 点赞,是我创作的动力!
⭐️ 收藏,是我努力的方向!
✏️ 评论,是我进步的财富!
💖 感谢你的阅读!

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐