ruk·si

🐍 Python
Futures

Updated at 2018-11-19 17:59

concurrency.futures module contains tools for writing concurrent applications.

  • Use ThreadPoolExecutor to create parallel tasks utilizing threads e.g. downloading.
  • Use ProcessPoolExecutor to create parallel tasks utilizing processes e.g math.
  • Both instantiate Executor objects.
  • Both take max_workers, and the default is related to the number of CPUs.
  • You should use process over threads if you do something computationally-intensive that doesn't release GIL.

executor.map runs a given function with all parameters in an iterable. map forces all tasks to use the same callable, and the results are returned in the order they were queued, even if a later one completes faster.

from concurrent import futures
import time
import typing

def wait_map(durations: typing.Iterable[int]) -> typing.Generator[str, None, None]:
    with futures.ThreadPoolExecutor(max_workers=2) as executor:
        results = executor.map(wait, durations)
    return results

def wait(duration: int) -> str:
    time.sleep(duration)
    print(f'Waited for {duration}.')
    return 'done'

assert list(wait_map([1, 2, 1])) == ['done', 'done', 'done']
# => Waited for 1.
# => Waited for 2.
# => Waited for 1.

executor.submit returns a future, which works kinda like JavaScript promises. The more complex interface comes with more control. Futures can come from multiple executors, they can have different callables or parameters, and they are processed as they are completed.

from concurrent import futures
import time
import typing

def wait_as_completed(durations: typing.Iterable[int]) -> typing.Iterable[str]:
    results = []
    with futures.ThreadPoolExecutor(max_workers=2) as executor:
        to_do = []
        for duration in durations:
            future = executor.submit(wait, duration)
            to_do.append(future)
        for future in futures.as_completed(to_do):
            result = future.result()
            results.append(result)
    return results

def wait(duration: int) -> str:
    time.sleep(duration)
    print(f'Waited for {duration}.')
    return 'done'

assert wait_as_completed([1, 2, 1]) == ['done', 'done', 'done']
# Waited for 1.
# Waited for 2.
# Waited for 1.

If you are not using with, you should shutdown the executor after use.

import time
from concurrent import futures

def wait_for(seconds):
    time.sleep(seconds)
    return seconds

def print_callback(job):
    if job.cancelled():
        print('cancelled')
    else:
        print(job.result())

start_time = time.time()

executor1 = futures.ProcessPoolExecutor()
executor2 = futures.ProcessPoolExecutor()

jobs = []

for number in [1, 3, 5]:
    job = executor1.submit(wait_for, number)
    job.add_done_callback(print_callback)
    jobs.append(job)

for number in [2, 4, 6]:
    job = executor2.submit(wait_for, number)
    job.add_done_callback(print_callback)
    jobs.append(job)

futures.wait(jobs)
# => 1, 2, 3, 4, 5, 6

executor1.shutdown()
executor2.shutdown()

print('took {} seconds'.format(time.time() - start_time))
# => took 6.0414369106292725 seconds

Sources