Techno Blender
Digitally Yours.

Introduction to asyncio. Managing I/O bound concurrency with… | by Oliver S | Mar, 2023

0 55


Managing I/O bound concurrency with Python

Concurrency and parallelism denote a program’s or computer’s capability to run multiple operations in parallel. One commonly distinguishes multi-processing (parallelism) and multi-threading (concurrency)— with the former describing running multiple processes, whereas the second denotes spawning multiple threads within the same process. In Python, due to the Global Interpreter Lock (GIL), only one thread can be executed at once, causing any multi-threaded application to be single core effectively. Still, e.g. for I/O bound programs (programs, in which more time is spent waiting for inputs — i.e. this is the bottleneck and not compute), also in Python multi-threading makes sense. In this post we will introduce asyncio, which is an elegant and easy way of doing so.

Photo by Gabriel Gusmao on Unsplash

We will designate a future post for the differences between multi-processing and multi-threading, and how asyncio plays into this. Still we want to motivate its usage here. As hinted above, multi-threading is particularly useful for I/O bound, i.e. non-CPU bound, applications. And asyncio is one way of doing so, which is preferred for some use cases by many developers.

asyncio excels for I/O bound programs, for which we want to write easy, clearly structured, and error-resistant code. asyncio actually uses only a single thread, and leaves it up to the user when the currently executed piece of code will “yield” its execution for waiting on external inputs and allowing a different piece of code to be executed. This often makes the code easier to read, while also giving the user more control of what is executed when — while simultaneously reducing the risk of deadlocks and co.

Let’s begin by an introductory example. Consider the following code:

import time

def sleepy_function():
print("Before sleeping.")
time.sleep(1)
print("After sleeping.")

def main():
for _ in range(3):
sleepy_function()

main()

In it, we call sleepy_function() three times, which simply prints two things and otherwise sleeps for a second. In total, this program takes 3 seconds to execute. But of course it is pretty much a waste to have a CPU wait for 1 second without doing any other work. Thus it’s natural to parallelise this — which is exactly the definition of an I/O bound program.

With asyncio, the following code can be parallelised like this:

import asyncio

async def sleepy_function():
print("Before sleeping.")
await asyncio.sleep(1)
print("After sleeping.")

async def main():
await asyncio.gather(*[sleepy_function() for _ in range(3)])

asyncio.run(main())

Note asyncio is only available from Python versions 3.4 upwards, and for these is included in the standard library.

In the following, we will now have a closer look at the used syntax and asyncio in general.

Core concepts of asyncio are coroutines and tasks. Coroutines are basically functions, which can yield execution to the controlling asyncio thread (while e.g. waiting for inputs), without losing their state (a bit similar to generator expressions). Thus, in this form of concurrency the user is in full control of when context switches between threads (coroutines / tasks in this setting) happen, as opposed to classical multi-threading, where this is determined by the operating system. Due to this, and the fact that asyncio makes use of only a single thread — asyncio code is much less error prone and we (normally) don’t need to use any kind of locking, such as locks or mutexes.

A coroutine is defined using the keyword async:

async def sample_coroutine():
print("Inside sample_coroutine")

It is now not enough to just call this as usual via sample_coroutine() — this won’t work. Instead, we have to order asyncio to execute this. We can do so via:

asyncio.run(sample_coroutine())

The next important keyword is await. This signals asyncio that we want to wait for some event, and that it can proceed executing another part of the code. Anything we can await for is called Awaitable, and coroutines and tasks are two examples:

import asyncio

async def sample_coroutine_2():
print("Inside sample_coroutine_2")

async def sample_coroutine():
print("Inside sample_coroutine")
await asyncio.sleep(1)
await sample_coroutine_2()

asyncio.run(sample_coroutine())

In above example, we first wait for 1s using asyncio.sleep — and then for sample_coroutine_2. Note the execution time of this program is still 1s, and not less somehow — but shortening this is not the point of this simple example.

