【FasAPI】使用FastAPI来实现一个基于RBAC(基于角色的访问控制)的用户权限控制系统
使用FastAPI来实现一个基于RBAC(基于角色的访问控制)的用户权限控制系统。以下是一个简单的实现方案,涵盖了用户管理、角色管理和权限管理的基本功能。
架构设计方案
【权限控制】一个通用的用户权限控制架构设计方案,可以适用于大多数应用场景
代码实现
1. 环境准备
首先,安装FastAPI和必要的依赖:
pip install fastapi uvicorn sqlalchemy passlib[bcrypt] pydantic
2. 数据库模型设计
使用SQLAlchemy定义数据库模型。为了简化演示,使用SQLite作为数据库。
# models.pyfrom sqlalchemy import Column, Integer, String, ForeignKey, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationshipBase = declarative_base()# 中间表用于多对多关系:用户与角色
user_role_table = Table('user_role',Base.metadata,Column('user_id', Integer, ForeignKey('users.id')),Column('role_id', Integer, ForeignKey('roles.id'))
)# 中间表用于多对多关系:角色与权限
role_permission_table = Table('role_permission',Base.metadata,Column('role_id', Integer, ForeignKey('roles.id')),Column('permission_id', Integer, ForeignKey('permissions.id'))
)# 用户表
class User(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True, index=True)username = Column(String, unique=True, index=True, nullable=False)password = Column(String, nullable=False)roles = relationship("Role", secondary=user_role_table, back_populates="users")# 角色表
class Role(Base):__tablename__ = 'roles'id = Column(Integer, primary_key=True, index=True)name = Column(String, unique=True, nullable=False)description = Column(String)users = relationship("User", secondary=user_role_table, back_populates="roles")permissions = relationship("Permission", secondary=role_permission_table, back_populates="roles")# 权限表
class Permission(Base):__tablename__ = 'permissions'id = Column(Integer, primary_key=True, index=True)name = Column(String, unique=True, nullable=False)description = Column(String)roles = relationship("Role", secondary=role_permission_table, back_populates="permissions")
3. 数据库连接和依赖注入
配置数据库连接,并创建会话依赖。
# database.pyfrom sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, SessionSQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)# 创建数据库表
from models import Base
Base.metadata.create_all(bind=engine)# 数据库会话依赖
def get_db():db = SessionLocal()try:yield dbfinally:db.close()
4. 用户注册和验证
实现用户注册、加密密码存储以及用户身份验证。
# main.pyfrom fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from passlib.context import CryptContextfrom models import User, Role, Permission
from database import get_dbapp = FastAPI()# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def verify_password(plain_password, hashed_password):return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password):return pwd_context.hash(password)# 用户注册
@app.post("/register")
def register_user(username: str, password: str, db: Session = Depends(get_db)):user = db.query(User).filter(User.username == username).first()if user:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered")hashed_password = get_password_hash(password)new_user = User(username=username, password=hashed_password)db.add(new_user)db.commit()db.refresh(new_user)return {"username": new_user.username, "message": "User registered successfully"}# 用户身份验证
@app.post("/login")
def login_user(username: str, password: str, db: Session = Depends(get_db)):user = db.query(User).filter(User.username == username).first()if not user or not verify_password(password, user.password):raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password")return {"username": user.username, "message": "Login successful"}
5. 角色和权限管理
定义角色和权限的增删改查,以及分配角色和权限的接口。
# roles_permissions.pyfrom fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Sessionfrom models import Role, Permission, User
from database import get_dbrouter = APIRouter()# 创建角色
@router.post("/roles")
def create_role(name: str, description: str, db: Session = Depends(get_db)):role = db.query(Role).filter(Role.name == name).first()if role:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role already exists")new_role = Role(name=name, description=description)db.add(new_role)db.commit()db.refresh(new_role)return {"name": new_role.name, "message": "Role created successfully"}# 创建权限
@router.post("/permissions")
def create_permission(name: str, description: str, db: Session = Depends(get_db)):permission = db.query(Permission).filter(Permission.name == name).first()if permission:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Permission already exists")new_permission = Permission(name=name, description=description)db.add(new_permission)db.commit()db.refresh(new_permission)return {"name": new_permission.name, "message": "Permission created successfully"}# 分配角色给用户
@router.post("/users/{user_id}/roles")
def assign_role_to_user(user_id: int, role_id: int, db: Session = Depends(get_db)):user = db.query(User).filter(User.id == user_id).first()role = db.query(Role).filter(Role.id == role_id).first()if not user or not role:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User or Role not found")user.roles.append(role)db.commit()return {"username": user.username, "roles": [role.name for role in user.roles]}# 分配权限给角色
@router.post("/roles/{role_id}/permissions")
def assign_permission_to_role(role_id: int, permission_id: int, db: Session = Depends(get_db)):role = db.query(Role).filter(Role.id == role_id).first()permission = db.query(Permission).filter(Permission.id == permission_id).first()if not role or not permission:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role or Permission not found")role.permissions.append(permission)db.commit()return {"role": role.name, "permissions": [perm.name for perm in role.permissions]}
6. 权限验证
通过依赖注入和FastAPI的 Depends
机制来实现权限验证。
# dependencies.pyfrom fastapi import Depends, HTTPException, status
from sqlalchemy.orm import Session
from jose import JWTError, jwtfrom models import User
from database import get_dbSECRET_KEY = "your_secret_key"
ALGORITHM = "HS256"# 获取当前用户
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):try:payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])user_id: int = payload.get("sub")if user_id is None:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials")except JWTError:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials")user = db.query(User).filter(User.id == user_id).first()if user is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")return user# 验证用户权限
def has_permission(permission_name: str, user: User = Depends(get_current_user)):user_permissions = [perm.name for role in user.roles for perm in role.permissions]if permission_name not in user_permissions:raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied")
7. 将所有部分集成
将所有路由和依赖注入到主应用中:
# main.pyfrom fastapi import FastAPIfrom roles_permissions import router as roles_permissions_routerapp = FastAPI()# 注册路由
app.include_router(roles_permissions_router, prefix="/api", tags=["Roles & Permissions"])
8. 启动应用
启动FastAPI应用:
uvicorn main:app --reload
这样,您将拥有一个基本的用户权限控制系统的FastAPI实现。可以在此基础上进行扩展和优化,比如增加JWT认证、前端集成等。