当前位置: 首页 > news >正文

Rust编程中的线程间通信

1.消息传递

为了实现消息传递并发,Rust 标准库提供了一个 信道channel)实现。信道是一个通用编程概念,表示数据从一个线程发送到另一个线程。

可以将编程中的信道想象为一个水流的渠道,比如河流或小溪。如果你将诸如橡皮鸭或小船之类的东西放入其中,它们会顺流而下到达下游。编程中的信息渠道(信道)有两部分组成,一个发送者(transmitter)和一个接收者(receiver)。发送者位于上游位置,在这里可以将橡皮鸭放入河中,接收者则位于下游,橡皮鸭最终会漂流至此。代码中的一部分调用发送者的方法以及希望发送的数据,另一部分则检查接收端收到的消息。当发送者或接收者任一被丢弃时可以认为信道被 关闭closed)了。

下面的代码我们创建了一个信道但没有做任何事。注意这还不能编译,因为 Rust 不知道想要在信道中发送什么类型:

use std::sync::mpsc;
​
fn main() {let (tx, rx) = mpsc::channel();
}

这里使用 mpsc::channel 函数创建一个新的信道;mpsc多个生产者,单个消费者multiple producer, single consumer)的缩写。简而言之,Rust 标准库实现信道的方式意味着一个信道可以有多个产生值的 发送sending)端,但只能有一个消费这些值的 接收receiving)端。想象一下多条小河小溪最终汇聚成大河:所有通过这些小河发出的东西最后都会来到下游的大河。目前我们以单个生产者开始,但是当示例可以工作后会增加多个生产者。

mpsc::channel 函数返回一个元组:第一个元素是发送端 -- 发送者,而第二个元素是接收端 -- 接收者。由于历史原因,txrx 通常作为 发送者transmitter)和 接收者receiver)的缩写,所以这就是我们将用来绑定这两端变量的名字。这里使用了一个 let 语句和模式来解构了此元组;

现在将发送端移动到一个新建线程中并发送一个字符串,这样新建线程就可以和主线程通讯了,代码如下:

use std::sync::mpsc;
use std::thread;
​
fn main() {let (tx, rx) = mpsc::channel();
​thread::spawn(move || {let val = String::from("hi");tx.send(val).unwrap();});
}

这里再次使用 thread::spawn 来创建一个新线程并使用 movetx 移动到闭包中这样新建线程就拥有 tx 了。新建线程需要拥有信道的发送端以便能向信道发送消息。信道的发送端有一个 send 方法用来获取需要放入信道的值。send 方法返回一个 Result<T, E> 类型,所以如果接收端已经被丢弃了,将没有发送值的目标,所以发送操作会返回错误。在这个例子中,出错的时候调用 unwrap 产生 panic。

我们在主线程中从信道的接收者获取值。这类似于在河的下游捞起橡皮鸭或接收聊天信息,代码如下:

use std::sync::mpsc;
use std::thread;
​
fn main() {let (tx, rx) = mpsc::channel();
​thread::spawn(move || {let val = String::from("hi");tx.send(val).unwrap();});
​let received = rx.recv().unwrap();println!("Got: {}", received);
}

信道的接收者有两个有用的方法:recvtry_recv。这里,我们使用了 recv,它是 receive 的缩写。这个方法会阻塞主线程执行直到从信道中接收一个值。一旦发送了一个值,recv 会在一个 Result<T, E> 中返回它。当信道发送端关闭,recv 会返回一个错误表明不会再有新的值到来了。

try_recv 不会阻塞,相反它立刻返回一个 Result<T, E>Ok 值包含可用的信息,而 Err 值代表此时没有任何消息。如果线程在等待消息过程中还有其他工作时使用 try_recv 很有用:可以编写一个循环来频繁调用 try_recv,在有可用消息时进行处理,其余时候则处理一会其他工作直到再次检查。

出于简单的考虑,这个例子使用了 recv;主线程中除了等待消息之外没有任何其他工作,所以阻塞主线程是合适的。

我们将会看到主线程打印出这个值:

2.信道与所有权转移

所有权规则在消息传递中扮演了重要角色,其有助于我们编写安全的并发代码。防止并发编程中的错误是在 Rust 程序中考虑所有权的一大优势。现在让我们做一个试验来看看信道与所有权如何一同协作以避免产生问题:我们将尝试在新建线程中的信道中发送完 val之后 再使用它。

看下面的代码:

use std::sync::mpsc;
use std::thread;
​
fn main() {let (tx, rx) = mpsc::channel();
​thread::spawn(move || {let val = String::from("hi");tx.send(val).unwrap();println!("val is {}", val);});
​let received = rx.recv().unwrap();println!("Got: {}", received);
}

这里尝试在通过 tx.send 发送 val 到信道中之后将其打印出来。但这么做有个隐患:一旦将值发送到另一个线程后,那个线程可能会在我们再次使用它之前就将其修改或者丢弃。其他线程对值可能的修改会由于不一致或不存在的数据而导致错误或意外的结果。

当编译这段代码时,Rust给出一个错误:

上面的隐患会造成一个编译时错误。send 函数获取其参数的所有权并移动这个值归接收者所有。这可以防止在发送后再次意外地使用这个值;所有权系统检查一切是否合乎规则。

3.发送多个值并观察接收者的等待

