python 实现多线程并返回函数返回值的三种方法
方法一:使用threading在threading中,并没有实现返回值的方法,我们可以用数据库或者是全局变量来实现返回值的获取。这里使用的是全局变量。def thread_function(age):for i in age:i += 1q.put({'age': i...
方法一:使用threading
在threading中,并没有实现返回值的方法,我们可以用数据库或者是全局变量来实现返回值的获取。这里使用的是全局变量。
from threading import Thread
from concurrent.futures import ThreadPoolExecutor, as_completed
import Queue
import time
q = Queue.Queue()
def thread_function(age):
for i in age:
i += 1
q.put(
{
'age': i
}
)
def run_threading(target, args, count):
"""
:param target: 目标函数
:param args: 函数参数
:param count: 线程数量
"""
ts = []
for i in range(count):
t = Thread(target=target, args=args)
ts.append(t)
[i.start() for i in ts]
[i.join() for i in ts]
if __name__ == '__main__':
ages = [1, 3, 4]
# 1111
run_threading(thread_function, (ages,), 1)
output = []
while not q.empty():
output.append(q.get())
print output
输出:
[{'age': 2}, {'age': 4}, {'age': 5}]
Process finished with exit code 0
方法二:使用ThreadPoolExecutor的submit
从Python3.2开始,标准库为我们提供了concurrent.futures
模块,它提供了ThreadPoolExecutor
和ProcessPoolExecutor
两个类,实现了对threading
和multiprocessing
的进一步抽象。这里主要关注线程池,不仅可以帮我们自动调度线程,还可以做到:
- 主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
- 当一个线程完成的时候,主线程能够立即知道。
- 让多线程和多进程的编码接口一致。
def thread_function(age):
return age+1
def run_thread_pool_sub(target, args, max_work_count=3):
with ThreadPoolExecutor(max_workers=max_work_count) as t:
res = [t.submit(target, i) for i in args]
return res
if __name__ == '__main__':
ages = [1, 3, 4]
res = run_thread_pool_sub(thread_function, ages)
for future in as_completed(res):
data = future.result()
print data
as_completed()
方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出,先完成的任务会先通知主线程
输出:
4
2
5
Process finished with exit code 0
这里可以看出submit的返回是无序的
这里看下源码:
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
注意的是,它就是执行一个单独的函数,并且返回的是future对象(具体请看官方文档)。
方法三:使用ThreadPoolExecutor的map
def thread_function(age):
for i in age:
yield i+1
def run_thread_pool(target, args, max_work_count=6):
with ThreadPoolExecutor(max_workers=max_work_count) as t:
res = t.map(target, args)
return res
if __name__ == '__main__':
ages = [1, 3, 4]
# 2222
res = run_thread_pool(target=thread_function, args=(ages,))
for j in res:
for i in j:
print i
输出:
2
4
5
Process finished with exit code 0
这里看出map的输出是有序的
这里看下map的源码:
def map(self, fn, *iterables, **kwargs):
"""Returns an iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
timeout = kwargs.get('timeout')
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)]
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
try:
# reverse to keep finishing order
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result()
else:
yield fs.pop().result(end_time - time.time())
finally:
for future in fs:
future.cancel()
return result_iterator()
它的参数是个iterables,所以当参数为列表字典等时,只需要写一个map函数就行了,而且它的返回值也是个iterable。
以上~
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)