当前位置: 首页 > news >正文

匿名管道在进程池中的应用案例

目录

  • 1. 管理多个管道文件
  • 2. 初始化创建出来的管道
  • 3. 子进程的工作
  • 4. 模拟任务模块
  • 5. 父进程分发任务
  • 6. 清理释放子进程
  • 7. 优化

本案例是基于 【进程间通信(一)】【管道通信(下)】 中对于管道的应用场景做的一个代码案例,可以先观看管道通信相关的文章,以得到该案例更好的观感体验。

1. 管理多个管道文件

父进程创建了一批子进程,每个子进程与父进程要实现进程通信,就要创建一个管道文件,作为双方通信的资源地。操作系统需要对系统中存在的大量进程、打开的文件等内核结构进行管理,因此我们创建出来的管道,也要进行管理。

const int processNum = 10;// 管理通信信道 --- 先描述
class channel
{
public:channel(int cmdfd, pid_t slaverId, const std::string processName): _cmdfd(cmdfd), _slaverId(slaverId), _processName(processName){}int _cmdfd;               // 发送任务的文件描述符pid_t _slaverId;          // 子进程 pidstd::string _processName; // 子进程名字
};// 管理通信信道 --- 后组织// father -> w, child -> rstd::vector<channel> channels;

2. 初始化创建出来的管道

使用循环结构 + fork() 创建多个子进程。父进程对管道做写入,子进程读取,以此来实现父进程通过管道通信对子进程派发任务的案例。

因此创建完子进程后,子进程要把写端关闭,只保留管道的读端。同样的,父进程则关闭对管道的读端。

