Techno Blender
Digitally Yours.

Python Concurrency — concurrent.futures | by Diego Barba | Sep, 2022

0 84


Part 3 of the Python Concurrency series. Interface simplicity brought to multi-threading and multi-processing.

Image by author.

previous stories:

After successive headaches dealing with multi-threaded and multi-process code, a dream begins to take shape: is there a way to do this in a simpler way? Is there a way to hide the creation of threads, processes, queues, and pipes? Is there a way to offload the computation elsewhere and get the result back?

It turns out that there is such a way.

concurrent.futures implements a simple, intuitive, and frankly a great API to deal with threads and processes. By now, we know our way around multi-process and multi-threaded code. We know how to create processes and threads, but sometimes we require something simpler.

There may be cases when we genuinely want to manage separate processes or multiple threads in our code. However, for a data scientist, processes and threads are usually means of either achieving parallelism or offloading computation away from the program’s main thread. Here is where the concurrent.futures module shines. It hides most of the complexities of multi-threaded/process code and lets us focus on our thing, retrieve, process and apply CPU-hungry models to data.

Story Structure

  • Future
  • map generator
  • Reuse PoolExecutor
  • ProcessPool and the Executor class
  • multi-threading -> multi-process
  • multi-process -> multi-threading
  • Moral of the story

Future

The concept of future is the essence behind the simplicity of the concurrent.futures module. The future is a proxy for a result that does not exist yet but will exist in the future. A task is submitted to an executor, and the executor gives us back a future. So we can think of it as a sort of receipt so that we can come back later and use it to get the result of our task. The executor will manage all the thread and process management for us.

Under the hood, the executor is a pool of processes or threads and a task scheduler.

As an example, we will use a ThreadPoolExecutor. We will submit a task (do_work()) that sleeps for sleep_secs and then returns a string. The submit() method of the executor class returns an instance of the Future class.

We will explore two different implementations:

  • wait_for_future(): we call the result method immediately after creating the future. This call blocks until the result is ready.
  • get_future_after(): we call the result method after doing some other important things (sleeping). In this case, we wait more time than the time taken for the task; hence, the result is ready (no blocking) when we do the call.

In both implementations, we will keep track of time and introduce some print statements to shed light on the behavior of the future.

— — — — Wait for future — — — —
future created | 0.00046324729919433594
waiting for future… | 0.0004940032958984375
future result: foo | 5.002425193786621
— — — — Get future after — — — —
future created | 0.00028586387634277344
doing other things… | 0.00033402442932128906
future result: foo | 10.005676031112671

As you can see from the print statements, the submit() method does not block. The result() method may block only if the task is not done by the time it is called.

map generator

The executor has another method for scheduling tasks, the map() method (utterly analogous to Python’s map()). This method executes a given function using the elements of an iterable as arguments. It is essentially a more compact way of writing a for loop and storing the futures. In the upcoming sections, we will discuss why this method is one of my favorites for parallel computing.

map() returns a generator; hence, the call does not block. However, popping elements from the generator may block in case the corresponding task is not done.

Here is a simple example of the map() method usage. The signature of the function passed to map() is expected to have a single argument, the iterable. If our function does not have the same signature, as this is the case, we can use a partial function from the functools module:

map generator created | 0.001207113265991211
waiting for map results… | 0.0012679100036621094
map results: [‘foo-1’, ‘foo-2’, ‘foo-3’, ‘foo-4’, ‘foo-5’, ‘foo-6’, ‘foo-7’, ‘foo-8’, ‘foo-9’] | 9.99380612373352

Reusable PoolExecutor

In the past section, we’ve used the ThreadPoolExecutor as a context manager (with statement). While this approach is preferred because it takes care of terminating the pool, it has a significant drawback; the executor cannot be reused.

We often need continuous access to the pool and want to avoid the performance hit of pool creation and termination. In such cases, we can create an instance of the executor class, use it where we see fit, and terminate it manually using the shutdown() method.

The following example illustrates how we can reuse the executor:

some map results: [‘foo-1’, ‘foo-2’, ‘foo-3’, ‘foo-4’, ‘foo-5’, ‘foo-6’, ‘foo-7’, ‘foo-8’, ‘foo-9’]
more map results: [‘foo-10’, ‘foo-11’, ‘foo-12’, ‘foo-13’, ‘foo-14’, ‘foo-15’, ‘foo-16’, ‘foo-17’, ‘foo-18’, ‘foo-19’]

ProcessPool and the Executor class

If you haven’t noticed, the code presented so far uses an alias for the executor import, PoolExecutor. There is a very good reason for this, both the thread pool (ThreadPoolExecutor) and the process pool (ProcessPoolExecutor) implement the same interface. They both inherit from the Executor class and implement the same three methods:

That is one of my favorite features of the concurrent.futures API, we can switch from threads to processes with minimal code refactoring. The following example shows how easy it is to use the ProcessPoolExecutor instead:

