Techno Blender
Digitally Yours.

Combining Multiprocessing and Asyncio in Python for Performance Boosts | by Peng Qian | May, 2023

0 42


Thanks to GIL, using multiple threads to perform CPU-bound tasks has never been an option. With the popularity of multicore CPUs, Python offers a multiprocessing solution to perform CPU-bound tasks. But until now, there were still some problems with using multiprocess-related APIs directly.

Before we start, we still have a small piece of code to aid in the demonstration:

The method takes one argument and starts accumulating from 0 to this argument. Print the method execution time and return the result.

Problems with multiprocessing

As the code shows, we directly create and start multiple processes, and call the start and join methods of each process. However, there are some problems here:

  1. The join method cannot return the result of task execution.
  2. the join method blocks the main process and executes it sequentially.

Even if the later tasks are executed faster than the earlier ones, as shown in the following figure:

The screenshot shows the execution sequence of join. Image by Author
Although process_b finishes executing first, it still has to wait for process_a. Image by Author

Problems of using Pool

If we use multiprocessing.Pool, there are also some problems:

As the code shows, Pool’s apply method is synchronous, which means you have to wait for the previously apply task to finish before the next apply task can start executing.

multiprocess.Pool.apply method is synchronous. Image by Author

Of course, we can use the apply_async method to create the task asynchronously. But again, you need to use the get method to get the result blockingly. It brings us back to the problem with the join method:

Although apply_async is asynchronous, get will still block and execute sequentially. Image by Author

The problem with using ProcessPoolExecutor directly

So, what if we use concurrent.futures.ProcesssPoolExecutor to execute our CPU-bound tasks?

As the code shows, everything looks great and is called just like asyncio.as_completed. But look at the results; they are still fetched in startup order. This is not at all the same as asyncio.as_completed, which gets the results in the order in which they were executed:

Results are fetched in startup order. Image by Author
The result of the iteration still maintains the call order and blocks. Image by Author

Use asyncio’s run_in_executor to fix it

Fortunately, we can use asyncio to handle IO-bound tasks, and its run_in_executor method to invoke multi-process tasks in the same way as asyncio. Not only unifying concurrent and parallel APIs but also solving the various problems we encountered above:

Combining asyncio and ProcessPoolExecutor. Image by Author

Since the sample code in the previous article was all about simulating what we should call the methods of the concurrent process, many readers still need help understanding how to use it in the actual coding after learning it. So after understanding why we need to perform CPU-bound parallel tasks in asyncio, today we will use a real-world example to explain how to use asyncio to handle IO-bound and CPU-bound tasks simultaneously and appreciate the efficiency of asyncio for our code. Let’s go.

In this case today, we will deal with two problems:

  1. How to read multiple datasets concurrently. Especially if the datasets are large or many. How to use asyncio to improve efficiency.
  2. How to use asyncio’s run_in_executor method to implement a MapReduce program and process datasets efficiently.

Before we start, I will explain to you how our code is going to be executed using a diagram:

The diagram shows how the entire code works. Image by Author

The yellow part represents our concurrent tasks. Since the CPU can process data from memory faster than IO can read data from disk, we first read all datasets into memory concurrently.

After the initial data merging and slicing, we come to the green part that represents the CPU parallel task. In this part, we will start several processes to map the data.

Finally, we get the intermediate results of all the processes in the main process and then use a reduce program to get the final results.

Data preparation

In this case, we will use the Google Books Ngram Dataset, which counts the frequency of each string combination in various books by year from 1500 to 2012.

The Google Books Ngram dataset is free for any purpose, and today we will use these datasets below:

We aim to count the cumulative number of times each word is counted by the result set.

Dependency installation

To read the files concurrently, we will use the aiofiles library, which can support asyncio’s concurrent implementation.

If you are using pip, you can install it as follows:

$ pip install aiofiles

If you are using Anaconda, you can install it as follows:

$ conda install -c anaconda aiofiles

Since this case is still relatively simple, for the sake of demonstration, we will use a .py script to do the whole thing here.

As an architect, before you start, you should plan your methods according to the flowchart design and try to follow the “single responsibility principle” for each method. Thus, do only one thing once upon each method:

