终究是一个非常痛苦的夜晚。我没想到我会活着这么痛苦。

帮别人做一个小功能。把一个 android 设备里面固定目录下的log 导出到本地,然后打成压缩包.

需求很简单,在听到这个需求之后,我当时的预估时间是10分钟到1小时最多,不会超过1小时。。。。

我没想到,真正让我痛苦的,不是遇到了 windows 文件夹权限的问题,而是没有遇到这个问题,但是我以为我遇到的是这个问题…

这句话有的绕口,但是读完之后你就知道,这句总结还算到位.。

长话短说,我在使用 open()函数的时候,传入了一个文件夹作为参数,比如open("D:\\Games"), 然后 python 一直报错,说权限不足。

PermissionError: [Errno 13] Permission denied: 'D:\\Projects'

如果是一个正常人类可能会去查看API,或者网上找一下原因。我直接去网上找原因了。因为我要打开的这个文件夹比较特殊,是从其他设备上面导过来的,导致我以为是 windows 权限的问题。
因为网上有很多很多提示Windows 权限的报错。

基于此,我在网上找了好久,主要的方向就是,怎么去满足这个权限,不让它open失败。当然,没有效果的,因为根本原因是:通过open(file_path), 这个 file_path如果是一个目录而不是文件的话,就必然报错提示权限不足。
当然,也不能说是一无所获。

我找到了以下几种可能可以避免Windows 权限的解决方案了,只是目前用不上。

  1. 使用管理员权限去执行这个程序脚本。
    具体做法就是,找到 cmd.exe 这个程序,用鼠标右击去通过管理员的权限运行,然后在这个 cmd 窗口里面去执行脚本。
  2. 通过多进程的方式去执行。
    这个就更脑洞大开了。我看到的一个现象是,我通过给目标文件夹设置权限之后,不能立即去打开,但是把程序关闭,再次执行,又可以运行了。(当时情况特殊,后面我无法复现这个场景了)基于这么一个现象,我想到的原因是,因为当前进程跟之前的权限有关联,新启动的进程可以看到目标文件夹已经有权限了,所以用新的进程去打开。

这个地方门道还有点多,我一共使用了两种方式:

  • 第一种是使用 multiprocessing.Process的方式去开启子进程,然后把提权的操作放在这个子进程里面,然后在这个子进程执行结束之后,再去在父进程里面去调用 open() 函数。这个 multiprocessing 还是蛮不错的,它可以开任意多个进程,而且可以传参数,可以进行进程之间的数据共享。
  • 第二种就更牛批了,我都佩服自己,对于第二种实现方式。因为第一种方式也是失败(现在当然知道失败是因为open不能去传文件夹路径作为参数,但是当时不清楚),于是就想到,是不是因为这个父子进程之间也是互相关联的呢,导致在父进程依然不能获取到正确的权限。那么,我直接把当前进程关闭,重新开一个进程,这样总不会互相影响了吧。所以就有了第二种实现方法。大体的实现逻辑就是利用 sys.argv去自己给自己传参数,实现一个脚本在运行不同次数的时候,执行的逻辑不同,然后每次执行都是一个新的进程,完全不会被前面一次执行所影响。

为了实现第二种不互相影响的实现,我还搞了一个临时文件用来存储上一次执行的结果。因为不能互相影响导致前面执行的内存数据,在下次执行的时候无法读取,因为上一次的程序已经执行结束了,这些内存里面的数据也就消失了。那么,我就搞一个文本文件,相当于数据库一样,去记录上一次执行的结果,然后第二次执行的时候可以去读这个文件的内容,然后再执行具体的逻辑。

哎,贴一下代码,不过,因为一直失败,所以前面有的代码没有保留,直接被删除了。不过第二种实现方式倒是保留了,有兴趣的可以看看。

这里最困难不是这两种实现方案,而是 Windows 的文件夹权限处理。这一块的文档不是很多,然后也有一些文档写的比较详情,但是里面的专业术语太多了,看不懂。

cacls Music /p everyone:f /t   ====> 只有 everyone 了,但是要手动确认
cacls.exe Music /t /e /g everyone:F  ====> 多了一个 everyone 但是反应迟钝。
cacls Music /p everyone:f /t /e /g  ====》 无效
echo y|cacls Music /p everyone:f /t    ===> 只有一个 everyone, 并且不需要手动确认了

这几个命令我找了好久才找到,也都实验了。还有一些其他的无效的命令,就没有记录了。
关于使用 cacls 还有 icacls, 以及其他的方式给文件夹设置权限,里面的内容很多,感觉很难。

比如这篇:
PowerShell 修改文件夹及文件权限

