Skip to content

threading

使用 Python 线程池ThreadPoolExecutormap方法的完整示例,展示了如何并行处理多个任务并收集结果:

import time
from concurrent.futures import ThreadPoolExecutor

def process_task(task_id):
    """模拟耗时任务,返回任务ID的平方"""
    print(f"处理任务 {task_id}...")
    time.sleep(1)  # 模拟1秒的处理时间
    return task_id ** 2

def main():
    # 创建包含10个工作线程的线程池
    with ThreadPoolExecutor(max_workers=10) as executor:
        # 生成15个任务ID(0-14)
        task_ids = range(15)

        # 使用map方法并行处理所有任务
        # 结果将按照任务提交的顺序返回
        results = executor.map(process_task, task_ids)

        # 打印每个任务的处理结果
        for task_id, result in zip(task_ids, results):
            print(f"任务 {task_id} 的结果: {result}")

if __name__ == "__main__":
    start_time = time.time()
    main()
    print(f"所有任务处理完成,耗时: {time.time() - start_time:.2f} 秒")

这个示例中:

  1. process_task函数模拟了一个耗时 1 秒的任务,返回任务 ID 的平方
  2. 使用ThreadPoolExecutor创建了一个包含 10 个工作线程的线程池
  3. 通过map方法提交了 15 个任务(任务 ID 从 0 到 14)
  4. 主线程会按照任务提交的顺序打印结果
  5. 程序最后输出总耗时(约 1.5 秒,因为 15 个任务由 10 个线程并行处理)

运行这段代码,你将看到任务被并行处理,并且结果按顺序返回。如果要调整并行度,可以修改max_workers参数;如果需要异步获取结果,可以考虑使用submit方法和Future对象。

使用 Python ThreadPoolExecutorsubmit 方法的完整示例,展示了如何异步提交多个任务并处理结果:

import time
from concurrent.futures import ThreadPoolExecutor

def process_task(task_id):
    """模拟耗时任务,返回任务ID的平方"""
    print(f"开始处理任务 {task_id}(线程: {time.threading.get_ident()})")
    time.sleep(1)  # 模拟1秒的处理时间
    print(f"完成任务 {task_id}")
    return task_id ** 2

def main():
    # 创建包含3个工作线程的线程池
    with ThreadPoolExecutor(max_workers=3) as executor:
        # 提交10个任务(任务ID从0到9)
        # submit方法返回Future对象,代表异步执行的任务
        futures = [executor.submit(process_task, i) for i in range(10)]

        print("\n所有任务已提交,等待完成...\n")

        # 按完成顺序处理结果(使用as_completed)
        from concurrent.futures import as_completed
        for future in as_completed(futures):
            try:
                task_id = futures.index(future)  # 获取对应的任务ID
                result = future.result()  # 获取任务结果(如果任务失败会抛出异常)
                print(f"任务 {task_id} 的结果: {result}")
            except Exception as e:
                print(f"任务 {task_id} 失败: {e}")

if __name__ == "__main__":
    start_time = time.time()
    main()
    print(f"\n所有任务处理完成,耗时: {time.time() - start_time:.2f} 秒")

这个示例中:

  1. process_task 函数模拟了一个耗时 1 秒的任务,返回任务 ID 的平方
  2. 使用 ThreadPoolExecutor 创建了一个包含 3 个工作线程的线程池
  3. 通过 submit 方法异步提交了 10 个任务,并收集 Future 对象列表
  4. 使用 as_completed 迭代器按完成顺序获取任务结果
  5. 程序最后输出总耗时(约 3.5 秒,因为 10 个任务由 3 个线程分批次处理)

map 方法的主要区别:

  • submit 支持异步提交不同参数的任务
  • 返回 Future 对象,可以检查任务状态(pending/done)
  • 可以通过 as_completed 按完成顺序获取结果(而非提交顺序)
  • 支持异常处理(通过 future.result() 捕获)

运行这段代码,你将看到任务按批次并行处理,结果按完成顺序输出。如果需要调整并行度,可以修改 max_workers 参数;如果需要按提交顺序获取结果,可以直接遍历 futures 列表并调用 future.result()

在性能上,ThreadPoolExecutor.map()ThreadPoolExecutor.submit() 本身的线程调度开销是相同的,因为它们都基于同一个线程池实现。但在实际使用中,它们的性能表现可能因以下因素而有所不同:

1. 结果获取方式

  • map():按任务提交顺序返回结果,即使某些任务提前完成,也需要等待前面的任务返回后才能处理。如果任务执行时间差异较大,可能导致不必要的等待。
  • submit() + as_completed():按任务完成顺序返回结果,可以立即处理已完成的任务,避免等待。对于执行时间不均匀的任务,这种方式通常更高效。

示例对比: 假设任务 A 耗时 1 秒,任务 B 耗时 5 秒:

  • 使用 map() 时,必须等待 B 完成后才能获取 A 的结果。
  • 使用 submit() + as_completed() 时,A 完成后立即返回结果,无需等待 B。

2. 异常处理

  • map():如果某个任务抛出异常,会立即停止迭代并抛出异常,后续任务可能不会继续执行。
  • submit():异常会被封装在 Future 中,直到调用 future.result() 时才会抛出,不会影响其他任务的执行。

性能影响:如果任务可能抛出异常且需要继续处理其他任务,submit() 更可靠,避免因单个任务失败导致整个流程中断。

3. 参数传递灵活性

  • map():适用于批量处理相同函数、不同参数的任务,但参数必须是可迭代对象(如列表),且需要预先准备好所有参数。
  • submit():可以动态提交不同参数甚至不同函数的任务,适合动态生成任务的场景(如根据网络请求结果提交新任务)。

性能影响:如果任务参数需要动态生成或依赖外部资源,submit() 的灵活性可能减少等待时间。

4. 内存占用

  • map():需要预先准备所有参数,可能占用更多内存(尤其是参数列表很大时)。
  • submit():可以逐个提交任务,内存占用更可控。

性能建议

  1. 优先使用 map() 的场景
  2. 任务执行时间均匀。
  3. 需要按顺序处理结果。
  4. 参数可以预先批量生成。
  5. 优先使用 submit() 的场景
  6. 任务执行时间差异大。
  7. 需要立即处理完成的任务。
  8. 需要动态提交任务或处理异常。
  9. 需要更高的灵活性(如不同任务使用不同函数)。

示例测试

以下是一个对比 map()submit() 性能的简单测试代码:

import time
from concurrent.futures import ThreadPoolExecutor

def task(n):
    time.sleep(n)  # 模拟不同耗时的任务
    return n

def test_map():
    with ThreadPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(task, [1, 3, 2]))
    return results

def test_submit():
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(task, n) for n in [1, 3, 2]]
        results = [future.result() for future in futures]
    return results

# 测试耗时
start_map = time.time()
test_map()
print(f"map() 耗时: {time.time() - start_map:.2f}s")

start_submit = time.time()
test_submit()
print(f"submit() 耗时: {time.time() - start_submit:.2f}s")

在这个测试中,map()submit() 的耗时差异可能不大,但在更复杂的场景(如任务数量多、耗时差异大)下,submit() 可能表现更优。