Python asyncio.Queue class (with 3 examples)

Updated: August 18, 2023 By: Goodman Post a comment

The fundamentals

In Python, asyncio queues are a type of data structure that can store and retrieve items in a first-in, first-out (FIFO) order. They are designed to be used with coroutines, which are functions that can be paused and resumed asynchronously.

To create asyncio queues, you need to use the asyncio.Queue class. You can import the asyncio module and create an instance of the asyncio.Queue class with an optional maxsize argument, which specifies the maximum number of items that the queue can hold, like this:

import asyncio

# create a queue with a capacity of 10 items
queue = asyncio.Queue(maxsize=10)

You can then call the coroutine methods on the queue object to perform asynchronous operations, such as:

  • await queue.put(item) to put an item into the queue. If the queue is full, this method will wait until a free slot is available.
  • await queue.get() to get an item from the queue. If the queue is empty, this method will wait until an item is available.
  • queue.task_done() to indicate that a previously retrieved item has been processed. This method should be called by the consumer coroutines after they finish working on an item.
  • await queue.join() to block until all items in the queue have been processed. This method should be called by the producer coroutines after they finish putting items into the queue.

You can also use some non-coroutine methods of the queue, such as:

  • queue.put_nowait(item) to put an item into the queue without blocking. If the queue is full, this method will raise a QueueFull exception.
  • queue.get_nowait() to get an item from the queue without blocking. If the queue is empty, this method will raise a QueueEmpty exception.
  • queue.qsize() to get the number of items in the queue.
  • queue.empty() to check if the queue is empty.
  • queue.full() to check if the queue is full.

Asyncio queues are useful for creating concurrent programs that involve producer-consumer patterns, where one or more coroutines produce items and put them into a queue, and one or more coroutines consume items and process them from the queue. For example, asyncio queues can be used for:

  • Implementing web crawlers that fetch and parse web pages in parallel.
  • Building message brokers that distribute tasks among multiple workers.
  • Developing chat applications that send and receive messages asynchronously.

It’s hard and boring to learn programming with text and concepts only. It’s time to get our hands dirty by writing some code.

Examples

The simplest asyncio.Queue example

In this basic example, we’ll create a queue, put an item into it with the put() method, and then get an item from it using the get() method. We’ll also use the qsize() method twice to count the items in the queue before and after calling the get() method.

# SlingAcademy.com
# This code uses Python 3.11.4

import asyncio

async def main():
    # Create a queue that can hold up to 10 items
    queue = asyncio.Queue(maxsize=10)

    # Put an item into the queue
    await queue.put("Welcome to Sling Academy!")

    # Count the number of items in the queue
    count = queue.qsize()
    print(f"There are currently {count} items in the queue.")

    # Remove and return an item from the queue
    item_from_queue = await queue.get()

    # Print the item
    print(f"Item from queue: {item_from_queue}")

    # Indicate that the item has been processed
    queue.task_done()

    # Count the number of items in the queue
    count = queue.qsize()
    print(f"There are currently {count} items in the queue.")


# Run the main coroutine
asyncio.run(main())

Output:

There are currently 1 items in the queue.
Item from queue: Welcome to Sling Academy!
There are currently 0 items in the queue.

Producer-Consumer example (intermediate)

This example shows you how to use a queue to implement a producer-consumer pattern, where one or more coroutines produce items and put them into the queue, and one or more coroutines consume items and process them from the queue. The producer coroutines use random numbers to simulate different delays in producing items. The consumer coroutines use asyncio.sleep() to simulate different delays in processing items. The main coroutine creates a queue and several producer and consumer coroutines and then waits for them to finish using asyncio.gather():

# SlingAcademy.com
# This code uses Python 3.11.4

import asyncio
import random

# set a random seed for reproducibility
random.seed(2023)

# Create a coroutine that produces items and puts them into a queue
async def producer(name, queue):
    # Produce 10 items and put them into the queue
    for i in range(10):
        # Simulate some delay
        await asyncio.sleep(random.randint(1, 3))
        item = f"{name}-{i}"
        # Put the item into the queue
        await queue.put(item)
        print(f"Producer {name} added {item} to queue")

# Create a coroutine that consumes items from a queue
async def consumer(name, queue):
    while True:
        # Get an item from the queue
        item = await queue.get()
        # Simulate some delay
        await asyncio.sleep(random.random())
        print(f"Consumer {name} got {item} from queue")
        # Indicate that the item has been processed
        queue.task_done()


async def main():
    # Create a queue that can hold up to 20 items
    queue = asyncio.Queue(maxsize=20)

    # Create 3 producer and 5 consumer coroutines
    producers = [asyncio.create_task(producer(n, queue)) for n in range(3)]
    consumers = [asyncio.create_task(consumer(n, queue)) for n in range(5)]

    # Wait for all producers to finish
    await asyncio.gather(*producers)
    print("All producers finished")

    # Wait for all items in the queue to be processed
    await queue.join()
    print("All items in the queue have been processed")

    # Cancel all consumers
    for c in consumers:
        c.cancel()


# Run the main coroutine
asyncio.run(main())

Output (sorry because it’s a bit long):

