Python 多线程和多进程并发执行
python 中如何选择多线程,多进程实现并行执行脚本
Python 多线程和多进程并发执行
引言
在测试领域,为了提高测试效率,通常采用并行方式执行脚本,可通过多线程和多进程机制来实现,今天来了解一下多线程和多进程的概念,区别以及使用。
多线程
多线程是一种并发执行的机制,它允许程序同时执行多个线程,每个线程都是独立执行的最小单位。线程是在进程内部运行的,多线程共享同一个进程的地址空间,因此它们可以更方便地共享数据和通信。
主要特点和概念
-
轻量级: 线程相对于进程来说更轻量,因为它们共享同一个进程的资源,包括内存空间。线程的创建和切换开销较小。
-
共享内存: 线程在同一进程中共享相同的内存空间,这使得线程之间可以直接访问共享的数据,也容易进行通信。
-
并发执行: 多线程允许程序的不同部分并发执行,从而提高程序的整体性能。每个线程都有自己的执行路径,它们可以同时执行不同的任务。
-
线程安全: 在多线程编程中,需要考虑到多个线程同时访问和修改共享数据可能引发的问题。为了确保线程安全,可能需要使用锁、信号量等同步机制。
-
全局解释锁(GIL): 在 CPython 解释器中,由于全局解释锁的存在,一次只允许一个线程执行 Python 字节码。这使得在多线程中并发执行 CPU 密集型任务时性能提升有限,但对于 I/O 密集型任务仍然有效。
多线程的使用
threading 模块
在 Python中,可以使用 threading 模块来创建和管理线程。使用 threading 模块时,需要手动管理线程的创建和执行。以下是一个简单的多线程示例
import threading
import time
def square_number(number):
result = number * number
time.sleep(1)
print(f"Result for {number}: {result}")
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 创建一个线程池,用于存储线程对象
threads = []
# 定义一个线程执行函数
def thread_worker(num):
square_number(num)
# 启动线程
for num in numbers:
thread = threading.Thread(target=thread_worker, args=(num,))
threads.append(thread)
thread.start()
# 等待所有线程执行完成
for thread in threads:
thread.join()
print("All tasks have finished.")
在这个例子中,我们手动创建了一个线程的线程池。然后,我们定义了一个 thread_worker 函数,该函数接受一个数字并调用 square_number 函数。我们为每个数字创建一个线程,并将这些线程加入到一个列表中。最后,通过遍历线程列表,等待所有线程执行完成。
执行结果:
Result for 2: 4
Result for 1: 1
Result for 4: 16
Result for 3: 9
Result for 5: 25
Result for 7: 49
Result for 10: 100
Result for 9: 81
Result for 6: 36
Result for 8: 64
All tasks have finished.
concurrent.futures 线程池
concurrent.futures.ThreadPoolExecutor 是 Python 标准库中 concurrent.futures 模块提供的一个线程池实现,线程池自动管理了线程的生命周期,使得代码更为简洁,用于简化并发编程。它提供了高级别的接口,使得在多线程环境中提交和管理任务变得更加容易。
import concurrent.futures
import time
def square_number(number):
result = number * number
time.sleep(1)
print(f"Result for {number}: {result}")
# 创建一个包含3个线程的线程池
max_threads = 4
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
# 提交任务给线程池
futures = [executor.submit(square_number, num) for num in numbers]
# 等待所有任务执行完成
concurrent.futures.wait(futures)
print("All tasks have finished.")
在这个例子中,我们定义了一个简单的任务 square_number,该任务接受一个数字并计算其平方,然后打印结果。我们使用 ThreadPoolExecutor 创建了一个包含 4 个线程的线程池,并使用 submit 方法提交了一系列任务,每个任务处理一个数字。submit 方法会返回一个 concurrent.futures.Future 对象,可以用来监控任务的执行状态。
由于我们限制了线程池中的最大线程数为 4,因此任务会并发执行,但最多只有4个任务同时运行。等待所有任务完成后,程序输出 “All tasks have finished.”。
最后,使用 concurrent.futures.wait 等待一系列任务完成。这个例子中,wait 会阻塞主线程,直到所有的任务都完成。
ThreadPoolExecutor 简化了线程的管理和任务的提交过程,使得在多线程环境中更容易实现并发编程。需要注意的是,与原生的 threading 模块相比,ThreadPoolExecutor 提供了更高级别的抽象,更易于使用。
执行结果:
Result for 3: 9
Result for 1: 1
Result for 4: 16
Result for 2: 4
Result for 7: 49
Result for 6: 36
Result for 5: 25
Result for 8: 64
Result for 10: 100
Result for 9: 81
All tasks have finished.
多进程
多进程是一种并发执行的机制,允许程序同时执行多个独立的进程。每个进程都拥有独立的内存空间,因此它们不会互相干扰。多进程的优点在于能够充分利用多核处理器,实现真正的并行执行,特别适合处理 CPU 密集型的任务。
主要特点和概念:
独立内存空间: 每个进程都有独立的内存空间,不同进程之间的数据不能直接共享。进程之间的通信通常需要使用一些特殊的机制,例如管道、消息队列等。
并行执行: 多进程能够在多个 CPU 核心上并行执行,因此适用于 CPU 密集型任务。每个进程都有自己的 Python 解释器,避免了全局解释锁(GIL)对并行性能的限制。
稳定性: 进程之间相互隔离,一个进程的崩溃通常不会影响其他进程。这提高了系统的稳定性和可靠性。
创建和销毁开销较大: 与线程相比,创建和销毁进程的开销较大,因为每个进程都有独立的资源和状态。
多进程的使用:
multiprocessing 模块
在 Python中,可以使用 multiprocessing 模块来创建和管理多进程。以下是一个简单的多进程示例:
import multiprocessing
import time
def square_number(number):
result = number * number
time.sleep(1)
print(f"Result for {number}: {result}")
if __name__ == '__main__':
# 创建一个包含3个进程的进程池
max_processes = 3
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with multiprocessing.Pool(processes=max_processes) as pool:
# 使用map方法将任务分配给进程池
pool.map(square_number, numbers)
print("All tasks have finished.")
在这个例子中,我们创建了一个包含 3 个进程的进程池(max_processes = 3)。使用 pool.map 方法,我们将任务函数 square_number 应用于列表 numbers 中的每个数字,进程池会自动分配任务给空闲的进程。最后,等待所有任务完成后,程序输出 “All tasks have inished.”。
使用 Pool 类可以很方便地实现多进程编程,而不必手动管理进程的创建和销毁。每个进程在执行任务时独立运行,从而实现了并行处理的效果。
注意:
multiprocessing 的某些环境(比如在交互式环境中如 Jupyter Notebook中)使用 if name == ‘main’:是必要的,这是因为在Unix 系统上,multiprocessing 在 fork 子进程时会复制整个进程的状态,包括已经创建的线程,而在 Windows上,由于没有 fork,必须通过重新导入模块来确保每个进程都能正确地运行主程序。if name == ‘main’: 语句确保代码只在主模块中运行,而不是在子进程中运行。这是为了避免多次执行程序,因为在子进程中也会执行导入的代码。这是一个在使用 multiprocessing 模块时常见的实践。
concurrent.futures 进程池
当使用 concurrent.futures.ProcessPoolExecutor 时,你可以使用 submit 方法来提交可调用的对象(函数)给进程池,并获得一个 concurrent.futures.Future 对象,该对象代表异步计算的结果。以下是一个简单的例子:
import concurrent.futures
import time
def square_number(number):
result = number * number
time.sleep(1)
print(f"Result for {number}: {result}")
return result
def processes_work():
# 创建一个包含3个进程的进程池
max_processes = 3
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with concurrent.futures.ProcessPoolExecutor(max_workers=max_processes) as executor:
# 使用submit方法提交任务给进程池
futures = [executor.submit(square_number, num) for num in numbers]
# 等待所有任务执行完成
# concurrent.futures.wait(futures)
# 或获取结果
for future in concurrent.futures.as_completed(futures):
result = future.result()
# 在这里处理结果,例如打印或进行其他操作
print("print result: ", result)
print("All tasks have finished.")
if __name__ == '__main__':
processes_work()
在这个例子中,我们使用 concurrent.futures.ProcessPoolExecutor 创建了一个包含 3 个进程的进程池(max_workers = 3)。然后,我们使用 executor.submit 提交了一系列任务,每个任务是 square_number 函数对不同的数字进行平方运算。我们通过 concurrent.futures.as_completed 来获取已完成的任务,并通过 future.result() 获取任务的返回结果。在这个例子中,处理结果的操作是简单地打印结果。
需要注意
,在这个例子中同样使用了 if name == ‘main’: 条件判断,以确保在主模块中运行,避免在 Windows 等环境下可能出现的问题。
执行结果:
Result for 2: 4
Result for 1: 1
print result: 4
print result: 1
Result for 3: 9
print result: 9
Result for 4: 16
Result for 5: 25
print result: 16
print result: 25
Result for 6: 36
print result: 36
Result for 8: 64
Result for 7: 49
print result: 64
print result: 49
Result for 9: 81
print result: 81
Result for 10: 100
print result: 100
All tasks have finished.
选择合适的进程数
选择合适的进程数取决于任务的性质、系统资源和硬件配置。通常,你可以通过试验和性能测试来确定最佳的进程数。以下是一些考虑因素:
- CPU 核心数: 一般来说,进程数不应该超过系统的物理 CPU 核心数,否则可能导致竞争和性能下降。在多核系统上,你可以选择使用与核心数相当的进程数以充分利用硬件。
可以使用 Python 中的 os 模块来获取系统的 CPU 核心数
import os
def get_cpu_core_count():
return os.cpu_count()
if __name__ == '__main__':
core_count = get_cpu_core_count()
if core_count is not None:
print(f"The system has {core_count} CPU core(s).")
else:
print("Unable to determine the number of CPU cores.")
-
任务类型: 如果任务是 CPU 密集型的(需要大量计算),则增加进程数可能会提高性能。但对于 I/O 密集型任务(等待外部资源,如文件 I/O 或网络请求),增加进程数可能不会显著提高性能。
-
系统资源: 考虑系统的可用内存和其他资源。创建太多的进程可能导致内存不足,从而影响整体性能。
-
并发任务数: 任务的并发数也是一个重要的考虑因素。如果有大量的并发任务,适当增加进程数可能有助于更好地并行执行。
-
性能测试: 进行性能测试是确定最佳进程数的有效方法。通过尝试不同的进程数,测量执行时间和资源利用率,以找到最佳的配置。
根据这些因素,你可以根据具体情况来选择一个适当的进程数。注意,增加进程数并不总是能够线性提高性能,因此在选择进程数时需要平衡系统资源和任务的特性。
总结
总体而言,多线程适用于轻量级任务和 I/O 密集型任务,而多进程能够在多个 CPU 核心上真正并行执行,适用于 CPU 密集型任务和需要更高稳定性的场景。选择使用多线程还是多进程取决于任务的性质、硬件配置和系统要求。
I/O 密集型任务
I/O 密集型任务指的是程序在执行过程中主要涉及输入/输出操作(I/O 操作)的任务。这些任务通常涉及从外部设备(如磁盘、网络、数据库)读取或写入数据。在这样的任务中,大部分的时间都花费在等待 I/O 操作的完成上,而不是在计算或处理数据上。
实践结论
:可以对比一下多线程和多进程执行的区别,对于I/O 密集型任务,会发现多进程执行效率更高。
I/O 密集型任务的特点包括:
-
高度依赖外部资源: 这类任务需要频繁地与外部设备进行交互,例如读取文件、从网络下载数据、与数据库通信等。
-
等待时间较长: 由于涉及到外部设备,I/O 操作通常需要较长的时间完成。在这段时间内,程序可以执行其他任务而不是等待 I/O 操作的完成。
-
CPU 利用率较低: 在执行 I/O 操作期间,CPU 大部分时间都是空闲的,因为它不需要进行大量的计算工作。这导致了 CPU 利用率较低。
-
并发性较高: 由于大部分时间都在等待外部操作完成,因此在这期间可以同时执行其他任务,提高了程序的并发性。
-
性能瓶颈在 I/O 操作上: 对于 I/O 密集型任务,性能瓶颈主要出现在等待外部设备完成操作的时间上,而不是在 CPU 处理数据的速度上。
一些典型的 I/O 密集型任务包括网络通信、文件读写、数据库查询等。在这些场景中,使用多线程通常是一种有效的并发处理方式,因为一个线程在等待 I/O 操作完成的同时,其他线程仍然可以执行任务,从而提高整体系统的吞吐量。
CPU 密集型任务
CPU 密集型任务指的是程序在执行过程中主要涉及大量计算或处理大量数据的任务,而不涉及大量的输入/输出操作。在这类任务中,大部分的时间都花费在计算或处理数据上,而不是等待外部设备的操作完成。
CPU 密集型任务的特点包括:
-
大量计算: 这类任务需要进行大量的计算工作,可能涉及复杂的数学运算、算法执行或大规模的数据处理。
-
相对较短的等待时间: 与 I/O 密集型任务不同,CPU 密集型任务的等待时间相对较短,因为它们主要依赖 CPU 进行计算。
-
CPU 利用率高: 在执行 CPU 密集型任务时,CPU 大部分时间都处于繁忙状态,因为它需要进行大量的计算工作。
-
并发性较低: 由于 CPU 密集型任务主要依赖 CPU 进行计算,而不是等待外部设备,因此在执行期间很难同时执行其他任务。
-
性能瓶颈在 CPU 处理速度上: 对于 CPU 密集型任务,性能瓶颈主要出现在 CPU 处理速度上,而不是在等待外部设备完成操作的时间上。
一些典型的 CPU 密集型任务包括科学计算、图像处理、密码学操作等。在这些场景中,通常采用多进程或其他并行计算的方式,以充分利用多核 CPU 的性能。在 Python 中,由于全局解释锁 (GIL) 的存在,使用多线程可能无法充分发挥多核 CPU 的性能,因此对于 CPU 密集型任务,通常使用多进程是更为有效的选择。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)