Techno Blender
Digitally Yours.

Exception Handling in Methods of the Multiprocessing Pool Class in Python | by Паша Дубовик | Jul, 2022

0 80


Working with the map, imap, and imap_unordered methods

Photo by Marek Piwnicki on Unsplash

Introduction

When working with big data, it is often necessary to parallelize calculations. In python, the standard multiprocessing module is usually used for tasks that require a lot of computing resources. In DS, we constantly have to solve problems that can be easily parallelized. Examples could be bootstrap, multiple predictions (model prediction for multiple examples), data preprocessing, etc.

In this article, I would like to talk about some interesting and important things to consider when working with the multiprocessing Pool class in python:

  • exception handling in methods of the Pool class
  • handling of hanging functions in python
  • limitation of the memory used by the process (only for Unix systems)

I will be working with Python version 3.9 on OS Ubuntu 20.04.

So let’s get started!

Exception handling in methods of the Pool class

In my practice, I often have to work with the multiprocessing module. It’s nice to use all the power of your computer and squeeze all the juices out of the processor, isn’t it? Let’s imagine that you have written incredibly complex code and your calculations are so big that you decided to run them at night with the hope of waking up and seeing a wonderful result of your work. So, here is our beautiful function (let’s pretend that we forgot that it is impossible to divide a zero, with whom does it not happen?)

And what will you see in the morning? I think you will be terribly upset because obviously, you will see the following traceback:

multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
File "/usr/lib/python3.9/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.9/multiprocessing/pool.py", line 48, in mapstar
return list(map(*args))
File "/home/PycharmProjects/myproject/main.py", line 9, in my_awesome_foo
1 / 0
ZeroDivisionError: division by zero
"""
The above exception was the direct cause of the following exception:Traceback (most recent call last):
File "/home/PycharmProjects/myproject/main.py", line 19, in <module>
result = p.map(my_awesome_foo, tasks)
File "/usr/lib/python3.9/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.9/multiprocessing/pool.py", line 771, in get
raise self._value
ZeroDivisionError: division by zero

Someone will say that this is not surprising, it should happen and will be absolutely right. But let’s change our code a little and try to find out in more detail what happens inside the pool when an exception occurs in one of the processes. We will add to our function the printing of messages that this process has started and I will finish the work. The process name can be obtained using the function current_procces().name of the multiprocessing module.

Process ForkPoolWorker-1 started working on task 0
Process ForkPoolWorker-2 started working on task 1
Process ForkPoolWorker-4 started working on task 3
Process ForkPoolWorker-3 started working on task 2
Process ForkPoolWorker-1 started working on task 4
Process ForkPoolWorker-2 ended working on task 1
Process ForkPoolWorker-2 started working on task 5
Process ForkPoolWorker-4 ended working on task 3
Process ForkPoolWorker-4 started working on task 6
Process ForkPoolWorker-2 ended working on task 5
Process ForkPoolWorker-2 started working on task 7
Process ForkPoolWorker-1 ended working on task 4
Process ForkPoolWorker-1 started working on task 8
Process ForkPoolWorker-4 ended working on task 6
Process ForkPoolWorker-4 started working on task 9
Process ForkPoolWorker-3 ended working on task 2
Process ForkPoolWorker-1 ended working on task 8
Process ForkPoolWorker-4 ended working on task 9
Process ForkPoolWorker-2 ended working on task 7
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.9/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.9/multiprocessing/pool.py", line 48, in mapstar
return list(map(*args))
File "/home/PycharmProjects/myproject/main.py", line 9, in my_awesome_foo
1 / 0
ZeroDivisionError: division by zero
"""
The above exception was the direct cause of the following exception:Traceback (most recent call last):
File "/home/PycharmProjects/myproject/main.py", line 19, in <module>
result = p.map(my_awesome_foo, tasks)
File "/usr/lib/python3.9/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.9/multiprocessing/pool.py", line 771, in get
raise self._value
ZeroDivisionError: division by zero
Process finished with exit code 1

Bam!

So our function catches an exception at the first iteration, but what do we see? We see that all processes start and successfully complete their work before we see that something went wrong. In fact, this means that your program would really work all night, but in the end, it still ended with an error and you will not get any result. It’s a shame, isn’t it?

