1. apply_async

pool.apply_async 是 Python 中 multiprocessing 模块的一部分,用于异步地执行一个函数。当你使用 apply_async 方法时,它会立即返回一个 AsyncResult 对象,而不是等待函数执行完成。这允许你继续执行程序的其他部分,而不必等待函数执行完成。

apply_async适合用于各个进程之间及结果互不影响,比如大批量处理数据的场景,能显著提升效率。

from multiprocessing import Pool

def square(x):
    return x * x

if __name__ == '__main__':
    with Pool(4) as p:  # 创建一个有4个进程的进程池
        result = p.apply_async(square, (10,))  # 异步执行square函数
        print(result.get())  # 获取执行结果

在这个例子中,square 函数被异步地执行,并且我们可以通过调用 AsyncResult 对象的 get 方法来获取结果。get 方法会阻塞,直到结果可用。

apply_async 还可以接受一个 callback 参数,这是一个在任务完成时会被调用的函数

def my_callback(result):
    print("Result: ", result)

result = p.apply_async(square, (10,), callback=my_callback)

在这个例子中,当 square 函数执行完成后,my_callback 函数会被调用,并且执行结果会作为参数传递给 my_callback。

使用 apply_async 可以有效地利用多核处理器,提高程序的执行效率。

使用 apply_async 方法并行处理大量数据通常涉及以下几个步骤:

  • 1.定义工作函数:这个函数将对单个数据项进行处理。它应该能够接受一个参数,因为 apply_async 会将数据项作为单个参数传递给这个函数。

  • 2.创建进程池:使用 multiprocessing.Pool 创建一个进程池,你可以根据你的机器的CPU核心数来决定进程池的大小。

  • 3.使用 apply_async 提交任务:对于数据集中的每个数据项,使用 apply_async 将工作函数和数据项提交给进程池。这会异步地执行工作函数。

  • 4.收集结果:对于每个提交的任务,你可以使用返回的 AsyncResult 对象的 get 方法来获取结果,或者使用 map_async 方法来简化结果收集过程。

  • 5.关闭进程池:在所有任务提交后,使用 close 方法关闭进程池,这会阻止更多的任务提交。然后使用 join 方法等待所有进程完成。

下面是一个处理大量数据的示例:

from multiprocessing import Pool

# 定义工作函数
def process_data(data_item):
    # 这里是处理数据的逻辑
    result = data_item * 2  # 假设的处理逻辑
    return result

if __name__ == '__main__':
    # 创建一个进程池
    with Pool(4) as pool:
        # 假设我们有大量数据需要处理
        data = [1, 2, 3, 4, 5, ...]  # 这里只是示例,实际数据可能来自文件或数据库

        # 使用 apply_async 提交任务
        results = [pool.apply_async(process_data, (item,)) for item in data]

        # 收集结果
        processed_data = [result.get() for result in results]

        # 打印处理后的数据
        print(processed_data)

    # 进程池会自动关闭

在这个例子中,我们定义了一个 process_data 函数来处理单个数据项。我们创建了一个进程池,并为数据集中的每个数据项提交了一个任务。然后我们收集了所有任务的结果,并打印了处理后的数据。

如果你的数据量非常大,你可能会考虑使用 pool.map_async 来简化代码,它会自动处理任务的提交和结果的收集:

from multiprocessing import Pool

def process_data(data_item):
    return data_item * 2

if __name__ == '__main__':
    with Pool(4) as pool:
        data = [1, 2, 3, 4, 5, ...]  # 大量数据
        processed_data = pool.map_async(process_data, data).get()

    print(processed_data)

在这个简化的例子中,map_async 接受工作函数和数据列表,返回一个 AsyncResult 对象,我们可以通过调用 get 方法来获取所有处理后的数据。这种方法更简洁,但在某些情况下可能不如单独使用 apply_async 灵活。

