当前位置: 首页 > news >正文

python之异步任务

在 Python 中,异步任务通常通过使用库如 Celery 来实现。Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,同时提供操作控制。

Celery 中,delayapply_async 是两种常用的方法来调度异步任务。

delay 方法

delayCelery 提供的一个快捷方法,用于简化任务的调用。它会自动将任务标记为异步执行。

from celery import Celeryapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
def add(x, y):return x + y# 使用 delay 方法调用任务
result = add.delay(4, 6)

apply_async 方法

apply_async 提供了更多的控制选项,例如可以指定任务的执行时间、重试策略等。

from celery import Celeryapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
def add(x, y):return x + y# 使用 apply_async 方法调用任务
result = add.apply_async((4, 6), countdown=10)  # 任务将在10秒后执行
参数说明
  • args:任务的参数,通常以元组形式传递。
  • kwargs:任务的关键字参数,以字典形式传递。
  • countdown:任务延迟执行的时间(以秒为单位)。
  • eta:任务的预计执行时间(datetime 对象)。
  • expires:任务的过期时间(datetime 对象或秒数)。
  • retry:是否在任务失败时自动重试。
  • retry_policy:重试策略,例如最大重试次数、重试间隔等。

示例1

使用 apply_async 方法来设置任务的各种参数:

from celery import Celery
from datetime import datetime, timedeltaapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task(bind=True, max_retries=3)
def add(self, x, y):try:return x + yexcept Exception as exc:raise self.retry(exc=exc, countdown=5)# 使用 apply_async 方法调用任务
eta = datetime.utcnow() + timedelta(seconds=10)
result = add.apply_async((4, 6), eta=eta, expires=60, retry=True, retry_policy={'max_retries': 5,'interval_start': 0,'interval_step': 0.2,'interval_max': 0.2,
})

任务 add 被设置为在10秒后执行,并且在60秒后过期。如果任务失败,它会自动重试最多5次,每次重试间隔0.2秒。

  • delay 方法是 apply_async 的简化版本,适用于简单的异步任务调用。
  • apply_async 方法提供了更多的控制选项,适用于需要更复杂调度和重试策略的任务。

示例2

假设你有一个自定义的任务基类 CallbackTask,你可以这样定义一个任务:

from celery import Celery, Taskapp = Celery('tasks', broker='pyamqp://guest@localhost//')class CallbackTask(Task):def on_success(self, retval, task_id, args, kwargs):print(f'Task {task_id} succeeded with result: {retval}')def on_failure(self, exc, task_id, args, kwargs, einfo):print(f'Task {task_id} failed with exception: {exc}')@app.task(name='my_custom_task', base=CallbackTask, ignore_result=True)
def add(x, y):return x + y# 调用任务
result = add.delay(4, 6)
  1. 自定义任务基类 CallbackTask

    • on_success 方法:当任务成功完成时调用。
    • on_failure 方法:当任务失败时调用。
  2. 任务定义

    • @app.task(name='my_custom_task', base=CallbackTask, ignore_result=True)
      • name='my_custom_task':任务的自定义名称。
      • base=CallbackTask:任务的基类是 CallbackTask
      • ignore_result=True:任务的结果将不会被存储。
  3. 调用任务

    • result = add.delay(4, 6):异步调用任务 add,传递参数 46

Celery 中,任务的参数通常以元组或字典的形式传递,并且 Celery 会自动处理参数的序列化和反序列化。因此,你通常不需要手动将参数 JSON 化。

参数传递

简单参数
from celery import Celeryapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
def add(x, y):return x + y# 使用 delay 方法调用任务,传递参数
result = add.delay(4, 6)

在这个示例中,参数 46 被传递给任务 addCelery 会自动处理这些参数的序列化和反序列化。

复杂参数

如果你需要传递更复杂的参数,例如嵌套的字典或列表,Celery 也能处理这些情况:

from celery import Celeryapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
def process_data(data):# 假设 data 是一个字典return data['key1'] + data['key2']# 使用 apply_async 方法调用任务,传递复杂参数
data = {'key1': 10, 'key2': 20}
result = process_data.apply_async((data,))

在这个示例中,data 是一个字典,Celery 会自动将其序列化并传递给任务 process_data

常见错误

Celery 中,如果尝试传递一个 Django 模型对象作为任务参数,而没有设置适当的序列化和反序列化方法,通常会遇到序列化错误。默认情况下,Celery 使用 JSON 作为序列化格式,而 JSON 不支持直接序列化 Django 模型对象。

如果直接传递一个 Django 模型对象作为任务参数,可能会遇到类似以下的错误:

kombu.exceptions.EncodeError: Object of type <YourModel> is not JSON serializable

这个错误表明 Celery 尝试将 Django 模型对象序列化为 JSON,但失败了,因为 JSON 序列化器不知道如何处理 Django 模型对象。