Tasks

If however, we now want to parallelise the execution of different code blocks, one thing we can use is tasks. Via asyncio.create_task we can create a task from a coroutine, and then execute this via await. This way, we can run multiple coroutines in parallel:

import asyncio

async def sleepy_function():
print("Before sleeping.")
await asyncio.sleep(1)
print("After sleeping.")

async def main():
task1 = asyncio.create_task(sleepy_function())
task2 = asyncio.create_task(sleepy_function())
task3 = asyncio.create_task(sleepy_function())

await task1
await task2
await task3

asyncio.run(main())

gather

However, I usually find it more convenient and intuitive to use the following syntax involving asyncio.gather, which can take arbitrary many Awaitables as input:

import asyncio

async def sleepy_function():
print("Before sleeping.")
await asyncio.sleep(1)
print("After sleeping.")

async def main():
await asyncio.gather(
sleepy_function(), sleepy_function(), sleepy_function()
)

asyncio.run(main())

We can shorten above code using the asterix (*) operator, which unpacks the given Iterable into single arguments (and thus closing the gap to the introductory example):

import asyncio

async def sleepy_function():
print("Before sleeping.")
await asyncio.sleep(1)
print("After sleeping.")

async def main():
await asyncio.gather(*[sleepy_function() for _ in range(3)])

asyncio.run(main())

Queue

Let’s end this post by introducing asyncio.Queue, a queue which is safe to be used in “multi-threaded” asyncio applications, and a helpful tool for many real-world use-cases. We use it for a slightly more complex example, in hopes of capping off this post by a more realistic “real-world” example.

Our example consists of one producer thread and N consumer threads. The consumer tries to guess ISBN numbers, and puts them on the queue. The consumers remove these from the queue, and do a GET request to a public endpoint returning information about books with the given ISBN. If the consumer guesses a correct ISBN, we print the returned book information.

To execute the request in async mode, we use aiohttp. Webrequests in general are a common topic and use-case for asyncio: often, a webserver will concurrently run different requests, render / query different things, and then return the resulting webpage. Since this is not CPU bound, but most of the time (probably) is spent on waiting for the request results, asyncio is a good fit here.

Let’s look at the code:

import asyncio
import threading
from random import randint

import aiohttp

NUM_CONSUMERS = 8

async def producer(queue: asyncio.Queue) -> None:
while True:
random_isbn = "".join(["{}".format(randint(0, 9)) for _ in range(0, 10)])
queue.put_nowait(random_isbn)
await asyncio.sleep(0.05)

async def consumer(consumer_idx: int, queue: asyncio.Queue) -> None:
while True:
isbn = await queue.get()
async with aiohttp.ClientSession() as session:
async with session.get(
"https://openlibrary.org/api/books?bibkeys=ISBN:"
+ isbn
+ "&format=json"
) as resp:
book_descriptor = await resp.json()
if book_descriptor != {}:
print(
f"Consumer {consumer_idx} found valid ISBN. Current queue size: {queue.qsize()}. Discovered book: {book_descriptor}"
)
queue.task_done()

async def main():
queue = asyncio.Queue()
await asyncio.gather(
*([producer(queue)] + [consumer(idx, queue) for idx in range(NUM_CONSUMERS)])
)

if __name__ == "__main__":
asyncio.run(main())

Apart from what was said above, it is noteworthy to call out how to use the async queue. As we can see, we place items on the queue via put_nowait(). The _nowait suffix simply means don’t wait when the queue is full — which does not concern us here, as we don’t put a limit on the queue size.

Items are popped from the queue via get() — but need to be marked complete by a corresponding tasks_done()!

Note I tuned the number of consumers and the sleep time between generating ISBNs, s.t. on my laptop the queue size stays relatively constant.

This brings us to the end of this tutorial about asyncio. I hope this was an interesting read, thanks for stopping by!

