当前位置: 首页 > news >正文

模拟实现.net中的Task机制:探索异步编程的奥秘

.net中使用Task可以方便地编写异步程序,为了更好地理解Task及其调度机制,接下来模拟Task的实现,目的是搞清楚:

  1. Task是什么
  2. Task是如何被调度的

基本的Task模拟实现

从最基本的Task用法开始

Task.Run(Action action)

这个命令的作用是将action作为一项任务提交给调度器,调度器会安排空闲线程来处理。
我们使用Job来模拟Task

public class Job
{private readonly Action _work;public Job(Action work) => _work = work;public JobStatus Status { get; internal set; }internal protected virtual void Invoke(){Status = JobStatus.Running;_work();Status = JobStatus.Completed;}public void Start(JobScheduler? scheduler = null)=> (scheduler ?? JobScheduler.Current).QueueJob(this);public static Job Run(Action work){var job = new Job(work);job.Start();return job;}
}public enum JobStatus
{Created,Scheduled,Running,Completed
}

这里也定义了同Task一样的静态Run方法,使用方式也与Task类似

Job.Run(() => Console.WriteLine($"Job1, thread:{Thread.CurrentThread.ManagedThreadId}"));

作为对比,使用Task时的写法如下,多了await关键字,后文会讨论。

await Task.Run(()=>() => Console.WriteLine($"Task1, thread:{Thread.CurrentThread.ManagedThreadId}"));

调用Job.Run方法时,会基于给定的Action创建一个Job,然后执行job.Start(), 但Job没有立即开始执行,而是通过QueueJob方法提交给了调度器,由调度器来决定Job何时执行,在Job真正被执行时会调用其Invoke方法,此时给定的Action就会被执行了,同时会对应修改Job的状态,从Running到Completed。简单来说,.net的Task的基本工作过程与这个粗糙的Job一样,由此可见,Task/Job代表一项具有某种状态的操作

基于线程池的调度

但Task/Job的执行依赖与调度器,这里用JobScheduler来模拟,.net默认使用基于线程池的调度策略,我们也模拟实现一个ThreadPoolJobScheduler
首先看下JobScheduler,作为抽象基类,其QueueJob方法将有具体的某个调度器(ThreadPoolJobScheduler)来实现:

public abstract class JobScheduler
{public abstract void QueueJob(Job job);public static JobScheduler Current { get; set; } = new ThreadPoolJobScheduler();
}

ThreadPoolJobScheduler实现的QueueJob如下:

public class ThreadPoolJobScheduler : JobScheduler
{public override void QueueJob(Job job){job.Status = JobStatus.Scheduled;var executionContext = ExecutionContext.Capture();ThreadPool.QueueUserWorkItem(_ => ExecutionContext.Run(executionContext!,_ => job.Invoke(), null));}
}

ThreadPoolJobScheduler会将Job提交给线程池,并将Job状态设置为Scheduled。

使用指定线程进行调度

JobScheduler的Current属性默认设置为基于线程的调度,如果有其它调度器也可以更换,但为什么要更换呢?这要从基于线程的调度的局限说起,对于一些具有较高优先级的任务,采用这个策略可能会无法满足需求,比如当线程都忙的时候,新的任务可能迟迟无法被执行。对于这种情况,.net可以通过设置TaskCreationOptions.LongRunning来解决,解析来先用自定义的调度器来解决这个问题:

public class DedicatedThreadJobScheduler : JobScheduler
{private readonly BlockingCollection<Job> _queues=new();private readonly Thread[] _threads;public DedicatedThreadJobScheduler(int threadCount){_threads=new Thread[threadCount];for(int index=0; index< threadCount; index++){_threads[index] =new Thread(Invoke);}Array.ForEach(_threads, thread=>thread.Start());void Invoke(object? state){while(true){_queues.Take().Invoke();}}}public override void QueueJob(Job job){_queues.Add(job);}
}

