Multiprocessing是一个类似于threading模块的生成多进程的包,这个包提供了本地和远程的进程并发。使用multiprocessing能够有效的解决python因为在GIL(全局解释锁)下在CPU密集型任务中的瓶颈问题,允许使用多核处理器来运行python脚本程序。官方介绍https://docs.python.org/2/library/multiprocessing.html。

  multiprocessing生成进程大致是一下的流程:

  1.通过 multiprocessing.Process生成进程对象

  2.调用start()方法启动进程

  3.调用join()方法,阻塞主进程知道 子进程执行完毕 

下面实例代码展示如何通过multiprocessing创建子进程执行任务

 

# -*- coding: utf-8 -*-

import multiprocessing
import os


def func(m):
    print("called function in process : %s , process id is %s" % (str(m), str(os.getpid())))


if __name__ == '__main__':
    ProcessJob = []
    for i in range(5): # 依次开启5个进程
        p = multiprocessing.Process(target=func, args=(i,))
        ProcessJob.append(p)
        p.start() # 启动进程
        p.join() # 阻塞进程直至 当前进程中的任务完成

结果:    

  

进程的命名

对与每个进程在创建的时候,可以通过name参数对进程名称进行命名 例如

# -*- coding: utf-8 -*-

import multiprocessing
import os


def func(m):
    name = multiprocessing.current_process().name
    print("current process name is " + name)
    print("called function in process : %s , process id is %s" % (str(m), str(os.getpid())))


if __name__ == '__main__':
    ProcessJob = []
    for i in range(5): # 依次开启5个进程
        p = multiprocessing.Process(target=func,name='PROCESS_NAME_' + str(i), args=(i,))
        ProcessJob.append(p)
        p.start() # 启动进程
        p.join() # 阻塞进程直至 当前进程中的任务完成

 

 

在子类中使用进程 

  在子类使用进程,可以使类继承自multiprocessing.Process达到效果

# -*- coding: utf-8 -*-

import multiprocessing
import os


class MyProcess(multiprocessing.Process):
    def run(self): # 重写 run函数
        print("called rub method in process: %s, process id is : %s" % (multiprocessing.current_process().name, os.getpid()))


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = MyProcess() # 创建对象
        jobs.append(p)
        p.start() # 启动进程对象
        p.join() # 阻塞进程对象

结果:

called rub method in process: MyProcess-1, process id is : 1524
called rub method in process: MyProcess-2, process id is : 1525
called rub method in process: MyProcess-3, process id is : 1526
called rub method in process: MyProcess-4, process id is : 1527
called rub method in process: MyProcess-5, process id is : 1528

 

 

进程间如何交换数据对象

多进程间进行交换数据, multiprocessing提供了两个数据通道,一个是队列,一个是通道。

1. 通过队列进行数据交换 

创建一个进程共享的队列,这个共享队列是线程安全与进程安全的 在操作该共享队列的时候不需要通过锁来保持访问的安全性

通过multiprocessing.Queue()来创建共享队列。

2.通过管道进行数据交换

管道创建之后会返回一对连接对象, 每个对象都有send/receive 方法 , 实现了进程间的通信

如下例子:

# -*- coding: utf-8 -*-

import multiprocessing
import os


def create_items(pipe):
    output_pipe , _ = pipe
    for item in range(10):
        output_pipe.send(item)
    output_pipe.close()


def multiply_items(pipe_1, pipe_2):
    close, input_pipe = pipe_1
    close.close()
    output_pipe, _ = pipe_2
    try:
        while True:
            item = input_pipe.recv()
            output_pipe.send(item*item)
    except EOFError as err:
        output_pipe.close()

if __name__ == '__main__':
    pipe_1 = multiprocessing.Pipe(True) # 创建 通道
    process_pipe_1 = multiprocessing.Process(target= create_items, args=(pipe_1, )) #创建进程1 给通道1里载入数据

    process_pipe_1.start()

    pipe_2 = multiprocessing.Pipe(True) # 创建通道2

    process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2, )) #将通道1里的数据接收之后 放入 通道2

    process_pipe_2.start()
  pipe_1[0].close()
  pipe_2[0].close()
  try: 
    while True:
      print(pipe_2[1].recv())
    except EOFError as err:
      print("end")

 

进程的同步

当多个进程协同来完成一个任务的时候,多个进程需要共享数据, 按照一定的顺序来访问数据完成任务,需要用到同步原语来控制获取数据的顺序,保证任务的正常进行。

