scrcpy-client

        python中有一个scrcpy-client库,可以实现Android设备的实时投屏和操控。它和scrcpy实现Android投屏是一样的,都是把一个scrcpy-server.jar文件通过adb推送到Android设备,并利用adb指令执行scrcpy-server.jar开启投屏和操控服务端,电脑端通过python创建客户端来接收视频流数据和发送控制流数据。视频流数据中就是Android实时屏幕数据,控制流数据就是我们在电脑端对Android设备做的操控动作。在scrcpy-client库中作者提供了一个使用PySide6搭建的投屏控制UI界面,可以完成单台Android设备的投屏控制,我们可以自行制作投屏控制界面,完成多台Android设备的投屏控制。

安装指令:pip3 install scrcpy-client

直接使用

        安装好scrcpy-client库后,我们可以通过直接使用作者提供的ui界面来投屏Android设备。

import scrcpy_ui


scrcpy_ui.main()

确保我们的电脑上通过USB连接了一台Android设备,并且Android设备打开了USB调试功能,已允许电脑调式。这时我们就可以通过执行上面的代码,得到Android设备的投屏UI界面,如下图所示:

在这个界面中我们可以使用鼠标在投屏界面点击、滑动,控制Android设备的屏幕。可以通过设备序列号下拉框切换设备,Flip勾选后可以得到镜像屏幕。下方的HOME按钮点击后回到主屏幕,相当于按设备的home键。BACK按钮点击后会返回上一个界面,相当于按设备上的back键。还支持键盘输入,我们可以通过电脑的键盘让Android产生按键事件。

自定义使用

        如果你觉得作者提供的UI界面不能满足你的需求,我们还可以自定义UI界面来实现更多的操作方式。前提是你要会使用PySide6这种UI框架,你必须知道如何在UI界面中使用动态元素,如何实现鼠标点击、移动事件,鼠标滚轮滚动事件,键盘输入事件。如果你还不会使用UI界面相关的框架,可以先去学习一下。如果你会UI相关的框架,就接着往下看。

创建投屏服务

        使用scrcpy中的Client类建立投屏控制服务,Client类中的实例化方法如下:

class Client:
    def __init__(
        self,
        device: Optional[Union[AdbDevice, str, any]] = None,
        max_width: int = 0,
        bitrate: int = 8000000,
        max_fps: int = 0,
        flip: bool = False,
        block_frame: bool = False,
        stay_awake: bool = False,
        lock_screen_orientation: int = LOCK_SCREEN_ORIENTATION_UNLOCKED,
        connection_timeout: int = 3000,
        encoder_name: Optional[str] = None,
    ):

device:Android设备的设备序列号(使用adb devices指令可以查看到)。

max_width:图像帧的最大宽度,默认使用Android广播信息中的帧宽度。

bitrate:比特率,默认8000000比特。

max_fps:最大帧数,默认不限制帧数。

flip:翻转图像(镜像图像),默认不镜像。

block_frame:返回非空帧,默认不返回,返回非空帧可能会阻塞openCv2的渲染线程。

stay_awake:连接USB时Android设备屏幕保持常亮,默认不保持常亮。

lock_screen_orientation:锁定屏幕方向(禁止自动旋转屏幕),默认不锁定。

connection_timeout:连接投屏控制服务(socket服务)超时时间,设定时间内未能成功连接则初始化失败,默认3000毫秒。

encoder_name:编码器名称,可选OMX.google.h264.encoder、OMX.qcom.video.encoder.avc、 c2.qti.avc.encoder、c2.android.avc.encoder,默认自动选择。

        我们通过实例化Client类来得到一个Android设备的投屏控制服务对象,假如Android设备的序列号为123456789,我们可以通过如下代码创建投屏控制服务实例对象:

import scrcpy


server = scrcpy.Client(device='123456789', bitrate=100000000)

start方法

        start是Client的实例方法用于启动投屏控制服务,start可以接收两个参数,一个是threaded,默认为False,为True时表示在子线程中开启投屏控制服务;另一个是daemon_threaded,默认为False,为True时表示给子线程开启进程守护。threaded和daemon_threaded有一个为True时,都会在子线程中开启投屏控制服务,都为False时表示在主线程中开启投屏控制服务。要同时开启多个投屏控制服务时,就需要在子线程中开启投屏控制服务,自己创建子线程也是可以的。

server.start()

add_listener方法

        add_listener是Client的实例方法用于设置监听字典listeners,listeners字典中有两个元素,第一个元素的键为frame表示图像帧元素,第二个元素为init表示初始化元素。两个元素的值都是一个空列表,用来存放函数的。如果想把Android设备的屏幕图像放到UI界面的某个元素上,就需要在UI框架中写一个能接收图像、显示图像的方法,再把这个方法添加到listeners字典的第一个元素列表中。如果想在建立投屏控制服务时做一些操作,就在UI框架中写一个操作相关的方法,在把这个方法放到listeners字典的第二个元素列表中。

    def on_frame(self, frame):  # 在使用PySide6的UI框架中定义了一个用于显示图像的方法
        app.processEvents()
        if frame is not None:
            ratio = self.max_width / max(self.client.resolution)
            image = QImage(
                frame,
                frame.shape[1],
                frame.shape[0],
                frame.shape[1] * 3,
                QImage.Format_BGR888,
            )
            pix = QPixmap(image)  # 处理图像
            pix.setDevicePixelRatio(1 / ratio)  # 设置图像大小
            self.ui.label.setPixmap(pix)  # 在UI界面显示图像
            self.resize(1, 1)

在初始化UI界面时把显示图像的方法加入到listener字典的第一个元素中(key='frame')。

    def __init__(self):
        super().__init()
        self.server = scrcpy.Client(device='123456789', bitrate=100000000)
        self.server.add_listener(scrcpy.EVENT_FRAME, self.on_frame)

这里只是举例,实际上的方法需要根据你的需求自己写。如果想要把Android设备的图像展示在UI界面的某个元素上,就必须写一个展示图像的方法,再把这个方法添加到listener字典的第一个元素中(key='frame')。