在启动DedicatedThreadJobScheduler时,会启动指定数量的线程,这些线程会不停地从队列中取出任务并执行。
接下来看看.net的TaskCreationOptions.LongRunning怎么用:

await Task.Factory.StartNew(LongRunningMethod, TaskCreationOptions.LongRunning);static void LongRunningMethod()
{// Simulate a long-running operationConsole.WriteLine("Long-running task started on thread {0}.", Thread.CurrentThread.ManagedThreadId);Thread.Sleep(10000);Console.WriteLine("Long-running task finished on thread {0}.", Thread.CurrentThread.ManagedThreadId);
}

任务顺序的编排

在使用Task时,经常会使用await关键字,来控制多个异步任务之间的顺序,await实际上是语法糖,在了解await之前,先来看看最基本的ContinueWith方法。

var taskA = Task.Run(() => DateTime.Now);
var taskB = taskA.ContinueWith(time => Console.WriteLine(time.Result));
await taskB;

模仿Task,我们给Job也添加ContinueWith方法。

public class Job
{private readonly Action _work;private Job? _continue;public Job(Action work) => _work = work;public JobStatus Status { get; internal set; }internal protected virtual void Invoke(){Status = JobStatus.Running;_work();Status = JobStatus.Completed;_continue?.Start();}public void Start(JobScheduler? scheduler = null)=> (scheduler ?? JobScheduler.Current).QueueJob(this);public static Job Run(Action work){var job = new Job(work);job.Start();return job;}public Job ContinueWith(Action<Job> tobeContinued){if (_continue == null){var job = new Job(() => tobeContinued(this));_continue = job;}else{_continue.ContinueWith(tobeContinued);}return this;}
}

这个ContinueWith方法会将下一个待执行的Job放在_continue,这样多个顺序执行的Job就会构成一个链表。
在当前Job的Invoke方法执行结束时,会触发下一个Job被调度。
使用示例:

Job.Run(() =>
{Thread.Sleep(1000);Console.WriteLine("11");
}).ContinueWith(_ =>
{Thread.Sleep(1000);Console.WriteLine("12");
});

进一步使用await关键字来控制

要像Task一样使用await,需要Job支持有GetAwaiter方法。任何一个类型,只要有了这个GetAwaiter方法,就可以对其使用await关键字了。
c#的Task类中可以找到GetAwaiter

public TaskAwaiter GetAwaiter();

然后TaskAwaiter继承了ICriticalNotifyCompletion接口

public readonly struct TaskAwaiter<TResult> : System.Runtime.CompilerServices.ICriticalNotifyCompletion

照猫画虎,也为Job添加一个最简单的JobAwaiter

public class Job
{...public JobAwaiter GetAwaiter() => new(this);
}

JobAwaiter的定义如下:

public struct JobAwaiter : ICriticalNotifyCompletion
{private readonly Job _job;public readonly bool IsCompleted => _job.Status == JobStatus.Completed;public JobAwaiter(Job job){_job = job;if (job.Status == JobStatus.Created){job.Start();}}public void GetResult() { }public void OnCompleted(Action continuation){_job.ContinueWith(_ => continuation());}public void UnsafeOnCompleted(Action continuation)=> OnCompleted(continuation);
}

添加了await后,前面的代码也可以这样写:

await F1();
await F2();static Job F1() => new Job(() =>
{Thread.Sleep(1000);Console.WriteLine("11");
});static Job F2() => new Job(() =>
{Thread.Sleep(1000);Console.WriteLine("12");
});

总结

回顾开头的两个问题,现在可以尝试给出答案了。

  1. Task是什么,Task是一种有状态的操作(Created,Scheduled,Running,Completed),是对耗时操作的抽象,就像现实中的一项任务一样,它的执行需要相对较长的时间,它也有创建(Created),安排(Scheduled),执行(Running),完成(Completed)的基本过程。任务完成当然需要拿到结果的,这里的Job比较简单,没有模拟具体的结果;
  2. Task是如何被调度的,默认采用基于线程池的调度,即创建好Task后,由线程池中的空闲线程执行,具体什么时候执行、由哪个线程执行,开发者是不用关心的,在具体执行过程中,
    但由于.net全局线程池的局限,对于一些特殊场景无法满足时(比如需要立即执行Task),此时可以通过TaskCreationOptions更改调度行为;