PS: let me know if you by accident discovered any cool reads using the last sample program!


Managing I/O bound concurrency with Python

Concurrency and parallelism denote a program’s or computer’s capability to run multiple operations in parallel. One commonly distinguishes multi-processing (parallelism) and multi-threading (concurrency)— with the former describing running multiple processes, whereas the second denotes spawning multiple threads within the same process. In Python, due to the Global Interpreter Lock (GIL), only one thread can be executed at once, causing any multi-threaded application to be single core effectively. Still, e.g. for I/O bound programs (programs, in which more time is spent waiting for inputs — i.e. this is the bottleneck and not compute), also in Python multi-threading makes sense. In this post we will introduce asyncio, which is an elegant and easy way of doing so.

Photo by Gabriel Gusmao on Unsplash

We will designate a future post for the differences between multi-processing and multi-threading, and how asyncio plays into this. Still we want to motivate its usage here. As hinted above, multi-threading is particularly useful for I/O bound, i.e. non-CPU bound, applications. And asyncio is one way of doing so, which is preferred for some use cases by many developers.

asyncio excels for I/O bound programs, for which we want to write easy, clearly structured, and error-resistant code. asyncio actually uses only a single thread, and leaves it up to the user when the currently executed piece of code will “yield” its execution for waiting on external inputs and allowing a different piece of code to be executed. This often makes the code easier to read, while also giving the user more control of what is executed when — while simultaneously reducing the risk of deadlocks and co.

Let’s begin by an introductory example. Consider the following code:

import time

def sleepy_function():
print("Before sleeping.")
time.sleep(1)
print("After sleeping.")

def main():
for _ in range(3):
sleepy_function()

main()

In it, we call sleepy_function() three times, which simply prints two things and otherwise sleeps for a second. In total, this program takes 3 seconds to execute. But of course it is pretty much a waste to have a CPU wait for 1 second without doing any other work. Thus it’s natural to parallelise this — which is exactly the definition of an I/O bound program.

With asyncio, the following code can be parallelised like this:

import asyncio

async def sleepy_function():
print("Before sleeping.")
await asyncio.sleep(1)
print("After sleeping.")

async def main():
await asyncio.gather(*[sleepy_function() for _ in range(3)])

asyncio.run(main())

Note asyncio is only available from Python versions 3.4 upwards, and for these is included in the standard library.

In the following, we will now have a closer look at the used syntax and asyncio in general.

Core concepts of asyncio are coroutines and tasks. Coroutines are basically functions, which can yield execution to the controlling asyncio thread (while e.g. waiting for inputs), without losing their state (a bit similar to generator expressions). Thus, in this form of concurrency the user is in full control of when context switches between threads (coroutines / tasks in this setting) happen, as opposed to classical multi-threading, where this is determined by the operating system. Due to this, and the fact that asyncio makes use of only a single thread — asyncio code is much less error prone and we (normally) don’t need to use any kind of locking, such as locks or mutexes.

A coroutine is defined using the keyword async:

async def sample_coroutine():
print("Inside sample_coroutine")

It is now not enough to just call this as usual via sample_coroutine() — this won’t work. Instead, we have to order asyncio to execute this. We can do so via:

asyncio.run(sample_coroutine())

The next important keyword is await. This signals asyncio that we want to wait for some event, and that it can proceed executing another part of the code. Anything we can await for is called Awaitable, and coroutines and tasks are two examples:

import asyncio

async def sample_coroutine_2():
print("Inside sample_coroutine_2")

async def sample_coroutine():
print("Inside sample_coroutine")
await asyncio.sleep(1)
await sample_coroutine_2()

asyncio.run(sample_coroutine())

In above example, we first wait for 1s using asyncio.sleep — and then for sample_coroutine_2. Note the execution time of this program is still 1s, and not less somehow — but shortening this is not the point of this simple example.

Tasks