remove_listener方法

        remove_listener方法是Client的实例方法用于移除监听字典listeners中的某个方法,如果我们想不再显示图像时,可以把显示图像的方法从listeners中移除掉。

    def no_display(self):
        self.server.remove_listener(scrcpy.EVENT_FRAME, self.on_frame)

stop方法

        stop方法是Client的实例方法用于结束投屏控制服务,在我们关闭投屏UI界面时需要结束掉投屏控制服务,及时释放内存资源。

    def closeEvent(self, _):
        self.server.stop()

控制方法

        我们已经把Android设备的屏幕投射到了电脑上,现在就需要通过一些控制Android设备的方法来操作Android设备。Client的实例属性中有一个control属性,是通过实例化ControlSender类来得到的,ControlSender类就是专门用来控制Android设备的操作类。

    self.control = ControlSender(self)

所以我们想要控制Android就需要通过投屏控制对象的control属性。

keycode方法

    @inject(const.TYPE_INJECT_KEYCODE)
    def keycode(
        self, keycode: int, action: int = const.ACTION_DOWN, repeat: int = 0
    ) -> bytes:

        keycode方法是ControlSender类的实例方法用于向Android设备发送按键事件。keycode方法可以接收3个参数,第一个参数keycode表示键值(你需要了解adb键值);第二个参数action表示按下还是抬起,默认是按下;第三个参数repeat表示重复操作次数,想重复按几次。

    def click_home(self):
        self.server.control.keycode(scrcpy.KEYCODE_HOME, scrcpy.ACTION_DOWN)
        self.server.control.keycode(scrcpy.KEYCODE_HOME, scrcpy.ACTION_UP)

点击home键,先按下再抬起,完成一次按键。

text方法

    @inject(const.TYPE_INJECT_TEXT)
    def text(self, text: str) -> bytes:

        text方法是ControlSender类的实例方法用于向Android中输入文本,前提是Android设备中的某个输入框被激活了。text方法接收一个参数,就是我们要在Android设备中输入的文本内容。

    def input_text(self, text):
        self.server.control.text(text)

touch方法

    def touch(
        self, x: int, y: int, action: int = const.ACTION_DOWN, touch_id: int = -1
    ) -> bytes:

        touch方法是ControlSender类的实例方法用于Android设备屏幕的多点触控。touch方法可以接收4个参数,前两个参数为触点的x坐标和y坐标;第三个参数action为按下、移动、抬起;第四个参数为触控事件id,默认为-1,你可以设置不同的id来同时执行多个触控事件,达到多点触控的目的。

    def mouse_move(self, evt: QMouseEvent):
        focused_widget = QApplication.focusWidget()
        if focused_widget is not None:
            focused_widget.clearFocus()
        ratio = self.max_width / max(self.one_client.resolution)
        self.server.control.touch(evt.position().x() / ratio, evt.position().y() / ratio, scrcpy.ACTION_MOVE)

scroll方法

    @inject(const.TYPE_INJECT_SCROLL_EVENT)
    def scroll(self, x: int, y: int, h: int, v: int) -> bytes:

        scroll方法是ControlSender类的实例方法用于Android设备屏幕的滚动事件。scroll方法可以接收4个参数,前两个为滚动点的坐标位置;第三个参数为水平滚动距离;第四个参数为垂直滚动距离。

    def on_wheel(self):
        """鼠标滚轮滚动事件"""

        def wheel(evt: QWheelEvent):
            ratio = self.max_width / max(self.one_client.resolution)
            position_x = evt.position().x() / ratio
            position_y = evt.position().y() / ratio
            angle_x = evt.angleDelta().x()
            angle_y = evt.angleDelta().y()
            if angle_y > 0:
                angle_y = 1
            else:
                angle_y = -1
            self.server.control.scroll(position_x, position_y, angle_x, angle_y)

        return wheel

写出这个方法后,我们就可以使用鼠标滚轮来控制Android设备的屏幕上下滚动了。

back_or_turn_screen_on方法

    @inject(const.TYPE_BACK_OR_SCREEN_ON)
    def back_or_turn_screen_on(self, action: int = const.ACTION_DOWN) -> bytes:

        back_or_turn_screen_on方法是ControlSender类的实例方法用于按返回键,并且如果屏幕关闭了还会唤醒屏幕。只接收一个参数action为按下或抬起。

    def click_back(self):
        self.server.control.back_or_turn_screen_on(scrcpy.ACTION_DOWN)
        self.server.control.back_or_turn_screen_on(scrcpy.ACTION_UP)

expand_notification_panel方法

    @inject(const.TYPE_EXPAND_NOTIFICATION_PANEL)
    def expand_notification_panel(self) -> bytes:

        expand_notification_panel方法是ControlSender类的实例方法用于打开Android设备的下拉通知栏。

    def open_notification(self):
        self.server.control.expand_notification_panel()

expand_settings_panel方法

    @inject(const.TYPE_EXPAND_SETTINGS_PANEL)
    def expand_settings_panel(self) -> bytes:

        expand_settings_panel方法是ControlSender类的实例方法用于打开Android设备的下拉菜单栏。

    def open_settings(self):
        self.server.control.expand_settings_panel()

collapse_panels方法

    @inject(const.TYPE_COLLAPSE_PANELS)
    def collapse_panels(self) -> bytes:

        collapse_panels方法是ControlSender类的实例方法用于收起Android设备的下拉通知栏或菜单栏。

    def close_panel(self):
        self.server.control.collapse_panelsl()

get_clipboard方法

    def get_clipboard(self) -> str:

        get_clipboard方法是ControlSender类的实例方法用于获取Android设备粘贴板中的内容。在Android设备上复制的文本,我们可以通过这个方法把文本获取出来。

    def get_android_clipboard(self):
        return self.server.control.get_clipboard()

set_clipboard方法

    @inject(const.TYPE_SET_CLIPBOARD)
    def set_clipboard(self, text: str, paste: bool = False) -> bytes:

        set_clipboard方法是ControlSender类的实例方法用于设置Android设备粘贴板中的内容。set_clipboard方法可以接收两个参数,第一个参数text为要设置到粘贴板中的文本内容;第二个参数paste为粘贴状态,默认为False,当为True时会立即把文本粘贴到输入框中(Android设备的光标在某个输入框中时)。

    def set_android_clipboard(self, text: str, paste=False):
        self.server.control.set_clipboard(text, paste)

