Python asyncio: How to run a function in a separate thread

Updated: February 12, 2024 By: Guest Contributor Post a comment

Overview

Asynchronous programming in Python has gained popularity for efficiently handling I/O bound and high-level structured network code. With the asyncio library, Python provides a powerful framework for writing single-threaded concurrent code using coroutines. However, there are scenarios where integrating asyncio with traditional synchronous code or running tasks in separate threads becomes necessary. This guide will explore how to use asyncio to execute functions in separate threads, enabling the seamless integration of synchronous and asynchronous codes.

Understanding Asyncio and Threads

asyncio is an asynchronous I/O framework that uses event loops to manage its execution. An event loop runs in a single thread and executes all tasks in its queue without preemptive task switching. Unlike asyncio, threads allow for actual concurrency in Python, albeit with some overhead and complexity due to context switching and the Global Interpreter Lock (GIL).

To run synchronous functions that may block the event loop, asyncio provides the run_in_executor method, thereby avoiding the blocking of the event loop and ensuring the application remains responsive.

Before diving into code examples, ensure you have a recent version of Python (3.7 or newer) installed, as asyncio has undergone significant improvements in recent versions.

Basic Usage of run_in_executor

The run_in_executor method of the event loop enables running a function in a separate thread. Here’s a basic example:

import asyncio
import time

def synchronous_task():
    time.sleep(5)
    return 'Task complete'

async def main():
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(None, synchronous_task)
    print(result)

asyncio.run(main())

In this example, synchronous_task simulates a time-consuming synchronous function. By using run_in_executor, it is run in a default executor (a thread pool) without blocking the asyncio event loop.

Customizing Executor

By default, run_in_executor uses the concurrent.futures.ThreadPoolExecutor, but you can specify a different executor if needed. This is useful for controlling the number of threads or for using a ProcessPoolExecutor. Here’s how to use a custom ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor
import asyncio

def synchronous_task():
    print('Task running')
    time.sleep(3)
    print('Task done')

async def main():
    executor = ThreadPoolExecutor(max_workers=3)
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(executor, synchronous_task)
    executor.shutdown()

asyncio.run(main())

Integrating Asyncio with Synchronous Code


Integrating synchronous code into an asynchronous asyncio application without blocking the event loop can be achieved using run_in_executor. This method allows you to run synchronous functions in a separate thread or process pool, thus avoiding blocking the async event loop. Here’s how you can use run_in_executor to execute a synchronous function asynchronously within an asyncio application.

Example:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

# This is a synchronous function that simulates a time-consuming task.
def synchronous_task(x):
    print(f"Starting a synchronous task with {x}")
    time.sleep(2)  # Simulate a blocking operation, e.g., file IO or network request.
    print("Synchronous task completed")
    return x * 2

# This is an asynchronous wrapper function to run the synchronous function in an executor.
async def run_sync_in_async(x):
    loop = asyncio.get_running_loop()
    # Run the synchronous function in a default ThreadPoolExecutor.
    result = await loop.run_in_executor(None, synchronous_task, x)
    print(f"Result from synchronous task: {result}")

# Main coroutine to demonstrate running synchronous code in async manner.
async def main():
    # Schedule three synchronous tasks to run concurrently.
    await asyncio.gather(
        run_sync_in_async(1),
        run_sync_in_async(2),
        run_sync_in_async(3)
    )

# Run the main coroutine
asyncio.run(main())

Explantions:

  • Synchronous Task: The synchronous_task function represents a blocking operation. It’s a regular function that performs a task synchronously, such as accessing a database or reading a file.
  • Async Wrapper: The run_sync_in_async async function serves as a wrapper around the synchronous function, allowing it to be called within the async application. It uses asyncio.get_running_loop().run_in_executor() to run the synchronous function in a separate thread.
  • ThreadPoolExecutor: By passing None as the first argument to run_in_executor, it uses the default ThreadPoolExecutor. You can also create a custom ThreadPoolExecutor or ProcessPoolExecutor for greater control over the execution environment.
  • asyncio.gather: This is used to run multiple instances of the async wrapper concurrently, demonstrating how multiple synchronous tasks can be handled without blocking the async event loop.

This example illustrates a practical approach to integrating synchronous operations into an asynchronous asyncio application, maintaining the non-blocking nature of the event loop while handling operations that are not natively asynchronous.

Error Handling

Error handling in asynchronous tasks running in separate threads requires attention. Exceptions thrown in the thread will be propagated to the asyncio task, allowing you to use try-except blocks around your await statement:

async def main():
    loop = asyncio.get_running_loop()
    try:
        result = await loop.run_in_executor(None, synchronous_task)
    except Exception as e:
        print(f'Error: {e}')

Best Practices

When working with asyncio and threads:

  • Minimize the use of threads as much as possible to avoid GIL limitations and potential performance issues.
  • Consider the lifecycle of your threads and executor services to prevent memory leaks or orphaned threads.
  • Properly handle exceptions from synchronous tasks to maintain stability.
  • Use the asyncio.Lock or asyncio.Semaphore for thread-safe operations on shared resources.

Conclusion

Asyncio’s run_in_executor is a powerful tool for integrating synchronous operations into the asyncio environment. By following the practices outlined in this guide and understanding the examples, you can optimize the responsiveness and efficiency of your Python applications. Embrace the asynchronous programming paradigms, and don’t hesitate to integrate synchronous code when necessary using the strategies discussed.