当前位置: 首页 > news >正文

用go实现一个任务调度类 (泛型)

用go实现一个任务调度类 (泛型)

源码地址:
https://github.com/robinfoxnan/BirdTalkServer/blob/main/server/core/workmanager.go

1.概述

实现了一个简单的任务管理系统,允许用户定义任务和工作者,并将任务分配给工作者进行处理。这个系统旨在提供一个灵活的任务管理框架,可以根据需要动态地添加和移除工作者,以及处理任务。

2.主要功能

  1. 定义了 Task 接口和 Worker 接口,用于表示任务和工作者;
  2. 提供了基础的任务类型 BaseTask 和基础的工作者类型 BaseWorker,用户可以基于这些基础类型来实现自定义的任务和工作者。需要在 BaseTask结构上继承一个新的结构,并实现Process方法;
  3. 实现了一个泛型任务管理器 Manager,用于管理工作者并分配任务给工作者。根据最大工作者个数和任务队列长度,动态地添加工作者。提供了停止所有工作者的方法,提供了方法来等待所有工作者完成任务。

3.类型和接口

3.1Task 任务接口

type Task interface {Process()
}

任务接口定义了一个 Process() 方法,用于执行任务的处理逻辑。

3.2Worker 接口

type Worker interface {Init(id int64, taskChan chan Task, wg *sync.WaitGroup, f WorkerCleanF)Start()Stop()
}

工作者接口定义了三个方法:

  • Init() 方法用于初始化工作者。创建后,设置工作者ID,任务通道,同步组,以及一个析构函数类似的清理函数;
  • Start() 方法用于启动工作者协程,开始处理任务;
  • Stop() 方法用于停止工作者;(关闭通道)

3.3BaseTask 结构体

这是一个最基础的示例,后续自定义结构可以包含这个结构:

type BaseTask struct {Id int64
}

基础任务结构体包含一个任务 ID,实现了 Task 接口的 Process() 方法,用于执行任务的处理逻辑。

3.4BaseWorker 结构体

type BaseWorker struct {Id       int64waitGrp  *sync.WaitGrouptaskChan chan TaskcleanFun WorkerCleanFquitChan chan struct{}
}

基础工作者结构体包含工作者 ID、等待组、任务通道、清理函数和退出通道,实现了 Worker 接口的 Init()Start()Stop() 方法,用于初始化工作者、启动工作者和停止工作者。

4. Manager 结构体

type Manager[T Task, W Worker] struct {workers       map[int64]W    // 使用一个map管理各个协程maxWorkers    int64          // 最大协程数量workerCounter int64          // 使用原子方式计数taskChan      chan Task      // 任务通道lock          sync.Mutex     // map用的锁wg            sync.WaitGroup // 同步组newWorkerFunc func() W       // 用于创建泛型中工作者结构的函数exiting       int32          // 退出状态标记,防止停止过程中加入任务workerIdSeq   int64          // 协程序号,可以用雪花算法代替,一般应该够用
}

任务管理器结构体包含了一个工作者映射、最大工作者数量、工作者计数器、任务通道、互斥锁、等待组、新建工作者函数、退出标志和工作者 ID 序列,提供了方法来添加任务、移除工作者、等待所有工作者完成任务和停止所有工作者。

5. 使用示例

最简单的一个测试示例

    manager := NewManager[Task, *BaseWorker](20, NewBaseWorker)// 添加示例任务到管理器go func() {for i := 0; i < 10; i++ {var t = &BaseTask{Id: int64(i)}manager.AddTask(t)}}()time.Sleep(time.Minute * 1)manager.StopAll()// 等待所有工作者完成任务manager.Wait()

我们需要重新定义一个结构用于表示任务,通常需要更多的字段

type CustomTask struct {BaseTaskAdditionalInfo string// 这里添加更多的字段
}// 实现 Task 接口的 Run 方法,
// 必须要实现这个函数,这是任务调度的功能入口,在协程中运行
func (t *CustomTask) Process() {fmt.Printf("CustomTask with additional info '%s' is running\n", t.AdditionalInfo)// 调用父类的 Process 方法//t.BaseTask.Process()
}

重写测试:


func TestWorkers(t *testing.T) {manager := NewManager[Task, *BaseWorker](20, NewBaseWorker)// 添加示例工作者到管理器// 添加示例任务到管理器go func() {for i := 0; i < 10; i++ {var t = &BaseTask{Id: int64(i)}manager.AddTask(t)}for i := 10; i < 16; i++ {var t = &CustomTask{BaseTask: BaseTask{Id: int64(i)}, AdditionalInfo: "Custom Info"}manager.AddTask(t)}}()time.Sleep(time.Minute * 1)manager.StopAll()// 等待所有工作者完成任务manager.Wait()
}

结论

各个语言实现的这个轮子基本都差不多。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 回归预测 | Matlab基于SAO-LSTM雪消融算法优化长短期记忆神经网络的数据多输入单输出回归预测
  • springboot项目
  • OpenCV4.9.0开源计算机视觉库安装教程
  • SQL注入四-PHP应用SQL二次注入堆叠执行DNS带外功能点黑白盒条件
  • 第 1 章 信息化和信息系统 -4
  • 栈内存和堆内存
  • HCIP-Datacom(H12-821)题库补充(3/26)
  • NTP服务搭建
  • 这回轮到鸿蒙禁用安卓了!!!
  • 【剑指offer】顺时针打印矩阵
  • 1.Git快速入门
  • 数据结构/C++:位图 布隆过滤器
  • 测试缺陷定位的基本方法
  • MATLAB下载+安装教程
  • 如何应对Android面试官->进程通信如何注册与获取服务
  • [译]前端离线指南(上)
  • echarts的各种常用效果展示
  • ES2017异步函数现已正式可用
  • iOS动画编程-View动画[ 1 ] 基础View动画
  • java中的hashCode
  • JSDuck 与 AngularJS 融合技巧
  • Lucene解析 - 基本概念
  • miniui datagrid 的客户端分页解决方案 - CS结合
  • SAP云平台里Global Account和Sub Account的关系
  • SpingCloudBus整合RabbitMQ
  • Swoft 源码剖析 - 代码自动更新机制
  • text-decoration与color属性
  • tweak 支持第三方库
  • v-if和v-for连用出现的问题
  • vuex 学习笔记 01
  • 软件开发学习的5大技巧,你知道吗?
  • 设计模式 开闭原则
  • 延迟脚本的方式
  • 支付宝花15年解决的这个问题,顶得上做出十个支付宝 ...
  • ​【数据结构与算法】冒泡排序:简单易懂的排序算法解析
  • ​猴子吃桃问题:每天都吃了前一天剩下的一半多一个。
  • ​业务双活的数据切换思路设计(下)
  • #Datawhale AI夏令营第4期#AIGC方向 文生图 Task2
  • (17)Hive ——MR任务的map与reduce个数由什么决定?
  • (4)Elastix图像配准:3D图像
  • (zt)最盛行的警世狂言(爆笑)
  • (附源码)springboot 个人网页的网站 毕业设计031623
  • (黑马出品_高级篇_01)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式
  • (算法)硬币问题
  • (杂交版)植物大战僵尸
  • (转)Linq学习笔记
  • .DFS.
  • .md即markdown文件的基本常用编写语法
  • .NET Core WebAPI中封装Swagger配置
  • .net MySql
  • .net 简单实现MD5
  • .NET 依赖注入和配置系统
  • .NET框架
  • .Net语言中的StringBuilder:入门到精通
  • [].shift.call( arguments ) 和 [].slice.call( arguments )