还有好几篇也是这种比较详细的,但是真的看不懂…

// 第二种实现方案的完整代码:(可以正常运行的)

import os
import shutil
import subprocess
import sys
import time
import zipfile

default_pull_path = "/sdcard/Android/data/com.iauto.navi/files/bllog/"
default_place_path = str(os.getcwd())
default_temp_file = ".pull_temp.txt"
key_pull_path = "KEY_PULL_PATH"
key_place_path = "KEY_PLACE_PATH"
key_target_path = "KEY_TARGET_PATH"


def _read_stored_mappings():
    mappings = {}
    if not os.path.exists(default_temp_file):
        return mappings
    with open(default_temp_file, mode='r', newline='') as f:
        lines = f.readlines()
        for line in lines:
            if key_place_path in line:
                value = line.strip().split(",")[-1]
                mappings[key_place_path] = value
            if key_pull_path in line:
                value = line.strip().split(",")[-1]
                mappings[key_pull_path] = value
            if key_target_path in line:
                value = line.strip().split(",")[-1]
                mappings[key_target_path] = value
    return mappings


def _write_2_mapping_file(key, value):
    dd = _read_stored_mappings()
    dd[key] = value
    with open(default_temp_file, mode='w+', newline='') as f:
        for k, v in dd.items():
            f.write(k)
            f.write(",")
            f.write(v)
            f.write("\n")


def _write_full_2_mapping_file(mappings):
    with open(default_temp_file, mode='w+', newline='') as f:
        for k, v in mappings:
            f.write(k)
            f.write(",")
            f.write(v)
            f.write("\n")


def _e_slow_cmd(command="git -h"):
    p = subprocess.Popen(command, shell=True)
    return_code = p.wait()
    print("result code: ", return_code)
    return return_code == 0


def _exe(command):
    p = subprocess.Popen(command, shell=True)
    return_code = p.wait()
    return return_code == 0


def _zip_dirs(output_zip_path, target_dir) -> tuple[bool, str]:
    """
    make a zip file (output_zip_path) from target_dir.
    :param output_zip_path: like /home/tony/logs/abc.zip
    :param target_dir: like /home/tony/resources/
    :return: output_zip_path
    """
    # try:
    #     with open(target_dir, mode='r'): ### 在 windows 上面必然报错,只能针对 文件,不能针对 目录。
    #         pass
    # except Exception as ex:
    #     traceback.print_exc()
    #     return False, "zip error: {}".format(ex)
    #     pass
    with zipfile.ZipFile(output_zip_path, "w") as zip_obj:
        # 压缩目录
        # zipfile没有直接压缩目录的功能,要压缩目录只能遍历目录一个一个文件压。
        for root, fd, files in os.walk(target_dir):
            print("root:", root)
            print("fd:", fd)
            fpath = root.replace(target_dir, os.path.basename(target_dir))
            # 如果想要目录为空时仍将该目录压缩进去,该目录也要压缩一遍;反之请将以下行注释掉
            # zip_obj.write(root)
            for fn in files:
                # 拼接文件完整目录,不然只用文件名代码找不到文件
                tmp_file_path = str(os.path.join(root, fn))
                zip_obj.write(tmp_file_path, str(os.path.join(fpath, fn)))

    return True, ""


def _first_time():
    args = sys.argv
    return len(args) <= 1 or len(args) >= 3 or (len(args) == 2 and not args[1].strip().isdigit())


def _is_windows():
    return sys.platform == "win32"


def _restart_file_resource_mgr():
    if _is_windows():
        s1 = _exe("taskkill /f /im explorer.exe")
        time.sleep(1)
        s2 = _exe("start explorer.exe")
        return s1 and s2
    else:
        return True


def _change_dir_mod(target_dir):
    print("change_dir_mod.....")
    if _is_windows():
        # cacls.exe Music /t /e /g everyone:F
        command = "echo y|cacls {} /p everyone:f /t".format(target_dir)
        # command = "cacls.exe {} /t /e /g everyone:F".format(target_dir)
    else:
        command = "chmod -R u+w {}".format(target_dir)
    success = _e_slow_cmd(command)
    print("_change_dir_mod cmd?", command)
    print("_change_dir_mod success?", success)
    return success