解决方法

  1. 传递模型对象的主键

    • 传递模型对象的主键(或其他简单类型)作为任务参数,然后在任务内部重新获取模型对象。
    from celery import Celery
    from myapp.models import MyModelapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
    def process_model_object(model_id):obj = MyModel.objects.get(id=model_id)# 处理对象print(obj)# 调用任务,传递模型对象的主键
    obj = MyModel.objects.first()
    process_model_object.delay(obj.id)
    
  2. 自定义序列化和反序列化

    • 自定义任务参数的序列化和反序列化方法,将模型对象转换为可序列化的格式(如字典)。
    from celery import Celery
    from myapp.models import MyModelapp = Celery('tasks', broker='pyamqp://guest@localhost//')@app.task
    def process_model_object(model_data):# 反序列化模型对象obj = MyModel(**model_data)# 处理对象print(obj)# 调用任务,传递模型对象的字典表示
    obj = MyModel.objects.first()
    model_data = {'id': obj.id,'field1': obj.field1,'field2': obj.field2,# 其他字段
    }
    process_model_object.delay(model_data)
    
  3. 使用 Pickle 序列化器

    • Celery 支持多种序列化器,包括 Pickle。Pickle 可以序列化几乎所有 Python 对象,但它有安全风险,不建议在不受信任的环境中使用。
    from celery import Celery
    from myapp.models import MyModelapp = Celery('tasks', broker='pyamqp://guest@localhost//')
    app.conf.update(task_serializer='pickle',accept_content=['pickle'],  # Ignore other contentresult_serializer='pickle',
    )@app.task
    def process_model_object(obj):# 处理对象print(obj)# 调用任务,传递模型对象
    obj = MyModel.objects.first()
    process_model_object.delay(obj)
    

总结

  • 传递模型对象的主键:这是最常见和推荐的方法,因为它简单且安全。
  • 自定义序列化和反序列化:适用于需要传递复杂对象的情况。
  • 使用 Pickle 序列化器:虽然方便,但有安全风险,不建议在不受信任的环境中使用。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 数据结构与算法 第12天(排序)
  • python之对象间的关系
  • 14.2 k8s中我们都需要监控哪些组件
  • C#/.NET/.NET Core推荐学习路线文档文章
  • Linux新建虚拟机Ubuntu详解
  • Linux:软硬连接和动静态库
  • PhotoZoom9怎么样?图片模糊怎么办?
  • navigator.mediaDevices.getUserMedia检查用户的摄像头是否可用,虚拟摄像头问题
  • 基于MinerU的PDF解析API
  • AUC真的什么情形下都适合吗
  • COD论文笔记 BiRefNet
  • Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)
  • Python | Leetcode Python题解之第386题字典序排数
  • 图文解析保姆级教程:Postman专业接口测试工具的安装和基本使用
  • ChatGPT 3.5/4.0使用手册:解锁人工智能的无限潜能
  • docker容器内的网络抓包
  • EOS是什么
  • Joomla 2.x, 3.x useful code cheatsheet
  • Mysql5.6主从复制
  • Python学习笔记 字符串拼接
  • SAP云平台运行环境Cloud Foundry和Neo的区别
  • vue学习系列(二)vue-cli
  • 如何用vue打造一个移动端音乐播放器
  • 微信小程序设置上一页数据
  • 一个普通的 5 年iOS开发者的自我总结,以及5年开发经历和感想!
  • 用Visual Studio开发以太坊智能合约
  • No resource identifier found for attribute,RxJava之zip操作符
  • 7行Python代码的人脸识别
  • 支付宝花15年解决的这个问题,顶得上做出十个支付宝 ...
  • # Java NIO(一)FileChannel
  • #《AI中文版》V3 第 1 章 概述
  • (3)选择元素——(17)练习(Exercises)
  • (php伪随机数生成)[GWCTF 2019]枯燥的抽奖
  • (二)linux使用docker容器运行mysql
  • (附源码)计算机毕业设计ssm电影分享网站
  • (回溯) LeetCode 78. 子集
  • (力扣记录)235. 二叉搜索树的最近公共祖先
  • (力扣题库)跳跃游戏II(c++)
  • (七)理解angular中的module和injector,即依赖注入
  • (十七)devops持续集成开发——使用jenkins流水线pipeline方式发布一个微服务项目
  • (续)使用Django搭建一个完整的项目(Centos7+Nginx)
  • (转)全文检索技术学习(三)——Lucene支持中文分词
  • (转)树状数组
  • .NET C# 操作Neo4j图数据库
  • .net core 的缓存方案
  • .NET Core、DNX、DNU、DNVM、MVC6学习资料
  • .Net Framework 4.x 程序到底运行在哪个 CLR 版本之上
  • .NET 使用 ILRepack 合并多个程序集(替代 ILMerge),避免引入额外的依赖
  • .NET与 java通用的3DES加密解密方法
  • .vue文件怎么使用_我在项目中是这样配置Vue的
  • 。。。。。
  • /usr/local/nginx/logs/nginx.pid failed (2: No such file or directory)
  • @31省区市高考时间表来了,祝考试成功
  • [ 云计算 | AWS 实践 ] Java 如何重命名 Amazon S3 中的文件和文件夹
  • [2013AAA]On a fractional nonlinear hyperbolic equation arising from relative theory