当前位置: 首页 > news >正文

浅谈C#之ConcurrentQueue

一、基本介绍

ConcurrentQueue<T> 是一个线程安全的队列,它允许多个线程同时对队列进行操作而不会相互干扰。它是 System.Collections.Concurrent 命名空间下的一个类,提供了基本的队列操作,如 Enqueue(入队)、TryDequeue(尝试出队)、TryPeek(尝试查看队首元素)等,并且是线程安全的。

二、关键特性

线程安全:不需要额外的同步机制,就可以在多线程环境中安全地使用。

无锁:内部使用原子操作来保证线程安全,通常比使用锁有更好的性能。

阻塞操作:虽然 ConcurrentQueue<T> 本身不提供阻塞操作,但可以与其他同步原语(如 SemaphoreSlim 或 CancellationToken)结合使用来实现阻塞行为

三、简单示例

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){ConcurrentQueue<int> queue = new ConcurrentQueue<int>();CancellationTokenSource cts = new CancellationTokenSource();// 生产者线程Task producer = Task.Run(() =>{for (int i = 0; i < 10; i++){queue.Enqueue(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100); // 模拟工作}}, cts.Token);// 消费者线程Task consumer = Task.Run(() =>{while (!cts.Token.IsCancellationRequested){if (queue.TryDequeue(out int item)){Console.WriteLine($"Consumed: {item}");}else{Thread.Yield(); // 让出 CPU 时间片}}}, cts.Token);// 等待一段时间,然后取消任务Thread.Sleep(1500);cts.Cancel();Task.WaitAll(producer, consumer);}
}

四、完整示例

1.与 BlockingCollection<T> 结合使用

BlockingCollection<T> 是一个线程安全的集合,提供了数据结构和同步原语的组合,可以与 ConcurrentQueue<T> 结合使用来实现生产者-消费者模式。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){BlockingCollection<int> blockingCollection = new BlockingCollection<int>(new ConcurrentQueue<int>(), 10);Task producer = Task.Run(() =>{for (int i = 0; i < 20; i++){blockingCollection.Add(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100);}blockingCollection.CompleteAdding();});Task consumer = Task.Run(() =>{foreach (var item in blockingCollection.GetConsumingEnumerable()){Console.WriteLine($"Consumed: {item}");Thread.Sleep(150);}});Task.WaitAll(producer, consumer);}
}

2. 使用 CancellationToken 实现优雅的取消

ConcurrentQueue<T> 可以与 CancellationToken 结合使用,以实现任务的优雅取消。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){ConcurrentQueue<int> queue = new ConcurrentQueue<int>();CancellationTokenSource cts = new CancellationTokenSource();Task producer = Task.Run(() =>{for (int i = 0; i < 20; i++){if (cts.Token.IsCancellationRequested){Console.WriteLine("Cancellation requested");return;}queue.Enqueue(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100);}}, cts.Token);Task consumer = Task.Run(() =>{while (!cts.Token.IsCancellationRequested){if (queue.TryDequeue(out int item)){Console.WriteLine($"Consumed: {item}");}else{Thread.Yield();}}}, cts.Token);Thread.Sleep(1500);cts.Cancel();Task.WaitAll(producer, consumer);}
}

与 SemaphoreSlim 实现并发控制

SemaphoreSlim 可以与 ConcurrentQueue<T> 结合使用,以控制同时访问资源的线程数量。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){ConcurrentQueue<int> queue = new ConcurrentQueue<int>();SemaphoreSlim semaphore = new SemaphoreSlim(3);Task producer = Task.Run(() =>{for (int i = 0; i < 20; i++){semaphore.Wait();queue.Enqueue(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100);semaphore.Release();}});Task consumer = Task.Run(() =>{while (!queue.IsEmpty){semaphore.Wait();if (queue.TryDequeue(out int item)){Console.WriteLine($"Consumed: {item}");Thread.Sleep(150);}semaphore.Release();}});Task.WaitAll(producer, consumer);}
}

 使用 IProducerConsumerCollection<T> 接口