set_screen_power_mode方法

    @inject(const.TYPE_SET_SCREEN_POWER_MODE)
    def set_screen_power_mode(self, mode: int = scrcpy.POWER_MODE_NORMAL) -> bytes:

        set_screen_power_mode方法是ControlSender类的实例方法用于Android设备的屏幕电源模式。默认为正常状态表示开启Android设备的屏幕电源,此时Android设备的屏幕为正常状态。还可以设置为关闭状态(scrcpy.POWER_MODE_OFF),此时Android设备的屏幕为关闭状态,但并不是灭屏状态(屏幕电源关了和灭屏是两回事),投屏界面还是能看到屏幕。通过这种方式可以在投屏操控Android设备时减少Android设备的电源消耗。

    def set_screen_power_mode(self, mode=2):
        self.server.control.set_screen_power_mode(mode)

totate_device方法

    @inject(const.TYPE_ROTATE_DEVICE)
    def rotate_device(self) -> bytes:

        totate_device方法是ControlSender类的实例方法用于旋转Android设备的屏幕。

    def totate_screen(self):
        self.server.control.totate_device()

swipe方法

    def swipe(
        self,
        start_x: int,
        start_y: int,
        end_x: int,
        end_y: int,
        move_step_length: int = 5,
        move_steps_delay: float = 0.005,
    ) -> None:

        swipe方法是ControlSender类的实例方法用于滑动Android设备的屏幕。这个方法是对touch方法的封装,相当于一点触控。swipe方法可以接收6个参数,前4个参数为滑动的起始坐标和终止坐标;第5个参数为步长(每次滑动的距离),默认为5个坐标单位;第6个参数为每滑动一步停顿的时间,默认0.005秒。

    def swipe_event(self, start_x: int, start_y: int, end_x: int, end_y: int, step: int, delay: float):
        self.server.control.swipe(start_x, start_y, end_x, end_y, step, delay)

结语

        我们通过在python的UI框架中使用上面这些方法,就能实现Android设备的投屏控制了,这个投屏控制的应用要做成什么样子完全由你自己的需求和审美来决定。如果你想同时操作多台Android可以创建多个投屏控制服务,然后把这些服务放到一个列表或字典中(最好是字典),来实现控制设备的切换,达到单独控制某台设备或同时操作多台设备的目的。

模型(示例)

        我看评论区都想要代码,这里就为大家提供了一个用于参考的模型。可以同时操控多台设备,也可以选择性的操作某台设备。

# -*- coding: utf-8 -*-

import sys
import threading
import scrcpy
from PySide6.QtGui import QMouseEvent, QImage, QPixmap, QKeyEvent
from adbutils import adb
from PySide6.QtCore import *
from PySide6.QtWidgets import QApplication, QWidget, QPushButton, QHBoxLayout, QVBoxLayout, \
    QCheckBox, QLabel, QGridLayout, QSpacerItem, QSizePolicy
from numpy import ndarray

# 创建QApplication对象
if not QApplication.instance():
    app = QApplication([])
else:
    app = QApplication.instance()


items = [i.serial for i in adb.device_list()]  # 设备列表
client_dict = {}  # 设备scrcpy客服端字典
# 为所有设备建立scrcpy服务
for i in items:
    client_dict[i] = scrcpy.Client(device=i, bitrate=1000000000)


def thread_ui(func, *args):
    """
    开启一个新线程任务\n
    :param func: 要执行的线程函数;
    :param args: 函数中需要传入的参数 Any
    :return:
    """
    t = threading.Thread(target=func, args=args)  # 定义新线程
    t.setDaemon(True)  # 开启线程守护
    t.start()  # 执行线程


class SignThread(QThread):
    """信号线程"""

    def __new__(cls, parent: QWidget, func, *types: type):
        cls.__update_date = Signal(*types, name=func.__name__)  # 定义信号(*types)一个信号中可以有一个或多个类型的数据(int,str,list,...)
        return super().__new__(cls)  # 使用父类__new__方法创建SignThread实例对象

    def __init__(self, parent: QWidget, func, *types: type):
        """
        信号线程初始化\n
        :param parent: 界面UI控件
        :param func: 信号要绑定的方法
        :param types: 信号类型,可以是一个或多个(type,...)
        """
        super().__init__(parent)  # 初始化父类
        self.__update_date.connect(func)  # 绑定信号与方法

    def send_sign(self, *args):
        """
        使用SignThread发送信号\n
        :param args: 信号的内容
        :return:
        """
        self.__update_date.emit(*args)  # 发送信号元组(type,...)


