python 多进程apply_async、map_async、pool.imap的用法
多进程
1. apply_async
pool.apply_async
是 Python 中 multiprocessing 模块的一部分,用于异步地执行一个函数。当你使用 apply_async 方法时,它会立即返回一个 AsyncResult 对象,而不是等待函数执行完成。这允许你继续执行程序的其他部分,而不必等待函数执行完成。
apply_async
适合用于各个进程之间及结果互不影响,比如大批量处理数据的场景,能显著提升效率。
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == '__main__':
with Pool(4) as p: # 创建一个有4个进程的进程池
result = p.apply_async(square, (10,)) # 异步执行square函数
print(result.get()) # 获取执行结果
在这个例子中,square 函数被异步地执行,并且我们可以通过调用 AsyncResult 对象的 get 方法来获取结果。get 方法会阻塞,直到结果可用。
apply_async
还可以接受一个 callback
参数,这是一个在任务完成时会被调用的函数
def my_callback(result):
print("Result: ", result)
result = p.apply_async(square, (10,), callback=my_callback)
在这个例子中,当 square 函数执行完成后,my_callback 函数会被调用,并且执行结果会作为参数传递给 my_callback。
使用 apply_async
可以有效地利用多核处理器,提高程序的执行效率。
使用 apply_async 方法并行处理大量数据通常涉及以下几个步骤:
-
1.
定义工作函数
:这个函数将对单个数据项进行处理。它应该能够接受一个参数,因为 apply_async 会将数据项作为单个参数传递给这个函数。 -
2.
创建进程池
:使用 multiprocessing.Pool 创建一个进程池,你可以根据你的机器的CPU核心数来决定进程池的大小。 -
3.
使用 apply_async 提交任务
:对于数据集中的每个数据项,使用 apply_async 将工作函数和数据项提交给进程池。这会异步地执行工作函数。 -
4.
收集结果
:对于每个提交的任务,你可以使用返回的 AsyncResult 对象的 get 方法来获取结果,或者使用 map_async 方法来简化结果收集过程。 -
5.
关闭进程池
:在所有任务提交后,使用 close 方法关闭进程池,这会阻止更多的任务提交。然后使用 join 方法等待所有进程完成。
下面是一个处理大量数据的示例:
from multiprocessing import Pool
# 定义工作函数
def process_data(data_item):
# 这里是处理数据的逻辑
result = data_item * 2 # 假设的处理逻辑
return result
if __name__ == '__main__':
# 创建一个进程池
with Pool(4) as pool:
# 假设我们有大量数据需要处理
data = [1, 2, 3, 4, 5, ...] # 这里只是示例,实际数据可能来自文件或数据库
# 使用 apply_async 提交任务
results = [pool.apply_async(process_data, (item,)) for item in data]
# 收集结果
processed_data = [result.get() for result in results]
# 打印处理后的数据
print(processed_data)
# 进程池会自动关闭
在这个例子中,我们定义了一个 process_data
函数来处理单个数据项。我们创建了一个进程池
,并为数据集中的每个数据项提交了一个任务。然后我们收集了所有任务的结果,并打印了处理后的数据。
如果你的数据量非常大
,你可能会考虑使用 pool.map_async 来简化代码
,它会自动处理任务的提交和结果的收集:
from multiprocessing import Pool
def process_data(data_item):
return data_item * 2
if __name__ == '__main__':
with Pool(4) as pool:
data = [1, 2, 3, 4, 5, ...] # 大量数据
processed_data = pool.map_async(process_data, data).get()
print(processed_data)
在这个简化的例子中,map_async
接受工作函数和数据列表,返回一个 AsyncResult
对象,我们可以通过调用 get 方法来获取所有处理后的数据。这种方法更简洁,但在某些情况下可能不如单独使用 apply_async
灵活。
2. map_async
- 功能:
map_async
是 map 函数的异步版本`,它将一个函数应用于一个迭代器的每个元素。 - 使用场景: 当你有一个
迭代器
(如列表或元组)并且需要对其中的每个元素应用同一个函数时,使用 map_async 是合适的。它适用于批量处理相似任务的场景。 - 结果处理: map_async 返回一个 AsyncResult 对象,你可以通过调用 get() 方法来获取所有任务的结果,这些结果通常以列表的形式返回。
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == '__main__':
with Pool(4) as pool:
result = pool.map_async(square, range(5))
print(result.get()) # 获取结果
3 pool.imap
pool.imap
是 Python multiprocessing.Pool 中的一种方法,用于在进程池中分布式地逐步处理输入数据序列,并逐项返回结果。它的行为类似于内置函数 map,但以迭代器的形式顺序
返回结果,支持逐步消费。
案例1
from multiprocessing.pool import Pool, ThreadPool
from threading import Thread
results = ThreadPool(NUM_THREADS).imap(fn, iter_data) # fn 是函数名, iter_data 可迭代数据
for result in results:
print(result)
案例2
https://github1s.com/ultralytics/yolov5/blob/master/utils/dataloaders.py
def cache_labels(self, path=Path("./labels.cache"), prefix=""):
"""Caches dataset labels, verifies images, reads shapes, and tracks dataset integrity."""
x = {} # dict
nm, nf, ne, nc, msgs = 0, 0, 0, 0, [] # number missing, found, empty, corrupt, messages
desc = f"{prefix}Scanning {path.parent / path.stem}..."
with Pool(NUM_THREADS) as pool:
pbar = tqdm(
pool.imap(verify_image_label, zip(self.im_files, self.label_files, repeat(prefix))),
desc=desc,
total=len(self.im_files),
bar_format=TQDM_BAR_FORMAT,
)
for im_file, lb, shape, segments, nm_f, nf_f, ne_f, nc_f, msg in pbar:
nm += nm_f
nf += nf_f
ne += ne_f
nc += nc_f
if im_file:
x[im_file] = [lb, shape, segments]
if msg:
msgs.append(msg)
pbar.desc = f"{desc} {nf} images, {nm + ne} backgrounds, {nc} corrupt"
pbar.close()
if msgs:
LOGGER.info("\n".join(msgs))
if nf == 0:
LOGGER.warning(f"{prefix}WARNING ⚠️ No labels found in {path}. {HELP_URL}")
x["hash"] = get_hash(self.label_files + self.im_files)
x["results"] = nf, nm, ne, nc, len(self.im_files)
x["msgs"] = msgs # warnings
x["version"] = self.cache_version # cache version
try:
np.save(path, x) # save cache for next time
path.with_suffix(".cache.npy").rename(path) # remove .npy suffix
LOGGER.info(f"{prefix}New cache created: {path}")
except Exception as e:
LOGGER.warning(f"{prefix}WARNING ⚠️ Cache directory {path.parent} is not writeable: {e}") # not writeable
return x
pool.imap(verify_image_label, zip(self.im_files, self.label_files, repeat(prefix)))
- 执行
pool.imap
返回result作为一个迭代器,然后通过tqdm包装为进度条。 - pool.imap为每个可迭代的数据执行
verify_image_label
优缺点
- 优点:
- 按需加载结果: pool.imap 不会一次性把所有任务结果加载到内存中,因此适合处理大规模数据。
- 顺序输出: 结果按输入顺序返回,即使某些任务耗时较短,也会等待前面的任务完成后输出。
- 缺点:
- 等待顺序: 如果某些任务很耗时,后面的任务即使已经完成,也需要等待之前任务完成后才能返回。
适用场景
- 任务耗时分布均匀: 每个任务的执行时间差不多。
- 结果需要按输入顺序返回: 顺序重要时使用。
- 内存有限: 可以逐步处理大规模数据
4. 对比
4.1 apply_async 和 map_async对比
apply_async
和 map_async
都是Python multiprocessing
模块中的函数,用于在进程池中异步地执行任务。它们的主要区别在于它们处理任务的方式和适用场景。
- 灵活性:
apply_async
更灵活因为它允许你为每个任务传递不同的参数。而 map_async 则将同一个函数应用于迭代器的每个元素。 - 结果收集:使用 map_async 可以更方便地收集所有任务的结果,因为它们会作为一个列表返回。而使用 apply_async 时,你需要为每个任务单独处理结果。
- 适用性:如果你的任务是独立的并且参数不同,使用 apply_async。如果你需要对一个数据集合中的每个元素执行相同的操作,使用 map_async 更合适。
4.2 pool.imap 和 map_async对比
pool.imap 和 pool.map_async
都是 Python 中 multiprocessing.Pool
提供的并行任务处理方法,但它们在执行流程、返回结果的方式和适用场景上有所不同。
4.2.1 pool.imap
-
以迭代器(generator)的形式逐步返回结果。
-
按输入顺序分发任务,并且按输入顺序返回结果,每处理完一项任务就立即生成一个结果。
优点:
- 逐步返回结果,可以节省内存。
- 按输入顺序生成结果,保证结果顺序一致性。
缺点:
- 因为需要按顺序返回结果,所以即使后面的任务先完成,也需要等待前面的任务
示例代码
from multiprocessing import Pool
def square(x):
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.imap(square, range(10)) # 返回一个生成器
for result in results: # 按顺序逐步获取结果
print(result)
输出(按顺序返回结果):
0
1
4
9
16
...
81
4.2 pool.map_async
-
返回一个 AsyncResult 对象,可以通过 .get() 获取所有结果。
-
所有任务会被分发到子进程池中,并行执行。任务完成后,所有
结果一次性
返回(与内置的 map 类似,但是异步执行
)。
优点:
-
无需逐步消费结果,可选择等待所有任务完成后统一获取。
-
可以在结果未完成前做其他事情(异步优势)。
缺点: -
会占用更多内存,因为结果会在所有任务完成后一次性返回。
示例代码
from multiprocessing import Pool
def square(x):
return x ** 2
if __name__ == "__main__":
with Pool(4) as pool:
async_result = pool.map_async(square, range(10)) # 异步提交任务
print("Doing other tasks while waiting for results...")
results = async_result.get() # 获取所有结果(阻塞,等待完成)
print(results)
输出(一次性返回结果):
Doing other tasks while waiting for results...
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
对比总结
5. 案例
案例1
process_num = 10
data_list =[] # 可迭代对象
with Pool(processes=process_num) as pool:
r = pool.map_async(process_, args_list)
return_list = r.get()
案例2
import multiprocessing
from multiprocessing import Pool
from tqdm import tqdm
# 多线程处理的函数
# 注意process函数需要有返回值,如果没有返回值可以 return None
def process_(x):
return x ** 2
def run_with_multiprocessing():
process_num = 10 # 设置进程数
data_list = [4,6,7,8] # 一个可迭代的数据
# 使用进程池执行
with Pool(processes=process_num) as pool:
# tqdm 显示任务进度
results = list(tqdm(pool.imap(process_, data_list), total=len(data_list )))
案例3
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
def run_with_multithreading():
process_num = 10 # 设置线程数
data_list = [4,6,7,8]
results = []
with ThreadPoolExecutor(max_workers=process_num) as executor:
# 提交任务
future_to_args = {
executor.submit(process_,data ) for args in data_list
}
# tqdm 显示任务进度
for future in tqdm(as_completed(future_to_args), total=len(future_to_args)):
try:
results.append(future.result())
except Exception as e:
print(f"Task failed: {e}")
return results
注意多线程执行的函数,需要有返回值
,如果没有返回值,直接Return None
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)