先看一段代码:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
​
fn main() {let (tx, rx) = mpsc::channel();
​thread::spawn(move || {let vals = vec![String::from("hi"),String::from("from"),String::from("the"),String::from("thread"),];
​for val in vals {tx.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});
​for received in rx {println!("Got: {}", received);}
}

这一次,在新建线程中有一个字符串 vector 希望发送到主线程。我们遍历它们,单独的发送每一个字符串并通过一个 Duration 值调用 thread::sleep 函数来暂停一秒。

在主线程中,不再显式调用 recv 函数:而是将 rx 当作一个迭代器。对于每一个接收到的值,我们将其打印出来。当信道被关闭时,迭代器也将结束。编译这段代码,结果如下:

因为主线程中的 for 循环里并没有任何暂停或等待的代码,所以可以说主线程是在等待从新建线程中接收值。

4.通过克隆发送者创建多个生产者

之前我们提到了mpscmultiple producer, single consumer 的缩写。可以运用 mpsc 来扩展创建向同一接收者发送值的多个线程。这可以通过克隆发送者来做到, 看下面的代码:

let (tx, rx) = mpsc::channel();
​let tx1 = tx.clone();thread::spawn(move || {let vals = vec![String::from("hi"),String::from("from"),String::from("the"),String::from("thread"),];
​for val in vals {tx1.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});
​thread::spawn(move || {let vals = vec![String::from("more"),String::from("messages"),String::from("for"),String::from("you"),];
​for val in vals {tx.send(val).unwrap();thread::sleep(Duration::from_secs(1));}});
​for received in rx {println!("Got: {}", received);}

这一次,在创建新线程之前,对发送者调用了 clone 方法。这会给我们一个可以传递给第一个新建线程的发送端句柄。我们会将原始的信道发送端传递给第二个新建线程。这样就会有两个线程,每个线程将向信道的接收端发送不同的消息。编译代码执行结果如下:

不同的系统可能会看到这些值以不同的顺序出现;这也就是并发既有趣又困难的原因。如果通过 thread::sleep 做实验,在不同的线程中提供不同的值,就会发现它们的运行更加不确定,且每次都会产生不同的输出。

相关文章:

  • 行业洞察:分布式云如何助力媒体与娱乐业实现创新与增长?
  • 【PC】开发者日志:竞技比赛验证系统强化
  • 14. 机器学习——kNN
  • NOIP2023模拟16联测37 小猫吃火龙果
  • PHP7使用C++扩展开发
  • Java通过JNI技术调用C++动态链接库的helloword测试
  • 考研数学笔记:线性代数中抽象矩阵性质汇总
  • 33、Flink 的Table API 和 SQL 中的时区
  • 免root修改手机imei的技术原理是什么?如何实现的?hook吗
  • sam altman推荐的最好的AI工具
  • 网络运维Day14
  • 【计算机网络笔记】IP子网划分与子网掩码
  • 牛客网刷题笔记131111 Python实现LRU+二叉树先中后序打印+SQL并列排序
  • ◢Django 自写分页与使用
  • php和java对比
  • [nginx文档翻译系列] 控制nginx
  • 【159天】尚学堂高琪Java300集视频精华笔记(128)
  • Docker入门(二) - Dockerfile
  • dva中组件的懒加载
  • github指令
  • iOS仿今日头条、壁纸应用、筛选分类、三方微博、颜色填充等源码
  • isset在php5.6-和php7.0+的一些差异
  • Linux CTF 逆向入门
  • Node + FFmpeg 实现Canvas动画导出视频
  • node入门
  • python大佬养成计划----difflib模块
  • Python利用正则抓取网页内容保存到本地
  • Python实现BT种子转化为磁力链接【实战】
  • Sass 快速入门教程
  • vuex 学习笔记 01
  • Vue组件定义
  • WordPress 获取当前文章下的所有附件/获取指定ID文章的附件(图片、文件、视频)...
  • 半理解系列--Promise的进化史
  • 讲清楚之javascript作用域
  • 警报:线上事故之CountDownLatch的威力
  • 前端 CSS : 5# 纯 CSS 实现24小时超市
  • 使用Gradle第一次构建Java程序
  • 问:在指定的JSON数据中(最外层是数组)根据指定条件拿到匹配到的结果
  • 学习JavaScript数据结构与算法 — 树
  • ​什么是bug?bug的源头在哪里?
  • #NOIP 2014# day.1 生活大爆炸版 石头剪刀布
  • #微信小程序:微信小程序常见的配置传旨
  • #我与Java虚拟机的故事#连载06:收获颇多的经典之作
  • (Matalb时序预测)WOA-BP鲸鱼算法优化BP神经网络的多维时序回归预测
  • (zt)基于Facebook和Flash平台的应用架构解析
  • (二)WCF的Binding模型
  • (分布式缓存)Redis持久化
  • (附源码)php新闻发布平台 毕业设计 141646
  • (附源码)spring boot基于Java的电影院售票与管理系统毕业设计 011449
  • (蓝桥杯每日一题)love
  • (顺序)容器的好伴侣 --- 容器适配器
  • (一)C语言之入门:使用Visual Studio Community 2022运行hello world
  • (转)Scala的“=”符号简介
  • .desktop 桌面快捷_Linux桌面环境那么多,这几款优秀的任你选
  • .NET Remoting Basic(10)-创建不同宿主的客户端与服务器端