Python Concurrency — concurrent.futures | by Diego Barba | Sep, 2022
Part 3 of the Python Concurrency series. Interface simplicity brought to multi-threading and multi-processing.
previous stories:
After successive headaches dealing with multi-threaded and multi-process code, a dream begins to take shape: is there a way to do this in a simpler way? Is there a way to hide the creation of threads, processes, queues, and pipes? Is there a way to offload the computation elsewhere and get the result back?
It turns out that there is such a way.
concurrent.futures
implements a simple, intuitive, and frankly a great API to deal with threads and processes. By now, we know our way around multi-process and multi-threaded code. We know how to create processes and threads, but sometimes we require something simpler.
There may be cases when we genuinely want to manage separate processes or multiple threads in our code. However, for a data scientist, processes and threads are usually means of either achieving parallelism or offloading computation away from the program’s main thread. Here is where the concurrent.futures
module shines. It hides most of the complexities of multi-threaded/process code and lets us focus on our thing, retrieve, process and apply CPU-hungry models to data.
Story Structure
Future
map
generator- Reuse PoolExecutor
- ProcessPool and the
Executor
class - multi-threading -> multi-process
- multi-process -> multi-threading
- Moral of the story
Future
The concept of future is the essence behind the simplicity of the concurrent.futures
module. The future is a proxy for a result that does not exist yet but will exist in the future. A task is submitted to an executor, and the executor gives us back a future. So we can think of it as a sort of receipt so that we can come back later and use it to get the result of our task. The executor will manage all the thread and process management for us.
Under the hood, the executor is a pool of processes or threads and a task scheduler.
As an example, we will use a ThreadPoolExecutor
. We will submit a task (do_work()
) that sleeps for sleep_secs
and then returns a string. The submit()
method of the executor class returns an instance of the Future
class.
We will explore two different implementations:
wait_for_future()
: we call theresult
method immediately after creating the future. This call blocks until the result is ready.get_future_after()
: we call theresult
method after doing some other important things (sleeping). In this case, we wait more time than the time taken for the task; hence, the result is ready (no blocking) when we do the call.
In both implementations, we will keep track of time and introduce some print
statements to shed light on the behavior of the future
.
— — — — Wait for future — — — —
future created | 0.00046324729919433594
waiting for future… | 0.0004940032958984375
future result: foo | 5.002425193786621
— — — — Get future after — — — —
future created | 0.00028586387634277344
doing other things… | 0.00033402442932128906
future result: foo | 10.005676031112671
As you can see from the print statements, the submit()
method does not block. The result()
method may block only if the task is not done by the time it is called.
map
generator
The executor has another method for scheduling tasks, the map(
) method (utterly analogous to Python’s map()
). This method executes a given function using the elements of an iterable as arguments. It is essentially a more compact way of writing a for
loop and storing the futures. In the upcoming sections, we will discuss why this method is one of my favorites for parallel computing.
map()
returns a generator; hence, the call does not block. However, popping elements from the generator may block in case the corresponding task is not done.
Here is a simple example of the map(
) method usage. The signature of the function passed to map()
is expected to have a single argument, the iterable. If our function does not have the same signature, as this is the case, we can use a partial
function from the functools
module:
map generator created | 0.001207113265991211
waiting for map results… | 0.0012679100036621094
map results: [‘foo-1’, ‘foo-2’, ‘foo-3’, ‘foo-4’, ‘foo-5’, ‘foo-6’, ‘foo-7’, ‘foo-8’, ‘foo-9’] | 9.99380612373352
Reusable PoolExecutor
In the past section, we’ve used the ThreadPoolExecutor
as a context manager (with
statement). While this approach is preferred because it takes care of terminating the pool, it has a significant drawback; the executor cannot be reused.
We often need continuous access to the pool and want to avoid the performance hit of pool creation and termination. In such cases, we can create an instance of the executor class, use it where we see fit, and terminate it manually using the shutdown()
method.
The following example illustrates how we can reuse the executor:
some map results: [‘foo-1’, ‘foo-2’, ‘foo-3’, ‘foo-4’, ‘foo-5’, ‘foo-6’, ‘foo-7’, ‘foo-8’, ‘foo-9’]
more map results: [‘foo-10’, ‘foo-11’, ‘foo-12’, ‘foo-13’, ‘foo-14’, ‘foo-15’, ‘foo-16’, ‘foo-17’, ‘foo-18’, ‘foo-19’]
ProcessPool and the Executor
class
If you haven’t noticed, the code presented so far uses an alias for the executor import, PoolExecutor
. There is a very good reason for this, both the thread pool (ThreadPoolExecutor
) and the process pool (ProcessPoolExecutor
) implement the same interface. They both inherit from the Executor
class and implement the same three methods:
That is one of my favorite features of the concurrent.futures
API, we can switch from threads to processes with minimal code refactoring. The following example shows how easy it is to use the ProcessPoolExecutor
instead:
map results: [‘foo-1’, ‘foo-2’, ‘foo-3’, ‘foo-4’, ‘foo-5’, ‘foo-6’, ‘foo-7’, ‘foo-8’, ‘foo-9’] | 6.141733169555664
When computing multiple tasks in the process pool, via submit()
or via map()
, they will be computed in parallel up to the number of processes in the pool. If the number of processes in the pool is eight and the number of tasks is larger, there will only be eight parallel tasks executing at most any given time.
Using a process pool, the map()
method can be used to parallelize for
loops that only depend on the iterable value.
multi-threading -> multi-process
The use of multi-threaded and multi-process in the same program may not be widespread, yet I often find myself in such a situation. Perhaps too often.
The logic is the following:
- An event triggers a thread; it could be a client request or some sensor reading.
- Within this new thread, we need to do some CPU heavy lifting, and of course, we want it to use more than one core. So we do some parallel computing using multiple processes.
While the logic sounds very straightforward, this approach might be cumbersome to implement using bare processes and threads. However, the concurrent.futures
API makes it very simple. The following example illustrates this point. In the example, a CPU-intensive function (do_CPU_bound_work()
) is called in parallel from another function (do_parallel_work
). This flow is carried out concurrently from two threads:
A thread, doing parallel_work…
B thread, doing parallel_work…
A thread, done
B thread, done
We could simplify it further if we use the ThreadPoolExecutor
instead of bare threads:
A thread, doing parallel_work…
B thread, doing parallel_work…
A thread, done
B thread, done
Another great feature is that we can get the computation results from a future
or the map()
generator without requiring queues, pipes, or funky shared variables.
It is incredible how a great interface design can simplify our life to such an extent.
multi-process -> multi-threading
What if, instead of spawning processes from multiple threads, we want to spawn threads from multiple processes? No problem, we can do it with the utmost simplicity. The following example shows how we can create multiple threads to handle concurrent I/O from multiple processes:
A proc, doing parallel I/O…
B proc, doing parallel I/O…
B proc, done
A proc, done
We can also create the processes from the concurrent.futures
API:
A proc, doing parallel I/O…
B proc, doing parallel I/O…
A proc, done
B proc, done
Moral of the story
It is difficult not to love the interface implemented by concurrent.futures
. Personally, it is my first choice when dealing with concurrency, whether that may be using threads for I/O or processes for parallel computing.
If you are not using it by now, you definitely should!
Everything should be made as simple as possible, but not simpler
— Albert Einstein
Part 3 of the Python Concurrency series. Interface simplicity brought to multi-threading and multi-processing.
![](https://miro.medium.com/max/1400/1*22ABSS0VV4d0kjO3ePs7sg.png)
previous stories:
After successive headaches dealing with multi-threaded and multi-process code, a dream begins to take shape: is there a way to do this in a simpler way? Is there a way to hide the creation of threads, processes, queues, and pipes? Is there a way to offload the computation elsewhere and get the result back?
It turns out that there is such a way.
concurrent.futures
implements a simple, intuitive, and frankly a great API to deal with threads and processes. By now, we know our way around multi-process and multi-threaded code. We know how to create processes and threads, but sometimes we require something simpler.
There may be cases when we genuinely want to manage separate processes or multiple threads in our code. However, for a data scientist, processes and threads are usually means of either achieving parallelism or offloading computation away from the program’s main thread. Here is where the concurrent.futures
module shines. It hides most of the complexities of multi-threaded/process code and lets us focus on our thing, retrieve, process and apply CPU-hungry models to data.
Story Structure
Future
map
generator- Reuse PoolExecutor
- ProcessPool and the
Executor
class - multi-threading -> multi-process
- multi-process -> multi-threading
- Moral of the story
Future
The concept of future is the essence behind the simplicity of the concurrent.futures
module. The future is a proxy for a result that does not exist yet but will exist in the future. A task is submitted to an executor, and the executor gives us back a future. So we can think of it as a sort of receipt so that we can come back later and use it to get the result of our task. The executor will manage all the thread and process management for us.
Under the hood, the executor is a pool of processes or threads and a task scheduler.
As an example, we will use a ThreadPoolExecutor
. We will submit a task (do_work()
) that sleeps for sleep_secs
and then returns a string. The submit()
method of the executor class returns an instance of the Future
class.
We will explore two different implementations:
wait_for_future()
: we call theresult
method immediately after creating the future. This call blocks until the result is ready.get_future_after()
: we call theresult
method after doing some other important things (sleeping). In this case, we wait more time than the time taken for the task; hence, the result is ready (no blocking) when we do the call.
In both implementations, we will keep track of time and introduce some print
statements to shed light on the behavior of the future
.
— — — — Wait for future — — — —
future created | 0.00046324729919433594
waiting for future… | 0.0004940032958984375
future result: foo | 5.002425193786621
— — — — Get future after — — — —
future created | 0.00028586387634277344
doing other things… | 0.00033402442932128906
future result: foo | 10.005676031112671
As you can see from the print statements, the submit()
method does not block. The result()
method may block only if the task is not done by the time it is called.
map
generator
The executor has another method for scheduling tasks, the map(
) method (utterly analogous to Python’s map()
). This method executes a given function using the elements of an iterable as arguments. It is essentially a more compact way of writing a for
loop and storing the futures. In the upcoming sections, we will discuss why this method is one of my favorites for parallel computing.
map()
returns a generator; hence, the call does not block. However, popping elements from the generator may block in case the corresponding task is not done.
Here is a simple example of the map(
) method usage. The signature of the function passed to map()
is expected to have a single argument, the iterable. If our function does not have the same signature, as this is the case, we can use a partial
function from the functools
module:
map generator created | 0.001207113265991211
waiting for map results… | 0.0012679100036621094
map results: [‘foo-1’, ‘foo-2’, ‘foo-3’, ‘foo-4’, ‘foo-5’, ‘foo-6’, ‘foo-7’, ‘foo-8’, ‘foo-9’] | 9.99380612373352
Reusable PoolExecutor
In the past section, we’ve used the ThreadPoolExecutor
as a context manager (with
statement). While this approach is preferred because it takes care of terminating the pool, it has a significant drawback; the executor cannot be reused.
We often need continuous access to the pool and want to avoid the performance hit of pool creation and termination. In such cases, we can create an instance of the executor class, use it where we see fit, and terminate it manually using the shutdown()
method.
The following example illustrates how we can reuse the executor:
some map results: [‘foo-1’, ‘foo-2’, ‘foo-3’, ‘foo-4’, ‘foo-5’, ‘foo-6’, ‘foo-7’, ‘foo-8’, ‘foo-9’]
more map results: [‘foo-10’, ‘foo-11’, ‘foo-12’, ‘foo-13’, ‘foo-14’, ‘foo-15’, ‘foo-16’, ‘foo-17’, ‘foo-18’, ‘foo-19’]
ProcessPool and the Executor
class
If you haven’t noticed, the code presented so far uses an alias for the executor import, PoolExecutor
. There is a very good reason for this, both the thread pool (ThreadPoolExecutor
) and the process pool (ProcessPoolExecutor
) implement the same interface. They both inherit from the Executor
class and implement the same three methods:
That is one of my favorite features of the concurrent.futures
API, we can switch from threads to processes with minimal code refactoring. The following example shows how easy it is to use the ProcessPoolExecutor
instead:
map results: [‘foo-1’, ‘foo-2’, ‘foo-3’, ‘foo-4’, ‘foo-5’, ‘foo-6’, ‘foo-7’, ‘foo-8’, ‘foo-9’] | 6.141733169555664
When computing multiple tasks in the process pool, via submit()
or via map()
, they will be computed in parallel up to the number of processes in the pool. If the number of processes in the pool is eight and the number of tasks is larger, there will only be eight parallel tasks executing at most any given time.
Using a process pool, the map()
method can be used to parallelize for
loops that only depend on the iterable value.
multi-threading -> multi-process
The use of multi-threaded and multi-process in the same program may not be widespread, yet I often find myself in such a situation. Perhaps too often.
The logic is the following:
- An event triggers a thread; it could be a client request or some sensor reading.
- Within this new thread, we need to do some CPU heavy lifting, and of course, we want it to use more than one core. So we do some parallel computing using multiple processes.
While the logic sounds very straightforward, this approach might be cumbersome to implement using bare processes and threads. However, the concurrent.futures
API makes it very simple. The following example illustrates this point. In the example, a CPU-intensive function (do_CPU_bound_work()
) is called in parallel from another function (do_parallel_work
). This flow is carried out concurrently from two threads:
A thread, doing parallel_work…
B thread, doing parallel_work…
A thread, done
B thread, done
We could simplify it further if we use the ThreadPoolExecutor
instead of bare threads:
A thread, doing parallel_work…
B thread, doing parallel_work…
A thread, done
B thread, done
Another great feature is that we can get the computation results from a future
or the map()
generator without requiring queues, pipes, or funky shared variables.
It is incredible how a great interface design can simplify our life to such an extent.
multi-process -> multi-threading
What if, instead of spawning processes from multiple threads, we want to spawn threads from multiple processes? No problem, we can do it with the utmost simplicity. The following example shows how we can create multiple threads to handle concurrent I/O from multiple processes:
A proc, doing parallel I/O…
B proc, doing parallel I/O…
B proc, done
A proc, done
We can also create the processes from the concurrent.futures
API:
A proc, doing parallel I/O…
B proc, doing parallel I/O…
A proc, done
B proc, done
Moral of the story
It is difficult not to love the interface implemented by concurrent.futures
. Personally, it is my first choice when dealing with concurrency, whether that may be using threads for I/O or processes for parallel computing.
If you are not using it by now, you definitely should!
Everything should be made as simple as possible, but not simpler
— Albert Einstein