Swoole 中通过 process 模块实现多进程

简介: Swoole 中通过 process 模块实现多进程

我们都知道,PHP 有它自带的进程控制 pcntl,Swoole 中的 process 提供了更强大的功能,直接截取了官网的一张图。


1668569485000.jpg


下面我们模拟一个 TCP 服务器,演示一下基于 process 的多进程服务。


接下来,先来看我们服务器的代码部分。我们设置子进程数为 3 个,在下面这段代码中,主进程启动之后,会额外启动 3 个子进程,负责处理客户端连接以及请求操作,当子进程退出后,主进程会重新创建新的子进程。如果主进程退出,那么子进程在处理完当前请求之后也会退出。


Server


<?php
class Server
{
    private $mpid; //主进程id
    private $pids = []; //子进程数组
    private $socket; //网络套间字
    const Max_PROCESS = 3; //最大创建进程数
    //主进程逻辑
    public function run()
    {
        $process = new \Swoole\Process(function () {
            //获取主进程id
            $this->mpid = posix_getpid();
            echo time() . " master process pid: {$this->mpid}" . PHP_EOL;
            //创建TCP服务器并获取套间字
            $this->socket = stream_socket_server("tcp://127.0.0.1:9505", $errno, $errstr);
            if (!$this->socket) {
                exit("service start error:$errstr --- $errno");
            }
            //启动子进程
            for ($i = 1; $i <= self::Max_PROCESS; $i++) {
                $this->startWorkerProcess();
            }
            echo "Waiting client connect" . PHP_EOL;
            //主进程等待子进程退出 必须是死循环
            while (1) {
                if(count($this->pids)){
                    //回收结束运行的子进程,默认为阻塞,false 表示非阻塞模式,如果失败返回 false
                    $ret = \Swoole\Process::wait(false);
                    if ($ret) {
                        echo time() . " worker process: {$ret['pid']} exit ,then new process..." . PHP_EOL;
                        //新创建一个子进程
                        $this->startWorkerProcess();
                        //从数组中删除这个已不存在的子进程 pid
                        $index=array_search($ret['pid'],$this->pids);
                        unset($this->pids[$index]);
                    }
                }
                sleep(1);  //
            }
        }, false, false);
        //把当前进程升级为守护进程
        \Swoole\Process::daemon();
        $process->start();
    }
    //创建子进程
    public function startWorkerProcess()
    {
        $process = new \Swoole\Process(function (\Swoole\Process $work) {
            $this->acceptClient($work);
        }, false, false);
        $pid = $process->start();
        $this->pids[] = $pid;
    }
    //接收客户端请求
    public function acceptClient(&$worker)
    {
        //子进程等待客户端连接,不能退出
        while (1) {
            //接收由 stream_socket_server()创建的的套间字连接
            $conn = stream_socket_accept($this->socket, -1);
            //如果定义了连接回调的地址,就调用
            if ($this->onConnect) {
                call_user_func($this->onConnect, $conn);
            }
            //开始循环读取客户端信息
            $recv = ''; //实际接收数据
            $buffer = ''; //缓冲数据
            while (1) {
                //检查主进程是否正常,如果不正常,退出子进程
                $this->checkmPid($worker);
                //读取客户端信息
                $buffer = fread($conn, 20);
                //如果没有消息
                if ($buffer === false || $buffer === '') {
                    //如果设置了了解关闭的回调函数,那么执行关闭回调
                    if ($this->onClose) {
                        call_user_func($this->onClose, $conn);
                    }
                    //等待下一个连接消息
                    break;
                }
                //消息结束的位置
                $pos = strpos($buffer, "\n");
                //没有结束符,说明还没有读完
                if (false === $pos) {
                    $recv .= $buffer; //拼接消息
                } else {              //消息读完了,处理消息
                    $recv .= trim(substr($buffer, 0, $pos + 1));
                    //如果定义了处理消息回调的函数,那就直接调用回调
                    if ($this->onMessage) {
                        call_user_func($this->onMessage, $conn, $recv);
                    }
                    //如果接收到quit 表示退出,那么关闭这个链接,等待下一个客户端连接
                    if ($recv === 'quit') {
                        echo 'client close' . PHP_EOL;
                        fclose($conn);
                        break;
                    }
                }
                //清空消息
                $recv = '';
            }
        }
    }
    public function checkmPid(&$worker)
    {
        //说明主进程不存在,子进程此时是僵尸进程,需要退出
        if (!\Swoole\Process::kill($this->mpid, 0)) {
            $worker->exit();
            echo "worker: {$worker->pid} exit" . PHP_EOL;
        }
    }
}
$server = new Server();
//连接回调
$server->onConnect = function ($conn) {
    echo "onConnect -- accepted " . stream_socket_get_name($conn, true) . PHP_EOL;
};
//接收消息回调
$server->onMessage = function ($conn, $msg) {
    echo "message is ------ $msg" . PHP_EOL;
    fwrite($conn, "received: " . $msg . "\n");
};
//关闭回调
$server->onClose = function ($conn) {
    echo "close---" . stream_socket_get_name($conn, true) . PHP_EOL;
};
$server->run();

