php把运行时重构成多进程 + 每进程内协程 的混合模式完整流程==swoole
·
Swoole 是什么模式9 )
一句话说清楚
Swoole 是 多进程 + 每进程内协程 的混合模式。
---
进程结构图
php server.php 启动后,内存里长这样:
┌─────────────────────────────────────────┐
│ Master进程(1个) │
│ 只负责:监听端口、管理子进程 │
│ 自己不处理任何请求 │
└──────────────┬──────────────────────────┘
│
▼
┌─────────────────────────────────────────┐
│ Manager进程(1个) │
│ 只负责:盯着Worker,死了就重启 │
└──────┬───────────────────┬──────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Worker进程1 │ │ Worker进程2 │ ← 有几个CPU核就开几个
│ │ │ │
│ 协程A ←──┐ │ │ 协程A │
│ 协程B │ │ │ 协程B │
│ 协程C │ │ │ 协程C │
│ ... │ │ │ ... │
│ (可以几万个)│ │ │
└─────────────┘ └─────────────┘
↑
│
每来一个请求,Swoole自动创建一个协程处理它
---
三层模式分别是什么
第一层:多进程(进程间完全隔离)
Worker1 崩了 → 不影响 Worker2
Worker1 的内存 → Worker2 完全看不到
Worker1 的DB连接池 → 和 Worker2 各自独立
好处:一个Worker出问题,其他Worker继续跑
坏处:进程间通信麻烦,要用共享内存或消息队列
第二层:协程(进程内并发)
协程不是线程,没有真正的并行
是"主动让出CPU"的切换方式:
协程A:我要查数据库,要等200ms,我先让出去
协程B:我上!我要查Redis,要等5ms,我先让出去
协程C:我上!我在做纯计算,算完了
协程B:Redis回来了,我继续
协程A:数据库回来了,我继续
本质:用"等IO的时间"去处理其他请求
IO密集型应用效果极好
CPU密集型应用没什么用
第三层:事件驱动(底层机制)
Swoole底层用的是 epoll(Linux)
epoll 就是操作系统提供的"同时监听N个IO事件"的机制
类比:
传统方式 = 派1个人盯1口锅
epoll = 1个人同时盯1000口锅,哪口开了处理哪口
---
和其他模式的对比
PHP-FPM:
模式:多进程,每进程处理1个请求,处理完进程回收
并发:靠进程数量堆,100并发=100个进程
内存:每个请求独立,用完释放
Node.js:
模式:单进程,单线程,事件循环+协程(async/await)
并发:1个进程搞定所有请求
缺点:1个进程,CPU密集任务会卡死所有请求
Go:
模式:多线程,每个goroutine极轻量
并发:真正的并行(多核同时跑)
内存:goroutine只需2KB栈,可以开百万个
Swoole:
模式:多进程(利用多核)+ 进程内协程(处理IO等待)
并发:进程数×协程数,兼顾多核和高并发
定位:PHP版的"Node.js思路 + 多进程保障"
---
一句话记住
▎ Swoole = 多进程榨干多核CPU + 协程榨干每个进程的IO等待时间
两个维度都不浪费,这就是它快的原因。
---
0. 你要的“混合模式”到底是什么
─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
一句话:
- 多进程:吃满多核 CPU(4核就开4个 Worker)
- 每进程内协程:一个 Worker 同时处理很多 IO 请求(查库、调HTTP、Redis)
所以是两层并发:
1. 进程级并发(Worker1/2/3/4 同时干活)
2. 协程级并发(每个 Worker 里几千协程同时挂起/恢复)
---
1. 项目结构(直接照着建)
hybrid-runtime/
├── server.php
├── src/
│ ├── App.php
│ ├── Router.php
│ ├── DbPool.php
│ └── Controller/
│ └── UserController.php
└── composer.json
---
2. 依赖和环境
pecl install swoole
php.ini 加:
extension=swoole
验证:
php -m | grep swoole
---
3. 完整代码
3.1 src/DbPool.php(每个 Worker 自己的连接池)
<?php
use Swoole\Database\PDOConfig;
use Swoole\Database\PDOPool;
class DbPool
{
private static ?PDOPool $pool = null;
public static function init(array $cfg): void
{
if (self::$pool !== null) {
return;
}
self::$pool = new PDOPool(
(new PDOConfig())
->withHost($cfg['host'])
->withPort($cfg['port'])
->withDbName($cfg['database'])
->withUsername($cfg['username'])
->withPassword($cfg['password'])
->withCharset('utf8mb4'),
$cfg['pool_size']
);
}
public static function run(callable $fn): mixed
{
if (self::$pool === null) {
throw new RuntimeException('DbPool not initialized');
}
$pdo = self::$pool->get();
try {
return $fn($pdo);
} finally {
self::$pool->put($pdo);
}
}
}
---
3.2 src/Router.php(简单路由,支持 /users/{id})
<?php
class Router
{
private array $routes = [];
public function get(string $path, array $handler): void
{
$this->routes['GET'][$path] = $handler;
}
public function post(string $path, array $handler): void
{
$this->routes['POST'][$path] = $handler;
}
public function dispatch(string $method, string $path): ?array
{
if (isset($this->routes[$method][$path])) {
return ['handler' => $this->routes[$method][$path], 'params' => []];
}
foreach ($this->routes[$method] ?? [] as $pattern => $handler) {
$regex = preg_replace('/\{(\w+)\}/', '(?P<$1>[^/]+)', $pattern);
$regex = '#^' . $regex . '$#';
if (preg_match($regex, $path, $m)) {
$params = [];
foreach ($m as $k => $v) {
if (is_string($k)) $params[$k] = $v;
}
return ['handler' => $handler, 'params' => $params];
}
}
return null;
}
}
---
3.3 src/Controller/UserController.php(演示协程并发)
<?php
use Swoole\Coroutine;
use Swoole\Coroutine\WaitGroup;
class UserController
{
public function index(array $params, array $query): array
{
$page = max(1, (int)($query['page'] ?? 1));
$size = min(100, max(1, (int)($query['size'] ?? 10)));
$offset = ($page - 1) * $size;
$rows = DbPool::run(function (PDO $pdo) use ($size, $offset) {
$stmt = $pdo->prepare('SELECT id, name, email, created_at FROM users ORDER BY id DESC LIMIT :lim OFFSET
:off');
$stmt->bindValue(':lim', $size, PDO::PARAM_INT);
$stmt->bindValue(':off', $offset, PDO::PARAM_INT);
$stmt->execute();
return $stmt->fetchAll(PDO::FETCH_ASSOC);
});
return ['code' => 0, 'data' => $rows, 'page' => $page, 'size' => $size];
}
public function show(array $params, array $query): array
{
$id = (int)$params['id'];
$row = DbPool::run(function (PDO $pdo) use ($id) {
$stmt = $pdo->prepare('SELECT id, name, email, created_at FROM users WHERE id = :id');
$stmt->execute([':id' => $id]);
return $stmt->fetch(PDO::FETCH_ASSOC);
});
if (!$row) return ['code' => 404, 'message' => 'not found'];
return ['code' => 0, 'data' => $row];
}
public function store(array $params, array $body): array
{
$name = trim((string)($body['name'] ?? ''));
$email = trim((string)($body['email'] ?? ''));
if ($name === '' || $email === '') return ['code' => 400, 'message' => 'name/email required'];
if (!filter_var($email, FILTER_VALIDATE_EMAIL)) return ['code' => 400, 'message' => 'bad email'];
// 协程并发:检查邮箱 + 模拟外部审计日志
$wg = new WaitGroup();
$exists = false;
$auditOk = false;
$wg->add();
Coroutine::create(function () use (&$exists, $email, $wg) {
$exists = DbPool::run(function (PDO $pdo) use ($email) {
$stmt = $pdo->prepare('SELECT COUNT(*) FROM users WHERE email = :email');
$stmt->execute([':email' => $email]);
return (int)$stmt->fetchColumn() > 0;
});
$wg->done();
});
$wg->add();
Coroutine::create(function () use (&$auditOk, $wg) {
Coroutine::sleep(0.005); // 模拟HTTP审计服务
$auditOk = true;
$wg->done();
});
$wg->wait();
if ($exists) return ['code' => 409, 'message' => 'email exists'];
if (!$auditOk) return ['code' => 500, 'message' => 'audit failed'];
$id = DbPool::run(function (PDO $pdo) use ($name, $email) {
$stmt = $pdo->prepare('INSERT INTO users(name,email,created_at) VALUES(:name,:email,NOW())');
$stmt->execute([':name' => $name, ':email' => $email]);
return (int)$pdo->lastInsertId();
});
return ['code' => 0, 'data' => ['id' => $id]];
}
}
---
3.4 src/App.php
<?php
class App
{
private Router $router;
public function __construct()
{
$this->router = new Router();
$this->router->get('/users', [UserController::class, 'index']);
$this->router->get('/users/{id}', [UserController::class, 'show']);
$this->router->post('/users', [UserController::class, 'store']);
}
public function handle(Swoole\Http\Request $req, Swoole\Http\Response $res): void
{
$method = strtoupper($req->server['request_method'] ?? 'GET');
$uri = $req->server['request_uri'] ?? '/';
$path = strtok($uri, '?') ?: '/';
$hit = $this->router->dispatch($method, $path);
if (!$hit) {
$this->json($res, 404, ['code' => 404, 'message' => 'route not found']);
return;
}
[$class, $action] = $hit['handler'];
$params = $hit['params'];
$input = [];
if ($method === 'GET') {
$input = $req->get ?? [];
} else {
$raw = $req->rawContent();
$input = $raw ? (json_decode($raw, true) ?: []) : [];
}
try {
$controller = new $class();
$ret = $controller->$action($params, $input);
$status = ($ret['code'] ?? 500) === 0 ? 200 : ($ret['code'] ?? 500);
$this->json($res, $status, $ret);
} catch (Throwable $e) {
$this->json($res, 500, ['code' => 500, 'message' => 'internal error']);
}
}
private function json(Swoole\Http\Response $res, int $status, array $data): void
{
$res->status($status);
$res->header('Content-Type', 'application/json; charset=utf-8');
$res->end(json_encode($data, JSON_UNESCAPED_UNICODE));
}
}
---
3.5 server.php(核心:多进程 + 协程)
<?php
require_once __DIR__ . '/src/DbPool.php';
require_once __DIR__ . '/src/Router.php';
require_once __DIR__ . '/src/App.php';
require_once __DIR__ . '/src/Controller/UserController.php';
$host = '0.0.0.0';
$port = 9501;
$server = new Swoole\Http\Server($host, $port);
$workerNum = swoole_cpu_num(); // 多进程数 = CPU核数(常见默认)
$poolSizePerWorker = 20; // 每个Worker内的DB池大小
$server->set([
'worker_num' => $workerNum,
'max_coroutine' => 100000,
'enable_coroutine' => true,
'max_requests' => 100000, // 防止潜在内存碎片长期累积
'daemonize' => false,
'log_file' => '/tmp/swoole_hybrid.log',
]);
// 跨Worker共享计数器(原子计数),演示“进程间共享最小状态”
$counter = new Swoole\Atomic(0);
$server->on('Start', function () use ($host, $port, $workerNum, $poolSizePerWorker) {
echo "Master started: http://{$host}:{$port}\n";
echo "worker_num={$workerNum}, db_pool_per_worker={$poolSizePerWorker}\n";
});
$server->on('WorkerStart', function (Swoole\Http\Server $server, int $workerId) use ($poolSizePerWorker) {
// 每个Worker进程启动时做一次初始化(常驻内存)
DbPool::init([
'host' => '127.0.0.1',
'port' => 3306,
'database' => 'demo',
'username' => 'root',
'password' => 'root',
'pool_size' => $poolSizePerWorker,
]);
// 每个Worker一个App实例,长期驻留内存
$server->app = new App();
echo "Worker#{$workerId} ready\n";
});
$server->on('Request', function (Swoole\Http\Request $req, Swoole\Http\Response $res) use ($server, $counter) {
// 每个请求都会在协程里执行
$reqId = $counter->add(1);
$res->header('X-Req-Id', (string)$reqId);
$res->header('X-Worker-Pid', (string)getmypid());
/** @var App $app */
$app = $server->app;
$app->handle($req, $res);
});
$server->on('WorkerStop', function (Swoole\Http\Server $server, int $workerId) {
echo "Worker#{$workerId} stopping\n";
});
$server->start();
---
4. 建表 SQL
CREATE DATABASE IF NOT EXISTS demo DEFAULT CHARSET utf8mb4;
USE demo;
CREATE TABLE IF NOT EXISTS users (
id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(200) NOT NULL UNIQUE,
created_at DATETIME NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
---
5. 启动 + 压测
启动:
php server.php
测试:
curl -s http://127.0.0.1:9501/users
curl -s -X POST http://127.0.0.1:9501/users -H "Content-Type: application/json" -d
'{"name":"alice","email":"alice@example.com"}'
curl -s http://127.0.0.1:9501/users/1
并发压测(示例):
wrk -t4 -c200 -d30s http://127.0.0.1:9501/users
---
6. 运行时完整流程(大白话)
1. php server.php 启动 Master
2. Master 拉起多个 Worker(比如4个)
3. 每个 Worker 启动时执行 WorkerStart:
- 初始化自己的 DB 连接池
- 初始化自己的 App/路由
4. 请求来了:内核把连接分发给某个 Worker
5. Worker 给这个请求开一个协程
6. 协程执行业务;遇到查库/网络IO就“先挂起”
7. Worker 立刻去跑别的协程,不傻等
8. IO回来了,协程恢复继续跑
9. 返回响应
10. 进程不退出,继续等下一批请求(这就是常驻内存)
---
7. 你最容易踩的坑
- 坑1:把请求数据放到静态变量/全局变量
会被同Worker其他协程串数据。
请求级数据只放函数参数或协程上下文。
- 坑2:用 sleep()
会卡住整个 Worker。
要用 Swoole\Coroutine::sleep()。
- 坑3:连接借了不还
连接池会耗尽。
必须 try/finally 归还(上面的 DbPool::run 已解决)。
- 坑4:误以为 Worker 之间共享普通内存
不共享。
进程间共享要用 Atomic/Table/IPC/Redis。
---
8. 怎么判断你重构成功了
你看到这几个现象就对了:
- 启动日志里有多个 Worker PID
- 同时请求时,不同响应头出现不同 X-Worker-Pid
- 单个 Worker 在高并发下不阻塞(IO期间还能处理别的请求)
- 框架和连接池只在 WorkerStart 初始化一次,不是每请求初始化
---
这套就是你要的“多进程 + 每进程内协程”完整落地骨架。上生产前再加:配置中心、优雅重启、超时熔断、指标上报、链路追踪。
这是swoole的模式嘛?
* Precipitating… ●P对,这就是 Swoole 的典型模式。 )
你上面那套本质上就是:Master/Manager + 多个 Worker 进程 + 每个 Worker 内大量协程。 也就是常说的 "多进程事件驱动 + 协程并发"(混合并发模型)。
再压缩成一句话:
进程负责吃多核,协程负责吃高并发 IO。
更多推荐

所有评论(0)