Producer 0 added 0-0 to queue
Producer 2 added 2-0 to queue
Consumer 0 got 0-0 from queue
Consumer 1 got 2-0 from queue
Producer 1 added 1-0 to queue
Consumer 2 got 1-0 from queue
Producer 0 added 0-1 to queue
Producer 2 added 2-1 to queue
Consumer 3 got 0-1 from queue
Consumer 4 got 2-1 from queue
Producer 0 added 0-2 to queue
Consumer 0 got 0-2 from queue
Producer 1 added 1-1 to queue
Consumer 1 got 1-1 from queue
Producer 2 added 2-2 to queue
Producer 1 added 1-2 to queue
Consumer 2 got 2-2 from queue
Consumer 3 got 1-2 from queue
Producer 2 added 2-3 to queue
Producer 0 added 0-3 to queue
Consumer 4 got 2-3 from queue
Consumer 0 got 0-3 from queue
Producer 0 added 0-4 to queue
Consumer 1 got 0-4 from queue
Producer 1 added 1-3 to queue
Producer 0 added 0-5 to queue
Consumer 3 got 0-5 from queue
Consumer 2 got 1-3 from queue
Producer 2 added 2-4 to queue
Consumer 4 got 2-4 from queue
Producer 1 added 1-4 to queue
Producer 2 added 2-5 to queue
Consumer 0 got 1-4 from queue
Consumer 1 got 2-5 from queue
Producer 2 added 2-6 to queue
Producer 0 added 0-6 to queue
Consumer 3 got 2-6 from queue
Consumer 2 got 0-6 from queue
Producer 1 added 1-5 to queue
Consumer 4 got 1-5 from queue
Producer 2 added 2-7 to queue
Producer 0 added 0-7 to queue
Consumer 1 got 0-7 from queue
Consumer 0 got 2-7 from queue
Producer 1 added 1-6 to queue
Producer 2 added 2-8 to queue
Consumer 3 got 1-6 from queue
Consumer 2 got 2-8 from queue
Producer 0 added 0-8 to queue
Producer 2 added 2-9 to queue
Consumer 1 got 2-9 from queue
Consumer 4 got 0-8 from queue
Producer 1 added 1-7 to queue
Producer 0 added 0-9 to queue
Consumer 3 got 0-9 from queue
Consumer 0 got 1-7 from queue
Producer 1 added 1-8 to queue
Consumer 2 got 1-8 from queue
Producer 1 added 1-9 to queue
All producers finished
Consumer 1 got 1-9 from queue
All items in the queue have been processed

Chaining example (advanced)

This example demonstrates the way to use two queues to chain coroutines together, where each coroutine takes an input from one queue and produces an output to another queue. The first coroutine generates some numbers and puts them into the first queue. The second coroutine squares the numbers and puts them into the second queue. The third coroutine doubles the squared numbers from the second queue.

# SlingAcademy.com
# This code uses Python 3.11.4

import asyncio
import random

# set a random seed
random.seed(2023)

# Create a coroutine that generates random numbers
async def generate_numbers(out_queue):
    # Generate 10 random numbers and put them into the first queue
    for _ in range(10):
        # Generate a random number
        n = random.randint(1, 10)
        # Put the number into the first queue
        await out_queue.put(n)
        # mimic a delay
        await asyncio.sleep(1)
        # Print the generated number
        print(f"Generated number {n}")

# Create a coroutine that squares numbers
async def square_numbers(in_queue, out_queue):
    while True:
        # Get a number from the first queue
        n = await in_queue.get()
        # Square the number and put it into the second queue
        squared_n = n * n
        await out_queue.put(squared_n)
        # mimic a delay
        await asyncio.sleep(1)
        print(f"Squared number {squared_n}")
        # Indicate that the number has been processed
        in_queue.task_done()

# Create a coroutine that doubles numbers
async def double_numbers(in_queue):
    while True:
        # Get a squared number from in_queue
        n = await in_queue.get()
        # mimic a delay
        await asyncio.sleep(1)
        # Double the squared number
        n *= 2
        print(f"Doubled squared-number {n}")
        # Indicate that the number has been processed
        in_queue.task_done()


async def main():
    # Create the first queue
    queue_1 = asyncio.Queue()
    # Create the second queue
    queue_2 = asyncio.Queue()

    # Create three tasks
    generator = asyncio.create_task(generate_numbers(queue_1))
    squarer = asyncio.create_task(square_numbers(queue_1, queue_2))
    doubler = asyncio.create_task(double_numbers(queue_2))

    # Wait for the generator to finish
    await generator
    print("Generator finished")

    # Wait for all items in the first queue to be processed
    await queue_1.join()
    print("All numbers in the first queue have been processed")

    # Wait for all items in the second queue to be processed
    await queue_2.join()
    print("All numbers in the second queue have been processed")

    # Cancel the squarer and the printer
    squarer.cancel()
    doubler.cancel()


# Run the main coroutine
asyncio.run(main())

Output:

Generated number 7
Squared number 49
Doubled squared-number 98
Generated number 8
Squared number 64
Doubled squared-number 128
Generated number 7
Squared number 49
Doubled squared-number 98
Generated number 6
Squared number 36
Doubled squared-number 72
Generated number 10
Squared number 100
Doubled squared-number 200
Generated number 10
Squared number 100
Doubled squared-number 200
Generated number 6
Squared number 36
Doubled squared-number 72
Generated number 2
Squared number 4
Doubled squared-number 8
Generated number 2
Squared number 4
Doubled squared-number 8
Generated number 5
Squared number 25
Doubled squared-number 50
Generator finished
All numbers in the first queue have been processed
All numbers in the second queue have been processed

Conclusion

You’ve learned the fundamentals of the asyncio.Queue class and examined several code examples of using it in practice. From now on, you’ll have a powerful tool to develop modern projects that involve producer-consumer patterns. This tutorial ends here. Happy coding with Python & good luck!