SQLAlchemy SQL Toolkit 和 Object Relational Mapper (ORM: 对象关系映射器)是一套用于处理数据库和Python的综合工具
SQLAlchemy各组件依赖关系图:
安装
使用
数据表模型声明
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
from typing import List
from typing import Optional
from sqlalchemy import ForeignKey
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
fullname: Mapped[Optional[str]]
addresses: Mapped[List["Address"]] = relationship(
back_populates="user", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"User(id={self.id!r}, name={self.name!r}, fullname={self.fullname!r})"
class Address(Base):
__tablename__ = "address"
id: Mapped[int] = mapped_column(primary_key=True)
email_address: Mapped[str]
user_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
user: Mapped["User"] = relationship(back_populates="addresses")
def __repr__(self) -> str:
return f"Address(id={self.id!r}, email_address={self.email_address!r})"
|
创建数据引擎,并初始化所有数据表模型,将表模型转换成DDL操作写入数据库
创建SQLite引擎
1
2
|
from sqlalchemy import create_engine
engine = create_engine("sqlite://", echo=True)
|
创建引擎连接池配置
1
2
3
|
engine = create_engine(
"postgresql+psycopg2://me@localhost/mydb", pool_size=20, max_overflow=0
)
|
根据实际需要设置更多连接池调优参数。
使用NullPool禁用连接池
1
2
3
4
5
|
from sqlalchemy.pool import NullPool
engine = create_engine(
"postgresql+psycopg2://scott:tiger@localhost/test", poolclass=NullPool
)
|
Engine and Connection Use
创建session连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}, # 仅用于SQLite
pool_size=10,
max_overflow=100,
pool_timeout=30,
pool_recycle=3600,
echo=True,
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Dependency
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
|
创建异步session连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
rom sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
class Base(DeclarativeBase):
pass
engine = create_async_engine(DATABASE_URL)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
async def create_db_and_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
|
使用上下文管理器来获取session连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
class MySuperContextManager:
def __init__(self):
self.db = DBSession()
def __enter__(self):
return self.db
def __exit__(self, exc_type, exc_value, traceback):
self.db.close()
async def get_db():
with MySuperContextManager() as db:
yield db
|
执行创建表DDL
1
|
Base.metadata.create_all(engine)
|
执行的DDL SQL如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
BEGIN (implicit)
PRAGMA main.table_...info("user_account")
...
PRAGMA main.table_...info("address")
...
CREATE TABLE user_account (
id INTEGER NOT NULL,
name VARCHAR(30) NOT NULL,
fullname VARCHAR,
PRIMARY KEY (id)
)
...
CREATE TABLE address (
id INTEGER NOT NULL,
email_address VARCHAR NOT NULL,
user_id INTEGER NOT NULL,
PRIMARY KEY (id),
FOREIGN KEY(user_id) REFERENCES user_account (id)
)
...
COMMIT
|
执行select操作示例:
1
2
3
4
5
6
7
8
|
from sqlalchemy import select
session = Session(engine)
stmt = select(User).where(User.name.in_(["spongebob", "sandy"]))
for user in session.scalars(stmt):
print(user)
|
执行select操作转换实际执行的SQL:
1
2
3
4
5
|
BEGIN (implicit)
SELECT user_account.id, user_account.name, user_account.fullname
FROM user_account
WHERE user_account.name IN (?, ?)
[...] ('spongebob', 'sandy')
|
Basics of Using a Session
数据库结构修改
使用 Alembic 执行DDL操作,更新数据库表结构,示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from alembic import op
def upgrade():
"""
使用 Alembic 的操作来添加新列
"""
op.add_column('user_account', Column('salt', String))
def downgrade():
"""
使用 Alembic 的操作来删除列
"""
op.drop_column('user_account', 'salt')
|
参考