🐍 Python - Futures
Updated at 2018-11-19 17:59
concurrency.futures
module contains tools for writing concurrent applications.
- Use
ThreadPoolExecutor
to create parallel tasks utilizing threads e.g. downloading. - Use
ProcessPoolExecutor
to create parallel tasks utilizing processes e.g math. - Both instantiate
Executor
objects. - Both take
max_workers
, and the default is related to the number of CPUs. - You should use process over threads if you do something computationally-intensive that doesn't release GIL.
executor.map
runs a given function with all parameters in an iterable. map
forces all tasks to use the same callable, and the results are returned in the order they were queued, even if a later one completes faster.
from concurrent import futures
import time
import typing
def wait_map(durations: typing.Iterable[int]) -> typing.Generator[str, None, None]:
with futures.ThreadPoolExecutor(max_workers=2) as executor:
results = executor.map(wait, durations)
return results
def wait(duration: int) -> str:
time.sleep(duration)
print(f'Waited for {duration}.')
return 'done'
assert list(wait_map([1, 2, 1])) == ['done', 'done', 'done']
# => Waited for 1.
# => Waited for 2.
# => Waited for 1.
executor.submit
returns a future, which works kinda like JavaScript promises. The more complex interface comes with more control. Futures can come from multiple executors, they can have different callables or parameters, and they are processed as they are completed.
from concurrent import futures
import time
import typing
def wait_as_completed(durations: typing.Iterable[int]) -> typing.Iterable[str]:
results = []
with futures.ThreadPoolExecutor(max_workers=2) as executor:
to_do = []
for duration in durations:
future = executor.submit(wait, duration)
to_do.append(future)
for future in futures.as_completed(to_do):
result = future.result()
results.append(result)
return results
def wait(duration: int) -> str:
time.sleep(duration)
print(f'Waited for {duration}.')
return 'done'
assert wait_as_completed([1, 2, 1]) == ['done', 'done', 'done']
# Waited for 1.
# Waited for 2.
# Waited for 1.
If you are not using with
, you should shutdown the executor after use.
import time
from concurrent import futures
def wait_for(seconds):
time.sleep(seconds)
return seconds
def print_callback(job):
if job.cancelled():
print('cancelled')
else:
print(job.result())
start_time = time.time()
executor1 = futures.ProcessPoolExecutor()
executor2 = futures.ProcessPoolExecutor()
jobs = []
for number in [1, 3, 5]:
job = executor1.submit(wait_for, number)
job.add_done_callback(print_callback)
jobs.append(job)
for number in [2, 4, 6]:
job = executor2.submit(wait_for, number)
job.add_done_callback(print_callback)
jobs.append(job)
futures.wait(jobs)
# => 1, 2, 3, 4, 5, 6
executor1.shutdown()
executor2.shutdown()
print('took {} seconds'.format(time.time() - start_time))
# => took 6.0414369106292725 seconds
Sources
- Python Docs - concurrent.futures
- Dalke Scientific - Python's concurrent.futures
- Fluent Python, Luciano Ramalho