Sling Academy
Home/Python/Python async/await and timeouts (with examples)

Python async/await and timeouts (with examples)

Last updated: July 12, 2023

A timeout is a limit on the amount of time that an operation can take to complete. We need it when using async/await in Python because some operations may be slow, unreliable, or unresponsive, and we don’t want to wait indefinitely for them to finish. By using a timeout, we can cancel the operation and handle the exception if it takes too long.

This concise, straight-to-the-point article will walk you through a couple of different ways (with code examples) to handle timeout in asynchronous programming in modern Python.

Using asyncio.wait_for()

This approach uses the asyncio.wait_for() function to wait for an awaitable object (such as a coroutine or a task) to complete with a timeout. If the timeout expires, the function cancels the awaitable and raises an asyncio.TimeoutError exception.

This example defines an async function that simulates a long-running task by sleeping for a random amount of time. It also defines another async function that tries to wait for the long-running task to finish within 5 seconds. If the task finishes in time, it prints a message. If the task takes longer than 5 seconds, it cancels the task and prints another message. The code then runs the second async function using asyncio.run():

import asyncio
import random

# An async function that sleeps for a random amount of time
async def long_running_task():
    print("Starting long-running task")
    await asyncio.sleep(random.randint(1, 10))
    print("Finished long-running task")

# An async function that waits for the long-running task with a timeout
async def wait_with_timeout():
    try:
        # Wait for the long-running task to complete with a 5-second timeout
        await asyncio.wait_for(long_running_task(), timeout=5)
        print("Task completed within timeout")
    except asyncio.TimeoutError: 
        # Handle the timeout exception
        print("Task timed out and could not be completed") 

# A main async function that runs the wait_with_timeout function
async def main():
    await wait_with_timeout()

# Run the main async function
asyncio.run(main())

You will have a chance of receiving one of the following two outputs (due to the randomness):

Starting long-running task
Task timed out and could not be completed

Or:

Starting long-running task
Finished long-running task
Task completed within timeout

This technique has both advantages and disadvantages:

  • Pros: It is simple, straightforward, and works well for single tasks or coroutines that need to be executed with a timeout.
  • Cons: This approach may not be suitable for multiple tasks or coroutines that need to be executed concurrently with a timeout. It also requires handling the asyncio.TimeoutError exception explicitly.

Using asyncio.wait()

This approach uses the asyncio.wait() function to wait for multiple awaitable objects (such as coroutines or tasks) to complete concurrently with a timeout. The function returns two sets of completed and pending awaitables. If the timeout expires, the function cancels all pending awaitables and returns them in the second set.

This example has some similar points to the preceding one, but its core is different. What it does is creating an async function that simulates a long-running task by sleeping for a random amount of time and returning its duration. It also defines another async function that creates four tasks from the long-running task and waits for them to complete within 10 seconds. If the tasks complete in time, it prints their results. If some or all tasks take longer than 10 seconds, it cancels them and prints a message:

import asyncio
import random

# An async function that sleeps for a random amount of time and returns its duration
async def long_running_task():
    print("Starting long-running task")
    duration = random.randint(1, 10)
    await asyncio.sleep(duration)
    print("Finished long-running task")
    return duration

# An async function that creates and waits for multiple tasks with a timeout
async def wait_with_timeout():
    # Create four tasks from the long-running task
    tasks = [asyncio.create_task(long_running_task()) for _ in range(4)]
    
    # Wait for all tasks to complete with a 10-second timeout
    # The function returns two sets of completed and pending tasks
    done, pending = await asyncio.wait(tasks, timeout=10)
    
    # Iterate over the completed tasks and print their results
    for task in done:
        print(f"Task completed with result: {task.result()}")
    
    # Iterate over the pending tasks and cancel them
    for task in pending:
        print("Task timed out and canceled")
        task.cancel()

# A main async function that runs the wait_with_timeout function
async def main():
    await wait_with_timeout()

# Run the main async function
asyncio.run(main())

The output will look like this (yours might be a little different since the code uses random):

Starting long-running task
Starting long-running task
Starting long-running task
Starting long-running task
Finished long-running task
Finished long-running task
Finished long-running task
Task completed with result: 9
Task completed with result: 9
Task completed with result: 8
Task timed out and canceled

Some thoughts about this approach:

  • Pros: This approach is suitable for multiple tasks or coroutines that need to be executed concurrently with a timeout. It does not require handling the asyncio.TimeoutError exception explicitly.
  • Cons: This approach may be more complex and verbose than using asyncio.wait_for(). It also requires iterating over the sets of completed and pending awaitables to process their results or cancel them manually.

Next Article: Python: How to Define and Call Asynchronous Functions

Previous Article: Python asyncio.sleep() function (with examples)

Series: Python Asynchronous Programming Tutorials

Python

You May Also Like

  • Introduction to yfinance: Fetching Historical Stock Data in Python
  • Monitoring Volatility and Daily Averages Using cryptocompare
  • Advanced DOM Interactions: XPath and CSS Selectors in Playwright (Python)
  • Automating Strategy Updates and Version Control in freqtrade
  • Setting Up a freqtrade Dashboard for Real-Time Monitoring
  • Deploying freqtrade on a Cloud Server or Docker Environment
  • Optimizing Strategy Parameters with freqtrade’s Hyperopt
  • Risk Management: Setting Stop Loss, Trailing Stops, and ROI in freqtrade
  • Integrating freqtrade with TA-Lib and pandas-ta Indicators
  • Handling Multiple Pairs and Portfolios with freqtrade
  • Using freqtrade’s Backtesting and Hyperopt Modules
  • Developing Custom Trading Strategies for freqtrade
  • Debugging Common freqtrade Errors: Exchange Connectivity and More
  • Configuring freqtrade Bot Settings and Strategy Parameters
  • Installing freqtrade for Automated Crypto Trading in Python
  • Scaling cryptofeed for High-Frequency Trading Environments
  • Building a Real-Time Market Dashboard Using cryptofeed in Python
  • Customizing cryptofeed Callbacks for Advanced Market Insights
  • Integrating cryptofeed into Automated Trading Bots