方法一:使用threading

在threading中,并没有实现返回值的方法,我们可以用数据库或者是全局变量来实现返回值的获取。这里使用的是全局变量。

from threading import Thread
from concurrent.futures import ThreadPoolExecutor, as_completed
import Queue
import time
q = Queue.Queue()

def thread_function(age):
    for i in age:
        i += 1
        q.put(
            {
                'age': i
             }
        )
def run_threading(target, args, count):
    """
    :param target: 目标函数
    :param args: 函数参数
    :param count: 线程数量
    """
    ts = []
    for i in range(count):
        t = Thread(target=target, args=args)
        ts.append(t)
    [i.start() for i in ts]
    [i.join() for i in ts]

if __name__ == '__main__':
    ages = [1, 3, 4]
    # 1111
    run_threading(thread_function, (ages,), 1)
    output = []
    while not q.empty():
        output.append(q.get())
    print output

输出:

[{'age': 2}, {'age': 4}, {'age': 5}]

Process finished with exit code 0

方法二:使用ThreadPoolExecutor的submit

从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutorProcessPoolExecutor两个类,实现了对threadingmultiprocessing的进一步抽象。这里主要关注线程池,不仅可以帮我们自动调度线程,还可以做到:

  1. 主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
  2. 当一个线程完成的时候,主线程能够立即知道。
  3. 让多线程和多进程的编码接口一致。
def thread_function(age):
    return age+1

def run_thread_pool_sub(target, args, max_work_count=3):
    with ThreadPoolExecutor(max_workers=max_work_count) as t:
        res = [t.submit(target, i) for i in args]
        return res
if __name__ == '__main__':
    ages = [1, 3, 4]
    res = run_thread_pool_sub(thread_function, ages)
    for future in as_completed(res):
        data = future.result()
        print data

as_completed()方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出,先完成的任务会先通知主线程
输出:

4
2
5
Process finished with exit code 0

这里可以看出submit的返回是无序的

这里看下源码:

    def submit(self, fn, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)

            self._work_queue.put(w)
            self._adjust_thread_count()
            return f
    submit.__doc__ = _base.Executor.submit.__doc__

注意的是,它就是执行一个单独的函数,并且返回的是future对象(具体请看官方文档)。

方法三:使用ThreadPoolExecutor的map

def thread_function(age):
    for i in age:
        yield i+1

def run_thread_pool(target, args, max_work_count=6):
    with ThreadPoolExecutor(max_workers=max_work_count) as t:
        res = t.map(target, args)
        return res

if __name__ == '__main__':
    ages = [1, 3, 4]
    # 2222
    res = run_thread_pool(target=thread_function, args=(ages,))
    for j in res:
        for i in j:
            print i

输出:

2
4
5

Process finished with exit code 0

这里看出map的输出是有序的

这里看下map的源码:

     def map(self, fn, *iterables, **kwargs):
        """Returns an iterator equivalent to map(fn, iter).

        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.

        Returns:
            An iterator equivalent to: map(func, *iterables) but the calls may
            be evaluated out-of-order.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If fn(*args) raises for any values.
        """
        timeout = kwargs.get('timeout')
        if timeout is not None:
            end_time = timeout + time.time()

        fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)]

        # Yield must be hidden in closure so that the futures are submitted
        # before the first iterator value is required.
        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()
                    else:
                        yield fs.pop().result(end_time - time.time())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

它的参数是个iterables,所以当参数为列表字典等时,只需要写一个map函数就行了,而且它的返回值也是个iterable。
以上~

参考:[python] ThreadPoolExecutor线程池

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