当前位置: 首页 > news >正文

Flask使用线程异步执行耗时任务

1 问题说明

1.1 任务简述

在开发Flask应用中一定会遇到执行耗时任务,但是Flask是轻量级的同步框架,即在单个请求时服务会阻被塞,直到任务完成(注意:当前请求被阻塞不会影响到其他请求)。

解决异步问题有两种思路,一种是借助外部工具实现异步,例如:消息队列(RabbitMQ)、 异步任务队列(Celery+Redis);另一种借助Python中的进程、线程或协程解决异步。我的是小项目选择因此选择了第二种方法。

经过测试,我在Flask中使用协程(gevent)会被阻塞;使用进程(multiprocessing)不会被阻塞,操作数据库出了问题(有可能是我没操作正确的问题);最后选择使用线程(threading)。

注意:使用线程会出现线程安全问题,

1.2 注意的问题

(1)线程安全

"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中# 2.1 直接使用Session是线程不安全的,不推荐使用self.session = Session(self.engine)# 2.2 直接使用scoped_session是线程安全的,推荐使用# 获取sessionmakersession_factory = sessionmaker(engine)# scoped_session是线程安全的# 注意,此session不能使用Query对象session = scoped_session(session_factory)!!!
"""

(2)使用数据库

# 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高
# 否则无法插入数据库并报错,错误内容如下(低版本不会出现此问题):
"""
This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.
"""

2 工程布局

项目布局如下:
在这里插入图片描述

3 源代码

(1)main.py

from blueprint import init_blueprint
from config_app import app
from config_db import init_mysql_db# 初始化MySQL
init_mysql_db()init_blueprint()if __name__ == '__main__':app.run(host='0.0.0.0', debug=True)

(2)config_app.py

from flask import Flask
from flask_cors import CORSdef create_app():flask_app = Flask(__name__)CORS(flask_app, supports_credentials=True)return flask_appapp = create_app()

(3)config_db.py


from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow# 添加pymysql驱动,连接MySQL数据库
import pymysqlfrom config_app import apppymysql.install_as_MySQLdb()"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中# 2.1 直接使用Session是线程不安全的,不推荐使用self.session = Session(self.engine)# 2.2 直接使用scoped_session是线程安全的,推荐使用# 获取sessionmakersession_factory = sessionmaker(engine)# scoped_session是线程安全的# 注意,此session不能使用Query对象session = scoped_session(session_factory)!!!
"""# 创建MySQL单实例
mysql_db = SQLAlchemy()# 创建Schema
mysql_schema = Marshmallow()# 创建数据库
"""
create database async default character set utf8mb4 collate utf8mb4_unicode_ci;
"""class MysqlConf:acc = "root"pwd = "123456"host = "192.168.108.200"port = 3306db = "async"mysql_conf = MysqlConf()# 初始化MySQL数据库
def init_mysql_db():# 配置MySQL数据库urldb_url = "mysql://" + mysql_conf.acc + ":" + mysql_conf.pwd + "@" + mysql_conf.host + ":" + str(mysql_conf.port) + "/" + mysql_conf.dbapp.config["SQLALCHEMY_DATABASE_URI"] = db_url# 关闭sqlalchemy自动跟踪数据库app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False# 显示底层执行的SQL语句app.config['SQLALCHEMY_ECHO'] = True# 解决‘No application found. Either work inside a view function or push an application context.’app.app_context().push()# 初始化appmysql_db.init_app(app)# 初始化schemamysql_schema.init_app(app)# 初始化table
def init_table():# 删除表mysql_db.drop_all()# 创建表mysql_db.create_all()

(4)dao.py

import datetimefrom config_db import mysql_db as db# Create table of bm_record
class User(db.Model):# Record table__tablename__ = "as_user"id = db.Column("us_id", db.Integer, nullable=False, primary_key=True, autoincrement=True)name = db.Column("us_name", db.String(100))age = db.Column("us_age", db.Integer)create_time = db.Column("us_create_time", db.DateTime, default=datetime.datetime.now)# 插入数据
def insert_record_dao(user: User):# Add datadb.session.add(user)# Commit datadb.session.commit()

(5)blueprint.py