map results: [‘foo-1’, ‘foo-2’, ‘foo-3’, ‘foo-4’, ‘foo-5’, ‘foo-6’, ‘foo-7’, ‘foo-8’, ‘foo-9’] | 6.141733169555664

When computing multiple tasks in the process pool, via submit() or via map(), they will be computed in parallel up to the number of processes in the pool. If the number of processes in the pool is eight and the number of tasks is larger, there will only be eight parallel tasks executing at most any given time.

Using a process pool, the map() method can be used to parallelize for loops that only depend on the iterable value.

multi-threading -> multi-process

The use of multi-threaded and multi-process in the same program may not be widespread, yet I often find myself in such a situation. Perhaps too often.

The logic is the following:

  1. An event triggers a thread; it could be a client request or some sensor reading.
  2. Within this new thread, we need to do some CPU heavy lifting, and of course, we want it to use more than one core. So we do some parallel computing using multiple processes.

While the logic sounds very straightforward, this approach might be cumbersome to implement using bare processes and threads. However, the concurrent.futures API makes it very simple. The following example illustrates this point. In the example, a CPU-intensive function (do_CPU_bound_work()) is called in parallel from another function (do_parallel_work). This flow is carried out concurrently from two threads:

A thread, doing parallel_work…
B thread, doing parallel_work…
A thread, done
B thread, done

We could simplify it further if we use the ThreadPoolExecutor instead of bare threads:

A thread, doing parallel_work…
B thread, doing parallel_work…
A thread, done
B thread, done

Another great feature is that we can get the computation results from a future or the map() generator without requiring queues, pipes, or funky shared variables.

It is incredible how a great interface design can simplify our life to such an extent.

multi-process -> multi-threading

What if, instead of spawning processes from multiple threads, we want to spawn threads from multiple processes? No problem, we can do it with the utmost simplicity. The following example shows how we can create multiple threads to handle concurrent I/O from multiple processes:

A proc, doing parallel I/O…
B proc, doing parallel I/O…
B proc, done
A proc, done

We can also create the processes from the concurrent.futures API:

A proc, doing parallel I/O…
B proc, doing parallel I/O…
A proc, done
B proc, done

Moral of the story

It is difficult not to love the interface implemented by concurrent.futures. Personally, it is my first choice when dealing with concurrency, whether that may be using threads for I/O or processes for parallel computing.

If you are not using it by now, you definitely should!

Everything should be made as simple as possible, but not simpler

— Albert Einstein


Part 3 of the Python Concurrency series. Interface simplicity brought to multi-threading and multi-processing.

Image by author.

previous stories:

After successive headaches dealing with multi-threaded and multi-process code, a dream begins to take shape: is there a way to do this in a simpler way? Is there a way to hide the creation of threads, processes, queues, and pipes? Is there a way to offload the computation elsewhere and get the result back?

It turns out that there is such a way.

concurrent.futures implements a simple, intuitive, and frankly a great API to deal with threads and processes. By now, we know our way around multi-process and multi-threaded code. We know how to create processes and threads, but sometimes we require something simpler.

There may be cases when we genuinely want to manage separate processes or multiple threads in our code. However, for a data scientist, processes and threads are usually means of either achieving parallelism or offloading computation away from the program’s main thread. Here is where the concurrent.futures module shines. It hides most of the complexities of multi-threaded/process code and lets us focus on our thing, retrieve, process and apply CPU-hungry models to data.

Story Structure

  • Future
  • map generator
  • Reuse PoolExecutor
  • ProcessPool and the Executor class
  • multi-threading -> multi-process
  • multi-process -> multi-threading
  • Moral of the story

Future

The concept of future is the essence behind the simplicity of the concurrent.futures module. The future is a proxy for a result that does not exist yet but will exist in the future. A task is submitted to an executor, and the executor gives us back a future. So we can think of it as a sort of receipt so that we can come back later and use it to get the result of our task. The executor will manage all the thread and process management for us.

Under the hood, the executor is a pool of processes or threads and a task scheduler.

As an example, we will use a ThreadPoolExecutor. We will submit a task (do_work()) that sleeps for sleep_secs and then returns a string. The submit() method of the executor class returns an instance of the Future class.

We will explore two different implementations:

  • wait_for_future(): we call the result method immediately after creating the future. This call blocks until the result is ready.
  • get_future_after(): we call the result method after doing some other important things (sleeping). In this case, we wait more time than the time taken for the task; hence, the result is ready (no blocking) when we do the call.

In both implementations, we will keep track of time and introduce some print statements to shed light on the behavior of the future.

— — — — Wait for future — — — —
future created | 0.00046324729919433594
waiting for future… | 0.0004940032958984375
future result: foo | 5.002425193786621
— — — — Get future after — — — —
future created | 0.00028586387634277344
doing other things… | 0.00033402442932128906
future result: foo | 10.005676031112671