class MyWindow(QWidget):
    """UI界面"""

    def __init__(self):
        """UI界面初始化"""
        super().__init__()  # 初始化父级
        self.setWindowTitle('多台手机投屏控制示例(python & scrcpy)')  # 设置窗口标题
        self.max_width = 600  # 设置手机投屏宽度
        # 定义元素
        self.check_box = QCheckBox("控制所有设备")  # 定义是否控制所有设备选择框
        self.back_button = QPushButton("BACK")  # 定义返回键
        self.home_button = QPushButton("HOME")  # 定义home键
        self.recent_button = QPushButton("RECENT")  # 定义最近任务键
        self.video = QLabel("设备屏幕信息加载......")  # 定义手机投屏控制标签
        self.video.setStyleSheet("border-width: 3px;border-style: solid;border-color: black;")  # 定义投屏标签样式
        self.video_list = []  # 定义手机投屏标签列表
        for i in items:
            self.video_list.append(QLabel(i))  # 把投屏标签加入列表
        self.main_layout = QHBoxLayout(self)  # 定义主布局容器
        self.frame_layout = QVBoxLayout()  # 定义投屏操控框容器
        self.button_layout = QHBoxLayout()
        self.device_layout = QVBoxLayout()  # 定义投屏容器
        self.list_layout = QGridLayout()  # 定义投屏列表布局容器
        self.spacer = QSpacerItem(20, 40, QSizePolicy.Minimum, QSizePolicy.Expanding)  # 弹性空间
        self.device_spacer = QSpacerItem(20, 40, QSizePolicy.Minimum, QSizePolicy.Expanding)  # 弹性空间
        self.v_spacer = QSpacerItem(40, 20, QSizePolicy.Expanding, QSizePolicy.Minimum)  # 弹性空间
        # 页面布局
        self.main_layout.addLayout(self.frame_layout)
        self.main_layout.addLayout(self.device_layout)
        self.main_layout.addItem(self.v_spacer)
        self.frame_layout.addWidget(self.video)
        self.frame_layout.addLayout(self.button_layout)
        self.frame_layout.addWidget(self.check_box)
        self.frame_layout.addItem(self.spacer)
        self.button_layout.addWidget(self.back_button)
        self.button_layout.addWidget(self.home_button)
        self.button_layout.addWidget(self.recent_button)
        self.device_layout.addLayout(self.list_layout)
        self.device_layout.addItem(self.device_spacer)
        # 交互事件
        self.back_button.clicked.connect(self.click_key(scrcpy.KEYCODE_BACK))
        self.home_button.clicked.connect(self.click_key(scrcpy.KEYCODE_HOME))
        self.recent_button.clicked.connect(self.click_key(scrcpy.KEYCODE_APP_SWITCH))
        self.video.mousePressEvent = self.mouse_event(scrcpy.ACTION_DOWN)
        self.video.mouseMoveEvent = self.mouse_event(scrcpy.ACTION_MOVE)
        self.video.mouseReleaseEvent = self.mouse_event(scrcpy.ACTION_UP)
        self.keyPressEvent = self.on_key_event(scrcpy.ACTION_DOWN)
        self.keyReleaseEvent = self.on_key_event(scrcpy.ACTION_UP)
        # 所有设备屏幕有序排布,最多15台设备,可按需修改
        if len(items) > 0:
            self.now_device = items[0]
            self.now_client = client_dict[items[0]]
            self.now_client.add_listener(scrcpy.EVENT_FRAME, self.main_frame)
            for num in range(len(items)):
                self.video_list[num].setStyleSheet("border-width: 3px;border-style: solid;border-color: black;")
                self.video_list[num].mousePressEvent = self.switch_video(items[num])
                client = client_dict[items[num]]
                client.add_listener(scrcpy.EVENT_FRAME, self.on_frame(num, client))
                if num < 5:
                    self.list_layout.addWidget(self.video_list[num], 0, num, 1, 1)
                elif num < 10:
                    self.list_layout.addWidget(self.video_list[num], 1, num - 5, 1, 1)
                elif num < 15:
                    self.list_layout.addWidget(self.video_list[num], 2, num - 10, 1, 1)

        self.mouse_thread = SignThread(self, self.mouse_exe, int, int, int)
        self.main_thread = SignThread(self, self.main_exe, ndarray)
        self.on_thread = SignThread(self, self.on_exe, int, int, ndarray)

    def click_key(self, key_value: int):
        """
        按键事件\n
        :param key_value: 键值
        :return:
        """

        def key_event():
            if self.check_box.isChecked():
                for i in client_dict:
                    client_dict[i].control.keycode(key_value, scrcpy.ACTION_DOWN)
                    client_dict[i].control.keycode(key_value, scrcpy.ACTION_UP)
            else:
                self.now_client.control.keycode(key_value, scrcpy.ACTION_DOWN)
                self.now_client.control.keycode(key_value, scrcpy.ACTION_UP)

        return key_event

    def switch_video(self, device: str):
        """
        切换设备屏幕为主控屏幕\n
        :param device: 设备序列号
        :return:
        """

        def now_video(evt: QMouseEvent):
            app.processEvents()
            self.now_client.remove_listener(scrcpy.EVENT_FRAME, self.main_frame)
            self.now_client = client_dict[device]
            self.now_client.add_listener(scrcpy.EVENT_FRAME, self.main_frame)
            self.now_client.control.keycode(224, scrcpy.ACTION_DOWN)
            self.now_client.control.keycode(224, scrcpy.ACTION_UP)
            bound = self.now_client.resolution
            self.now_client.control.swipe(bound[0] / 2, bound[1] / 2, bound[0] / 2, bound[1] / 2 - 20)
            self.now_client.control.swipe(bound[0] / 2, bound[1] / 2 - 20, bound[0] / 2, bound[1] / 2)
            self.now_device = device

        return now_video

    def main_frame(self, frame: ndarray):
        """
        监听设备屏幕数据,设置控制窗口图像\n
        :param frame: 图像帧
        :return:
        """
        if frame is not None:
            self.main_thread.send_sign(frame)

    def main_exe(self, frame):
        """
        主控屏幕图像设置\n
        :param frame: 图像帧
        :return:
        """
        ratio = self.max_width / max(self.now_client.resolution)
        image = QImage(
            frame,
            frame.shape[1],
            frame.shape[0],
            frame.shape[1] * 3,
            QImage.Format_BGR888,
        )
        pix = QPixmap(image)
        pix.setDevicePixelRatio(1 / ratio)
        self.video.setPixmap(pix)

    def on_frame(self, num: int, client: scrcpy.Client):
        """
        监听设备屏幕数据,设置小窗口图像\n
        :param num: 设备投屏序号
        :param client: scrcpy服务
        :return:
        """

        def client_frame(frame: ndarray):
            if frame is not None:
                self.on_thread.send_sign(num, max(client.resolution), frame)

        return client_frame

    def on_exe(self, num: int, resolution: int, frame):
        """
        小窗口图像设置\n
        :param num: 设备投屏序号
        :param resolution: 设备宽度
        :param frame: 图像帧
        :return:
        """
        ratio = 300 / resolution
        image = QImage(
            frame,
            frame.shape[1],
            frame.shape[0],
            frame.shape[1] * 3,
            QImage.Format_BGR888,
        )
        pix = QPixmap(image)
        pix.setDevicePixelRatio(1 / ratio)
        self.video_list[num].setPixmap(pix)

    def mouse_event(self, action=scrcpy.ACTION_DOWN):
        """
        鼠标事件\n
        :param action: 事件类型
        :return: 对应的事件函数
        """

        def event(evt: QMouseEvent):
            focused_widget = QApplication.focusWidget()
            if focused_widget is not None:
                focused_widget.clearFocus()
            ratio = self.max_width / max(self.now_client.resolution)
            self.mouse_thread.send_sign(evt.position().x() / ratio, evt.position().y() / ratio, action)

        return event

    def mouse_exe(self, x: int, y: int, action: int):
        """
        执行鼠标事件\n
        :param x: x坐标
        :param y: y坐标
        :param action: 事件类型
        :return:
        """
        if self.check_box.isChecked():
            for i in client_dict:
                client_dict[i].control.touch(x, y, action)
        else:
            self.now_client.control.touch(x, y, action)

    def on_key_event(self, action=scrcpy.ACTION_DOWN):
        """
        键盘按键事件\n
        :param action: 事件类型
        :return: 对应的事件函数
        """

        def handler(evt: QKeyEvent):
            code = self.key_code(evt.key())
            if code != -1:
                if self.check_box.isChecked():
                    for i in client_dict:
                        client_dict[i].control.keycode(code, action)
                else:
                    self.now_client.control.keycode(code, action)

        return handler

    @staticmethod
    def key_code(code):
        """
        Map qt keycode ti android keycode

        Args:
            code: qt keycode
            android keycode, -1 if not founded
        """
        if 48 <= code <= 57:
            return code - 48 + 7
        if 65 <= code <= 90:
            return code - 65 + 29
        if 97 <= code <= 122:
            return code - 97 + 29

        hard_code = {
            35: scrcpy.KEYCODE_POUND,
            42: scrcpy.KEYCODE_STAR,
            44: scrcpy.KEYCODE_COMMA,
            46: scrcpy.KEYCODE_PERIOD,
            32: scrcpy.KEYCODE_SPACE,
            16777219: scrcpy.KEYCODE_DEL,
            16777248: scrcpy.KEYCODE_SHIFT_LEFT,
            16777220: scrcpy.KEYCODE_ENTER,
            16777217: scrcpy.KEYCODE_TAB,
            16777249: scrcpy.KEYCODE_CTRL_LEFT,
            16777235: scrcpy.KEYCODE_DPAD_UP,
            16777237: scrcpy.KEYCODE_DPAD_DOWN,
            16777234: scrcpy.KEYCODE_DPAD_LEFT,
            16777236: scrcpy.KEYCODE_DPAD_RIGHT,
        }
        if code in hard_code:
            return hard_code[code]

        print(f"Unknown keycode: {code}")
        return -1

    def closeEvent(self, _):
        """窗口关闭事件"""
        for i in client_dict:
            client_dict[i].stop()  # 关闭scrcpy服务


