当前位置: 首页 > news >正文

【FasAPI】使用FastAPI来实现一个基于RBAC(基于角色的访问控制)的用户权限控制系统

使用FastAPI来实现一个基于RBAC(基于角色的访问控制)的用户权限控制系统。以下是一个简单的实现方案,涵盖了用户管理、角色管理和权限管理的基本功能。

架构设计方案

【权限控制】一个通用的用户权限控制架构设计方案,可以适用于大多数应用场景

代码实现

1. 环境准备

首先,安装FastAPI和必要的依赖:

pip install fastapi uvicorn sqlalchemy passlib[bcrypt] pydantic

2. 数据库模型设计

使用SQLAlchemy定义数据库模型。为了简化演示,使用SQLite作为数据库。

# models.pyfrom sqlalchemy import Column, Integer, String, ForeignKey, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationshipBase = declarative_base()# 中间表用于多对多关系:用户与角色
user_role_table = Table('user_role',Base.metadata,Column('user_id', Integer, ForeignKey('users.id')),Column('role_id', Integer, ForeignKey('roles.id'))
)# 中间表用于多对多关系:角色与权限
role_permission_table = Table('role_permission',Base.metadata,Column('role_id', Integer, ForeignKey('roles.id')),Column('permission_id', Integer, ForeignKey('permissions.id'))
)# 用户表
class User(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True, index=True)username = Column(String, unique=True, index=True, nullable=False)password = Column(String, nullable=False)roles = relationship("Role", secondary=user_role_table, back_populates="users")# 角色表
class Role(Base):__tablename__ = 'roles'id = Column(Integer, primary_key=True, index=True)name = Column(String, unique=True, nullable=False)description = Column(String)users = relationship("User", secondary=user_role_table, back_populates="roles")permissions = relationship("Permission", secondary=role_permission_table, back_populates="roles")# 权限表
class Permission(Base):__tablename__ = 'permissions'id = Column(Integer, primary_key=True, index=True)name = Column(String, unique=True, nullable=False)description = Column(String)roles = relationship("Role", secondary=role_permission_table, back_populates="permissions")

3. 数据库连接和依赖注入

配置数据库连接,并创建会话依赖。

# database.pyfrom sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, SessionSQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)# 创建数据库表
from models import Base
Base.metadata.create_all(bind=engine)# 数据库会话依赖
def get_db():db = SessionLocal()try:yield dbfinally:db.close()

4. 用户注册和验证

实现用户注册、加密密码存储以及用户身份验证。

# main.pyfrom fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
from passlib.context import CryptContextfrom models import User, Role, Permission
from database import get_dbapp = FastAPI()# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")def verify_password(plain_password, hashed_password):return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password):return pwd_context.hash(password)# 用户注册
@app.post("/register")
def register_user(username: str, password: str, db: Session = Depends(get_db)):user = db.query(User).filter(User.username == username).first()if user:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered")hashed_password = get_password_hash(password)new_user = User(username=username, password=hashed_password)db.add(new_user)db.commit()db.refresh(new_user)return {"username": new_user.username, "message": "User registered successfully"}# 用户身份验证
@app.post("/login")
def login_user(username: str, password: str, db: Session = Depends(get_db)):user = db.query(User).filter(User.username == username).first()if not user or not verify_password(password, user.password):raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password")return {"username": user.username, "message": "Login successful"}

5. 角色和权限管理

定义角色和权限的增删改查,以及分配角色和权限的接口。

# roles_permissions.pyfrom fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Sessionfrom models import Role, Permission, User
from database import get_dbrouter = APIRouter()# 创建角色
@router.post("/roles")
def create_role(name: str, description: str, db: Session = Depends(get_db)):role = db.query(Role).filter(Role.name == name).first()if role:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role already exists")new_role = Role(name=name, description=description)db.add(new_role)db.commit()db.refresh(new_role)return {"name": new_role.name, "message": "Role created successfully"}# 创建权限
@router.post("/permissions")
def create_permission(name: str, description: str, db: Session = Depends(get_db)):permission = db.query(Permission).filter(Permission.name == name).first()if permission:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Permission already exists")new_permission = Permission(name=name, description=description)db.add(new_permission)db.commit()db.refresh(new_permission)return {"name": new_permission.name, "message": "Permission created successfully"}# 分配角色给用户
@router.post("/users/{user_id}/roles")
def assign_role_to_user(user_id: int, role_id: int, db: Session = Depends(get_db)):user = db.query(User).filter(User.id == user_id).first()role = db.query(Role).filter(Role.id == role_id).first()if not user or not role:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User or Role not found")user.roles.append(role)db.commit()return {"username": user.username, "roles": [role.name for role in user.roles]}# 分配权限给角色
@router.post("/roles/{role_id}/permissions")
def assign_permission_to_role(role_id: int, permission_id: int, db: Session = Depends(get_db)):role = db.query(Role).filter(Role.id == role_id).first()permission = db.query(Permission).filter(Permission.id == permission_id).first()if not role or not permission:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role or Permission not found")role.permissions.append(permission)db.commit()return {"role": role.name, "permissions": [perm.name for perm in role.permissions]}

