当前位置: 首页 > news >正文

PHP多进程系列笔记(三)

本节讲解几个多进程的实例。

多进程实例

Master-Worker结构

下面例子实现了简单的多进程管理:

  • 支持设置最大子进程数
  • Master-Worker结构:Worker挂掉,Master进程会重新创建一个
<?php 

$pids = []; //存储子进程pid
$MAX_PROCESS = 3;//最大进程数

$pid = pcntl_fork();
if($pid <0){
    exit("fork fail\n");
}elseif($pid > 0){
    exit;//父进程退出
}else{
    // 从当前终端分离
    if (posix_setsid() == -1) {
        die("could not detach from terminal");
    }

    $id = getmypid();   
    echo time()." Master process, pid {$id}\n"; 

    for($i=0; $i<$MAX_PROCESS;$i++){
        start_worker_process();
    }

    //Master进程等待子进程退出,必须是死循环
    while(1){
        foreach($pids as $pid){
            if($pid){
                $res = pcntl_waitpid($pid, $status, WNOHANG);
                if ( $res == -1 || $res > 0 ){
                    echo time()." Worker process $pid exit, will start new... \n";
                    start_worker_process();
                    unset($pids[$pid]);
                }
            }
        }
    }
}

/**
 * 创建worker进程
 */
function start_worker_process(){
    global $pids;
    $pid = pcntl_fork();
    if($pid <0){
        exit("fork fail\n");
    }elseif($pid > 0){
        $pids[$pid] = $pid;
        // exit; //此处不可退出,否则Master进程就退出了
    }else{
        //实际代码
        $id = getmypid();   
        $rand = rand(1,3);
        echo time()." Worker process, pid {$id}. run $rand s\n"; 
        while(1){
            sleep($rand);
        }
    }
}

~~~防盗版声明:本文系原创文章,发布于公众号飞鸿影的博客(fhyblog)及博客园,转载需作者同意。~~~

多进程Server

下面我们使用多进程实现一个tcp服务器,支持:

  • 多进程处理客户端连接
  • 子进程退出,Master进程会重新创建一个
  • 支持事件回调
<?php 

class TcpServer{
    const MAX_PROCESS = 3;//最大进程数
    private $pids = []; //存储子进程pid
    private $socket;

    public function __construct(){
        $pid = pcntl_fork();
        if($pid <0){
            exit("fork fail\n");
        }elseif($pid > 0){
            exit;//父进程退出
        } else{
            // 从当前终端分离
            if (posix_setsid() == -1) {
                die("could not detach from terminal");
            }

            umask(0);

            $id = getmypid();   
            echo time()." Master process, pid {$id}\n"; 

            //创建tcp server
            $this->socket = stream_socket_server("tcp://0.0.0.0:9201", $errno, $errstr);
            if(!$this->socket) exit("start server err: $errstr --- $errno");
        }
    }

    public function run(){
        for($i=0; $i<self::MAX_PROCESS;$i++){
            $this->start_worker_process();
        }

        echo "waiting client...\n";

        //Master进程等待子进程退出,必须是死循环
        while(1){
            foreach($this->pids as $k=>$pid){
                if($pid){
                    $res = pcntl_waitpid($pid, $status, WNOHANG);
                    if ( $res == -1 || $res > 0 ){
                        echo time()." Worker process $pid exit, will start new... \n";
                        $this->start_worker_process();
                        unset($this->pids[$k]);
                    }
                }
            }
            sleep(1);//让出1s时间给CPU
        }
    }

    /**
     * 创建worker进程,接受客户端连接
     */
    private function start_worker_process(){
        $pid = pcntl_fork();
        if($pid <0){
            exit("fork fail\n");
        }elseif($pid > 0){
            $this->pids[] = $pid;
            // exit; //此处不可退出,否则Master进程就退出了
        }else{
            $this->acceptClient();
        }
    }

    private function acceptClient()
    {
        //子进程一直等待客户端连接,不能退出
        while(1){
            $conn = stream_socket_accept($this->socket, -1);
            if($this->onConnect) call_user_func($this->onConnect, $conn); //回调连接事件

            //开始循环读取消息
            $recv = ''; //实际收到消息
            $buffer = ''; //缓冲消息
            while(1){
                $buffer = fread($conn, 20);

                //没有收到正常消息
                if($buffer === false || $buffer === ''){
                    if($this->onClose) call_user_func($this->onClose, $conn); //回调断开连接事件
                    break;//结束读取消息,等待下一个客户端连接
                }

                $pos = strpos($buffer, "\n"); //消息结束符
                if($pos === false){
                    $recv .= $buffer;                            
                }else{
                    $recv .= trim(substr($buffer, 0, $pos+1));

                    if($this->onMessage) call_user_func($this->onMessage, $conn, $recv); //回调收到消息事件

                    //客户端强制关闭连接
                    if($recv == "quit"){
                        echo "client close conn\n";
                        fclose($conn);
                        break;
                    }

                    $recv = ''; //清空消息,准备下一次接收
                }
            }
        }
    }