Next, we will implement each method step by step and finally integrate them to run together in the main method.

File reading

Method read_file will implement reading a single file with aiofiles:

Method get_all_file_content will start the file reading task and, after all the files have been read, will merge each line of text into a list and return it.

Data grouping

Method partition will decompose the list into multiple smaller lists of partition_size length according to the passed partition_size and facilitate subsequent iterations using the generator:

Map processing data

Method map_resource is the actual map method. Use it to read each line of data from the list, use the word as the key and the sum of the frequencies as the value, and finally return a dict result.

Integrating asyncio with multiprocessing

Method map_with_process calls asyncio’s run_in_executor method, which starts a pool of processes according to the number of CPU cores and executes the map method in parallel. And the final result is merged into a list by asyncio.gather method.

Reducing the merged data

Since the previous map process ends up with a list of word frequencies processed by multiple processes, we also need to use a reduce method to merge numerous dicts into a single final result, recording the final frequency of each word. Here we first write the method implementation of the reduce process.

Then we call the functools.reduce method directly to merge the data.

Finally, implement the main method

Eventually, we integrate all the methods into the main method and call.

Great! We get the sum of the frequencies of the word Aardvark in all the datasets. Task complete.

Using tqdm to indicate progress

In the previous article, we explained how to use tqdm to indicate the progress of asyncio tasks.

Since in the real world, data processing of large datasets often takes a long time, during which we need to track the progress of code execution, we also need to add tqdm progress bars in the right places.

It looks much more professional now.

The resulting screenshot after adding the tqdm APIs. Image by Author

In today’s article, we explored some of the problems with multi-process code, such as the hassle of getting the results of each process and the inability to get the results in the order in which we execute the tasks.

We also explored the feasibility of integrating asyncio with ProcessPoolExecutor and the advantages that such integration brings to us. For example, it unifies the API for concurrent and parallel programming, simplifies our programming process, and allows us to obtain execution results in order of completion.

Finally, we explain how we can alternate between concurrent and parallel programming techniques to help us execute our code efficiently in data science tasks through a real-world case study that exists.

Due to the limited ability of individuals, there are inevitably few places in the case, so I welcome your comments and corrections so that we can learn and progress together.


Thanks to GIL, using multiple threads to perform CPU-bound tasks has never been an option. With the popularity of multicore CPUs, Python offers a multiprocessing solution to perform CPU-bound tasks. But until now, there were still some problems with using multiprocess-related APIs directly.

Before we start, we still have a small piece of code to aid in the demonstration:

The method takes one argument and starts accumulating from 0 to this argument. Print the method execution time and return the result.

Problems with multiprocessing

As the code shows, we directly create and start multiple processes, and call the start and join methods of each process. However, there are some problems here:

  1. The join method cannot return the result of task execution.
  2. the join method blocks the main process and executes it sequentially.

Even if the later tasks are executed faster than the earlier ones, as shown in the following figure:

The screenshot shows the execution sequence of join. Image by Author
Although process_b finishes executing first, it still has to wait for process_a. Image by Author

Problems of using Pool

If we use multiprocessing.Pool, there are also some problems:

As the code shows, Pool’s apply method is synchronous, which means you have to wait for the previously apply task to finish before the next apply task can start executing.

multiprocess.Pool.apply method is synchronous. Image by Author

Of course, we can use the apply_async method to create the task asynchronously. But again, you need to use the get method to get the result blockingly. It brings us back to the problem with the join method:

Although apply_async is asynchronous, get will still block and execute sequentially. Image by Author

The problem with using ProcessPoolExecutor directly

So, what if we use concurrent.futures.ProcesssPoolExecutor to execute our CPU-bound tasks?

As the code shows, everything looks great and is called just like asyncio.as_completed. But look at the results; they are still fetched in startup order. This is not at all the same as asyncio.as_completed, which gets the results in the order in which they were executed:

Results are fetched in startup order. Image by Author
The result of the iteration still maintains the call order and blocks. Image by Author

Use asyncio’s run_in_executor to fix it

Fortunately, we can use asyncio to handle IO-bound tasks, and its run_in_executor method to invoke multi-process tasks in the same way as asyncio. Not only unifying concurrent and parallel APIs but also solving the various problems we encountered above:

