Python asyncio.Semaphore class (with examples)

Updated: August 7, 2023 By: Khue Post a comment

This succinct, practice-oriented article is about the asyncio.Semaphore class in Python.

The Fundamentals

The asyncio.Semaphore class is used to implement a semaphore object for asyncio tasks. A semaphore is a synchronization primitive that controls access to a shared resource by multiple concurrent tasks. A semaphore has an internal counter that is decremented by each acquire() call and incremented by each release() call. The counter can never go below zero; when acquire() finds that it is zero, it blocks, waiting until some task calls release().

Syntax:

class asyncio.Semaphore(value=1)

Here, value is an optional argument that specifies the initial value of the internal counter (default is 1).

Methods:

  • acquire(): a coroutine that acquires the semaphore. This method waits until the counter is positive, sets it to negative, and returns True. When more than one coroutine is blocked in acquire() waiting for the semaphore to be unlocked, only one coroutine eventually proceeds. Acquiring a semaphore is fair: the coroutine that proceeds will be the first coroutine that started waiting on the semaphore.
  • release(): a method that releases the semaphore. When the counter is negative, it resets it to positive and returns. If the counter is positive, a RuntimeError is raised.
  • locked(): a method that returns True if the semaphore is locked (the counter is negative) or False otherwise.

The asyncio.Semaphore class shines when you need to limit the number of concurrent tasks that can access a shared resource or perform some action. The following examples will cleary prove this point.

Examples

Limit the number of concurrent tasks that print a message

This simple example uses a semaphore object to limit the number of concurrent tasks that print a message:

# SlingAcademy.com
# This code uses Python 3.11.4


import asyncio

# Define a worker coroutine
async def worker(sem, name):
    # Acquire the semaphore
    async with sem:
        # Print a message
        print(f"Worker {name} started")
        # Simulate some work
        await asyncio.sleep(2)
        # Print another message
        print(f"Worker {name} finished")

# Define the main coroutine
async def main():
    # Create a semaphore with a limit of 3
    sem = asyncio.Semaphore(3)

    # Create 10 tasks that use the semaphore
    tasks = [asyncio.create_task(worker(sem, i)) for i in range(10)]

    # Wait for all tasks to complete
    await asyncio.gather(*tasks)


# Run the main function
asyncio.run(main())

Output:

Worker 0 started
Worker 1 started
Worker 2 started
Worker 0 finished
Worker 1 finished
Worker 2 finished
Worker 3 started
Worker 4 started
Worker 5 started
Worker 3 finished
Worker 4 finished
Worker 5 finished
Worker 6 started
Worker 7 started
Worker 8 started
Worker 6 finished
Worker 7 finished
Worker 8 finished
Worker 9 started
Worker 9 finished

Only a maximum of 3 workers can be concurrently active at a time. When a worker completes, a new worker starts.

Limit the number of concurrent tasks that write to a file

This is a real-world example that demonstrates how to use asyncio.Semaphore to constrain the number of concurrent tasks that write to a text file:

# SlingAcademy.com
# This code uses Python 3.11.4

import asyncio


# Define a function that writes data to a file
async def writer(file, sem, name, data):
    # Acquire the semaphore
    async with sem:
        # Write the worker name and the data to the file
        file.write(f"Writer {name}: {data}\n")
        # Print a message
        print(f"Writer {name} wrote {data}")
        # Simulate some heavy work
        await asyncio.sleep(1)


# Define a main coroutine
async def main():
    # Create a semaphore with a limit of 2
    # This means the maximum number of writers can run concurrently is 2
    sem = asyncio.Semaphore(2)

    # Open a file in append mode
    with open("output.txt", "a") as file:
        # Create a list of data to write
        data = ["Sling", "Academy", "Python", "Asyncio", "Semaphore"]

        # Create a list of tasks that use the semaphore, the file and the data
        tasks = [
            asyncio.create_task(writer(file, sem, i, data[i])) for i in range(len(data))
        ]

        # Wait for all tasks to complete
        await asyncio.gather(*tasks)


# Run the main function
asyncio.run(main())

After the code is done, a new file named output.txt is created (in the same directory as your Python script). It contains the following:

Writer 0: Sling
Writer 1: Academy
Writer 2: Python
Writer 3: Asyncio
Writer 4: Semaphore

You also see these messages in your terminal:

Writer 0 wrote Sling
Writer 1 wrote Academy
Writer 2 wrote Python
Writer 3 wrote Asyncio
Writer 4 wrote Semaphore

Afterword

You’ve learned everything about the asyncio.Semaphore class in Python and seen a few examples of utilizing it in practice. If you find something outdated or incorrect, please let me know by leaving a comment. This tutorial ends here. Happy coding & have a nice day!