6. 权限验证

通过依赖注入和FastAPI的 Depends 机制来实现权限验证。

# dependencies.pyfrom fastapi import Depends, HTTPException, status
from sqlalchemy.orm import Session
from jose import JWTError, jwtfrom models import User
from database import get_dbSECRET_KEY = "your_secret_key"
ALGORITHM = "HS256"# 获取当前用户
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):try:payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])user_id: int = payload.get("sub")if user_id is None:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials")except JWTError:raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,detail="Could not validate credentials")user = db.query(User).filter(User.id == user_id).first()if user is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User not found")return user# 验证用户权限
def has_permission(permission_name: str, user: User = Depends(get_current_user)):user_permissions = [perm.name for role in user.roles for perm in role.permissions]if permission_name not in user_permissions:raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Permission denied")

7. 将所有部分集成

将所有路由和依赖注入到主应用中:

# main.pyfrom fastapi import FastAPIfrom roles_permissions import router as roles_permissions_routerapp = FastAPI()# 注册路由
app.include_router(roles_permissions_router, prefix="/api", tags=["Roles & Permissions"])

8. 启动应用

启动FastAPI应用:

uvicorn main:app --reload

这样,您将拥有一个基本的用户权限控制系统的FastAPI实现。可以在此基础上进行扩展和优化,比如增加JWT认证、前端集成等。

相关文章:

  • 【算法精解】逆序对受限的方案数
  • 银河麒麟桌面操作系统如何添加WPS字体
  • ControllerAdvice定义统一异常处理
  • 【力扣 | SQL题 | 每日三题】力扣175, 176, 181
  • CloudMusic:免费听歌
  • python功能测试
  • Tableau|一入门
  • error -- unsupported GNU version gcc later than 10 are not supported;(gcc、g++)
  • 网络编程(6)——发送的时序性,全双工通信
  • 一个 Java 语言简化处理 PDF 的框架,提供了一套简单易用的 API 接口,满足多样化需求又能简化开发流程的处理方案(附教程)
  • 【AD那些事 10 】焊盘如何修改为自己想要的形状!!!!! 焊盘设计规则如何更改??????
  • 【架构设计】同步与异步:应用场景与选择指南
  • cpu路、核、线程、主频、缓存
  • 相似度度量方法有哪些?
  • 数据结构--单链表
  • JS 中的深拷贝与浅拷贝
  • 30秒的PHP代码片段(1)数组 - Array
  • CentOS 7 防火墙操作
  • Golang-长连接-状态推送
  • Gradle 5.0 正式版发布
  • js对象的深浅拷贝
  • October CMS - 快速入门 9 Images And Galleries
  • pdf文件如何在线转换为jpg图片
  • scrapy学习之路4(itemloder的使用)
  • 彻底搞懂浏览器Event-loop
  • 记录:CentOS7.2配置LNMP环境记录
  • 网络应用优化——时延与带宽
  • 微服务框架lagom
  • 一些基于React、Vue、Node.js、MongoDB技术栈的实践项目
  • 在 Chrome DevTools 中调试 JavaScript 入门
  • 在electron中实现跨域请求,无需更改服务器端设置
  • 终端用户监控:真实用户监控还是模拟监控?
  • 主流的CSS水平和垂直居中技术大全
  • 容器镜像
  • ​ubuntu下安装kvm虚拟机
  • ###C语言程序设计-----C语言学习(6)#
  • $con= MySQL有关填空题_2015年计算机二级考试《MySQL》提高练习题(10)
  • (1)(1.8) MSP(MultiWii 串行协议)(4.1 版)
  • (3) cmake编译多个cpp文件
  • (ctrl.obj) : error LNK2038: 检测到“RuntimeLibrary”的不匹配项: 值“MDd_DynamicDebug”不匹配值“
  • (poj1.3.2)1791(构造法模拟)
  • (TOJ2804)Even? Odd?
  • (力扣记录)1448. 统计二叉树中好节点的数目
  • (论文阅读22/100)Learning a Deep Compact Image Representation for Visual Tracking
  • (学习日记)2024.03.12:UCOSIII第十四节:时基列表
  • (循环依赖问题)学习spring的第九天
  • (一)Neo4j下载安装以及初次使用
  • (转) 深度模型优化性能 调参
  • (转)负载均衡,回话保持,cookie
  • (转)自己动手搭建Nginx+memcache+xdebug+php运行环境绿色版 For windows版
  • (最简单,详细,直接上手)uniapp/vue中英文多语言切换
  • .bat批处理(二):%0 %1——给批处理脚本传递参数
  • .gitignore不生效的解决方案
  • .NET : 在VS2008中计算代码度量值
  • .net core Redis 使用有序集合实现延迟队列