# 构建蓝本
import threading
import time
from concurrent.futures import ThreadPoolExecutorfrom flask import Blueprint, jsonifyfrom config_app import app
from config_db import init_table
from dao import insert_record_dao, Useruser = Blueprint("user", __name__)# 注册蓝本
def init_blueprint():app.register_blueprint(user, url_prefix='/user')@user.route("/initdb")
def init_db():init_table()return jsonify("success")@user.route("/add")
def add_user():user: User = User()user.name = "zhangsan"user.age = 12time.sleep(10)insert_record_dao(user)return jsonify(user.id)executor = ThreadPoolExecutor(3)
@user.route("/thread_pool")
def run_task_thread_pool():executor.submit(_run_thread_pool)return jsonify("success")# 执行线程
def _run_thread_pool():print("Run thread pool")time.sleep(2)user: User = User()user.name = "thread pool"user.age = 12# 注意:必须使用”with app.app_context()“,否则无法插入数据库,并且不会报错with app.app_context():insert_record_dao(user)print("End thread pool")pass@user.route("/thread")
def run_task_thread():task = threading.Thread(target=_run_thread, name='thread-01')task.start()return jsonify("success")# 执行线程
def _run_thread():print("Run thread")time.sleep(2)user: User = User()user.name = "thread"user.age = 12# 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高# 否则无法插入数据库并报错,错误内容如下(低版本不会出现此问题):"""This typically means that you attempted to use functionality that neededthe current application. To solve this, set up an application contextwith app.app_context(). See the documentation for more information."""with app.app_context():insert_record_dao(user)print("End thread")pass

4 请求截图

(1)初始化数据库

在这里插入图片描述

(2)添加用户

在这里插入图片描述

(3)使用线程池

在这里插入图片描述

(4)使用线程

在这里插入图片描述

相关文章:

  • 【keepalived】高可用神器,实现应用的自动主备切换
  • 【网络奇缘】- 计算机网络|分层结构|深入学习ISO模型
  • 怎么检测电脑电源?电脑电源检测系统软件如何助力?
  • 云轴科技ZStack助力龙芯打造IT产业新生态
  • DAPP开发【11】IPFS星际文件管理系统
  • 分享一个Python网络爬虫数据采集利器
  • InST论文复现
  • python进行文件批量命名
  • Istio可观测性
  • leetcode 1658. 将 x 减到 0 的最小操作数(优质解法)
  • Mac卸载、安装Python
  • 【带头学C++】----- 九、类和对象 ---- 9.1 类和对象的基本概念----(9.1.4---9.1.6)
  • Nginx实现(动静分离)
  • python实现一个计算器
  • Linux--程序地址空间
  • Create React App 使用
  • js数组之filter
  • spring + angular 实现导出excel
  • SQLServer之索引简介
  • 不发不行!Netty集成文字图片聊天室外加TCP/IP软硬件通信
  • 道格拉斯-普克 抽稀算法 附javascript实现
  • 高度不固定时垂直居中
  • 力扣(LeetCode)22
  • 跳前端坑前,先看看这个!!
  • 小程序button引导用户授权
  • 携程小程序初体验
  • 智能合约Solidity教程-事件和日志(一)
  • - 转 Ext2.0 form使用实例
  • 白色的风信子
  • gunicorn工作原理
  • zabbix3.2监控linux磁盘IO
  • 继 XDL 之后,阿里妈妈开源大规模分布式图表征学习框架 Euler ...
  • 摩拜创始人胡玮炜也彻底离开了,共享单车行业还有未来吗? ...
  • ​2021半年盘点,不想你错过的重磅新书
  • ​Base64转换成图片,android studio build乱码,找不到okio.ByteString接腾讯人脸识别
  • ​Linux Ubuntu环境下使用docker构建spark运行环境(超级详细)
  • # 日期待t_最值得等的SUV奥迪Q9:空间比MPV还大,或搭4.0T,香
  • #控制台大学课堂点名问题_课堂随机点名
  • (003)SlickEdit Unity的补全
  • (AtCoder Beginner Contest 340) -- F - S = 1 -- 题解
  • (附源码)计算机毕业设计ssm-Java网名推荐系统
  • (附源码)计算机毕业设计ssm电影分享网站
  • (十一)JAVA springboot ssm b2b2c多用户商城系统源码:服务网关Zuul高级篇
  • (四)Android布局类型(线性布局LinearLayout)
  • (提供数据集下载)基于大语言模型LangChain与ChatGLM3-6B本地知识库调优:数据集优化、参数调整、Prompt提示词优化实战
  • (五)大数据实战——使用模板虚拟机实现hadoop集群虚拟机克隆及网络相关配置
  • (一)使用IDEA创建Maven项目和Maven使用入门(配图详解)
  • .axf 转化 .bin文件 的方法
  • .NET / MSBuild 扩展编译时什么时候用 BeforeTargets / AfterTargets 什么时候用 DependsOnTargets?
  • .net core Swagger 过滤部分Api
  • .net redis定时_一场由fork引发的超时,让我们重新探讨了Redis的抖动问题
  • .NET命名规范和开发约定
  • /bin、/sbin、/usr/bin、/usr/sbin
  • /etc/skel 目录作用
  • @Builder用法