def main():
    for i in client_dict:
        thread_ui(client_dict[i].start)  # 给每一台设备单独开启一个scrcpy服务线程
    widget = MyWindow()  # 实例化UI线程
    widget.resize(1200, 800)  # 设置窗口大小
    widget.show()  # 展示窗口
    sys.exit(app.exec())  # 持续刷新窗口


if __name__ == '__main__':
    main()

更新

        对于scrcpy在某些Android设备上不能建立socket服务有多种原因,有可能是Android系统版本太低,有可能是生产该设备的厂商对该设备的Android系统做了限制。如果你使用Github上最新的scrcpy可以投屏,但使用scrcpy-client不能投屏很正常。因为scrcpy-client使用的是scrcpy1.2的版本,且scrcpy-client一直没有更新,而scrcpy一直在更新。关于我们自己如何同步scrcpy和scrcpy-client,我在这里举个例子。

        首先我们因该到Github上下载最新的scrcpy源码和scrcpy应用程序,如果你有scrcpy的编译环境也可以不用下载scrcpy应用程序,直接自己编译scrcpy-server.jar包。

        https://github.com/Genymobile/scrcpy

下载后我们会得到一个scrcpy-master压缩包和scrcpy应用程序压缩包,解压应用程序压缩包得到scrcpy-server文件,解压scrcpy-master压缩包得到scrcpy源码。

        用scrcpy-server替换掉site-packages(python存放第三方函数库的文件夹,应该知道在哪吧)中scrcpy里面的scrcpy-server.jar文件(替换,scrcpy-server增加jar后缀)。

        然后我们就根据scrcpy源码来修改scrcpy-client,主要是core.py和control.py两个文件。我下载的是scrcpy2.5的源码,我们在目录scrcpy-master\app\src下找到server.c文件并使用编辑器(我用的是Notepad++)打开,如果你不懂C语言也不要害怕。找到execute_server函数如下:

