Sling Academy
Home/Python/Python asyncio: How to limit the number of concurrent tasks

Python asyncio: How to limit the number of concurrent tasks

Last updated: February 12, 2024

Introduction

In modern software development, handling I/O-bound and high-level structured network code can be challenging. Python’s asyncio module is a game-changer for asynchronous programming, allowing developers to write concurrent code using the async/await syntax. However, managing the concurrency level to optimize resources and prevent overwhelming servers or APIs with too many requests is crucial. This tutorial guides you through limiting the number of concurrent tasks in asyncio, from basic to advanced examples.

Getting Started

Before diving into concurrency limits, it’s essential to understand the basics of asyncio and its event loop. The asyncio library is used for writing single-threaded concurrent code. The event loop schedules asynchronous tasks and callbacks, allowing I/O operations to be non-blocking.

Here is a simple example of an asynchronous function and how to run it:

import asyncio

async def say_hello():
    print("Hello, asyncio!")

asyncio.run(say_hello())

Executing Multiple Tasks Concurrently

To manage multiple tasks concurrently, you usually gather them using asyncio.gather():

import asyncio

async def task(name, seconds):
    print(f'Task {name} started')
    await asyncio.sleep(seconds)
    print(f'Task {name} completed')

async def main():
    await asyncio.gather(
        task('A', 1),
        task('B', 2),
        task('C', 3)
    )

asyncio.run(main())

Limiting the Number of Concurrent Tasks

To limit concurrency, Python’s asyncio offers semaphores via asyncio.Semaphore, which allows controlling access to a resource by multiple asynchronous tasks. A semaphore can set a counter representing how many tasks are allowed to run concurrently.

Here’s how you can use a semaphore to limit the number of concurrent tasks:

import asyncio

async def limited_task(name, semaphore):
    async with semaphore:
        print(f'Task {name} is running')
        await asyncio.sleep(2)
        print(f'Task {name} completed')

async def main():
    semaphore = asyncio.Semaphore(2) # Limit to 2 concurrent tasks
    await asyncio.gather(
        limited_task('A', semaphore),
        limited_task('B', semaphore),
        limited_task('C', semaphore),
        limited_task('D', semaphore)
    )

asyncio.run(main())

Advanced Usage: Dynamic Task Queues

For more fine-grained control over task execution and concurrency, you can create dynamic task queues. This involves using asyncio.Queue to manage tasks and control their execution based on available resources or external conditions.

Here is an example illustrating how to create and manage a dynamic task queue:

import asyncio

async def worker(name, task_queue):
    while True:
        task = await task_queue.get()
        print(f'Worker {name}: Starting {task}')
        await asyncio.sleep(1)
        print(f'Worker {name}: Completed {task}')
        task_queue.task_done()

async def main():
    task_queue = asyncio.Queue()

    for task in range(10):
        await task_queue.put(f'Task {task}')

    workers = [asyncio.create_task(worker(f'W{index}', task_queue)) for index in range(3)]

    await task_queue.join()

    for worker in workers:
        worker.cancel()

asyncio.run(main())

Conclusion

Limiting the number of concurrent tasks in asyncio is crucial for resource optimization and preventing server overloads. This tutorial explored basic to advanced examples of managing concurrency, from using semaphores to dynamic task queues. Implementing these techniques in your asynchronous Python applications will lead to more efficient and robust code.

Next Article: Python: Using async functions with the ‘WITH’ statement

Previous Article: Python error: asyncio.run() cannot be called from a running event loop

Series: Python Asynchronous Programming Tutorials

Python

You May Also Like

  • Python Warning: Secure coding is not enabled for restorable state
  • Python TypeError: write() argument must be str, not bytes
  • 4 ways to install Python modules on Windows without admin rights
  • Python TypeError: object of type ‘NoneType’ has no len()
  • Python: How to access command-line arguments (3 approaches)
  • Understanding ‘Never’ type in Python 3.11+ (5 examples)
  • Python: 3 Ways to Retrieve City/Country from IP Address
  • Using Type Aliases in Python: A Practical Guide (with Examples)
  • Python: Defining distinct types using NewType class
  • Using Optional Type in Python (explained with examples)
  • Python: How to Override Methods in Classes
  • Python: Define Generic Types for Lists of Nested Dictionaries
  • Python: Defining type for a list that can contain both numbers and strings
  • Using TypeGuard in Python (Python 3.10+)
  • Python: Using ‘NoReturn’ type with functions
  • Type Casting in Python: The Ultimate Guide (with Examples)
  • Python: Using type hints with class methods and properties
  • Python: Typing a function with default parameters
  • Python: Typing a function that can return multiple types