1.Lock:使用acquire和release来管理共享资源的访问

2. 事件:实现了进程间的简单通信, 一个进程发出事件 ,其他一个进程或多个进程响应事件。通过Event对象来实现

3.信号量: 用越共享的公共资源。例如一个资源支持同时访问的的数量。

4.屏障: 使所有的涉及屏障的进程到达某一条件后,在执行,屏障之前和之后的代码不能够并行执行

详细可参见 https://docs.python.org/2/library/multiprocessing.html#multiprocessing.managers.SyncManager

 

multiprocessing中的managers

这个managers是multiprocessing提供的一种管理器,用来协调多进程之间的共享信息,可以用来实现分布式进程之间的数据共享,协作完成任务。

例如,我们在 一台机器上的任务队列中发布任务, 远程进程获取 任务,执行完任务之后再将输入回放到结果队列, 不仅仅是队列 ,还可以是字典,list等其他的数据结构 

下面是一个 队列的例子:

server端:

# -*- coding: utf-8 -*-

import multiprocessing
from multiprocessing.managers import BaseManager
import random, time
import queue

# 继承自BaseManager 用于注册 下面的两个队列
class QueueManager(BaseManager):
    pass


task_queue = queue.Queue() # 申明两个队列 一个任务队列 一个结果队列
result_queue = queue.Queue()


def return_task_queue():
    global task_queue
    return task_queue


def return_result_queue():
    global result_queue
    return result_queue


if __name__ == '__main__':
    QueueManager.register('get_task_from_queue', callable=return_task_queue)#  注册两个队列
    QueueManager.register('put_result_to_queue', callable=return_result_queue)
    # 绑定端口以及验证码(window平台下需要写127.0.0.1, linux下可以为空 或者0.0.0.0, authkey 不能直接填字符串 会提示没有正确的编码 可以用b'abc' 的方式 或者'abc'.encode('utf-8'))
    mng = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
    # 启动
    mng.start()

    task = mng.get_task_from_queue()
    result = mng.put_result_to_queue()

    # 在任务队列中放入任务
    for i in range(10):
        n = random.randint(0, 1000)
        print("had put task into task_queue %s" % str(n))
        task.put(n)

    # 等待 任务完成之后放入 结果队列 取出打印
    while True:
        if result.empty():
            print("i am wait for result")
            time.sleep(1)
        else:
            rep = result.get()
            print("had get result from result_queue %s" % str(rep))


    # mng.shutdown()

client端:

# -*- coding: utf-8 -*-

import time, sys, queue
import math
from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
    pass


task_queue = queue.Queue()
result_queue = queue.Queue()


def return_task_queue():
    global task_queue
    return task_queue


def return_result_queue():
    global result_queue
    return result_queue

if __name__ == "__main__":
    server_addr = '127.0.0.1'

    QueueManager.register('get_task_from_queue', callable=return_task_queue)
    QueueManager.register('put_result_to_queue', callable=return_result_queue)

    mng = QueueManager(address=(server_addr, 5000), authkey=b'abc')
    mng.connect()

    task_get = mng.get_task_from_queue()
    result_put = mng.put_result_to_queue()

    while True:
        if task_get.empty():
            print("wait for task")
            time.sleep(1)
        else:
            m = task_get.get(timeout = 1)
            rep = math.sqrt(m)
            time.sleep(1)
            result_put.put(str(rep))

 

multiprocessing中的进程池

进程的频繁 创建和销毁时非常耗费资源的 , multiprocessing.Pool提供给用户一个常驻的进程池,当有任务来临时,有空闲的进程则执行任务,没有空闲的任务的时候,等待进程池中有空闲进程后,分配空闲进程给该任务执行

# -*- coding: utf-8 -*-

import multiprocessing
import time
import random


def func_square(x):
    print("process is exc %s", multiprocessing.current_process().name)
    time.sleep(2)
    return x*x


if __name__ == "__main__":
    res = []
    pool = multiprocessing.Pool(4)

    for i in range(10):
        res.append(pool.apply_async(func=func_square, args=(random.randint(0, 100), ))) # 异步执行 

    pool.close()
    pool.join()

    print("result is:")
    for r in res:
        print(r.get())

代码中的 Pool.apply_async是并行执行 Pool.apply是阻塞的同步执行, 类似的还有Pool.map, Pool.map_async.

 

转载于:https://www.cnblogs.com/FMS-Shaw/p/9751838.html

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