static sc_pid
execute_server(struct sc_server *server,
               const struct sc_server_params *params) {
    sc_pid pid = SC_PROCESS_NONE;

    const char *serial = server->serial;
    assert(serial);

    const char *cmd[128];
    unsigned count = 0;
    cmd[count++] = sc_adb_get_executable();
    cmd[count++] = "-s";
    cmd[count++] = serial;
    cmd[count++] = "shell";
    cmd[count++] = "CLASSPATH=" SC_DEVICE_SERVER_PATH;
    cmd[count++] = "app_process";

#ifdef SERVER_DEBUGGER
# define SERVER_DEBUGGER_PORT "5005"
    cmd[count++] =
# ifdef SERVER_DEBUGGER_METHOD_NEW
        /* Android 9 and above */
        "-XjdwpProvider:internal -XjdwpOptions:transport=dt_socket,suspend=y,"
        "server=y,address="
# else
        /* Android 8 and below */
        "-agentlib:jdwp=transport=dt_socket,suspend=y,server=y,address="
# endif
            SERVER_DEBUGGER_PORT;
#endif
    cmd[count++] = "/"; // unused
    cmd[count++] = "com.genymobile.scrcpy.Server";
    cmd[count++] = SCRCPY_VERSION;

    unsigned dyn_idx = count; // from there, the strings are allocated
#define ADD_PARAM(fmt, ...) do { \
        char *p; \
        if (asprintf(&p, fmt, ## __VA_ARGS__) == -1) { \
            goto end; \
        } \
        cmd[count++] = p; \
    } while(0)

    ADD_PARAM("scid=%08x", params->scid);
    ADD_PARAM("log_level=%s", log_level_to_server_string(params->log_level));

    if (!params->video) {
        ADD_PARAM("video=false");
    }
    if (params->video_bit_rate) {
        ADD_PARAM("video_bit_rate=%" PRIu32, params->video_bit_rate);
    }
    if (!params->audio) {
        ADD_PARAM("audio=false");
    }
    if (params->audio_bit_rate) {
        ADD_PARAM("audio_bit_rate=%" PRIu32, params->audio_bit_rate);
    }
    if (params->video_codec != SC_CODEC_H264) {
        ADD_PARAM("video_codec=%s",
                  sc_server_get_codec_name(params->video_codec));
    }
    if (params->audio_codec != SC_CODEC_OPUS) {
        ADD_PARAM("audio_codec=%s",
            sc_server_get_codec_name(params->audio_codec));
    }
    if (params->video_source != SC_VIDEO_SOURCE_DISPLAY) {
        assert(params->video_source == SC_VIDEO_SOURCE_CAMERA);
        ADD_PARAM("video_source=camera");
    }
    if (params->audio_source == SC_AUDIO_SOURCE_MIC) {
        ADD_PARAM("audio_source=mic");
    }
    if (params->max_size) {
        ADD_PARAM("max_size=%" PRIu16, params->max_size);
    }
    if (params->max_fps) {
        ADD_PARAM("max_fps=%" PRIu16, params->max_fps);
    }
    if (params->lock_video_orientation != SC_LOCK_VIDEO_ORIENTATION_UNLOCKED) {
        ADD_PARAM("lock_video_orientation=%" PRIi8,
                  params->lock_video_orientation);
    }
    if (server->tunnel.forward) {
        ADD_PARAM("tunnel_forward=true");
    }
    if (params->crop) {
        ADD_PARAM("crop=%s", params->crop);
    }
    if (!params->control) {
        // By default, control is true
        ADD_PARAM("control=false");
    }
    if (params->display_id) {
        ADD_PARAM("display_id=%" PRIu32, params->display_id);
    }
    if (params->camera_id) {
        ADD_PARAM("camera_id=%s", params->camera_id);
    }
    if (params->camera_size) {
        ADD_PARAM("camera_size=%s", params->camera_size);
    }
    if (params->camera_facing != SC_CAMERA_FACING_ANY) {
        ADD_PARAM("camera_facing=%s",
            sc_server_get_camera_facing_name(params->camera_facing));
    }
    if (params->camera_ar) {
        ADD_PARAM("camera_ar=%s", params->camera_ar);
    }
    if (params->camera_fps) {
        ADD_PARAM("camera_fps=%" PRIu16, params->camera_fps);
    }
    if (params->camera_high_speed) {
        ADD_PARAM("camera_high_speed=true");
    }
    if (params->show_touches) {
        ADD_PARAM("show_touches=true");
    }
    if (params->stay_awake) {
        ADD_PARAM("stay_awake=true");
    }
    if (params->video_codec_options) {
        ADD_PARAM("video_codec_options=%s", params->video_codec_options);
    }
    if (params->audio_codec_options) {
        ADD_PARAM("audio_codec_options=%s", params->audio_codec_options);
    }
    if (params->video_encoder) {
        ADD_PARAM("video_encoder=%s", params->video_encoder);
    }
    if (params->audio_encoder) {
        ADD_PARAM("audio_encoder=%s", params->audio_encoder);
    }
    if (params->power_off_on_close) {
        ADD_PARAM("power_off_on_close=true");
    }
    if (!params->clipboard_autosync) {
        // By default, clipboard_autosync is true
        ADD_PARAM("clipboard_autosync=false");
    }
    if (!params->downsize_on_error) {
        // By default, downsize_on_error is true
        ADD_PARAM("downsize_on_error=false");
    }
    if (!params->cleanup) {
        // By default, cleanup is true
        ADD_PARAM("cleanup=false");
    }
    if (!params->power_on) {
        // By default, power_on is true
        ADD_PARAM("power_on=false");
    }
    if (params->list & SC_OPTION_LIST_ENCODERS) {
        ADD_PARAM("list_encoders=true");
    }
    if (params->list & SC_OPTION_LIST_DISPLAYS) {
        ADD_PARAM("list_displays=true");
    }
    if (params->list & SC_OPTION_LIST_CAMERAS) {
        ADD_PARAM("list_cameras=true");
    }
    if (params->list & SC_OPTION_LIST_CAMERA_SIZES) {
        ADD_PARAM("list_camera_sizes=true");
    }

#undef ADD_PARAM

    cmd[count++] = NULL;

#ifdef SERVER_DEBUGGER
    LOGI("Server debugger waiting for a client on device port "
         SERVER_DEBUGGER_PORT "...");
    // From the computer, run
    //     adb forward tcp:5005 tcp:5005
    // Then, from Android Studio: Run > Debug > Edit configurations...
    // On the left, click on '+', "Remote", with:
    //     Host: localhost
    //     Port: 5005
    // Then click on "Debug"
#endif
    // Inherit both stdout and stderr (all server logs are printed to stdout)
    pid = sc_adb_execute(cmd, 0);

end:
    for (unsigned i = dyn_idx; i < count; ++i) {
        free((char *) cmd[i]);
    }

    return pid;
}