As you can see from the print statements, the submit() method does not block. The result() method may block only if the task is not done by the time it is called.

map generator

The executor has another method for scheduling tasks, the map() method (utterly analogous to Python’s map()). This method executes a given function using the elements of an iterable as arguments. It is essentially a more compact way of writing a for loop and storing the futures. In the upcoming sections, we will discuss why this method is one of my favorites for parallel computing.

map() returns a generator; hence, the call does not block. However, popping elements from the generator may block in case the corresponding task is not done.

Here is a simple example of the map() method usage. The signature of the function passed to map() is expected to have a single argument, the iterable. If our function does not have the same signature, as this is the case, we can use a partial function from the functools module:

map generator created | 0.001207113265991211
waiting for map results… | 0.0012679100036621094
map results: [‘foo-1’, ‘foo-2’, ‘foo-3’, ‘foo-4’, ‘foo-5’, ‘foo-6’, ‘foo-7’, ‘foo-8’, ‘foo-9’] | 9.99380612373352

Reusable PoolExecutor

In the past section, we’ve used the ThreadPoolExecutor as a context manager (with statement). While this approach is preferred because it takes care of terminating the pool, it has a significant drawback; the executor cannot be reused.

We often need continuous access to the pool and want to avoid the performance hit of pool creation and termination. In such cases, we can create an instance of the executor class, use it where we see fit, and terminate it manually using the shutdown() method.

The following example illustrates how we can reuse the executor:

some map results: [‘foo-1’, ‘foo-2’, ‘foo-3’, ‘foo-4’, ‘foo-5’, ‘foo-6’, ‘foo-7’, ‘foo-8’, ‘foo-9’]
more map results: [‘foo-10’, ‘foo-11’, ‘foo-12’, ‘foo-13’, ‘foo-14’, ‘foo-15’, ‘foo-16’, ‘foo-17’, ‘foo-18’, ‘foo-19’]

ProcessPool and the Executor class

If you haven’t noticed, the code presented so far uses an alias for the executor import, PoolExecutor. There is a very good reason for this, both the thread pool (ThreadPoolExecutor) and the process pool (ProcessPoolExecutor) implement the same interface. They both inherit from the Executor class and implement the same three methods:

That is one of my favorite features of the concurrent.futures API, we can switch from threads to processes with minimal code refactoring. The following example shows how easy it is to use the ProcessPoolExecutor instead:

map results: [‘foo-1’, ‘foo-2’, ‘foo-3’, ‘foo-4’, ‘foo-5’, ‘foo-6’, ‘foo-7’, ‘foo-8’, ‘foo-9’] | 6.141733169555664

When computing multiple tasks in the process pool, via submit() or via map(), they will be computed in parallel up to the number of processes in the pool. If the number of processes in the pool is eight and the number of tasks is larger, there will only be eight parallel tasks executing at most any given time.

Using a process pool, the map() method can be used to parallelize for loops that only depend on the iterable value.

multi-threading -> multi-process

The use of multi-threaded and multi-process in the same program may not be widespread, yet I often find myself in such a situation. Perhaps too often.

The logic is the following:

  1. An event triggers a thread; it could be a client request or some sensor reading.
  2. Within this new thread, we need to do some CPU heavy lifting, and of course, we want it to use more than one core. So we do some parallel computing using multiple processes.

While the logic sounds very straightforward, this approach might be cumbersome to implement using bare processes and threads. However, the concurrent.futures API makes it very simple. The following example illustrates this point. In the example, a CPU-intensive function (do_CPU_bound_work()) is called in parallel from another function (do_parallel_work). This flow is carried out concurrently from two threads:

A thread, doing parallel_work…
B thread, doing parallel_work…
A thread, done
B thread, done

We could simplify it further if we use the ThreadPoolExecutor instead of bare threads:

A thread, doing parallel_work…
B thread, doing parallel_work…
A thread, done
B thread, done

Another great feature is that we can get the computation results from a future or the map() generator without requiring queues, pipes, or funky shared variables.

It is incredible how a great interface design can simplify our life to such an extent.

multi-process -> multi-threading

What if, instead of spawning processes from multiple threads, we want to spawn threads from multiple processes? No problem, we can do it with the utmost simplicity. The following example shows how we can create multiple threads to handle concurrent I/O from multiple processes:

A proc, doing parallel I/O…
B proc, doing parallel I/O…
B proc, done
A proc, done

We can also create the processes from the concurrent.futures API:

A proc, doing parallel I/O…
B proc, doing parallel I/O…
A proc, done
B proc, done

Moral of the story

It is difficult not to love the interface implemented by concurrent.futures. Personally, it is my first choice when dealing with concurrency, whether that may be using threads for I/O or processes for parallel computing.

If you are not using it by now, you definitely should!

Everything should be made as simple as possible, but not simpler

— Albert Einstein

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.
Leave a comment