FastAPI技巧
文章目录
- 依赖注入
- 路径操作中的标签和摘要
- 后台任务
- 自定义异常处理
- 中间件(Middleware)
- 使用 Pydantic 进行数据验证:
- 异步编程
- 使用 WebSockets
- 使用 FastAPI 进行测试
- 使用缓存优化性能
- 使用 `Response` 对象自定义响应
- 版本控制
- 自定义路由类
- 异步数据库操作
- 事件处理器(Event Handlers)
- 动态路径参数
- OpenAPI Schema 扩展
- 安全性增强
- 使用 Sub-Applications
- 使用 `FastAPI` 和 `GraphQL`
- 动态添加路由
- 使用 `contextvars` 实现上下文变量
- 优化路径操作顺序
- 使用 FastAPI 实现依赖注入中的单例模式
- 热重载与自动重启
- 集成任务队列
- 多种身份验证方案
- SQLAlchemy 与 FastAPI 的异步支持
- 多应用整合
- 响应数据的分块传输
- 集成 Prometheus 监控
- API 网关模式
- 性能监控和调优
- 基于角色的访问控制(RBAC)
- 自动生成 API 客户端
- 支持多种内容类型的请求
- 请求限流
- 多线程与多进程执行
- 事件驱动的扩展机制
- 基于类视图的路径操作
- 基于 JWT 的安全认证和刷新机制
- 多语言支持与国际化
- 文件上传的高级处理
- 使用 Sentry 或 New Relic 进行错误监控与性能分析
- 使用 `Redis` 实现分布式锁
- 通过 FastAPI 实现基于事件的系统设计
- 使用 `Tortoise ORM` 实现异步数据库访问
- 使用 Asyncpg 进行高效的 PostgreSQL 操作
- 缓存依赖项结果
- 使用 `gRPC` 实现微服务之间的高效通信
- 使用 `Locust` 进行性能测试
依赖注入
使用 FastAPI 的依赖注入系统保持代码模块化和整洁。将可重用的组件(如数据库连接、认证机制或通用工具)定义为依赖项。
from fastapi import Dependsdef get_db():db = SessionLocal()try:yield dbfinally:db.close()@app.get("/items/")
def read_items(db: Session = Depends(get_db)):return db.query(Item).all()
将配置管理作为依赖项,通过依赖注入确保应用的配置可以灵活地在不同环境中适配。
from pydantic import BaseSettingsclass Settings(BaseSettings):database_url: strsecret_key: strclass Config:env_file = ".env"def get_settings() -> Settings:return Settings()@app.get("/config/")
def get_config(settings: Settings = Depends(get_settings)):return {"database_url": settings.database_url}
路径操作中的标签和摘要
使用标签来组织 API 端点,并添加摘要和描述,使 API 文档更加清晰。
@app.get("/items/{item_id}", tags=["items"], summary="通过ID检索物品")
def read_item(item_id: int):return {"item_id": item_id}
后台任务
FastAPI 提供了一种简单的方法来处理后台任务,这些任务可以在响应发送给客户端后运行。
from fastapi import BackgroundTasksdef write_log(message: str):with open("log.txt", "a") as log:log.write(message)@app.post("/send-notification/{email}")
def send_notification(email: str, background_tasks: BackgroundTasks):background_tasks.add_task(write_log, f"Notification sent to {email}")return {"message": "通知将在后台发送"}
自定义异常处理
创建自定义异常处理程序,以管理特定的错误或创建更友好的错误响应。
from fastapi import HTTPException@app.exception_handler(CustomException)
async def custom_exception_handler(request, exc):return JSONResponse(status_code=400, content={"message": str(exc)})@app.get("/resource/{id}")
async def get_resource(id: int):if id not found:raise CustomException("未找到资源")
中间件(Middleware)
使用中间件处理诸如日志记录、跨域请求(CORS)或请求验证等横切关注点。
from fastapi.middleware.cors import CORSMiddlewareapp.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)
在全局中间件中捕获异常,进行统一的异常处理,减少重复代码。
from fastapi.middleware.trustedhost import TrustedHostMiddlewareapp.add_middleware(TrustedHostMiddleware, allowed_hosts=["example.com", "*.example.com"])@app.middleware("http")
async def add_process_time_header(request: Request, call_next):try:response = await call_next(request)response.headers["X-Process-Time"] = str(time.time())return responseexcept Exception as e:return JSONResponse(status_code=500, content={"message": "An error occurred"})
使用 Pydantic 进行数据验证:
利用 Pydantic 模型进行请求和响应的验证,这确保了传入的数据会被自动验证和转换。
from pydantic import BaseModelclass Item(BaseModel):name: strprice: floatdescription: Optional[str] = None@app.post("/items/")
def create_item(item: Item):return item
异步编程
利用 FastAPI 对异步代码的支持,以更高效地处理 I/O 密集型操作。
@app.get("/async-items/")
async def get_items():items = await fetch_items_from_db()return items
使用 WebSockets
FastAPI 内置对 WebSockets 的支持,可以轻松创建实时应用程序。
from fastapi import WebSocket@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):await websocket.accept()while True:data = await websocket.receive_text()await websocket.send_text(f"接收到的信息: {data}")
为 WebSocket 连接添加中间件,以便进行认证、日志记录或其他预处理。
from fastapi import WebSocket, WebSocketDisconnect@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):await websocket.accept()try:while True:data = await websocket.receive_text()await websocket.send_text(f"Message: {data}")except WebSocketDisconnect:print("WebSocket disconnected")
使用 FastAPI 进行测试
FastAPI 与
pytest
集成良好,可用于测试你的 API。你可以使用TestClient
模拟请求并验证端点。
from fastapi.testclient import TestClientclient = TestClient(app)def test_read_main():response = client.get("/")assert response.status_code == 200assert response.json() == {"message": "Hello World"}
使用缓存优化性能
对耗时操作实现缓存策略,以提高性能,特别是对于频繁访问的端点。
from fastapi_cache import FastAPICache
from fastapi_cache.backends.inmemory import InMemoryBackend
from fastapi_cache.decorator import cache@app.on_event("startup")
async def startup():FastAPICache.init(InMemoryBackend())@app.get("/cached-items/")
@cache(expire=60)
async def get_cached_items():return await fetch_items_from_db()
使用 Response
对象自定义响应
FastAPI 提供了
Response
对象,允许你自定义 HTTP 响应,控制状态码、响应头和响应体。
from fastapi import Response@app.get("/custom-response")
def custom_response():return Response(content="Custom response", media_type="text/plain", status_code=200)
版本控制
通过路径前缀或子应用程序来管理 API 的版本,以保持向后兼容性。
from fastapi import FastAPIapp_v1 = FastAPI()
app_v2 = FastAPI()@app_v1.get("/items/")
def get_items_v1():return {"version": "v1"}@app_v2.get("/items/")
def get_items_v2():return {"version": "v2"}main_app = FastAPI()
main_app.mount("/v1", app_v1)
main_app.mount("/v2", app_v2)
自定义路由类
通过继承并扩展 FastAPI 的 APIRouter 来自定义路由类,以便在添加新路由时应用统一的配置或逻辑。
from fastapi import APIRouterclass CustomRouter(APIRouter):def api_route(self, path: str, **kwargs):if "tags" not in kwargs:kwargs["tags"] = ["default"]return super().api_route(path, **kwargs)router = CustomRouter()@router.get("/custom-route/")
def custom_route():return {"message": "This is a custom route"}
异步数据库操作
异步数据库驱动程序(如
databases
或SQLAlchemy
的异步模式),确保数据库操作是异步的,以避免阻塞事件循环
from databases import Databasedatabase = Database("sqlite:///test.db")@app.on_event("startup")
async def startup():await database.connect()@app.on_event("shutdown")
async def shutdown():await database.disconnect()@app.get("/async-db/")
async def read_data():query = "SELECT * FROM my_table"return await database.fetch_all(query)
事件处理器(Event Handlers)
FastAPI 提供了
on_event
装饰器,可以用来定义在应用程序启动和关闭时执行的代码。这在需要执行一些启动初始化或清理操作时非常有用。
@app.on_event("startup")
async def startup_event():# 执行一些启动时的操作,如连接数据库、启动后台任务等print("Application startup")@app.on_event("shutdown")
async def shutdown_event():# 执行一些关闭时的操作,如断开数据库连接、清理资源等print("Application shutdown")
动态路径参数
使用正则表达式在路径中捕获和验证动态参数,适用于需要更复杂路径匹配的情况。
from fastapi import Path@app.get("/files/{file_path:path}")
def read_file(file_path: str):return {"file_path": file_path}
在路径参数中使用自定义的转换器,以便对传入的参数进行验证和格式化。
from fastapi import Path, HTTPExceptionclass CustomPathConverter:regex = '[0-9]+'def to_python(self, value: str):return int(value)def to_url(self, value: int):return str(value)@app.get("/items/{item_id}", path_params={"item_id": CustomPathConverter()})
def read_item(item_id: int):return {"item_id": item_id}
OpenAPI Schema 扩展
扩展 FastAPI 生成的 OpenAPI 文档,为 API 增加额外的描述或文档内容。
app = FastAPI(title="My API",description="This is a custom API",version="1.0.0",openapi_tags=[{"name": "items","description": "Operations with items"}]
)
安全性增强
通过 OAuth2 或 JWT 实现 API 认证和授权,确保你的 API 安全可靠。
from fastapi.security import OAuth2PasswordBeareroauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")@app.get("/secure-endpoint/")
def secure_endpoint(token: str = Depends(oauth2_scheme)):return {"token": token}
使用 Sub-Applications
将大型应用拆分为多个子应用,进行模块化管理,并在主应用中挂载这些子应用。
from fastapi import FastAPIsub_app = FastAPI()@sub_app.get("/sub-app-endpoint")
def sub_app_endpoint():return {"message": "This is a sub-application"}main_app = FastAPI()main_app.mount("/sub", sub_app)
使用 FastAPI
和 GraphQL
FastAPI
与GraphQL
(通过Graphene
等库)集成,可以轻松实现基于图形的 API。
from fastapi import FastAPI
from starlette.graphql import GraphQLApp
import grapheneclass Query(graphene.ObjectType):hello = graphene.String(name=graphene.String(default_value="stranger"))def resolve_hello(self, info, name):return f'Hello {name}!'app = FastAPI()
app.add_route("/graphql", GraphQLApp(schema=graphene.Schema(query=Query)))
动态添加路由
根据条件或配置动态添加路由。
def add_dynamic_route(app, path: str):@app.get(path)def dynamic_route():return {"message": f"This is a dynamic route for {path}"}add_dynamic_route(app, "/dynamic-path")
使用 contextvars
实现上下文变量
在异步环境中,
contextvars
可以用于实现上下文变量的安全传递,适用于日志记录或跟踪。
import contextvarsrequest_id = contextvars.ContextVar("request_id")@app.middleware("http")
async def add_request_id(request, call_next):request_id.set(str(uuid.uuid4()))response = await call_next(request)return response@app.get("/items/")
def read_items():return {"request_id": request_id.get()}
优化路径操作顺序
在定义路径时,FastAPI 会按顺序匹配路径,因此将更具体的路径放在前面有助于提高匹配效率。
@app.get("/items/{item_id}/subitems/{subitem_id}")
def read_subitem(item_id: int, subitem_id: int):return {"item_id": item_id, "subitem_id": subitem_id}@app.get("/items/{item_id}")
def read_item(item_id: int):return {"item_id": item_id}
使用 FastAPI 实现依赖注入中的单例模式
在依赖项中使用单例模式,确保依赖的实例在应用的生命周期内仅被创建一次。
class SingletonDependency:def __init__(self):self.value = "Singleton Value"singleton_dependency = SingletonDependency()@app.get("/singleton/")
def get_singleton_dependency(dep: SingletonDependency = Depends(lambda: singleton_dependency)):return {"value": dep.value}
热重载与自动重启
使用
uvicorn
的--reload
选项实现开发时的热重载与自动重启。
uvicorn main:app --reload
集成任务队列
将 FastAPI 与任务队列(如 Celery)集成,实现任务的异步处理。
from celery import Celery
from fastapi import BackgroundTaskscelery = Celery(__name__, broker='redis://localhost:6379/0')@celery.task
def celery_task(data):return data@app.post("/process/")
async def process_data(data: dict, background_tasks: BackgroundTasks):background_tasks.add_task(celery_task.delay, data)return {"message": "Task is being processed"}
多种身份验证方案
为不同的端点提供多种身份验证方案,例如 Basic Auth、OAuth2、JWT 等。
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, HTTPBasicoauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
basic_auth = HTTPBasic()@app.get("/secure-endpoint/", dependencies=[Depends(oauth2_scheme)])
def secure_endpoint(token: str = Depends(oauth2_scheme)):return {"message": "This is secured with OAuth2"}@app.get("/basic-secure-endpoint/", dependencies=[Depends(basic_auth)])
def basic_secure_endpoint(credentials: HTTPBasicCredentials = Depends(basic_auth)):return {"message": f"Hello, {credentials.username}"}
SQLAlchemy 与 FastAPI 的异步支持
通过使用 SQLAlchemy 的异步特性,处理更高效的数据库操作。
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmakerDATABASE_URL = "postgresql+asyncpg://user:password@localhost/testdb"engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False
)async def get_async_db():async with AsyncSessionLocal() as session:yield session@app.get("/async-items/")
async def read_async_items(db=Depends(get_async_db)):result = await db.execute("SELECT * FROM items")return result.fetchall()
多应用整合
将多个 FastAPI 应用整合到一起,以便于大型应用的模块化开发和部署。
from fastapi import FastAPIapp1 = FastAPI()
app2 = FastAPI()@app1.get("/app1/")
def app1_route():return {"message": "App 1"}@app2.get("/app2/")
def app2_route():return {"message": "App 2"}main_app = FastAPI()
main_app.mount("/app1", app1)
main_app.mount("/app2", app2)
响应数据的分块传输
对于大型数据集或长时间运行的任务,可以使用分块传输技术逐步发送响应数据。
from fastapi.responses import StreamingResponseasync def data_generator():for i in range(100):yield f"data: {i}\n"@app.get("/stream-data/")
def stream_data():return StreamingResponse(data_generator(), media_type="text/event-stream")
集成 Prometheus 监控
通过集成 Prometheus 监控,可以实时监控 FastAPI 应用的性能指标。
from prometheus_fastapi_instrumentator import InstrumentatorInstrumentator().instrument(app).expose(app)
API 网关模式
将 FastAPI 应用作为 API 网关,用于管理和路由多个后端服务。
from fastapi import APIRouterrouter = APIRouter()@router.get("/service1/")
def service1_route():return {"message": "Service 1"}@router.get("/service2/")
def service2_route():return {"message": "Service 2"}app.include_router(router, prefix="/api")
性能监控和调优
使用
ASGI
服务器的性能监控工具(如uvicorn-gunicorn
),优化 FastAPI 应用的性能。
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4 --log-level info
基于角色的访问控制(RBAC)
实现基于角色的访问控制,确保不同权限的用户只能访问其有权限的资源。
from fastapi import Depends, HTTPException, statusdef get_current_user_role():# 假设从 token 中获取用户角色return "admin"def admin_role_dependency(role: str = Depends(get_current_user_role)):if role != "admin":raise HTTPException(status_code=status.HTTP_403_FORBIDDEN,detail="You do not have enough privileges")@app.get("/admin/")
def read_admin_data(role=Depends(admin_role_dependency)):return {"admin_data": "This is admin data"}
自动生成 API 客户端
使用
FastAPI
生成的 OpenAPI 模式,自动生成 API 客户端,以简化与前端或其他服务的集成。
# 使用 openapi-generator-cli 从 OpenAPI 模式生成客户端代码
openapi-generator-cli generate -i http://localhost:8000/openapi.json -g python
支持多种内容类型的请求
配置端点以支持不同的内容类型(如 JSON、表单数据、XML),从而提高 API 的灵活性。
from fastapi import File, Form@app.post("/upload/")
async def upload_file(file: bytes = File(...), description: str = Form(...)):return {"file_size": len(file), "description": description}
请求限流
实现请求限流机制,防止 API 被滥用,可以通过中间件或第三方库(如
slowapi
)来实现。
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.middleware import SlowAPIMiddleware
from slowapi.util import get_remote_addresslimiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(429, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)@app.get("/limited/")
@limiter.limit("5/minute")
def limited():return {"message": "This endpoint is rate-limited"}
多线程与多进程执行
对于 CPU 密集型任务,可以通过多线程或多进程实现并行处理,以充分利用系统资源。
import concurrent.futures@app.get("/process/")
def process_data():with concurrent.futures.ProcessPoolExecutor() as executor:result = executor.map(some_cpu_intensive_function, data_list)return {"result": list(result)}
事件驱动的扩展机制
利用 FastAPI 的事件系统,实现插件或模块化扩展,允许在特定事件发生时触发自定义逻辑。
from fastapi import FastAPIapp = FastAPI()@app.on_event("startup")
async def startup_event():# 扩展或插件初始化逻辑print("Application startup")@app.on_event("shutdown")
async def shutdown_event():# 清理或关闭逻辑print("Application shutdown")
基于类视图的路径操作
使用类视图(CBV)来组织路径操作,以便更好地管理共享的状态或依赖关系。
from fastapi import APIRouter
from fastapi_utils.cbv import cbvrouter = APIRouter()@cbv(router)
class ItemView:def __init__(self):self.data = []@router.get("/items/")def get_items(self):return self.data@router.post("/items/")def add_item(self, item: dict):self.data.append(item)return {"message": "Item added"}
基于 JWT 的安全认证和刷新机制
实现基于 JWT 的认证和刷新机制,确保用户的身份验证和会话管理。
from fastapi_jwt_auth import AuthJWT
from pydantic import BaseModelclass Settings(BaseModel):authjwt_secret_key: str = "secret"@AuthJWT.load_config
def get_config():return Settings()@app.post("/login/")
def login(user: User, Authorize: AuthJWT = Depends()):access_token = Authorize.create_access_token(subject=user.username)refresh_token = Authorize.create_refresh_token(subject=user.username)return {"access_token": access_token, "refresh_token": refresh_token}@app.post("/refresh/")
def refresh(Authorize: AuthJWT = Depends()):Authorize.jwt_refresh_token_required()current_user = Authorize.get_jwt_subject()access_token = Authorize.create_access_token(subject=current_user)return {"access_token": access_token}
多语言支持与国际化
通过集成
Babel
或其他国际化工具,支持多语言的内容管理和响应。
from fastapi import FastAPI
from flask_babel import Babel, gettextapp = FastAPI()
babel = Babel(app)@app.get("/hello/")
def say_hello():return {"message": gettext("Hello World")}
文件上传的高级处理
处理大文件上传或多文件上传,并通过临时存储机制避免内存占用过大。
from fastapi import UploadFile, File
import shutil@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile = File(...)):with open(f"/tmp/{file.filename}", "wb") as buffer:shutil.copyfileobj(file.file, buffer)return {"filename": file.filename}
使用 Sentry 或 New Relic 进行错误监控与性能分析
集成 Sentry 或 New Relic 等第三方服务进行错误监控和性能分析,及时发现并解决生产环境中的问题。
import sentry_sdk
from fastapi import FastAPI
from sentry_sdk.integrations.asgi import SentryAsgiMiddlewaresentry_sdk.init(dsn="your-dsn-url")app = FastAPI()
app.add_middleware(SentryAsgiMiddleware)
使用 Redis
实现分布式锁
使用 Redis 实现分布式锁,确保在分布式环境下数据操作的原子性和一致性。
import aioredis
import asyncioredis = aioredis.from_url("redis://localhost")async def lock_resource(resource_name: str):lock = await redis.lock(resource_name)await lock.acquire()try:# 操作资源passfinally:await lock.release()@app.post("/process/")
async def process_data():await lock_resource("resource_name")return {"status": "processed"}
通过 FastAPI 实现基于事件的系统设计
设计基于事件的系统,将应用逻辑解耦为不同的事件处理器,提高系统的扩展性和灵活性。
from fastapi import FastAPI
from pydantic import BaseModelapp = FastAPI()
events = {}def register_event(event_name: str, handler):if event_name not in events:events[event_name] = []events[event_name].append(handler)def trigger_event(event_name: str, payload: dict):if event_name in events:for handler in events[event_name]:handler(payload)@app.post("/event/")
async def handle_event(event: str, data: dict):trigger_event(event, data)return {"status": "event triggered"}# 注册事件处理器
register_event("user_signup", lambda payload: print(f"User signed up: {payload}"))
使用 Tortoise ORM
实现异步数据库访问
使用 Tortoise ORM 进行异步数据库访问,简化复杂的数据库操作,并支持多种数据库类型。
from tortoise import Tortoise, fields, models
from tortoise.contrib.fastapi import register_tortoiseclass User(models.Model):id = fields.IntField(pk=True)name = fields.CharField(max_length=50)app = FastAPI()register_tortoise(app,db_url='sqlite://db.sqlite3',modules={'models': ['__main__']},generate_schemas=True,add_exception_handlers=True,
)@app.get("/users/")
async def get_users():return await User.all()
使用 Asyncpg 进行高效的 PostgreSQL 操作
利用
asyncpg
提供的高性能异步 PostgreSQL 操作,可以显著提升数据库操作的效率。
import asyncpg
from fastapi import FastAPIapp = FastAPI()async def get_db():conn = await asyncpg.connect(user='user', password='password', database='db', host='127.0.0.1')try:yield connfinally:await conn.close()@app.get("/items/")
async def read_items(conn=Depends(get_db)):items = await conn.fetch("SELECT * FROM items")return items
缓存依赖项结果
使用
lru_cache
装饰器缓存依赖项的结果,避免重复计算和提高性能。
from functools import lru_cache
from fastapi import Depends@lru_cache()
def get_settings():return Settings()@app.get("/config/")
def read_config(settings: Settings = Depends(get_settings)):return settings
使用 gRPC
实现微服务之间的高效通信
将
FastAPI
与gRPC
结合,实现微服务架构下的高效通信。
from fastapi import FastAPI
import grpcapp = FastAPI()async def grpc_call():async with grpc.aio.insecure_channel('localhost:50051') as channel:stub = helloworld_pb2_grpc.GreeterStub(channel)response = await stub.SayHello(helloworld_pb2.HelloRequest(name='you'))return response@app.get("/grpc/")
async def call_grpc():response = await grpc_call()return {"message": response.message}
使用 Locust
进行性能测试
集成
Locust
进行负载测试和性能测试,确保 API 的可扩展性和稳定性。
from locust import HttpUser, TaskSet, taskclass UserBehavior(TaskSet):@taskdef index(self):self.client.get("/")class WebsiteUser(HttpUser):tasks = [UserBehavior]min_wait = 5000max_wait = 9000