    function __destruct() {
        @fclose($this->socket);
    }
}

$server =  new TcpServer();

$server->onConnect = function($conn){
    echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "\n";
    fwrite($conn,"conn success\n");
};

$server->onMessage = function($conn,$msg){
    echo "onMessage --" . $msg . "\n";
    fwrite($conn,"received ".$msg."\n");
};

$server->onClose = function($conn){
    echo "onClose --" . stream_socket_get_name($conn,true) . "\n";
    fwrite($conn,"onClose "."\n");
};

$server->run();

运行:

$ php process_multi.server.php 
1528734803 Master process, pid 9110
waiting client...

此时服务端已经变成守护进程了。新开终端,我们使用ps命令查看进程:

$ ps -ef | grep php
yjc       9110     1  0 00:33 ?        00:00:00 php process_multi.server.php
yjc       9111  9110  0 00:33 ?        00:00:00 php process_multi.server.php
yjc       9112  9110  0 00:33 ?        00:00:00 php process_multi.server.php
yjc       9113  9110  0 00:33 ?        00:00:00 php process_multi.server.php
yjc       9134  8589  0 00:35 pts/1    00:00:00 grep php

可以看到4个进程:1个主进程,3个子进程。使用kill命令结束子进程,主进程会重新拉起一个新的子进程。

然后我们使用telnet测试连接:

$ telnet 127.0.0.1 9201
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
conn success
hello server!
received hello server!
quit
received quit
Connection closed by foreign host.

转载于:https://www.cnblogs.com/52fhy/p/9211505.html

相关文章:

  • Netty基础
  • 探讨.NET Core的未来
  • 前端每日实战:61# 视频演示如何用纯 CSS 创作一只咖啡壶
  • Mantis 1.3.13报表显示问题
  • SQL Server 与 DSN
  • 关于怎么在手机端实现一个拖拽的操作
  • 每日笔记之2018-06-26之小总结
  • emoji web端处理
  • libvirt-qemu-虚拟机设备热插拔
  • 【刷算法】从上往下打印二叉树
  • 温故之.NET 中的并行并发概念解析
  • 命名实体识别从数据集到算法实现
  • 过滤器的功能实现
  • ps:建立规则选区
  • 巴克莱银行聚焦于业务产出的做法
  • [原]深入对比数据科学工具箱:Python和R 非结构化数据的结构化
  • 【译】React性能工程(下) -- 深入研究React性能调试
  • 2018以太坊智能合约编程语言solidity的最佳IDEs
  • Android Volley源码解析
  • bootstrap创建登录注册页面
  • C# 免费离线人脸识别 2.0 Demo
  • CSS中外联样式表代表的含义
  • ES6 ...操作符
  • IE报vuex requires a Promise polyfill in this browser问题解决
  • Java 网络编程(2):UDP 的使用
  • Perseus-BERT——业内性能极致优化的BERT训练方案
  • sublime配置文件
  • vue中实现单选
  • 分布式任务队列Celery
  • 如何利用MongoDB打造TOP榜小程序
  • 使用前端开发工具包WijmoJS - 创建自定义DropDownTree控件(包含源代码)
  • 为物联网而生:高性能时间序列数据库HiTSDB商业化首发!
  • 在weex里面使用chart图表
  • Nginx惊现漏洞 百万网站面临“拖库”风险
  • Nginx实现动静分离
  • 扩展资源服务器解决oauth2 性能瓶颈
  • 曜石科技宣布获得千万级天使轮投资,全方面布局电竞产业链 ...
  • ​​​​​​​GitLab 之 GitLab-Runner 安装,配置与问题汇总
  • ​​​​​​​Installing ROS on the Raspberry Pi
  • ​Java并发新构件之Exchanger
  • ​Linux Ubuntu环境下使用docker构建spark运行环境(超级详细)
  • ​Z时代时尚SUV新宠:起亚赛图斯值不值得年轻人买?
  • ​第20课 在Android Native开发中加入新的C++类
  • ​渐进式Web应用PWA的未来
  • ​你们这样子,耽误我的工作进度怎么办?
  • $HTTP_POST_VARS['']和$_POST['']的区别
  • (poj1.3.2)1791(构造法模拟)
  • (zt)最盛行的警世狂言(爆笑)
  • (顶刊)一个基于分类代理模型的超多目标优化算法
  • (三)mysql_MYSQL(三)
  • (算法设计与分析)第一章算法概述-习题
  • (转)linux自定义开机启动服务和chkconfig使用方法
  • .NET 5.0正式发布,有什么功能特性(翻译)
  • .NET CF命令行调试器MDbg入门(三) 进程控制
  • .NET Core WebAPI中使用swagger版本控制,添加注释