其中cmd是一个字符串列表,用来保存adb指令;ADD_PARAM是一个宏(预编译的函数),其作用就是往cmd中添加元素。execute_server的逻辑也很简单,主要就是根据一些条件来选择性的执行adb指令。这个adb指令就是用来在Android设备上建立scrcpy服务的。所以我们现在要找到scrcpy-client中用来在Android设备上建立scrcpy服务的函数。

        site-packages(python存放第三方函数库的文件夹,应该知道在哪吧)中scrcpy里面的core.py用编辑器(我用的是Pycharm)打开,也可以使用Ctrl+鼠标左键的方式直接打开core.py(编辑器中跳转到函数的方式,不用多说)。在core.py中找到__deploy_server函数如下:

    def __deploy_server(self) -> None:
        """
        Deploy server to android device
        """
        jar_name = "scrcpy-server.jar"
        server_file_path = os.path.join(
            os.path.abspath(os.path.dirname(__file__)), jar_name
        )
        self.device.sync.push(server_file_path, f"/data/local/tmp/{jar_name}")
        commands = [
            f"CLASSPATH=/data/local/tmp/{jar_name}",
            "app_process",
            "/",
            "com.genymobile.scrcpy.Server",
            "1.20",  # Scrcpy server version
            "info",  # Log level: info, verbose...
            f"{self.max_width}",  # Max screen width (long side)
            f"{self.bitrate}",  # Bitrate of video
            f"{self.max_fps}",  # Max frame per second
            f"{self.lock_screen_orientation}",  # Lock screen orientation: LOCK_SCREEN_ORIENTATION
            "true",  # Tunnel forward
            "-",  # Crop screen
            "false",  # Send frame rate to client
            "true",  # Control enabled
            "0",  # Display id
            "false",  # Show touches
            "true" if self.stay_awake else "false",  # Stay awake
            "-",  # Codec (video encoding) options
            self.encoder_name or "-",  # Encoder name
            "false",  # Power off screen after server closed
        ]

        self.__server_stream: AdbConnection = self.device.shell(
            commands,
            stream=True,
        )

        # Wait for server to start
        self.__server_stream.read(10)

我们可以发现execute_server函数和__deploy_server函数都是通过执行adb指令来建立scrcpy服务的。execute_server中的cmd跟__deploy_server中的commands是一个东西,cmd中的字符串包含=号类似于关键字传参(键=值),commands中的字符串没有=号类似于位置传参。由此可见scrcpy1.2使用的还是位置传参,scrcpy2.5使用的是关键字传参。所以我们应该把__deploy_server函数修改为如下形式:

    def __deploy_server(self) -> None:
        """
        Deploy server to android device
        """
        jar_name = "scrcpy-server.jar"
        server_file_path = os.path.join(
            os.path.abspath(os.path.dirname(__file__)), jar_name
        )
        self.device.sync.push(server_file_path, f"/data/local/tmp/{jar_name}")
        commands = [
            f"CLASSPATH=/data/local/tmp/{jar_name}",
            "app_process",
            "/",
            "com.genymobile.scrcpy.Server",
            "2.5",  # Scrcpy server version
            "log_level=info",  # Log level: info, verbose...
            "audio=false",
        ]
        if self.bitrate:  # Bitrate of video
            commands.append(f"video_bit_rate={self.bitrate}")
        if self.encoder_name:
            commands.append(f"video_encoder={self.encoder_name}")
        if self.max_width:  # Max screen width (long side)
            commands.append(f"max_size={self.max_width}")
        if self.max_fps:  # Max frame per second
            commands.append(f"max_fps={self.max_fps}")
        if self.lock_screen_orientation != LOCK_SCREEN_ORIENTATION_UNLOCKED:
            commands.append(f"lock_video_orientation={self.lock_screen_orientation}")
        if self.stay_awake:
            commands.append("stay_awake=true")
        commands.append("tunnel_forward=true")  # server link mode forward or reverse
        commands.append("display_id=0")
        commands.append("power_off_on_close=false")
        # "-",  # Crop screen
        # "false",  # Send frame rate to client

        self.__server_stream: AdbConnection = self.device.shell(
            commands,
            stream=True,
        )

        # Wait for server to start
        self.__server_stream.read(10)

现在我们就可以成功的建立scrcpy服务了,投屏没问题了但是控制还有问题,接下来就修改控制代码。(为了适配最新的scrcpy请把python中的av库升级到最新版本,视频流的编码和解码要匹配,投屏效果会好一些)

        要知道怎样修改控制部分的代码,需要查看scrcpy整套的控制代码,代码量很多,如果你不会C语言可能看不懂。但我们可以取巧,要知道在编码环节中存在必不可少的单元测试环节,特别是模块化的多人项目。因此我们只需要查看单元测试用例就能知道控制函数的传参方式,我们只需要把相应的参数传递给对应的函数,就能正常的控制设备了。

        在目录scrcpy-master\app\tests中找到test_control_msg_serialize.c文件并使用编辑器打开,我们可以看到这里面展示了所有注入事件函数的传参方式。再打开site-packages中scrcpy里面的control.py文件,逐个对比两边注入函数传递的参数是否有区别。

        我们可以发现触摸屏幕注入事件的参数有区别,test_control_msg_serialize.c中的test_serialize_inject_touch_event函数如下:

static void test_serialize_inject_touch_event(void) {
    struct sc_control_msg msg = {
        .type = SC_CONTROL_MSG_TYPE_INJECT_TOUCH_EVENT,
        .inject_touch_event = {
            .action = AMOTION_EVENT_ACTION_DOWN,
            .pointer_id = UINT64_C(0x1234567887654321),
            .position = {
                .point = {
                    .x = 100,
                    .y = 200,
                },
                .screen_size = {
                    .width = 1080,
                    .height = 1920,
                },
            },
            .pressure = 1.0f,
            .action_button = AMOTION_EVENT_BUTTON_PRIMARY,
            .buttons = AMOTION_EVENT_BUTTON_PRIMARY,
        },
    };

    uint8_t buf[SC_CONTROL_MSG_MAX_SIZE];
    size_t size = sc_control_msg_serialize(&msg, buf);
    assert(size == 32);

    const uint8_t expected[] = {
        SC_CONTROL_MSG_TYPE_INJECT_TOUCH_EVENT,
        0x00, // AKEY_EVENT_ACTION_DOWN
        0x12, 0x34, 0x56, 0x78, 0x87, 0x65, 0x43, 0x21, // pointer id
        0x00, 0x00, 0x00, 0x64, 0x00, 0x00, 0x00, 0xc8, // 100 200
        0x04, 0x38, 0x07, 0x80, // 1080 1920
        0xff, 0xff, // pressure
        0x00, 0x00, 0x00, 0x01, // AMOTION_EVENT_BUTTON_PRIMARY (action button)
        0x00, 0x00, 0x00, 0x01, // AMOTION_EVENT_BUTTON_PRIMARY (buttons)
    };
    assert(!memcmp(buf, expected, sizeof(expected)));
}