2. map_async

  • 功能map_async 是 map 函数的异步版本`,它将一个函数应用于一个迭代器的每个元素。
  • 使用场景: 当你有一个迭代器(如列表或元组)并且需要对其中的每个元素应用同一个函数时,使用 map_async 是合适的。它适用于批量处理相似任务的场景。
  • 结果处理: map_async 返回一个 AsyncResult 对象,你可以通过调用 get() 方法来获取所有任务的结果,这些结果通常以列表的形式返回。
from multiprocessing import Pool

def square(x):
    return x * x

if __name__ == '__main__':
    with Pool(4) as pool:
        result = pool.map_async(square, range(5))
        print(result.get())  # 获取结果

3 pool.imap

pool.imap 是 Python multiprocessing.Pool 中的一种方法,用于在进程池中分布式地逐步处理输入数据序列,并逐项返回结果。它的行为类似于内置函数 map,但以迭代器的形式顺序返回结果,支持逐步消费。

案例1

from multiprocessing.pool import Pool, ThreadPool
from threading import Thread
results = ThreadPool(NUM_THREADS).imap(fn, iter_data)  # fn 是函数名, iter_data 可迭代数据
for result in results:
    print(result)

案例2
https://github1s.com/ultralytics/yolov5/blob/master/utils/dataloaders.py

 def cache_labels(self, path=Path("./labels.cache"), prefix=""):
     """Caches dataset labels, verifies images, reads shapes, and tracks dataset integrity."""
     x = {}  # dict
     nm, nf, ne, nc, msgs = 0, 0, 0, 0, []  # number missing, found, empty, corrupt, messages
     desc = f"{prefix}Scanning {path.parent / path.stem}..."
     with Pool(NUM_THREADS) as pool:
         pbar = tqdm(
             pool.imap(verify_image_label, zip(self.im_files, self.label_files, repeat(prefix))),
             desc=desc,
             total=len(self.im_files),
             bar_format=TQDM_BAR_FORMAT,
         )
         for im_file, lb, shape, segments, nm_f, nf_f, ne_f, nc_f, msg in pbar:
             nm += nm_f
             nf += nf_f
             ne += ne_f
             nc += nc_f
             if im_file:
                 x[im_file] = [lb, shape, segments]
             if msg:
                 msgs.append(msg)
             pbar.desc = f"{desc} {nf} images, {nm + ne} backgrounds, {nc} corrupt"

     pbar.close()
     if msgs:
         LOGGER.info("\n".join(msgs))
     if nf == 0:
         LOGGER.warning(f"{prefix}WARNING ⚠️ No labels found in {path}. {HELP_URL}")
     x["hash"] = get_hash(self.label_files + self.im_files)
     x["results"] = nf, nm, ne, nc, len(self.im_files)
     x["msgs"] = msgs  # warnings
     x["version"] = self.cache_version  # cache version
     try:
         np.save(path, x)  # save cache for next time
         path.with_suffix(".cache.npy").rename(path)  # remove .npy suffix
         LOGGER.info(f"{prefix}New cache created: {path}")
     except Exception as e:
         LOGGER.warning(f"{prefix}WARNING ⚠️ Cache directory {path.parent} is not writeable: {e}")  # not writeable
     return x
  pool.imap(verify_image_label, zip(self.im_files, self.label_files, repeat(prefix)))
  • 执行pool.imap 返回result作为一个迭代器,然后通过tqdm包装为进度条。
  • pool.imap为每个可迭代的数据执行verify_image_label

优缺点

  • 优点:
    • 按需加载结果: pool.imap 不会一次性把所有任务结果加载到内存中,因此适合处理大规模数据。
    • 顺序输出: 结果按输入顺序返回,即使某些任务耗时较短,也会等待前面的任务完成后输出。
  • 缺点:
    • 等待顺序: 如果某些任务很耗时,后面的任务即使已经完成,也需要等待之前任务完成后才能返回。

适用场景

  • 任务耗时分布均匀: 每个任务的执行时间差不多。
  • 结果需要按输入顺序返回: 顺序重要时使用。
  • 内存有限: 可以逐步处理大规模数据

4. 对比

4.1 apply_async 和 map_async对比

apply_async map_async 都是Python multiprocessing模块中的函数,用于在进程池中异步地执行任务。它们的主要区别在于它们处理任务的方式和适用场景。

  • 灵活性:apply_async 更灵活因为它允许你为每个任务传递不同的参数。而 map_async 则将同一个函数应用于迭代器的每个元素。
  • 结果收集:使用 map_async 可以更方便地收集所有任务的结果,因为它们会作为一个列表返回。而使用 apply_async 时,你需要为每个任务单独处理结果。
  • 适用性:如果你的任务是独立的并且参数不同,使用 apply_async。如果你需要对一个数据集合中的每个元素执行相同的操作,使用 map_async 更合适。

4.2 pool.imap 和 map_async对比

pool.imap 和 pool.map_async 都是 Python 中 multiprocessing.Pool 提供的并行任务处理方法,但它们在执行流程、返回结果的方式和适用场景上有所不同。

4.2.1 pool.imap

  • 以迭代器(generator)的形式逐步返回结果。

  • 按输入顺序分发任务,并且按输入顺序返回结果,每处理完一项任务就立即生成一个结果。

优点:

  • 逐步返回结果,可以节省内存。
  • 按输入顺序生成结果,保证结果顺序一致性。

缺点:

  • 因为需要按顺序返回结果,所以即使后面的任务先完成,也需要等待前面的任务

示例代码

from multiprocessing import Pool

def square(x):
    return x ** 2

if __name__ == "__main__":
    with Pool(4) as pool:
        results = pool.imap(square, range(10))  # 返回一个生成器
        for result in results:  # 按顺序逐步获取结果
            print(result)

输出(按顺序返回结果):

0
1
4
9
16
...
81

4.2 pool.map_async

  • 返回一个 AsyncResult 对象,可以通过 .get() 获取所有结果。

  • 所有任务会被分发到子进程池中,并行执行。任务完成后,所有结果一次性返回(与内置的 map 类似,但是异步执行)。

优点:

  • 无需逐步消费结果,可选择等待所有任务完成后统一获取。

  • 可以在结果未完成前做其他事情(异步优势)。
    缺点:

  • 会占用更多内存,因为结果会在所有任务完成后一次性返回。

示例代码

from multiprocessing import Pool

def square(x):
    return x ** 2

if __name__ == "__main__":
    with Pool(4) as pool:
        async_result = pool.map_async(square, range(10))  # 异步提交任务
        print("Doing other tasks while waiting for results...")

        results = async_result.get()  # 获取所有结果(阻塞,等待完成)
        print(results)

输出(一次性返回结果):

Doing other tasks while waiting for results...
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

对比总结
在这里插入图片描述

5. 案例

  • 案例1
 process_num  = 10
 data_list =[]  # 可迭代对象
 with Pool(processes=process_num) as pool:
     r =  pool.map_async(process_, args_list)
     return_list = r.get()
  • 案例2
import multiprocessing
from multiprocessing import Pool
from tqdm import tqdm

# 多线程处理的函数
# 注意process函数需要有返回值,如果没有返回值可以  return None
def process_(x):
    return x ** 2


def run_with_multiprocessing():
    process_num = 10  # 设置进程数
    data_list = [4,6,7,8]  # 一个可迭代的数据
    # 使用进程池执行
    with Pool(processes=process_num) as pool:
        # tqdm 显示任务进度
        results = list(tqdm(pool.imap(process_, data_list), total=len(data_list )))

  • 案例3
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

def run_with_multithreading():
    process_num = 10  # 设置线程数
	
	data_list = [4,6,7,8]
    results = []
    with ThreadPoolExecutor(max_workers=process_num) as executor:
        # 提交任务
        future_to_args = {
            executor.submit(process_,data ) for args in data_list 
        }

        # tqdm 显示任务进度
        for future in tqdm(as_completed(future_to_args), total=len(future_to_args)):
            try:
                results.append(future.result())
            except Exception as e:
                print(f"Task failed: {e}")

    return results

注意多线程执行的函数,需要有返回值,如果没有返回值,直接Return None

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