另外,await是语法糖,它背后的实现是基于GetAwaiter,由其返回ICriticalNotifyCompletion接口的实现,并对ContinueWith做了封装。

相关文章:

  • java JUC并发编程 第十章 Synchronized与锁升级
  • 【数据结构】 二叉树理论概念!一文了解二叉树!
  • Java锁常见面试题
  • 力扣218.天际线问题 线段树解法
  • Java零基础手把手保姆级教程_类和对象(超详细)
  • Leetcode48旋转图像
  • R语言环境下使用curl库做的爬虫代码示例
  • Android 图片翻面动画
  • 零日漏洞预防
  • IO学习系列之阻塞IO
  • Linux安装配置awscli命令行接口工具及其从aws上传下载数据
  • GPT技术的崛起:改变生活与挑战未来
  • ch0_OSI 七层网络协议介绍
  • 亚马逊云科技大语言模型下的六大创新应用功能
  • acwing算法基础之数据结构--堆算法
  • ----------
  • -------------------- 第二讲-------- 第一节------在此给出链表的基本操作
  • 《网管员必读——网络组建》(第2版)电子课件下载
  • Android单元测试 - 几个重要问题
  • js算法-归并排序(merge_sort)
  • mysql外键的使用
  • php ci框架整合银盛支付
  • Python socket服务器端、客户端传送信息
  • 笨办法学C 练习34:动态数组
  • 持续集成与持续部署宝典Part 2:创建持续集成流水线
  • 从零搭建Koa2 Server
  • 分享自己折腾多时的一套 vue 组件 --we-vue
  • 理解 C# 泛型接口中的协变与逆变(抗变)
  • 使用阿里云发布分布式网站,开发时候应该注意什么?
  • 适配iPhoneX、iPhoneXs、iPhoneXs Max、iPhoneXr 屏幕尺寸及安全区域
  • 译有关态射的一切
  • 再次简单明了总结flex布局,一看就懂...
  • 责任链模式的两种实现
  • 智能网联汽车信息安全
  • 组复制官方翻译九、Group Replication Technical Details
  • ​html.parser --- 简单的 HTML 和 XHTML 解析器​
  • #android不同版本废弃api,新api。
  • #NOIP 2014# day.2 T2 寻找道路
  • #我与Java虚拟机的故事#连载10: 如何在阿里、腾讯、百度、及字节跳动等公司面试中脱颖而出...
  • (3)STL算法之搜索
  • (4)通过调用hadoop的java api实现本地文件上传到hadoop文件系统上
  • (9)YOLO-Pose:使用对象关键点相似性损失增强多人姿态估计的增强版YOLO
  • (第27天)Oracle 数据泵转换分区表
  • (二)Eureka服务搭建,服务注册,服务发现
  • (附源码)springboot家庭财务分析系统 毕业设计641323
  • (附源码)计算机毕业设计SSM在线影视购票系统
  • (六)库存超卖案例实战——使用mysql分布式锁解决“超卖”问题
  • (排序详解之 堆排序)
  • (四)TensorRT | 基于 GPU 端的 Python 推理
  • (转)JVM内存分配 -Xms128m -Xmx512m -XX:PermSize=128m -XX:MaxPermSize=512m
  • (转)linux自定义开机启动服务和chkconfig使用方法
  • (转)mysql使用Navicat 导出和导入数据库
  • (转)一些感悟
  • .net 提取注释生成API文档 帮助文档
  • .NET 中各种混淆(Obfuscation)的含义、原理、实际效果和不同级别的差异(使用 SmartAssembly)