// version-1(有点小bug,后面优化)
void InitProcessPool(std::vector<channel> *channels)
{for (int i = 0; i < processNum; ++i){int pipefd[2];      int n = pipe(pipefd);       // 提前创建管道assert(n == 0);(void)n;pid_t id = fork();  // child processif (id == 0){close(pipefd[1]);       // 关闭子进程对管道的写端dup2(pipefd[0], 0);     // 子进程对管道的读端做重定向到标准输入,弱化管道的概念,read 直接读 0 fd即可close(pipefd[0]);       // 这一步可做可不做slaver();     std::cout << "child process(" << getpid() << ") quit!\n";      exit(0);            }// father processclose(pipefd[0]);       // 关闭父进程对管道的读端// 添加 channel 字段std::string name = "process-" + std::to_string(i);channels->push_back(channel(pipefd[1], id, name));}
}

3. 子进程的工作

因为在初始化中,我们对所有子进程的读端做了重定向,因此在子进程的操作中,弱化了管道的概念,不需要传参指明文件描述符,直接对重定向后的 fd 做读取即可。这里人为规定好,子进程一次读4 bytes。

void slaver()
{while (true){int cmdCode = 0;// 规定通信协议:一次读 4 bytesint n = read(0, &cmdCode, sizeof(int));     // 因为对管道的读端做了重定向到标准输入,因此都是从 0 fd中读取if (n == sizeof(int)){std::cout << "[slaver process-" << getpid() << "] get a command, cmdcode: " << cmdCode << std::endl;if (cmdCode >= 0 && cmdCode < tasks.size()) tasks[cmdCode]();}if (n == 0) break;       // 读到为结尾的本质是父进程不再往管道写入数据了,因此子进程可以退出了,没有必要继续读取了。}
}

4. 模拟任务模块

#pragma once#include <iostream>
#include <vector>typedef void(*task_t)();void task1()
{std::cout << "task1" << "\n";
}
void task2()
{std::cout << "task2" << "\n";
}
void task3()
{std::cout << "task3" << "\n";
}
void task4()
{std::cout << "task4" << "\n";
}void LoadTask(std::vector<task_t> *tasks)
{tasks->push_back(task1);tasks->push_back(task2);tasks->push_back(task3);tasks->push_back(task4);
}

5. 父进程分发任务

分发任务需要解决几个问题

  • 选择哪个任务 ---- 随机数做选择,后续可以优化为用户交互来选择任务。
  • 选择哪个子进程 ----- 可以是随机数,也可以轮询分配,只要保证负载均衡即可(即不要一直在某几个子进程执行即可,要尽可能的 “公平” )
  • 最后发送任务,即与子进程进行通信,写入文件的系统接口是 write,管道文件也是文件,因此直接 write 写入数据即可。这里人为规定好,父进程一次写 4 bytes。
void Menu()
{std::cout << " -------------------------------------" << std::endl;std::cout << "|   1. task1             2. task2     |" << std::endl;std::cout << "|   3. task3             4. task4     |" << std::endl;std::cout << "|             0. exit                 |" << std::endl;std::cout << " ------------------------------------- " << std::endl;
}void ContralSlaver(const std::vector<channel> &channels)
{int which = 0;      while (true){Menu();int input = 0;std::cout << "Please Enter@ ";std::cin >> input;if(input <= 0 || input > tasks.size()) break;// 1. 选择任务// int cmdCode = rand() % tasks.size();int cmdCode = input - 1;// 2. 选择进程(这一步需要保证负载平衡,可以采用随机数或者轮询的策略)// int processId = rand() % channels.size();std::cout << "[father process]# " << "cmdcode: " << cmdCode << " is already send to [" << channels[which]._slaverId << "] process name: "<< channels[which]._processName << std::endl;// 3. 发送任务// 规定通信协议:一次写 4 byteswrite(channels[which]._cmdfd, &cmdCode, sizeof(cmdCode));++which;which %= channels.size();       // 在选择进程执行任务时,采用轮询的策略// sleep(1);}
}

6. 清理释放子进程

void QuitProcess(const std::vector<channel> &channels)
{// for (const auto &c : channels)// {// 	   close(c._cmdfd);// 	   waitpid(c._slaverId, nullptr, 0);// }for (const auto &c : channels) close(c._cmdfd);sleep(5);for (const auto &c : channels) waitpid(c._slaverId, nullptr, 0);
}

这个功能模块着重需要注意,如果以上面的代码为基准的话,那么这里不能在一个 for 循环内执行关闭 fd 和 waitpid 的操作。在上面初始化的模块中,是有点问题存在的。子进程创建时,会继承父进程的很多东西,包括父进程的 pcb 中的部分字段,其中就包括文件描述符表,因此父进程每创建一个子进程,就要创建一个管道文件,就会多打开一个文件描述符,然后指向新建的管道。而后续 fork 创建子进程时,子进程直接就继承了父进程这张文件描述符啊。也就是说,越到后面,子进程继承下来的文件描述符表中的写端就越来越多。

举个例子,假如创建10个子进程,每创建一个子进程的同时会创建一个管道文件,那么父进程就有一个 fd 指向该管道文件的写端。第 1 个子进程创建时,父进程有了指向管道写端的第一个 fd;第二个子进程创建时,继承父进程的文件描述符表,同时创建管道文件,因此第二个子进程有两个 fd 写端打开着,一个指向上一个创建的管道文件的写端,一个指向与该子进程相关的管道文件的写端。。。。以此类推,到创建第10个子进程时,该子进程的文件描述符表中,会多了 9 个 fd 指向与别的子进程相关的管道的写端。

所以在 close 和 waitpid 时,如果直接一个循环解决,那么父进程等待子进程时就会一直陷入阻塞状态。因为关闭了指向第一个管道的写端,还有剩下 9 个子进程指向该管道的写端!只要还要文件描述符指向该写端,那么子进程就无法退出!子进程无法退出,父进程就等不到子进程,就要一直阻塞!

弄清楚了原理,我们自然就能够知道最简单粗暴的解决方式。要把该管道的所有写端全部关闭完了,这个管道的读端才可以不需要继续做读取,相关的进程才能够退出。而最后创建的那个管道的写端,是只有一个子进程指向它的(即最后创建的子进程),因此倒序遍历 channel 就能够解决该问题。

还是举个例子方便大家理解,第一个子进程要退出,需要关闭第一个管道文件的写端,而第一个管道的写端,所有子进程都有,因为剩下的九个进程继承了父进程的文件描述符,它们的表中都有 fd 指向这个管道的写端。而第二个子进程要退出,需要关闭第二个管道的写端,那么就需要关闭后续8个进程的写端。所以倒过来退出子进程的话,关闭倒数第二个管道文件的写端(倒数第一个可以直接关闭,没有其它进程的 fd_array[ ] 指向它了),并且倒数第二个子进程退出后,该子进程指向其它的管道的写端,自然就被关闭了,这样就能够关闭倒数第三个管道文件的写端,然后退出子进程,就这样以此类推。。。

因此除了先把全部的写端关闭了,还可以这样写:

void QuitProcess(const std::vector<channel> &channels)
{// version1 for(int i = channels.size() - 1; i >= 0; --i){close(channels[i]._cmdfd);waitpid(channels[i]._slaverId, nullptr, 0);}
}

7. 优化

在初始化模块时就解决这个问题,记录所有的管道写端 fd,然后每创建出一个子进程,将其继承下来的所有不属于该进程的 fd 全部关闭,这样后续 QuitProcess 清理释放子进程时,就可以一个循环解决,也不需要倒叙了。

void InitProcessPool(std::vector<channel> *channels)
{// version 2: 确保每一个子进程都只有一个写端std::vector<int> oldfds;    // 记录父进程打开的所有fd,即管道写端for (int i = 0; i < processNum; ++i){...pid_t id = fork();  if (id == 0){// 关闭之前所有管道的写端for(auto fd : oldfds) close(fd);  ...}...oldfds.push_back(pipefd[1]);}
}void QuitProcess(const std::vector<channel> &channels)
{// version2for (const auto &c : channels){close(c._cmdfd);waitpid(c._slaverId, nullptr, 0);}
}

如果感觉该篇文章给你带来了收获,可以 点赞👍 + 收藏⭐️ + 关注➕ 支持一下!

感谢各位观看!

相关文章:

  • 【学习笔记】MIPI
  • Linux驱动开发(速记版)--平台总线
  • Java NIO 全面详解:掌握 `Path` 和 `Files` 的一切
  • C语言 | Leetcode C语言题解之第435题无重叠区间
  • go语言 常用的web框架
  • MySQL优化相关(持续积累...)
  • 电影票接口api对接有哪些优势?
  • 无源码实现免登录功能
  • 如何用ChatGPT制作一款手机游戏应用
  • 10.1 刷题
  • 大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
  • unity一键注释日志和反注释日志
  • Linux启动mysql报错
  • 字符和Ascll码表
  • android compose ScrollableTabRow indicator 指示器设置宽度
  • Angular 2 DI - IoC DI - 1
  • ECMAScript 6 学习之路 ( 四 ) String 字符串扩展
  • ECMAScript6(0):ES6简明参考手册
  • ECMAScript入门(七)--Module语法
  • Java 实战开发之spring、logback配置及chrome开发神器(六)
  • java8 Stream Pipelines 浅析
  • JS基础之数据类型、对象、原型、原型链、继承
  • storm drpc实例
  • 从零搭建Koa2 Server
  • 浅谈web中前端模板引擎的使用
  • 数组大概知多少
  • 微信小程序实战练习(仿五洲到家微信版)
  • 支付宝花15年解决的这个问题,顶得上做出十个支付宝 ...
  • ​IAR全面支持国科环宇AS32X系列RISC-V车规MCU
  • ​浅谈 Linux 中的 core dump 分析方法
  • #includecmath
  • #数据结构 笔记三
  • $jQuery 重写Alert样式方法
  • (02)Unity使用在线AI大模型(调用Python)
  • (12)目标检测_SSD基于pytorch搭建代码
  • (3)医疗图像处理:MRI磁共振成像-快速采集--(杨正汉)
  • (C)一些题4
  • (CPU/GPU)粒子继承贴图颜色发射
  • (Matalb时序预测)PSO-BP粒子群算法优化BP神经网络的多维时序回归预测
  • (二)Eureka服务搭建,服务注册,服务发现
  • (论文阅读22/100)Learning a Deep Compact Image Representation for Visual Tracking
  • (论文阅读31/100)Stacked hourglass networks for human pose estimation
  • (免费领源码)Java#Springboot#mysql农产品销售管理系统47627-计算机毕业设计项目选题推荐
  • (亲测成功)在centos7.5上安装kvm,通过VNC远程连接并创建多台ubuntu虚拟机(ubuntu server版本)...
  • (五)网络优化与超参数选择--九五小庞
  • (转)GCC在C语言中内嵌汇编 asm __volatile__
  • *上位机的定义
  • .bat批处理出现中文乱码的情况
  • .desktop 桌面快捷_Linux桌面环境那么多,这几款优秀的任你选
  • .gitignore文件忽略的内容不生效问题解决
  • .NET 8 编写 LiteDB vs SQLite 数据库 CRUD 接口性能测试(准备篇)
  • .NET DevOps 接入指南 | 1. GitLab 安装
  • .NET设计模式(2):单件模式(Singleton Pattern)
  • @NotNull、@NotEmpty 和 @NotBlank 区别
  • [ vulhub漏洞复现篇 ] Django SQL注入漏洞复现 CVE-2021-35042