This example clearly shows how important it is to handle exceptions when using the map method of the Pool class. What about the imap and imap_unordered methods? Here we see more predictable behavior:

Process ForkPoolWorker-1 started working on task 0
Process ForkPoolWorker-3 started working on task 2
Process ForkPoolWorker-2 started working on task 1
Process ForkPoolWorker-4 started working on task 3
Process ForkPoolWorker-1 started working on task 4
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.9/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/home/PycharmProjects/myproject/main.py", line 8, in my_awesome_foo
1 / 0
ZeroDivisionError: division by zero
"""
The above exception was the direct cause of the following exception:Traceback (most recent call last):
File "/home/PycharmProjects/myproject/main.py", line 21, in <module>
result = list(p.imap(my_awesome_foo, tasks))
File "/usr/lib/python3.9/multiprocessing/pool.py", line 870, in next
raise value
ZeroDivisionError: division by zero
Process finished with exit code 1

Unfortunately, proper handling of exceptions raised in the map method is beyond the scope of this article. There are libraries such as pebble that allow you to do this.

Here is an example of one of the exception handling options for the imap method (also suitable for imap_unordered)

Process ForkPoolWorker-1 started working on task 0
Process ForkPoolWorker-2 started working on task 1
Process ForkPoolWorker-4 started working on task 3
Process ForkPoolWorker-3 started working on task 2
Process ForkPoolWorker-1 started working on task 4
Process ForkPoolWorker-4 ended working on task 3
Process ForkPoolWorker-4 started working on task 5
Process ForkPoolWorker-2 ended working on task 1
Process ForkPoolWorker-3 ended working on task 2
Process ForkPoolWorker-2 started working on task 6
Process ForkPoolWorker-3 started working on task 7
Process ForkPoolWorker-1 ended working on task 4
Process ForkPoolWorker-1 started working on task 8
Process ForkPoolWorker-4 ended working on task 5
Process ForkPoolWorker-4 started working on task 9
Process ForkPoolWorker-2 ended working on task 6
Process ForkPoolWorker-3 ended working on task 7
Process ForkPoolWorker-1 ended working on task 8
Process ForkPoolWorker-4 ended working on task 9
time took: 3.0
[ZeroDivisionError('division by zero'), 1, 2, 3, 4, 5, 6, 7, 8, 9]
Process finished with exit code 0

Then you can print out the full traceback to see what went wrong:

Traceback (most recent call last):
File "/usr/lib/python3.9/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/home/PycharmProjects/myproject/main.py", line 9, in my_awesome_foo
1 / 0
ZeroDivisionError: division by zero
"""
The above exception was the direct cause of the following exception:Traceback (most recent call last):
File "/home/PycharmProjects/myproject/main.py", line 23, in <module>
result.append(next(iterator))
File "/usr/lib/python3.9/multiprocessing/pool.py", line 870, in next
raise value
ZeroDivisionError: division by zero

Thus, we successfully caught the exception, and our pool completed its work and gave us the result. Moreover, we can print the entire exception stack and see where the error occurred in our code.

Handling of hanging functions in Python

Let’s change our beautiful function:

For n=0, our function falls asleep for 5 seconds, and for all other n, it falls asleep for 1 second. And now imagine that it’s not 5 seconds, but 5 hours, for example. Or even worse, with some input data, your function falls into an infinite loop. We wouldn’t want to wait forever, would we? So what to do in such a situation? Here is an excerpt from the python documentation for the imap method:

Also if chunksize is 1 then the next() method of the iterator returned by the imap() method has an optional timeout parameter: next(timeout) will raise multiprocessing.TimeoutError if the result cannot be returned within timeout seconds.

So let’s try using the iterator next() method with the timeout argument as described in the documentation. In the previous chapter, we learned how to handle errors and, in theory, we should handle TimeoutError correctly:

What should we see this time?