If however, we now want to parallelise the execution of different code blocks, one thing we can use is tasks. Via asyncio.create_task we can create a task from a coroutine, and then execute this via await. This way, we can run multiple coroutines in parallel:

import asyncio

async def sleepy_function():
print("Before sleeping.")
await asyncio.sleep(1)
print("After sleeping.")

async def main():
task1 = asyncio.create_task(sleepy_function())
task2 = asyncio.create_task(sleepy_function())
task3 = asyncio.create_task(sleepy_function())

await task1
await task2
await task3

asyncio.run(main())

gather

However, I usually find it more convenient and intuitive to use the following syntax involving asyncio.gather, which can take arbitrary many Awaitables as input:

import asyncio

async def sleepy_function():
print("Before sleeping.")
await asyncio.sleep(1)
print("After sleeping.")

async def main():
await asyncio.gather(
sleepy_function(), sleepy_function(), sleepy_function()
)

asyncio.run(main())

We can shorten above code using the asterix (*) operator, which unpacks the given Iterable into single arguments (and thus closing the gap to the introductory example):

import asyncio

async def sleepy_function():
print("Before sleeping.")
await asyncio.sleep(1)
print("After sleeping.")

async def main():
await asyncio.gather(*[sleepy_function() for _ in range(3)])

asyncio.run(main())

Queue

Let’s end this post by introducing asyncio.Queue, a queue which is safe to be used in “multi-threaded” asyncio applications, and a helpful tool for many real-world use-cases. We use it for a slightly more complex example, in hopes of capping off this post by a more realistic “real-world” example.

Our example consists of one producer thread and N consumer threads. The consumer tries to guess ISBN numbers, and puts them on the queue. The consumers remove these from the queue, and do a GET request to a public endpoint returning information about books with the given ISBN. If the consumer guesses a correct ISBN, we print the returned book information.

To execute the request in async mode, we use aiohttp. Webrequests in general are a common topic and use-case for asyncio: often, a webserver will concurrently run different requests, render / query different things, and then return the resulting webpage. Since this is not CPU bound, but most of the time (probably) is spent on waiting for the request results, asyncio is a good fit here.

Let’s look at the code:

import asyncio
import threading
from random import randint

import aiohttp

NUM_CONSUMERS = 8

async def producer(queue: asyncio.Queue) -> None:
while True:
random_isbn = "".join(["{}".format(randint(0, 9)) for _ in range(0, 10)])
queue.put_nowait(random_isbn)
await asyncio.sleep(0.05)

async def consumer(consumer_idx: int, queue: asyncio.Queue) -> None:
while True:
isbn = await queue.get()
async with aiohttp.ClientSession() as session:
async with session.get(
"https://openlibrary.org/api/books?bibkeys=ISBN:"
+ isbn
+ "&format=json"
) as resp:
book_descriptor = await resp.json()
if book_descriptor != {}:
print(
f"Consumer {consumer_idx} found valid ISBN. Current queue size: {queue.qsize()}. Discovered book: {book_descriptor}"
)
queue.task_done()

async def main():
queue = asyncio.Queue()
await asyncio.gather(
*([producer(queue)] + [consumer(idx, queue) for idx in range(NUM_CONSUMERS)])
)

if __name__ == "__main__":
asyncio.run(main())

Apart from what was said above, it is noteworthy to call out how to use the async queue. As we can see, we place items on the queue via put_nowait(). The _nowait suffix simply means don’t wait when the queue is full — which does not concern us here, as we don’t put a limit on the queue size.

Items are popped from the queue via get() — but need to be marked complete by a corresponding tasks_done()!

Note I tuned the number of consumers and the sleep time between generating ISBNs, s.t. on my laptop the queue size stays relatively constant.

This brings us to the end of this tutorial about asyncio. I hope this was an interesting read, thanks for stopping by!

PS: let me know if you by accident discovered any cool reads using the last sample program!

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.

Leave a comment