传递了TOUCH_EVENT、ACTION、pointer id、x坐标(100)、y坐标(200)、屏幕宽(1080)、屏幕高(1920)、pressure、action button、buttons一共10个参数。

control.py中的touch函数如下:

    @inject(const.TYPE_INJECT_TOUCH_EVENT)
    def touch(
        self, x: int, y: int, action: int = const.ACTION_DOWN, touch_id: int = -1
    ) -> bytes:
        """
        Touch screen

        Args:
            x: horizontal position
            y: vertical position
            action: ACTION_DOWN | ACTION_UP | ACTION_MOVE
            touch_id: Default using virtual id -1, you can specify it to emulate multi finger touch
        """
        x, y = max(x, 0), max(y, 0)
        return struct.pack(
            ">BqiiHHHi",
            action,
            touch_id,
            int(x),
            int(y),
            int(self.parent.resolution[0]),
            int(self.parent.resolution[1]),
            0xFFFF,
            1,
        )

传递了TOUCH_EVENT(装饰器中传入)、action、touch_id、x坐标、y坐标、屏幕宽、屏幕高、pressure、action button一共9个参数,少了一个参数buttons,至于buttons有什么作用,我没去研究,你感兴趣可以去研究一下,我们传1就可以了。至于>BqiiHHHi是什么意思,你去搜一下python中struct的用法就知道了,它是C语言数据类型的格式化指代字符串,不同的字符代表C语言中不同类型的数据。由于串口通信使用的是C语言的数据类型,所以要把python的数据类型转换为C语言的数据类型。还有touch_id不能为-1,为-1时会一直按着屏幕。所以我们可以把touch函数改为如下形式:

    @inject(const.TYPE_INJECT_TOUCH_EVENT)
    def touch(self, x: int, y: int, action: int = const.ACTION_DOWN, touch_id: int = 1) -> bytes:
        """
        Touch screen

        Args:
            x: horizontal position
            y: vertical position
            action: ACTION_DOWN | ACTION_UP | ACTION_MOVE
            touch_id: Default using virtual id 1, you can specify it to emulate multi finger touch
        """
        x, y = max(x, 0), max(y, 0)
        return struct.pack(
            ">BqiiHHHii",
            action,
            touch_id,
            int(x),
            int(y),
            int(self.parent.resolution[0]),
            int(self.parent.resolution[1]),
            0xFFFF,
            1,
            1,
        )

同样的scroll函数也需要改成如下形式:

    @inject(const.TYPE_INJECT_SCROLL_EVENT)
    def scroll(self, x: int, y: int, h: int, v: int) -> bytes:
        """
        Scroll screen

        Args:
            x: horizontal position
            y: vertical position
            h: horizontal movement
            v: vertical movement
        """

        x, y = max(x, 0), max(y, 0)
        return struct.pack(
            ">iiHHeei",
            int(x),
            int(y),
            int(self.parent.resolution[0]),
            int(self.parent.resolution[1]),
            float(h),
            float(v),
            1,
        )

set_clipboard函数也需要改成如下形式:

    @inject(const.TYPE_SET_CLIPBOARD)
    def set_clipboard(self, text: str, paste: bool = False) -> bytes:
        """
        Set clipboard

        Args:
            text: the string you want to set
            paste: paste now
        """
        buffer = text.encode("utf-8")
        return struct.pack(">q?i", len(buffer), paste, len(buffer)) + buffer

 至于get_clipboard函数我们改成如下形式:

    @inject(const.TYPE_GET_CLIPBOARD)
    def get_clipboard(self, copy_type: int = const.COPY_KEY_COPY):
        """
        Get clipboard

        Args:
            copy_type: COPY_KEY_NONE | COPY_KEY_COPY | COPY_KEY_CUT
        """
        return struct.pack(">B", copy_type)

肯定会报错的,我们需要再const.py中增加如下代码:

# Copy type
COPY_KEY_NONE = 0
COPY_KEY_COPY = 1
COPY_KEY_CUT = 2

再在scrcpy中增加一个receiver.py文件如下:

import socket
import struct
import pandas.io.clipboard as cb


class ControlReceiver:
    def __init__(self, receiver: socket.socket):
        self.receiver = receiver

    def run_receiver(self):
        while True:
            try:
                (msg_type,) = struct.unpack(">B", self.receiver.recv(1))
            except ConnectionAbortedError:
                break
            if msg_type == 0:
                try:
                    (length,) = struct.unpack(">i", self.receiver.recv(4))
                    text = self.receiver.recv(length).decode("utf-8")
                    if text != cb.paste():
                        cb.copy(text)
                except struct.error:
                    pass

然后在core.py的start函数中增加两行代码:

    def start(self, threaded: bool = False, daemon_threaded: bool = False) -> None:
        """
        Start listening video stream

        Args:
            threaded: Run stream loop in a different thread to avoid blocking
            daemon_threaded: Run stream loop in a daemon thread to avoid blocking
        """
        assert self.alive is False

        self.__deploy_server()
        self.__init_server_connection()
        # 增加两行代码
        receiver = ControlReceiver(self.control_socket)
        threading.Thread(target=receiver.run_receiver, daemon=True).start()
        # 增加结束
        self.alive = True
        self.__send_to_listeners(EVENT_INIT)

        if threaded or daemon_threaded:
            self.stream_loop_thread = threading.Thread(
                target=self.__stream_loop, daemon=daemon_threaded
            )
            self.stream_loop_thread.start()
        else:
            self.__stream_loop()

至于form .receiver import ControlReceiver不需要我写了吧!

如此我们就可以在电脑和手机之间进行双向复制粘贴内容了。

        我不想再写了,够多了,也解释清楚了,自己琢磨吧!

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