当前位置: 首页 > news >正文

Python解决多个服务线程/进程重复运行定时任务的问题

记录多实例服务定时任务出现运行多次的问题

问题:web项目运行多个实例时,定时任务会被执行多次的问题

举例来说
我使用库APScheduler排定了一个定时任务taskA在每天的晚上9点需要执行一次,我的web服务使用分布式运行了8个实例,于是在每天晚上的9点,我的8个服务都会接收到APScheduler发布的这个定时任务,然后每个服务都会执行一次这个定时任务,这与我预期的只执行1次是不符的,我想要解决这个问题,让定时任务只被执行一次

设想的解决方式:
使用redis分布式锁的方式(redis.setnx(key, value))让实例只能运行一次定时任务,从而解决定时任务会被执行多次的问题

setnx(key, value)方法很简单,就是当key不存在时,setnx会成功,否则会失败

    def setnx(self, name: KeyT, value: EncodableT) -> Awaitable:
        """Set the value of key ``name`` to ``value`` if key doesn't exist"""
        return self.execute_command("SETNX", name, value)

实现方案

  1. setnx(key, value)方法会在key不存在时设置value,当多个线程同时接到排期准备运行同一个任务时,只有第一个线程setnx会成功(返回True),于是第一个setnx成功的线程运行了定时任务,其他线程在setnx时由于key已经存在会失败(返回False),从而让它们跳过定时任务的执行

  2. 仍然存在的问题:定时任务一般会执行多次,在其下一次执行时,setnx相同key的这条记录应该被删除掉,因为这是一次新的任务,否则之后的任务执行都会因setnx时key已存在而失败导致任务无法执行

    i. 第一种方案:在setnx成功的线程1任务执行完成后删除这个key在redis中存储的记录,从而让下一次任务第一次运行时又可以成功setnx(key, value)而执行
    但这种方案存在一定的风险:如果存在线程2因为一些原因阻塞了,在线程1执行完任务才开始接收到运行定时任务的指令,那么线程2会在key被删除后开始尝试setnx,那必然会成功,然后重复了运行任务

    ii. 基于第一种方案的考虑,确定了第二种方案,只需要给每次的定时任务添加唯一标识即可避免第一种方案的问题:设置此次任务运行的唯一key_x,在setnx成功的线程1任务执行完成之后不对这次定时任务的key_x执行删除
    此次定时任务唯一key_x的设置很容易想到的方案是在这次定时任务id上添加运行的排期时间,这样就可以让这一次的定时任务是唯一且可识别了,只要运行了一次其值就永久设置为True,不会在执行第二次(考虑到资源占用,实际应该设置一个较长的过期时间也完全可以避免方案1的风险)

设置有过期时间的方法应该使用redis.set(key, value, nx=True, ex=10)方法,这里nx=True表名使用命令SETNX,而ex=10则是过期的时间,单位为秒

第一种方案的可重复运行的小案例:

# -*- coding: utf-8 -*-
import asyncio
import time

import aioredis
from aioredis import Redis

loop = asyncio.get_event_loop()
redis_coro = aioredis.Redis()


def redis_distributed_lock(cache_key, cache_value="locked"):
    def decorator(func):
        async def wrapper(*args, **kwargs):
            redis_instance = await redis_coro
            # 这里设置了10小时的过期时间,完全可以避免重复运行的风险了
            locked = await redis_instance.set(cache_key, cache_value, nx=True, ex=60 * 60 * 10)
            if locked:  # 第一个线程设置成功值会运行任务,否则不会运行任务
                print("success")
                return await func(*args, **kwargs)
            print(f"failed")

        return wrapper

    return decorator


async def ntasks():
    t_time = ["9点", "10点", "11点"]   # 模拟任务在三个时间点被执行
    redis: Redis = await redis_coro
    t_id = 1

    async def task_func(tid):
        print(f"{tid=}, executing...")
        return tid

    # 为了可重复运行这个示例,先执行删除之前设置的key
    ret = await redis.delete(str(t_id))

    for t_t in t_time:
        redis_key = f"{t_id}"    
        # 这里本来预期是直接放到函数头上装饰,但是不方便控制redis_key参数,所以使用了原始的方式装饰task_func
        task_f = redis_distributed_lock(redis_key)(task_func)
        # 假设启动分布式服务8个,会执行8次定时任务,这里创建了8个任务,按照先执行完先返回的顺序处理
        for f in asyncio.as_completed([task_f(redis_key) for _ in range(8)], loop=loop):
            res = await f
            # print(f"{res=}")
        print(f"=" * 80)
        time.sleep(5)  # 模拟一个定时任务在多个时间点执行,下一次执行时,时间参数(t_t)会发生变化
        # break


if __name__ == '__main__':
    try:
        loop.run_until_complete(ntasks())
    finally:
        loop.stop()
        loop.close()

运行结果

success
tid='1', executing...
failed
failed
failed
failed
failed
failed
failed
================================================================================
failed
failed
failed
failed
failed
failed
failed
failed
================================================================================
failed
failed
failed
failed
failed
failed

只有第一个时间点是按照预期执行,之后的时间点执行都总是失败,因为每个时间点的该任务设置的key都是一样的

第二种方案的可重复运行的小案例:

# -*- coding: utf-8 -*-
import asyncio
import time

import aioredis
from aioredis import Redis

loop = asyncio.get_event_loop()
redis_coro = aioredis.Redis()


