threading
使用 Python 线程池ThreadPoolExecutor
的map
方法的完整示例,展示了如何并行处理多个任务并收集结果:¶
import time
from concurrent.futures import ThreadPoolExecutor
def process_task(task_id):
"""模拟耗时任务,返回任务ID的平方"""
print(f"处理任务 {task_id}...")
time.sleep(1) # 模拟1秒的处理时间
return task_id ** 2
def main():
# 创建包含10个工作线程的线程池
with ThreadPoolExecutor(max_workers=10) as executor:
# 生成15个任务ID(0-14)
task_ids = range(15)
# 使用map方法并行处理所有任务
# 结果将按照任务提交的顺序返回
results = executor.map(process_task, task_ids)
# 打印每个任务的处理结果
for task_id, result in zip(task_ids, results):
print(f"任务 {task_id} 的结果: {result}")
if __name__ == "__main__":
start_time = time.time()
main()
print(f"所有任务处理完成,耗时: {time.time() - start_time:.2f} 秒")
这个示例中:
process_task
函数模拟了一个耗时 1 秒的任务,返回任务 ID 的平方- 使用
ThreadPoolExecutor
创建了一个包含 10 个工作线程的线程池 - 通过
map
方法提交了 15 个任务(任务 ID 从 0 到 14) - 主线程会按照任务提交的顺序打印结果
- 程序最后输出总耗时(约 1.5 秒,因为 15 个任务由 10 个线程并行处理)
运行这段代码,你将看到任务被并行处理,并且结果按顺序返回。如果要调整并行度,可以修改max_workers
参数;如果需要异步获取结果,可以考虑使用submit
方法和Future
对象。
使用 Python ThreadPoolExecutor
的 submit
方法的完整示例,展示了如何异步提交多个任务并处理结果:¶
import time
from concurrent.futures import ThreadPoolExecutor
def process_task(task_id):
"""模拟耗时任务,返回任务ID的平方"""
print(f"开始处理任务 {task_id}(线程: {time.threading.get_ident()})")
time.sleep(1) # 模拟1秒的处理时间
print(f"完成任务 {task_id}")
return task_id ** 2
def main():
# 创建包含3个工作线程的线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交10个任务(任务ID从0到9)
# submit方法返回Future对象,代表异步执行的任务
futures = [executor.submit(process_task, i) for i in range(10)]
print("\n所有任务已提交,等待完成...\n")
# 按完成顺序处理结果(使用as_completed)
from concurrent.futures import as_completed
for future in as_completed(futures):
try:
task_id = futures.index(future) # 获取对应的任务ID
result = future.result() # 获取任务结果(如果任务失败会抛出异常)
print(f"任务 {task_id} 的结果: {result}")
except Exception as e:
print(f"任务 {task_id} 失败: {e}")
if __name__ == "__main__":
start_time = time.time()
main()
print(f"\n所有任务处理完成,耗时: {time.time() - start_time:.2f} 秒")
这个示例中:
process_task
函数模拟了一个耗时 1 秒的任务,返回任务 ID 的平方- 使用
ThreadPoolExecutor
创建了一个包含 3 个工作线程的线程池 - 通过
submit
方法异步提交了 10 个任务,并收集Future
对象列表 - 使用
as_completed
迭代器按完成顺序获取任务结果 - 程序最后输出总耗时(约 3.5 秒,因为 10 个任务由 3 个线程分批次处理)
与 map
方法的主要区别:
submit
支持异步提交不同参数的任务- 返回
Future
对象,可以检查任务状态(pending/done) - 可以通过
as_completed
按完成顺序获取结果(而非提交顺序) - 支持异常处理(通过
future.result()
捕获)
运行这段代码,你将看到任务按批次并行处理,结果按完成顺序输出。如果需要调整并行度,可以修改 max_workers
参数;如果需要按提交顺序获取结果,可以直接遍历 futures
列表并调用 future.result()
。
在性能上,ThreadPoolExecutor.map()
和 ThreadPoolExecutor.submit()
本身的线程调度开销是相同的,因为它们都基于同一个线程池实现。但在实际使用中,它们的性能表现可能因以下因素而有所不同:¶
1. 结果获取方式¶
map()
:按任务提交顺序返回结果,即使某些任务提前完成,也需要等待前面的任务返回后才能处理。如果任务执行时间差异较大,可能导致不必要的等待。submit()
+as_completed()
:按任务完成顺序返回结果,可以立即处理已完成的任务,避免等待。对于执行时间不均匀的任务,这种方式通常更高效。
示例对比: 假设任务 A 耗时 1 秒,任务 B 耗时 5 秒:
- 使用
map()
时,必须等待 B 完成后才能获取 A 的结果。 - 使用
submit()
+as_completed()
时,A 完成后立即返回结果,无需等待 B。
2. 异常处理¶
map()
:如果某个任务抛出异常,会立即停止迭代并抛出异常,后续任务可能不会继续执行。submit()
:异常会被封装在Future
中,直到调用future.result()
时才会抛出,不会影响其他任务的执行。
性能影响:如果任务可能抛出异常且需要继续处理其他任务,submit()
更可靠,避免因单个任务失败导致整个流程中断。
3. 参数传递灵活性¶
map()
:适用于批量处理相同函数、不同参数的任务,但参数必须是可迭代对象(如列表),且需要预先准备好所有参数。submit()
:可以动态提交不同参数甚至不同函数的任务,适合动态生成任务的场景(如根据网络请求结果提交新任务)。
性能影响:如果任务参数需要动态生成或依赖外部资源,submit()
的灵活性可能减少等待时间。
4. 内存占用¶
map()
:需要预先准备所有参数,可能占用更多内存(尤其是参数列表很大时)。submit()
:可以逐个提交任务,内存占用更可控。
性能建议¶
- 优先使用
map()
的场景: - 任务执行时间均匀。
- 需要按顺序处理结果。
- 参数可以预先批量生成。
- 优先使用
submit()
的场景: - 任务执行时间差异大。
- 需要立即处理完成的任务。
- 需要动态提交任务或处理异常。
- 需要更高的灵活性(如不同任务使用不同函数)。
示例测试¶
以下是一个对比 map()
和 submit()
性能的简单测试代码:
import time
from concurrent.futures import ThreadPoolExecutor
def task(n):
time.sleep(n) # 模拟不同耗时的任务
return n
def test_map():
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(task, [1, 3, 2]))
return results
def test_submit():
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, n) for n in [1, 3, 2]]
results = [future.result() for future in futures]
return results
# 测试耗时
start_map = time.time()
test_map()
print(f"map() 耗时: {time.time() - start_map:.2f}s")
start_submit = time.time()
test_submit()
print(f"submit() 耗时: {time.time() - start_submit:.2f}s")
在这个测试中,map()
和 submit()
的耗时差异可能不大,但在更复杂的场景(如任务数量多、耗时差异大)下,submit()
可能表现更优。