Process ForkPoolWorker-1 started working on task 0
Process ForkPoolWorker-2 started working on task 1
Process ForkPoolWorker-3 started working on task 2
Process ForkPoolWorker-4 started working on task 3
Process ForkPoolWorker-2 ended working on task 1
Process ForkPoolWorker-3 ended working on task 2
Process ForkPoolWorker-4 ended working on task 3
Process ForkPoolWorker-2 started working on task 4
Process ForkPoolWorker-3 started working on task 5
Process ForkPoolWorker-4 started working on task 6
Process ForkPoolWorker-2 ended working on task 4
Process ForkPoolWorker-3 ended working on task 5
Process ForkPoolWorker-2 started working on task 7
Process ForkPoolWorker-4 ended working on task 6
Process ForkPoolWorker-3 started working on task 8
Process ForkPoolWorker-4 started working on task 9
Process ForkPoolWorker-2 ended working on task 7
Process ForkPoolWorker-4 ended working on task 9
Process ForkPoolWorker-3 ended working on task 8
Process ForkPoolWorker-1 ended working on task 0
time took: 6.0
[TimeoutError(), TimeoutError(), TimeoutError(), TimeoutError(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Process finished with exit code 0

Double Bam!

We caught the timeout Error exception 4 times and processed it, while the function still worked at n=0. That is, the ForkPoolWorker-1 process itself did not stop waiting for 5 seconds and every 1.5 seconds an exception was raised that we intercepted. Then the ForkPoolWorker-1 process successfully completed its work and returned the value 0. That’s not what we wanted at all, is it?

What should we do in this situation? How to forcibly terminate the process after the timeout expires?

Let’s think about how you can interrupt the execution of the function altogether. Probably many, I know that this can be done even from the keyboard using the keyboard shortcut Ctr+C. And how can you force an interrupt in python? We need to send an interrupt signal to our process. Let’s look at the documentation of the kill function of the os module:

os.kill(pid, sig)

Send signal sig to the process pid. Constants for the specific signals available on the host platform are defined in the signal module.

Go to the documentation of the signal module and see that SIGINT is responsible for interrupting from the keyboard (The default action is to raise KeyboardInterrupt)

Note: this approach will only work on Unix systems. A little later I will describe how you can do this in Windows.

class threading.Timer(interval, function, args=None, kwargs=None)

Create a timer that will run function with arguments args and keyword arguments kwargs, after interval seconds have passed. If args is None (the default) then an empty list will be used. If kwargs is None (the default) then an empty dict will be used.

Great, what is needed, we will create a function that will simulate an interrupt from the keyboard and we will run this function on a timer that is equal to the timeout. If it has not come, we will simply cancel the timer. Let’s implement our idea in the form of a decorator:

Let’s check how it works for our function:

Process MainProcess started working on task 0
function my_awesome_foo took longer than 1.5 s.
time took: 1.5
Process finished with exit code 0

Everything worked out the way we wanted! For a Windows-based system, you can use _thread.interrupt_main() instead of os.kill(). I tested on Windows 11 and everything worked fine. Let’s see how our decorated function works with the imap method of the Pool class:

Process ForkPoolWorker-2 started working on task 1
Process ForkPoolWorker-1 started working on task 0
Process ForkPoolWorker-4 started working on task 3
Process ForkPoolWorker-3 started working on task 2
Process ForkPoolWorker-2 ended working on task 1
Process ForkPoolWorker-4 ended working on task 3
Process ForkPoolWorker-3 ended working on task 2
Process ForkPoolWorker-4 started working on task 4
Process ForkPoolWorker-2 started working on task 6
Process ForkPoolWorker-3 started working on task 5
Process ForkPoolWorker-1 started working on task 7
Process ForkPoolWorker-3 ended working on task 5
Process ForkPoolWorker-4 ended working on task 4
Process ForkPoolWorker-2 ended working on task 6
Process ForkPoolWorker-4 started working on task 9
Process ForkPoolWorker-3 started working on task 8
Process ForkPoolWorker-1 ended working on task 7
Process ForkPoolWorker-4 ended working on task 9
Process ForkPoolWorker-3 ended working on task 8
time took: 3.0['function my_awesome_foo took longer than 1.5 s.', 1, 2, 3, 4, 5, 6, 7, 8, 9]Process finished with exit code 0

This is what we wanted!

Limitation of the memory used by the process (only for Unix systems)

Now let’s imagine a situation where you want to limit the memory that a process can use. This can be easily done on Unix systems using the resource module.

Process ForkPoolWorker-1 started working on task 0
Process ForkPoolWorker-2 started working on task 1
Process ForkPoolWorker-3 started working on task 2
Process ForkPoolWorker-4 started working on task 3
Process ForkPoolWorker-1 started working on task 4
Process ForkPoolWorker-4 ended working on task 3
Process ForkPoolWorker-2 ended working on task 1
Process ForkPoolWorker-3 ended working on task 2
Process ForkPoolWorker-2 started working on task 5
Process ForkPoolWorker-3 started working on task 6
Process ForkPoolWorker-4 started working on task 7
Process ForkPoolWorker-1 ended working on task 4
Process ForkPoolWorker-1 started working on task 8
Process ForkPoolWorker-2 ended working on task 5
Process ForkPoolWorker-3 ended working on task 6
Process ForkPoolWorker-4 ended working on task 7
Process ForkPoolWorker-2 started working on task 9
Process ForkPoolWorker-1 ended working on task 8
Process ForkPoolWorker-2 ended working on task 9
time took: 3.0[MemoryError(), 1, 2, 3, 4, 5, 6, 7, 8, 9]Process finished with exit code 0

Well, like the cherry on the cake, let’s collect all the examples into one and see how everything we talked about here can be done with one command using the parallelbar library:

image by author

and the result is:

time took: 8.2
[MemoryError(), 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, TimeoutError('function my_awesome_foo took longer than 1.5 s.'), 21, 22, 23, 24, 25, 26, 27, 28, 29]
Process finished with exit code 0

Thus, thanks to the progress bar, we were able to estimate how much time remained until the end of the execution, and he also showed us that there were intercepted errors.

You can learn more about parallelbar in my article:

or you can check the documentation

Conclusion

  • in the article, we briefly reviewed multiprocessing in python using the example of the Pool class of the multiprocessing module.
  • we have seen how exceptions can be handled in the process pool using the imap function.
  • we have implemented a decorator that allows you to interrupt the execution of a function after a specified timeout
  • we looked at how you can limit the resources used by a process in the process pool using the example of limiting the memory used
  • we looked at a small example of using the parallelbar library to implement exception handling and limiting the resources used by processes

I hope this article was useful for you!


Working with the map, imap, and imap_unordered methods

Photo by Marek Piwnicki on Unsplash

Introduction

When working with big data, it is often necessary to parallelize calculations. In python, the standard multiprocessing module is usually used for tasks that require a lot of computing resources. In DS, we constantly have to solve problems that can be easily parallelized. Examples could be bootstrap, multiple predictions (model prediction for multiple examples), data preprocessing, etc.

In this article, I would like to talk about some interesting and important things to consider when working with the multiprocessing Pool class in python:

  • exception handling in methods of the Pool class
  • handling of hanging functions in python
  • limitation of the memory used by the process (only for Unix systems)

I will be working with Python version 3.9 on OS Ubuntu 20.04.

So let’s get started!

Exception handling in methods of the Pool class

In my practice, I often have to work with the multiprocessing module. It’s nice to use all the power of your computer and squeeze all the juices out of the processor, isn’t it? Let’s imagine that you have written incredibly complex code and your calculations are so big that you decided to run them at night with the hope of waking up and seeing a wonderful result of your work. So, here is our beautiful function (let’s pretend that we forgot that it is impossible to divide a zero, with whom does it not happen?)

And what will you see in the morning? I think you will be terribly upset because obviously, you will see the following traceback:

multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
File "/usr/lib/python3.9/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.9/multiprocessing/pool.py", line 48, in mapstar
return list(map(*args))
File "/home/PycharmProjects/myproject/main.py", line 9, in my_awesome_foo
1 / 0
ZeroDivisionError: division by zero
"""
The above exception was the direct cause of the following exception:Traceback (most recent call last):
File "/home/PycharmProjects/myproject/main.py", line 19, in <module>
result = p.map(my_awesome_foo, tasks)
File "/usr/lib/python3.9/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.9/multiprocessing/pool.py", line 771, in get
raise self._value
ZeroDivisionError: division by zero

Someone will say that this is not surprising, it should happen and will be absolutely right. But let’s change our code a little and try to find out in more detail what happens inside the pool when an exception occurs in one of the processes. We will add to our function the printing of messages that this process has started and I will finish the work. The process name can be obtained using the function current_procces().name of the multiprocessing module.

Process ForkPoolWorker-1 started working on task 0
Process ForkPoolWorker-2 started working on task 1
Process ForkPoolWorker-4 started working on task 3
Process ForkPoolWorker-3 started working on task 2
Process ForkPoolWorker-1 started working on task 4
Process ForkPoolWorker-2 ended working on task 1
Process ForkPoolWorker-2 started working on task 5
Process ForkPoolWorker-4 ended working on task 3
Process ForkPoolWorker-4 started working on task 6
Process ForkPoolWorker-2 ended working on task 5
Process ForkPoolWorker-2 started working on task 7
Process ForkPoolWorker-1 ended working on task 4
Process ForkPoolWorker-1 started working on task 8
Process ForkPoolWorker-4 ended working on task 6
Process ForkPoolWorker-4 started working on task 9
Process ForkPoolWorker-3 ended working on task 2
Process ForkPoolWorker-1 ended working on task 8
Process ForkPoolWorker-4 ended working on task 9
Process ForkPoolWorker-2 ended working on task 7
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.9/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.9/multiprocessing/pool.py", line 48, in mapstar
return list(map(*args))
File "/home/PycharmProjects/myproject/main.py", line 9, in my_awesome_foo
1 / 0
ZeroDivisionError: division by zero
"""
The above exception was the direct cause of the following exception:Traceback (most recent call last):
File "/home/PycharmProjects/myproject/main.py", line 19, in <module>
result = p.map(my_awesome_foo, tasks)
File "/usr/lib/python3.9/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.9/multiprocessing/pool.py", line 771, in get
raise self._value
ZeroDivisionError: division by zero
Process finished with exit code 1

Bam!

So our function catches an exception at the first iteration, but what do we see? We see that all processes start and successfully complete their work before we see that something went wrong. In fact, this means that your program would really work all night, but in the end, it still ended with an error and you will not get any result. It’s a shame, isn’t it?

This example clearly shows how important it is to handle exceptions when using the map method of the Pool class. What about the imap and imap_unordered methods? Here we see more predictable behavior:

Process ForkPoolWorker-1 started working on task 0
Process ForkPoolWorker-3 started working on task 2
Process ForkPoolWorker-2 started working on task 1
Process ForkPoolWorker-4 started working on task 3
Process ForkPoolWorker-1 started working on task 4
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.9/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/home/PycharmProjects/myproject/main.py", line 8, in my_awesome_foo
1 / 0
ZeroDivisionError: division by zero
"""
The above exception was the direct cause of the following exception:Traceback (most recent call last):
File "/home/PycharmProjects/myproject/main.py", line 21, in <module>
result = list(p.imap(my_awesome_foo, tasks))
File "/usr/lib/python3.9/multiprocessing/pool.py", line 870, in next
raise value
ZeroDivisionError: division by zero
Process finished with exit code 1

Unfortunately, proper handling of exceptions raised in the map method is beyond the scope of this article. There are libraries such as pebble that allow you to do this.

Here is an example of one of the exception handling options for the imap method (also suitable for imap_unordered)

Process ForkPoolWorker-1 started working on task 0
Process ForkPoolWorker-2 started working on task 1
Process ForkPoolWorker-4 started working on task 3
Process ForkPoolWorker-3 started working on task 2
Process ForkPoolWorker-1 started working on task 4
Process ForkPoolWorker-4 ended working on task 3
Process ForkPoolWorker-4 started working on task 5
Process ForkPoolWorker-2 ended working on task 1
Process ForkPoolWorker-3 ended working on task 2
Process ForkPoolWorker-2 started working on task 6
Process ForkPoolWorker-3 started working on task 7
Process ForkPoolWorker-1 ended working on task 4
Process ForkPoolWorker-1 started working on task 8
Process ForkPoolWorker-4 ended working on task 5
Process ForkPoolWorker-4 started working on task 9
Process ForkPoolWorker-2 ended working on task 6
Process ForkPoolWorker-3 ended working on task 7
Process ForkPoolWorker-1 ended working on task 8
Process ForkPoolWorker-4 ended working on task 9
time took: 3.0
[ZeroDivisionError('division by zero'), 1, 2, 3, 4, 5, 6, 7, 8, 9]
Process finished with exit code 0

Then you can print out the full traceback to see what went wrong:

Traceback (most recent call last):
File "/usr/lib/python3.9/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/home/PycharmProjects/myproject/main.py", line 9, in my_awesome_foo
1 / 0
ZeroDivisionError: division by zero
"""
The above exception was the direct cause of the following exception:Traceback (most recent call last):
File "/home/PycharmProjects/myproject/main.py", line 23, in <module>
result.append(next(iterator))
File "/usr/lib/python3.9/multiprocessing/pool.py", line 870, in next
raise value
ZeroDivisionError: division by zero

Thus, we successfully caught the exception, and our pool completed its work and gave us the result. Moreover, we can print the entire exception stack and see where the error occurred in our code.

Handling of hanging functions in Python

Let’s change our beautiful function:

For n=0, our function falls asleep for 5 seconds, and for all other n, it falls asleep for 1 second. And now imagine that it’s not 5 seconds, but 5 hours, for example. Or even worse, with some input data, your function falls into an infinite loop. We wouldn’t want to wait forever, would we? So what to do in such a situation? Here is an excerpt from the python documentation for the imap method:

Also if chunksize is 1 then the next() method of the iterator returned by the imap() method has an optional timeout parameter: next(timeout) will raise multiprocessing.TimeoutError if the result cannot be returned within timeout seconds.

So let’s try using the iterator next() method with the timeout argument as described in the documentation. In the previous chapter, we learned how to handle errors and, in theory, we should handle TimeoutError correctly:

What should we see this time?

Process ForkPoolWorker-1 started working on task 0
Process ForkPoolWorker-2 started working on task 1
Process ForkPoolWorker-3 started working on task 2
Process ForkPoolWorker-4 started working on task 3
Process ForkPoolWorker-2 ended working on task 1
Process ForkPoolWorker-3 ended working on task 2
Process ForkPoolWorker-4 ended working on task 3
Process ForkPoolWorker-2 started working on task 4
Process ForkPoolWorker-3 started working on task 5
Process ForkPoolWorker-4 started working on task 6
Process ForkPoolWorker-2 ended working on task 4
Process ForkPoolWorker-3 ended working on task 5
Process ForkPoolWorker-2 started working on task 7
Process ForkPoolWorker-4 ended working on task 6
Process ForkPoolWorker-3 started working on task 8
Process ForkPoolWorker-4 started working on task 9
Process ForkPoolWorker-2 ended working on task 7
Process ForkPoolWorker-4 ended working on task 9
Process ForkPoolWorker-3 ended working on task 8
Process ForkPoolWorker-1 ended working on task 0
time took: 6.0
[TimeoutError(), TimeoutError(), TimeoutError(), TimeoutError(), 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Process finished with exit code 0

Double Bam!

We caught the timeout Error exception 4 times and processed it, while the function still worked at n=0. That is, the ForkPoolWorker-1 process itself did not stop waiting for 5 seconds and every 1.5 seconds an exception was raised that we intercepted. Then the ForkPoolWorker-1 process successfully completed its work and returned the value 0. That’s not what we wanted at all, is it?

What should we do in this situation? How to forcibly terminate the process after the timeout expires?

Let’s think about how you can interrupt the execution of the function altogether. Probably many, I know that this can be done even from the keyboard using the keyboard shortcut Ctr+C. And how can you force an interrupt in python? We need to send an interrupt signal to our process. Let’s look at the documentation of the kill function of the os module:

os.kill(pid, sig)

Send signal sig to the process pid. Constants for the specific signals available on the host platform are defined in the signal module.

Go to the documentation of the signal module and see that SIGINT is responsible for interrupting from the keyboard (The default action is to raise KeyboardInterrupt)

Note: this approach will only work on Unix systems. A little later I will describe how you can do this in Windows.

class threading.Timer(interval, function, args=None, kwargs=None)

Create a timer that will run function with arguments args and keyword arguments kwargs, after interval seconds have passed. If args is None (the default) then an empty list will be used. If kwargs is None (the default) then an empty dict will be used.

Great, what is needed, we will create a function that will simulate an interrupt from the keyboard and we will run this function on a timer that is equal to the timeout. If it has not come, we will simply cancel the timer. Let’s implement our idea in the form of a decorator:

Let’s check how it works for our function:

Process MainProcess started working on task 0
function my_awesome_foo took longer than 1.5 s.
time took: 1.5
Process finished with exit code 0

Everything worked out the way we wanted! For a Windows-based system, you can use _thread.interrupt_main() instead of os.kill(). I tested on Windows 11 and everything worked fine. Let’s see how our decorated function works with the imap method of the Pool class:

Process ForkPoolWorker-2 started working on task 1
Process ForkPoolWorker-1 started working on task 0
Process ForkPoolWorker-4 started working on task 3
Process ForkPoolWorker-3 started working on task 2
Process ForkPoolWorker-2 ended working on task 1
Process ForkPoolWorker-4 ended working on task 3
Process ForkPoolWorker-3 ended working on task 2
Process ForkPoolWorker-4 started working on task 4
Process ForkPoolWorker-2 started working on task 6
Process ForkPoolWorker-3 started working on task 5
Process ForkPoolWorker-1 started working on task 7
Process ForkPoolWorker-3 ended working on task 5
Process ForkPoolWorker-4 ended working on task 4
Process ForkPoolWorker-2 ended working on task 6
Process ForkPoolWorker-4 started working on task 9
Process ForkPoolWorker-3 started working on task 8
Process ForkPoolWorker-1 ended working on task 7
Process ForkPoolWorker-4 ended working on task 9
Process ForkPoolWorker-3 ended working on task 8
time took: 3.0['function my_awesome_foo took longer than 1.5 s.', 1, 2, 3, 4, 5, 6, 7, 8, 9]Process finished with exit code 0

This is what we wanted!

Limitation of the memory used by the process (only for Unix systems)

Now let’s imagine a situation where you want to limit the memory that a process can use. This can be easily done on Unix systems using the resource module.

Process ForkPoolWorker-1 started working on task 0
Process ForkPoolWorker-2 started working on task 1
Process ForkPoolWorker-3 started working on task 2
Process ForkPoolWorker-4 started working on task 3
Process ForkPoolWorker-1 started working on task 4
Process ForkPoolWorker-4 ended working on task 3
Process ForkPoolWorker-2 ended working on task 1
Process ForkPoolWorker-3 ended working on task 2
Process ForkPoolWorker-2 started working on task 5
Process ForkPoolWorker-3 started working on task 6
Process ForkPoolWorker-4 started working on task 7
Process ForkPoolWorker-1 ended working on task 4
Process ForkPoolWorker-1 started working on task 8
Process ForkPoolWorker-2 ended working on task 5
Process ForkPoolWorker-3 ended working on task 6
Process ForkPoolWorker-4 ended working on task 7
Process ForkPoolWorker-2 started working on task 9
Process ForkPoolWorker-1 ended working on task 8
Process ForkPoolWorker-2 ended working on task 9
time took: 3.0[MemoryError(), 1, 2, 3, 4, 5, 6, 7, 8, 9]Process finished with exit code 0

Well, like the cherry on the cake, let’s collect all the examples into one and see how everything we talked about here can be done with one command using the parallelbar library:

image by author

and the result is:

time took: 8.2
[MemoryError(), 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, TimeoutError('function my_awesome_foo took longer than 1.5 s.'), 21, 22, 23, 24, 25, 26, 27, 28, 29]
Process finished with exit code 0

Thus, thanks to the progress bar, we were able to estimate how much time remained until the end of the execution, and he also showed us that there were intercepted errors.

You can learn more about parallelbar in my article:

or you can check the documentation

Conclusion

  • in the article, we briefly reviewed multiprocessing in python using the example of the Pool class of the multiprocessing module.
  • we have seen how exceptions can be handled in the process pool using the imap function.
  • we have implemented a decorator that allows you to interrupt the execution of a function after a specified timeout
  • we looked at how you can limit the resources used by a process in the process pool using the example of limiting the memory used
  • we looked at a small example of using the parallelbar library to implement exception handling and limiting the resources used by processes

I hope this article was useful for you!

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.
Leave a comment