def redis_distributed_lock(cache_key, cache_value="locked"):
    def decorator(func):
        async def wrapper(*args, **kwargs):
            redis_instance = await redis_coro
            # 这里设置了10小时的过期时间,完全可以避免重复运行的风险了
            locked = await redis_instance.set(cache_key, cache_value, nx=True, ex=60 * 60 * 10)
            if locked:  # 第一个线程设置成功值会运行任务,否则不会运行任务
                print("success")
                return await func(*args, **kwargs)
            print(f"failed")

        return wrapper

    return decorator


async def ntasks():
    t_time = ["9点", "10点", "11点"]   # 模拟任务在三个时间点被执行
    redis: Redis = await redis_coro
    t_id = 1

    async def task_func(tid):
        print(f"{tid=}, executing...")
        return tid

    for t_t in t_time:
        redis_key = f"{t_id}_{t_t}"     # set的key用定时任务的id+时间点来作为此次定时任务的唯一标识
        # 为了可重复运行这个示例,先执行删除之前设置的key
        ret = await redis.delete(redis_key)
        print(f"key deleted? {ret}")
        # 这里本来预期是直接放到函数头上装饰,但是不方便控制redis_key参数,所以使用了原始的方式装饰task_func
        task_f = redis_distributed_lock(redis_key)(task_func)
        # 假设启动分布式服务8个,会执行8次定时任务,这里创建了8个任务,按照先执行完先返回的顺序处理
        for f in asyncio.as_completed([task_f(redis_key) for _ in range(8)], loop=loop):
            res = await f
            # print(f"{res=}")
        print(f"=" * 80)
        time.sleep(5)  # 模拟一个定时任务在多个时间点执行,下一次执行时,时间参数(t_t)会发生变化
        # break


if __name__ == '__main__':
    try:
        loop.run_until_complete(ntasks())
    finally:
        loop.stop()
        loop.close()

输出

key deleted? 1
success
tid='1_9点', executing...
failed
failed
failed
failed
failed
failed
failed
================================================================================
key deleted? 1
success
tid='1_10点', executing...
failed
failed
failed
failed
failed
failed
failed
================================================================================
key deleted? 1
success
tid='1_11点', executing...
failed
failed
failed
failed
failed
failed
failed
================================================================================

可以看到task_func任务的每个时间点的执行都只有一次成功,而且不会出现只有第一个时间点执行成功而之后的时间点执行都全是失败的情况

有用的参考:
起初总是尝试在协程方法中使用多线程threading.Thread老是碰壁,看了这个回答后读了这篇文章,感觉豁然开朗如醍醐灌顶

相关文章:

  • webpack学习笔记
  • 01人机交互/打开CMD/常见CMD命令/CMD打开QQ并设置环境变量
  • QT汽车客运公司售票系统(改良版)
  • 初始Cpp之 六、内存分配
  • 【算法】重载sort的cmp的题
  • STC15单片机-LED闪烁(定时器)
  • Android 移动安全攻防实战 第一章
  • Docker 容器技术
  • 基于springboot的少儿识字系统
  • Hyperledger Besu环境搭建(Linux)
  • java毕业设计参考文献基于S2SH的仓库管理系统[包运行成功]
  • 基于51单片机8位竞赛抢答器_倒计时可调+LED跑马灯
  • 【教3妹学mysql】一条慢sql如何排查优化
  • 【JavaEE】进程与线程的创建
  • 计算机毕业设计 SSM校园二手交流系统 大学二手交易平台系统 二手物品交易平台Java Vue MySQL数据库 远程调试 代码讲解
  • JavaScript-如何实现克隆(clone)函数
  • 03Go 类型总结
  • Android优雅地处理按钮重复点击
  • nginx 负载服务器优化
  • node和express搭建代理服务器(源码)
  • Spring Boot快速入门(一):Hello Spring Boot
  • 前端性能优化--懒加载和预加载
  • 腾讯大梁:DevOps最后一棒,有效构建海量运营的持续反馈能力
  • 携程小程序初体验
  • 验证码识别技术——15分钟带你突破各种复杂不定长验证码
  • 远离DoS攻击 Windows Server 2016发布DNS政策
  • gunicorn工作原理
  • shell使用lftp连接ftp和sftp,并可以指定私钥
  • (14)学习笔记:动手深度学习(Pytorch神经网络基础)
  • (3)Dubbo启动时qos-server can not bind localhost22222错误解决
  • (9)目标检测_SSD的原理
  • (bean配置类的注解开发)学习Spring的第十三天
  • (第9篇)大数据的的超级应用——数据挖掘-推荐系统
  • (算法)Travel Information Center
  • (算法设计与分析)第一章算法概述-习题
  • .NET 5种线程安全集合
  • .NET 线程 Thread 进程 Process、线程池 pool、Invoke、begininvoke、异步回调
  • 。Net下Windows服务程序开发疑惑
  • /var/spool/postfix/maildrop 下有大量文件
  • []sim300 GPRS数据收发程序
  • [Android]一个简单使用Handler做Timer的例子
  • [flask]http请求//获取请求头信息+客户端信息
  • [Godot] 3D拾取
  • [hdu 3065] 病毒侵袭持续中 [AC自动机] [病毒特征码匹配]
  • [HNOI2008]Cards
  • [IE编程] WebBrowser控件中设置页面的缩放
  • [INSTALL_FAILED_TEST_ONLY],Android开发出现应用未安装
  • [iOS]iOS获取设备信息经常用法
  • [Linux]Ubuntu noVNC使用
  • [Linux]进程间通信(进程间通信介绍 | 匿名管道 | 命名管道)
  • [POJ - 2386]
  • [python] dataclass 快速创建数据类
  • [python] logging输出到控制台(标准输出)
  • [Spring Boot 2]整合持久层技术
  • [ThinkPHP]源码阅读:Model的获取器