Multiprocessing是一个类似于threading模块的生成多进程的包,这个包提供了本地和远程的进程并发。使用multiprocessing能够有效的解决python因为在GIL(全局解释锁)下在CPU密集型任务中的瓶颈问题,允许使用多核处理器来运行python脚本程序。官方介绍https://docs.python.org/2/library/multiprocessing.html。
multiprocessing生成进程大致是一下的流程:
1.通过 multiprocessing.Process生成进程对象
2.调用start()方法启动进程
3.调用join()方法,阻塞主进程知道 子进程执行完毕
下面实例代码展示如何通过multiprocessing创建子进程执行任务
# -*- coding: utf-8 -*- import multiprocessing import os def func(m): print("called function in process : %s , process id is %s" % (str(m), str(os.getpid()))) if __name__ == '__main__': ProcessJob = [] for i in range(5): # 依次开启5个进程 p = multiprocessing.Process(target=func, args=(i,)) ProcessJob.append(p) p.start() # 启动进程 p.join() # 阻塞进程直至 当前进程中的任务完成
结果:
进程的命名
对与每个进程在创建的时候,可以通过name参数对进程名称进行命名 例如
# -*- coding: utf-8 -*- import multiprocessing import os def func(m): name = multiprocessing.current_process().name print("current process name is " + name) print("called function in process : %s , process id is %s" % (str(m), str(os.getpid()))) if __name__ == '__main__': ProcessJob = [] for i in range(5): # 依次开启5个进程 p = multiprocessing.Process(target=func,name='PROCESS_NAME_' + str(i), args=(i,)) ProcessJob.append(p) p.start() # 启动进程 p.join() # 阻塞进程直至 当前进程中的任务完成
在子类中使用进程
在子类使用进程,可以使类继承自multiprocessing.Process达到效果
# -*- coding: utf-8 -*- import multiprocessing import os class MyProcess(multiprocessing.Process): def run(self): # 重写 run函数 print("called rub method in process: %s, process id is : %s" % (multiprocessing.current_process().name, os.getpid())) if __name__ == '__main__': jobs = [] for i in range(5): p = MyProcess() # 创建对象 jobs.append(p) p.start() # 启动进程对象 p.join() # 阻塞进程对象
结果:
called rub method in process: MyProcess-1, process id is : 1524
called rub method in process: MyProcess-2, process id is : 1525
called rub method in process: MyProcess-3, process id is : 1526
called rub method in process: MyProcess-4, process id is : 1527
called rub method in process: MyProcess-5, process id is : 1528
进程间如何交换数据对象
多进程间进行交换数据, multiprocessing提供了两个数据通道,一个是队列,一个是通道。
1. 通过队列进行数据交换
创建一个进程共享的队列,这个共享队列是线程安全与进程安全的 在操作该共享队列的时候不需要通过锁来保持访问的安全性
通过multiprocessing.Queue()来创建共享队列。
2.通过管道进行数据交换
管道创建之后会返回一对连接对象, 每个对象都有send/receive 方法 , 实现了进程间的通信
如下例子:
# -*- coding: utf-8 -*- import multiprocessing import os def create_items(pipe): output_pipe , _ = pipe for item in range(10): output_pipe.send(item) output_pipe.close() def multiply_items(pipe_1, pipe_2): close, input_pipe = pipe_1 close.close() output_pipe, _ = pipe_2 try: while True: item = input_pipe.recv() output_pipe.send(item*item) except EOFError as err: output_pipe.close() if __name__ == '__main__': pipe_1 = multiprocessing.Pipe(True) # 创建 通道 process_pipe_1 = multiprocessing.Process(target= create_items, args=(pipe_1, )) #创建进程1 给通道1里载入数据 process_pipe_1.start() pipe_2 = multiprocessing.Pipe(True) # 创建通道2 process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2, )) #将通道1里的数据接收之后 放入 通道2 process_pipe_2.start()
pipe_1[0].close()
pipe_2[0].close()
try:
while True:
print(pipe_2[1].recv())
except EOFError as err:
print("end")
进程的同步
当多个进程协同来完成一个任务的时候,多个进程需要共享数据, 按照一定的顺序来访问数据完成任务,需要用到同步原语来控制获取数据的顺序,保证任务的正常进行。
1.Lock:使用acquire和release来管理共享资源的访问
2. 事件:实现了进程间的简单通信, 一个进程发出事件 ,其他一个进程或多个进程响应事件。通过Event对象来实现
3.信号量: 用越共享的公共资源。例如一个资源支持同时访问的的数量。
4.屏障: 使所有的涉及屏障的进程到达某一条件后,在执行,屏障之前和之后的代码不能够并行执行
详细可参见 https://docs.python.org/2/library/multiprocessing.html#multiprocessing.managers.SyncManager
multiprocessing中的managers
这个managers是multiprocessing提供的一种管理器,用来协调多进程之间的共享信息,可以用来实现分布式进程之间的数据共享,协作完成任务。
例如,我们在 一台机器上的任务队列中发布任务, 远程进程获取 任务,执行完任务之后再将输入回放到结果队列, 不仅仅是队列 ,还可以是字典,list等其他的数据结构
下面是一个 队列的例子:
server端:
# -*- coding: utf-8 -*- import multiprocessing from multiprocessing.managers import BaseManager import random, time import queue # 继承自BaseManager 用于注册 下面的两个队列 class QueueManager(BaseManager): pass task_queue = queue.Queue() # 申明两个队列 一个任务队列 一个结果队列 result_queue = queue.Queue() def return_task_queue(): global task_queue return task_queue def return_result_queue(): global result_queue return result_queue if __name__ == '__main__': QueueManager.register('get_task_from_queue', callable=return_task_queue)# 注册两个队列 QueueManager.register('put_result_to_queue', callable=return_result_queue) # 绑定端口以及验证码(window平台下需要写127.0.0.1, linux下可以为空 或者0.0.0.0, authkey 不能直接填字符串 会提示没有正确的编码 可以用b'abc' 的方式 或者'abc'.encode('utf-8')) mng = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc') # 启动 mng.start() task = mng.get_task_from_queue() result = mng.put_result_to_queue() # 在任务队列中放入任务 for i in range(10): n = random.randint(0, 1000) print("had put task into task_queue %s" % str(n)) task.put(n) # 等待 任务完成之后放入 结果队列 取出打印 while True: if result.empty(): print("i am wait for result") time.sleep(1) else: rep = result.get() print("had get result from result_queue %s" % str(rep)) # mng.shutdown()
client端:
# -*- coding: utf-8 -*- import time, sys, queue import math from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass task_queue = queue.Queue() result_queue = queue.Queue() def return_task_queue(): global task_queue return task_queue def return_result_queue(): global result_queue return result_queue if __name__ == "__main__": server_addr = '127.0.0.1' QueueManager.register('get_task_from_queue', callable=return_task_queue) QueueManager.register('put_result_to_queue', callable=return_result_queue) mng = QueueManager(address=(server_addr, 5000), authkey=b'abc') mng.connect() task_get = mng.get_task_from_queue() result_put = mng.put_result_to_queue() while True: if task_get.empty(): print("wait for task") time.sleep(1) else: m = task_get.get(timeout = 1) rep = math.sqrt(m) time.sleep(1) result_put.put(str(rep))
multiprocessing中的进程池
进程的频繁 创建和销毁时非常耗费资源的 , multiprocessing.Pool提供给用户一个常驻的进程池,当有任务来临时,有空闲的进程则执行任务,没有空闲的任务的时候,等待进程池中有空闲进程后,分配空闲进程给该任务执行
# -*- coding: utf-8 -*- import multiprocessing import time import random def func_square(x): print("process is exc %s", multiprocessing.current_process().name) time.sleep(2) return x*x if __name__ == "__main__": res = [] pool = multiprocessing.Pool(4) for i in range(10): res.append(pool.apply_async(func=func_square, args=(random.randint(0, 100), ))) # 异步执行 pool.close() pool.join() print("result is:") for r in res: print(r.get())
代码中的 Pool.apply_async是并行执行 Pool.apply是阻塞的同步执行, 类似的还有Pool.map, Pool.map_async.
所有评论(0)