def _get_pull_and_place_paths() -> tuple[str, str]:
    mappings = _read_stored_mappings()

    if key_pull_path in mappings and key_place_path in mappings:
        return mappings[key_pull_path], mappings[key_place_path]
    else:
        pull_p = default_pull_path
        place_p = os.getcwd()
        # TODO
        pull_p = "/sdcard/Music"
        place_p = "C:\\Users\\Stone\\Desktop\\logs"
        args = sys.argv
        if len(args) == 2 and not args[1].strip().isdigit():
            pull_p = args[1].strip()
        elif len(args) == 3:
            pull_p = args[1].strip()
            place_p = args[2].strip()
        mappings[key_pull_path] = pull_p
        mappings[key_place_path] = place_p
        _write_2_mapping_file(key_pull_path, pull_p)
        _write_2_mapping_file(key_place_path, place_p)
        return pull_p, place_p


def _get_target_dir_path(place_path=None, end_dir_name=None):
    print("_get_target_dir_path,,,a={},b={}".format(place_path, end_dir_name))
    # traceback.print_stack()
    mappings = _read_stored_mappings()
    print("_get_target_dir_path, mp:", mappings)
    if key_target_path in mappings:
        print("_get_target_dir_path, 1 mp:", mappings)
        return mappings[key_target_path]
    else:
        print("_get_target_dir_path, 2 mp:", mappings)
        value = os.path.join(place_path, end_dir_name)
        mappings[key_target_path] = value
        _write_2_mapping_file(key_target_path, value)
        return value


def s0_pull_log_2_local() -> tuple[bool, str]:
    cmd = "adb pull {} {}".format(*_get_pull_and_place_paths())
    ss = _e_slow_cmd(cmd)
    return ss, "" if not ss else "[{}] failed.".format(cmd)


def s1_chmod_local_dir() -> tuple[bool, str]:
    pull_path, place_path = _get_pull_and_place_paths()
    print("s1_chmod_local_dir 1", pull_path, place_path)
    if not os.path.isdir(place_path):
        return False, "place_path:[{}] is not a valid dir.".format(place_path)
    try:
        end_dir_name = list(filter(lambda name: len(name) > 0, pull_path.split("/")))[-1]
    except Exception as e:
        print("error of filter:", e)
        return False, "get last pulled dir name failed."
    if not end_dir_name or len(end_dir_name) <= 0:
        print("pull path: [{}] is not a dir. GET OUT!!!!".format(pull_path))
        return False, "get last pulled dir name failed."
    target_dir = _get_target_dir_path(place_path, end_dir_name)
    print("s1_chmod_local_dir 2", target_dir)
    if not os.path.isdir(target_dir):
        return False, "[{}] is not a dir. GET OUT!!!".format(target_dir)
    success = _change_dir_mod(target_dir)
    print("s1_chmod_local_dir 3", success)
    return success, "" if success else "chmod [{}] failed. GET OUT!!!!".format(target_dir)


def s2_zip_local_dir() -> tuple[bool, str]:
    pull_path, place_path = _get_pull_and_place_paths()
    print("pull_path, place_path ", pull_path, place_path)
    time_str = time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime())
    zip_fn = "log_{}.zip".format(time_str)
    zip_fp = os.path.join(place_path, zip_fn)
    print("_get_target_dir_path()", _get_target_dir_path())
    return _zip_dirs(zip_fp, _get_target_dir_path())


def s3_delete_local_dir() -> tuple[bool, str]:
    target_dir = _get_target_dir_path()
    shutil.rmtree(target_dir)
    os.remove(default_temp_file)
    return True, ""


def handle_args():
    print("handle_args in process: {}, {}".format(os.getcwd(), os.getpid()))
    # success, text = False, "execute failed"
    args = sys.argv
    fp = args[0]
    if _first_time():
        print("exec 0")
        success, text = s0_pull_log_2_local()
        if not success:
            print(text)
        else:
            _exe("python {} {}".format(fp, 1))
    else:
        arg = args[1]
        num = int(arg.strip())
        if 1 == num:
            print("exec 1")
            success, text = s1_chmod_local_dir()
            if not success:
                print(text)
            else:
                if _is_windows():
                    print("sleep 1 before zip in windows")
                    time.sleep(1)
                    print("sleep 1 done.....")

                _exe("python {} {}".format(fp, 2))
        elif 2 == num:
            print("exec 2")
            time.sleep(1)
            print("exec 2, after sleep...")
            success, text = s2_zip_local_dir()
            if not success:
                print(text)
            else:
                _exe("python {} {}".format(fp, 3))
            pass
        elif 3 == num:
            print("exec 3")
            s3_delete_local_dir()
            print("step all done...")


if __name__ == '__main__':
    handle_args()
    # s2_zip_local_dir()
    ss = 'C:\\Users\\Stone\\Desktop\\logs\\Music\\.thumbnails\\.database_uuid'
    s2 = "C:\\Users\\Stone\\Desktop\\normal\\nets.txt"
    # with open(ss):
    #     pass

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