Python asyncio: How to limit the number of concurrent tasks

Updated: February 12, 2024 By: Guest Contributor Post a comment

Introduction

In modern software development, handling I/O-bound and high-level structured network code can be challenging. Python’s asyncio module is a game-changer for asynchronous programming, allowing developers to write concurrent code using the async/await syntax. However, managing the concurrency level to optimize resources and prevent overwhelming servers or APIs with too many requests is crucial. This tutorial guides you through limiting the number of concurrent tasks in asyncio, from basic to advanced examples.

Getting Started

Before diving into concurrency limits, it’s essential to understand the basics of asyncio and its event loop. The asyncio library is used for writing single-threaded concurrent code. The event loop schedules asynchronous tasks and callbacks, allowing I/O operations to be non-blocking.

Here is a simple example of an asynchronous function and how to run it:

import asyncio

async def say_hello():
    print("Hello, asyncio!")

asyncio.run(say_hello())

Executing Multiple Tasks Concurrently

To manage multiple tasks concurrently, you usually gather them using asyncio.gather():

import asyncio

async def task(name, seconds):
    print(f'Task {name} started')
    await asyncio.sleep(seconds)
    print(f'Task {name} completed')

async def main():
    await asyncio.gather(
        task('A', 1),
        task('B', 2),
        task('C', 3)
    )

asyncio.run(main())

Limiting the Number of Concurrent Tasks

To limit concurrency, Python’s asyncio offers semaphores via asyncio.Semaphore, which allows controlling access to a resource by multiple asynchronous tasks. A semaphore can set a counter representing how many tasks are allowed to run concurrently.

Here’s how you can use a semaphore to limit the number of concurrent tasks:

import asyncio

async def limited_task(name, semaphore):
    async with semaphore:
        print(f'Task {name} is running')
        await asyncio.sleep(2)
        print(f'Task {name} completed')

async def main():
    semaphore = asyncio.Semaphore(2) # Limit to 2 concurrent tasks
    await asyncio.gather(
        limited_task('A', semaphore),
        limited_task('B', semaphore),
        limited_task('C', semaphore),
        limited_task('D', semaphore)
    )

asyncio.run(main())

Advanced Usage: Dynamic Task Queues

For more fine-grained control over task execution and concurrency, you can create dynamic task queues. This involves using asyncio.Queue to manage tasks and control their execution based on available resources or external conditions.

Here is an example illustrating how to create and manage a dynamic task queue:

import asyncio

async def worker(name, task_queue):
    while True:
        task = await task_queue.get()
        print(f'Worker {name}: Starting {task}')
        await asyncio.sleep(1)
        print(f'Worker {name}: Completed {task}')
        task_queue.task_done()

async def main():
    task_queue = asyncio.Queue()

    for task in range(10):
        await task_queue.put(f'Task {task}')

    workers = [asyncio.create_task(worker(f'W{index}', task_queue)) for index in range(3)]

    await task_queue.join()

    for worker in workers:
        worker.cancel()

asyncio.run(main())

Conclusion

Limiting the number of concurrent tasks in asyncio is crucial for resource optimization and preventing server overloads. This tutorial explored basic to advanced examples of managing concurrency, from using semaphores to dynamic task queues. Implementing these techniques in your asynchronous Python applications will lead to more efficient and robust code.