ConcurrentQueue<T> 实现了 IProducerConsumerCollection<T> 接口,这使得它可以与任何需要这种接口的 API 一起使用。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;class Program
{static void Main(){IProducerConsumerCollection<int> collection = new ConcurrentQueue<int>();Task producer = Task.Run(() =>{for (int i = 0; i < 20; i++){collection.TryAdd(i);Console.WriteLine($"Produced: {i}");Thread.Sleep(100);}});Task consumer = Task.Run(() =>{while (collection.TryTake(out int item)){Console.WriteLine($"Consumed: {item}");Thread.Sleep(150);}});Task.WaitAll(producer, consumer);}
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 表情迁移大法,LivePortrait 帮你快速处理图片!
  • LabVIEW制系统开发流程介绍
  • Ubuntu报错:正在等待缓存锁:无法获得锁 /var/lib/dpkg/lock-frontend 锁正由进程 7647
  • TypeScript 在前端开发中的规范化应用
  • 一键快速制作和印刷样本册,推荐一个优质网站:FLBOOK
  • 安卓玩机工具-----适合安卓机型的“搞机工具箱” 功能齐全 玩机推荐
  • Leangoo敏捷工具在缺陷跟踪(BUG)管理中的高效应用
  • 软考基础知识之性能指标
  • 【转载】golang内存分配
  • 《JavaEE进阶》----10.<SpringMVC应用分层:【三层架构】>
  • StarRocks 培训课程重磅上线!专家出品,助你升级打怪不走弯路!
  • A Tutorial on Near-Field XL-MIMO Communications Towards 6G【论文阅读笔记】
  • 前端正确设置资源上下文路径ContextPath(发布目录outDir 、公共基础路径),保证打包部署后站点能正常加载资源。
  • Session、Cookies 和 Token 的关系详解
  • 跨国公司研发战略调整与中国IT产业的未来
  • 网络传输文件的问题
  • 【划重点】MySQL技术内幕:InnoDB存储引擎
  • CAP理论的例子讲解
  • Debian下无root权限使用Python访问Oracle
  • ES6系列(二)变量的解构赋值
  • go append函数以及写入
  • gulp 教程
  • idea + plantuml 画流程图
  • JavaScript异步流程控制的前世今生
  • nfs客户端进程变D,延伸linux的lock
  • Python实现BT种子转化为磁力链接【实战】
  • Vue.js-Day01
  • Vue.js源码(2):初探List Rendering
  • windows下如何用phpstorm同步测试服务器
  • 百度小程序遇到的问题
  • 分布式熔断降级平台aegis
  • 老板让我十分钟上手nx-admin
  • 如何实现 font-size 的响应式
  • 时间复杂度与空间复杂度分析
  • PostgreSQL之连接数修改
  • 浅谈sql中的in与not in,exists与not exists的区别
  • ​​​​​​​​​​​​​​汽车网络信息安全分析方法论
  • # 数仓建模:如何构建主题宽表模型?
  • #、%和$符号在OGNL表达式中经常出现
  • #70结构体案例1(导师,学生,成绩)
  • #define、const、typedef的差别
  • (16)Reactor的测试——响应式Spring的道法术器
  • (3)Dubbo启动时qos-server can not bind localhost22222错误解决
  • (3)选择元素——(17)练习(Exercises)
  • (分享)一个图片添加水印的小demo的页面,可自定义样式
  • (附源码)ssm本科教学合格评估管理系统 毕业设计 180916
  • (附源码)计算机毕业设计SSM疫情下的学生出入管理系统
  • (解决办法)ASP.NET导出Excel,打开时提示“您尝试打开文件'XXX.xls'的格式与文件扩展名指定文件不一致
  • (十三)Maven插件解析运行机制
  • (四)模仿学习-完成后台管理页面查询
  • (学习日记)2024.01.09
  • (一)utf8mb4_general_ci 和 utf8mb4_unicode_ci 适用排序和比较规则场景
  • (转)PlayerPrefs在Windows下存到哪里去了?
  • .NET delegate 委托 、 Event 事件
  • .Net IOC框架入门之一 Unity