一、简介
FastAPI中你可以使用任何关系型数据库,可以通过SQLAlchemy将其轻松的适应于任何的数据库,比如:
PostgreSQL
MySQL
SQLite
Oracle
Microsoft SQL Server
...
SQLAlchemy是一个ORM(object-relational mapping)的框架。在ORM中,你创建一个类就会通过SQLAlchemy将其自动转成一张表,在类中的每一个属性就会将其转成表中的字段。
这里有一些实例,假如有一个大的项目,里面包含一个子包叫做sql_app:
复制代码
.
└── sql_app
├── __init__.py
├── crud.py
├── database.py
├── main.py
├── models.py
└── schemas.py
复制代码
__init__.py 是一个空文件,但是说明sql_app是一个package
database.py 数据库配置相关
models.py 数据库模型表
schemas.py 模型验证
crud.py 数据库操作相关
main.py 主文件
二、简单实例
该实例以MySQL为例,SQLAlchemy需要借助于pymysql连接数据库,所以需要进行安装这两个工具包:
pip install sqlalchemy
pip install pymysql
1、database.py
复制代码
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:123456@127.0.0.1:3306/test"
echo=True表示引擎将用repr()函数记录所有语句及其参数列表到日志
engine = create_engine(
SQLALCHEMY_DATABASE_URL, encoding='utf8', echo=True
)
SQLAlchemy中,CRUD是通过会话进行管理的,所以需要先创建会话,
每一个SessionLocal实例就是一个数据库session
flush指发送到数据库语句到数据库,但数据库不一定执行写入磁盘
commit是指提交事务,将变更保存到数据库文件中
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
创建基本映射类
Base = declarative_base()
复制代码
在数据库相关的配置文件中,首先创建一个SQLAlchemy的"engine",然后创建SessionLocal实例进行会话,最后创建模型类的基类。
2、models.py
复制代码
from sqlalchemy import Boolean, Column, Integer, String
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(32), unique=True, index=True)
hashed_password = Column(String(32))
is_active = Column(Boolean, default=True)
复制代码
通过数据库配置文件中的基类来创建模型类。
3、schemas.py
复制代码
from pydantic import BaseModel
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
"""
请求模型验证:
email:
password:
"""
password: str
class User(UserBase):
"""
响应模型:
id:
email:
is_active
并且设置orm_mode与之兼容
"""
id: int
is_active: bool
class Config:
orm_mode = True
复制代码
定义请求参数模型验证与响应模型验证的Pydantic模型,其中响应模型中设置orm_mode=True参数,表示与ORM模型兼容,因为后续中返回的数据库查询是orm模型,通过设置这个参数可以将orm模型通过pydantic模型进行验证。
4、crud.py
复制代码
from sqlalchemy.orm import Session
import models, schemas
通过id查询用户
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
新建用户
def db_create_user(db: Session, user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
db.commit() # 提交保存到数据库中
db.refresh(db_user) # 刷新
return db_user
复制代码
通过传入数据库连接以及参数等进行数据库操作,包括创建用户、查询用户等,返回的是orm模型对象。
5、main.py
复制代码
from fastapi import FastAPI, Depends, HTTPException
import crud, schemas
from database import SessionLocal, engine, Base
from sqlalchemy.orm import Session
import uvicorn
Base.metadata.create_all(bind=engine) #数据库初始化,如果没有库或者表,会自动创建
app = FastAPI()
Dependency
def get_db():
"""
每一个请求处理完毕后会关闭当前连接,不同的请求使用不同的连接
:return:
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
新建用户
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
return crud.db_create_user(db=db, user=user)
通过id查询用户
@app.get("/user/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
return db_user
if name == '__main__':
uvicorn.run(app=app, host="127.0.0.1", port=8000)
复制代码
主文件进行数据库初始化、FastAPI实例创建以及处理各种请求。
进入到交互文档查看:
http://127.0.0.1:8000/users/
复制代码
请求
{
"email": "hhh@example113.com",
"password": "ss123456"
}
响应
{
"email": "hhh@example113.com",
"id": 7,
"is_active": true
}
复制代码
http://127.0.0.1:8000/user/7
复制代码
响应
{
"email": "hhh@example113.com",
"id": 7,
"is_active": true
}
复制代码
三、复杂实例
在之前的基础上再加一个模型类Item,User与之是一对多的关系。
1、models.py
复制代码
from sqlalchemy import Boolean, Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(32), unique=True, index=True)
hashed_password = Column(String(32))
is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(32), index=True)
description = Column(String(32), index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
复制代码
2、schemas.py
复制代码
from typing import Optional,List
from pydantic import BaseModel
class ItemBase(BaseModel):
title: str
description: Optional[str] = None
class ItemCreate(ItemBase):
pass
class Item(ItemBase):
id: int
owner_id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
"""
请求模型验证:
email:
password:
"""
password: str
class User(UserBase):
"""
响应模型:
id:
email:
is_active
并且设置orm_mode与之兼容
"""
id: int
is_active: bool
items: List[Item] = []
class Config:
orm_mode = True
复制代码
3、crud.py
复制代码
from sqlalchemy.orm import Session
import models, schemas
通过id查询用户
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
新建用户
def db_create_user(db: Session, user: schemas.UserCreate):
fake_hashed_password = user.password + "notreallyhashed"
db_user = models.User(email=user.email, hashed_password=fake_hashed_password)
db.add(db_user)
db.commit() # 提交保存到数据库中
db.refresh(db_user) # 刷新
return db_user
获取用户拥有的item
def get_item(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Item).offset(skip).limit(limit).all()
新建用户的item
def create_user_item(db: Session, item: schemas.ItemCreate, user_id: int):
db_item = models.Item(**item.dict(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
复制代码
4、main.py
复制代码
from typing import List
from fastapi import FastAPI, Depends, HTTPException
import crud, schemas
from database import SessionLocal, engine, Base
from sqlalchemy.orm import Session
import uvicorn
Base.metadata.create_all(bind=engine)
app = FastAPI()
Dependency
def get_db():
"""
每一个请求处理完毕后会关闭当前连接,不同的请求使用不同的连接
:return:
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
新建用户
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
return crud.db_create_user(db=db, user=user)
通过id查询用户
@app.get("/user/{user_id}", response_model=schemas.User)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if not db_user:
raise HTTPException(status_code=404, detail="User not found")
return db_user
读取用户拥有的item
@app.get("/items/", response_model=List[schemas.Item])
def read_items(skip: int = 0, limit: int = 0, db: Session = Depends(get_db)):
items = crud.get_item(db=db, skip=skip, limit=limit)
return items
创建用户的item
@app.post("/users/{user_id}/items", response_model=schemas.Item)
def create_item_user(user_id: int, item: schemas.ItemCreate, db: Session = Depends(get_db)):
return crud.create_user_item(db=db, item=item, user_id=user_id)
if name == '__main__':
uvicorn.run(app=app, host="127.0.0.1", port=8000)
复制代码
当启动项目后,会生成新的Item数据表,以及与User表之间建立关系:
复制代码
User表
create table users
(
id int auto_increment
primary key,
email varchar(32) null,
hashed_password varchar(32) null,
is_active tinyint(1) null,
constraint ix_users_email
unique (email)
);
create index ix_users_id
on users (id);
Item表
create table items
(
id int auto_increment
primary key,
title varchar(32) null,
description varchar(32) null,
owner_id int null,
constraint items_ibfk_1
foreign key (owner_id) references users (id)
);
create index ix_items_description
on items (description);
create index ix_items_id
on items (id);
create index ix_items_title
on items (title);
create index owner_id
on items (owner_id);
复制代码
最后进入交互文档进行测试。