开启服务之后,你可以通过命令查看进程 pstree -p 主进程id 或者通过命令查看运行的更多信息 ps -ef | grep Server.php, 可以看到对应进程的 id。


1668569519338.jpg


现在让我们创建一个简单的客户端连接服务器。

Client
<?php
go(function () {
    $client = new \Swoole\Coroutine\Client(SWOOLE_SOCK_TCP);
    // 尝试与指定 TCP 服务端建立连接(IP和端口号需要与服务端保持一致,超时时间为0.5秒)
    if ($client->connect("127.0.0.1", 9505, 0.5)) {
        // 建立连接后发送内容
        $client->send("hello world\n");
        // 打印接收到的消息
        echo $client->recv().PHP_EOL;
        sleep(2);
        // 关闭连接
        $client->close();
    } else {
        echo "connect failed.";
    }
});

创建一个协程客户端请求,服务端接收数据,服务端响应数据,客户端接收数据的整个过程。整理客户端 sleep 两秒之后才关闭连接。

1668569537915.jpg

然后我们来很粗糙的手动 kill 掉其中的一个子进程。父进程又重新创建一个子进程。

1668569549648.jpg

和预期一样,主进程又重新创建了一个新的子进程。

相关文章
|
4月前
|
应用服务中间件 Windows
129. SAP ABAP Update Process(更新进程)的概念和设计动机解析
129. SAP ABAP Update Process(更新进程)的概念和设计动机解析
|
4月前
|
人工智能 自然语言处理 Linux
进程(process) vs 线程(Thread)
本文主要介绍了进程和线程的基本概念、区别以及操作系统如何调度线程的方式。同时,还介绍了线程锁的核心原理和实现方式。在多线程编程中,理解进程和线程的概念以及线程锁的使用,对于保证程序的安全性和性能非常重要。
97 0
|
11月前
|
消息中间件
每日一博 - 图解进程(Process)和线程(Thread)区别联系
每日一博 - 图解进程(Process)和线程(Thread)区别联系
55 0
|
12月前
|
存储 安全 Windows
徒手帮 process explorer 找回丢失的进程列
徒手帮 process explorer 找回丢失的进程列
|
30天前
|
数据采集 并行计算 安全
Python并发编程:多进程(multiprocessing模块)
在处理CPU密集型任务时,Python的全局解释器锁(GIL)可能会成为瓶颈。为了充分利用多核CPU的性能,可以使用Python的multiprocessing模块来实现多进程编程。与多线程不同,多进程可以绕过GIL,使得每个进程在自己的独立内存空间中运行,从而实现真正的并行计算。
|
1月前
|
Python
python Process 多进程编程
python Process 多进程编程
27 1
|
1月前
|
存储 安全 Python
[python]使用标准库logging实现多进程安全的日志模块
[python]使用标准库logging实现多进程安全的日志模块
|
1月前
|
JavaScript 前端开发
nodejs process进程
nodejs process进程
22 0
|
2月前
|
Unix Linux Python
`subprocess`模块是Python中用于生成新进程、连接到它们的输入/输出/错误管道,并获取它们的返回(退出)代码的模块。
`subprocess`模块是Python中用于生成新进程、连接到它们的输入/输出/错误管道,并获取它们的返回(退出)代码的模块。
|
2月前
|
Python
在Python中,`multiprocessing`模块提供了一种在多个进程之间共享数据和同步的机制。
在Python中,`multiprocessing`模块提供了一种在多个进程之间共享数据和同步的机制。

相关实验场景

更多