Combining asyncio and ProcessPoolExecutor. Image by Author

Since the sample code in the previous article was all about simulating what we should call the methods of the concurrent process, many readers still need help understanding how to use it in the actual coding after learning it. So after understanding why we need to perform CPU-bound parallel tasks in asyncio, today we will use a real-world example to explain how to use asyncio to handle IO-bound and CPU-bound tasks simultaneously and appreciate the efficiency of asyncio for our code. Let’s go.

In this case today, we will deal with two problems:

  1. How to read multiple datasets concurrently. Especially if the datasets are large or many. How to use asyncio to improve efficiency.
  2. How to use asyncio’s run_in_executor method to implement a MapReduce program and process datasets efficiently.

Before we start, I will explain to you how our code is going to be executed using a diagram:

The diagram shows how the entire code works. Image by Author

The yellow part represents our concurrent tasks. Since the CPU can process data from memory faster than IO can read data from disk, we first read all datasets into memory concurrently.

After the initial data merging and slicing, we come to the green part that represents the CPU parallel task. In this part, we will start several processes to map the data.

Finally, we get the intermediate results of all the processes in the main process and then use a reduce program to get the final results.

Data preparation

In this case, we will use the Google Books Ngram Dataset, which counts the frequency of each string combination in various books by year from 1500 to 2012.

The Google Books Ngram dataset is free for any purpose, and today we will use these datasets below:

We aim to count the cumulative number of times each word is counted by the result set.

Dependency installation

To read the files concurrently, we will use the aiofiles library, which can support asyncio’s concurrent implementation.

If you are using pip, you can install it as follows:

$ pip install aiofiles

If you are using Anaconda, you can install it as follows:

$ conda install -c anaconda aiofiles

Since this case is still relatively simple, for the sake of demonstration, we will use a .py script to do the whole thing here.

As an architect, before you start, you should plan your methods according to the flowchart design and try to follow the “single responsibility principle” for each method. Thus, do only one thing once upon each method:

Next, we will implement each method step by step and finally integrate them to run together in the main method.

File reading

Method read_file will implement reading a single file with aiofiles:

Method get_all_file_content will start the file reading task and, after all the files have been read, will merge each line of text into a list and return it.

Data grouping

Method partition will decompose the list into multiple smaller lists of partition_size length according to the passed partition_size and facilitate subsequent iterations using the generator:

Map processing data

Method map_resource is the actual map method. Use it to read each line of data from the list, use the word as the key and the sum of the frequencies as the value, and finally return a dict result.

Integrating asyncio with multiprocessing

Method map_with_process calls asyncio’s run_in_executor method, which starts a pool of processes according to the number of CPU cores and executes the map method in parallel. And the final result is merged into a list by asyncio.gather method.

Reducing the merged data

Since the previous map process ends up with a list of word frequencies processed by multiple processes, we also need to use a reduce method to merge numerous dicts into a single final result, recording the final frequency of each word. Here we first write the method implementation of the reduce process.

Then we call the functools.reduce method directly to merge the data.

Finally, implement the main method

Eventually, we integrate all the methods into the main method and call.

Great! We get the sum of the frequencies of the word Aardvark in all the datasets. Task complete.

Using tqdm to indicate progress

In the previous article, we explained how to use tqdm to indicate the progress of asyncio tasks.

Since in the real world, data processing of large datasets often takes a long time, during which we need to track the progress of code execution, we also need to add tqdm progress bars in the right places.

It looks much more professional now.

The resulting screenshot after adding the tqdm APIs. Image by Author

In today’s article, we explored some of the problems with multi-process code, such as the hassle of getting the results of each process and the inability to get the results in the order in which we execute the tasks.

We also explored the feasibility of integrating asyncio with ProcessPoolExecutor and the advantages that such integration brings to us. For example, it unifies the API for concurrent and parallel programming, simplifies our programming process, and allows us to obtain execution results in order of completion.

Finally, we explain how we can alternate between concurrent and parallel programming techniques to help us execute our code efficiently in data science tasks through a real-world case study that exists.

Due to the limited ability of individuals, there are inevitably few places in the case, so I welcome your comments and corrections so that we can learn and progress together.

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.

Leave a comment