SQLAlchemy: How to Connect to Multiple Databases Concurrently

Updated: January 3, 2024 By: Guest Contributor Post a comment

Introduction

SQLAlchemy, a powerful ORM for Python, facilitates database interactions and supports connections to multiple databases out of the box. This versatility enables complex data operations spanning diverse DBMSs.

Setting up the Environment

Before delving into multi-database connections, ensure you have the latest SQLAlchemy installed:

pip install sqlalchemy

For asynchronous capabilities, you’ll also need asyncio and asyncpg or similar libraries compatible with your databases.

Understanding Engine Creation

In SQLAlchemy, an engine maintains the database connections. Create one engine per database:

from sqlalchemy import create_engine

# For a PostgreSQL database
engine1 = create_engine('postgresql+psycopg2://user:password@host/dbname')

# For a MySQL database
engine2 = create_engine('mysql+pymysql://user:password@host/dbname')

Session Management

Sessions are used to interact with databases. Use the sessionmaker factory to generate Session classes for each engine:

from sqlalchemy.orm import sessionmaker

Session1 = sessionmaker(bind=engine1)
session1 = Session1()

Session2 = sessionmaker(bind=engine2)
session2 = Session2()

Each session object can now be used independently to interact with its respective database.

Working with Models

Define models for each database or map existing tables leveraging the ORM capabilities:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

Base1 = declarative_base()
Base2 = declarative_base()

class User(Base1):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)

class Product(Base2):
    __tablename__ = 'products'
    id = Column(Integer, primary_key=True)
    name = Column(String)

Concurrent Operations

To perform concurrent operations, one can use threading or asyncio’s async/await syntax.

Using Threading

Python’s threading library allows creating threads to operate concurrently:

import threading


def work_with_db1(session):
    # perform some database operations
    pass


def work_with_db2(session):
    # perform some different database operations
    pass

thread1 = threading.Thread(target=work_with_db1, args=(session1,))
thread2 = threading.Thread(target=work_with_db2, args=(session2,))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

Using Asyncio

For asynchronous operations, use async and await with SQLAlchemy 1.4 or newer:

import asyncio


def work_with_db1(session):
    # await some database operations
    pass


def work_with_db2(session):
    # await some different database operations
    pass

async def main():
    task1 = asyncio.create_task(work_with_db1(session1))
    task2 = asyncio.create_task(work_with_db2(session2))

    await asyncio.gather(task1, task2)

asyncio.run(main())

Advanced Usage: Using Bindparam

To execute raw SQL with access to multiple databases concurrently, SQLAlchemy’s bindparam is especially useful. Assign engines to specific statements:

from sqlalchemy.sql import text

with engine1.connect() as conn1, engine2.connect() as conn2:
    result1 = conn1.execute(text("SELECT * FROM users").execution_options(bind=engine1))
    result2 = conn2.execute(text("SELECT * FROM products").execution_options(bind=engine2))

    for user in result1:
        print(user)

    for product in result2:
        print(product)

Using bindparam, one can target specific databases in transactions, ensuring separation and consistency.

Conclusion

Concurrent connections to multiple databases in SQLAlchemy unlock powerful data manipulation capabilities. By understanding engines, sessions, and concurrency patterns, you can deftly manage connections in a multi-database environment.