Fix event loop initialization and handle pipe errors

- Modified startup script to instantiate GameProcessServer separately
  before wrapping in WsServer/HttpServer, allowing direct access to
  set event loop after IoServer creation
- Implemented setLoop() method to set event loop after construction
- Added error handling for fwrite() to gracefully handle broken pipes
  when process closes
- Suppressed fread() warnings with @ operator to avoid noise
- Simplified startOutputReader() since event loop polling is now
  handled directly in onOpen()
- Added null checks for event loop before using in timers
- Server now properly starts with continuous 100ms output polling

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
hant 2025-12-07 14:17:28 +08:00
parent 081903563a
commit 52e465177c
5 changed files with 141 additions and 29 deletions

File diff suppressed because one or more lines are too long

View File

@ -3,6 +3,7 @@ namespace Game\Core;
use Ratchet\MessageComponentInterface; use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface; use Ratchet\ConnectionInterface;
use React\EventLoop\LoopInterface;
/** /**
* 进程转发 WebSocket 服务器 * 进程转发 WebSocket 服务器
@ -14,12 +15,24 @@ class GameProcessServer implements MessageComponentInterface
{ {
protected array $clients = []; protected array $clients = [];
protected array $processes = []; protected array $processes = [];
protected ?LoopInterface $loop = null;
protected array $timers = [];
public function __construct() public function __construct(?LoopInterface $loop = null)
{ {
$this->loop = $loop;
echo "[进程服务器] 初始化\n"; echo "[进程服务器] 初始化\n";
} }
/**
* 设置事件循环(用于延迟初始化)
*/
public function setLoop(LoopInterface $loop): void
{
$this->loop = $loop;
echo "[进程服务器] 事件循环已设置\n";
}
/** /**
* 客户端连接时 * 客户端连接时
*/ */
@ -37,6 +50,27 @@ class GameProcessServer implements MessageComponentInterface
'type' => 'system', 'type' => 'system',
'message' => '游戏进程已启动,正在加载...' 'message' => '游戏进程已启动,正在加载...'
]); ]);
// 设置定时器每100ms读取一次进程输出实时转发
// 只有在事件循环可用时才设置
if ($this->loop) {
$connId = $conn->resourceId;
$timer = $this->loop->addPeriodicTimer(0.1, function() use ($conn, $connId) {
// 检查连接是否仍然有效
if (!isset($this->clients[$connId]) || !isset($this->processes[$connId])) {
return;
}
// 读取进程输出并发送给客户端
$this->readProcessOutput($conn);
});
$this->timers[$connId] = $timer;
echo "[定时器] 为连接 {$connId} 启动输出轮询\n";
} else {
echo "[警告] 事件循环未设置,无法启动输出轮询\n";
}
} catch (\Exception $e) { } catch (\Exception $e) {
$this->sendError($conn, '启动游戏失败: ' . $e->getMessage()); $this->sendError($conn, '启动游戏失败: ' . $e->getMessage());
$conn->close(); $conn->close();
@ -103,17 +137,8 @@ class GameProcessServer implements MessageComponentInterface
*/ */
private function startOutputReader(string $connId, $stdout, $stderr): void private function startOutputReader(string $connId, $stdout, $stderr): void
{ {
// 创建后台读取循环 // 已由 onOpen() 中的事件循环定时器处理
// 这里使用一个简单的轮询机制 // 不再需要额外的初始化
// 实际上应该使用事件循环已由Ratchet处理
// 我们在收到消息时检查输出
// 创建管道监控文件
$readQueuesFile = sys_get_temp_dir() . '/game_' . $connId . '.pipes';
file_put_contents($readQueuesFile, json_encode([
'stdout' => (int)$stdout,
'stderr' => (int)$stderr,
]));
} }
/** /**
@ -133,10 +158,16 @@ class GameProcessServer implements MessageComponentInterface
// 将输入写入进程的STDIN // 将输入写入进程的STDIN
$process = $this->processes[$from->resourceId] ?? null; $process = $this->processes[$from->resourceId] ?? null;
if ($process && is_resource($process['stdin'])) { if ($process && is_resource($process['stdin'])) {
fwrite($process['stdin'], $input . "\n"); $bytesWritten = @fwrite($process['stdin'], $input . "\n");
if ($bytesWritten === false || $bytesWritten === 0) {
// 写入失败,可能进程已关闭
echo "[警告] 无法写入进程 {$from->resourceId},进程可能已关闭\n";
// 不中断连接,等待下一次尝试或超时
} else {
fflush($process['stdin']); fflush($process['stdin']);
echo "[输入] {$from->resourceId}: {$input}\n"; echo "[输入] {$from->resourceId}: {$input}\n";
} }
}
} elseif ($type === 'ping') { } elseif ($type === 'ping') {
$this->sendMessage($from, ['type' => 'pong']); $this->sendMessage($from, ['type' => 'pong']);
} }
@ -163,7 +194,7 @@ class GameProcessServer implements MessageComponentInterface
// 读取stdout // 读取stdout
while (true) { while (true) {
$chunk = fread($process['stdout'], $bufferSize); $chunk = @fread($process['stdout'], $bufferSize);
if ($chunk === '' || $chunk === false) { if ($chunk === '' || $chunk === false) {
break; break;
} }
@ -172,7 +203,7 @@ class GameProcessServer implements MessageComponentInterface
// 读取stderr // 读取stderr
while (true) { while (true) {
$chunk = fread($process['stderr'], $bufferSize); $chunk = @fread($process['stderr'], $bufferSize);
if ($chunk === '' || $chunk === false) { if ($chunk === '' || $chunk === false) {
break; break;
} }
@ -195,6 +226,13 @@ class GameProcessServer implements MessageComponentInterface
{ {
$connId = $conn->resourceId; $connId = $conn->resourceId;
// 取消定时器
if (isset($this->timers[$connId]) && $this->loop) {
$this->loop->cancelTimer($this->timers[$connId]);
unset($this->timers[$connId]);
echo "[定时器] 取消连接 {$connId} 的输出轮询\n";
}
// 关闭进程 // 关闭进程
$process = $this->processes[$connId] ?? null; $process = $this->processes[$connId] ?? null;
if ($process) { if ($process) {
@ -225,6 +263,14 @@ class GameProcessServer implements MessageComponentInterface
public function onError(ConnectionInterface $conn, \Exception $e) public function onError(ConnectionInterface $conn, \Exception $e)
{ {
echo "[错误] {$conn->resourceId}: {$e->getMessage()}\n"; echo "[错误] {$conn->resourceId}: {$e->getMessage()}\n";
// 取消定时器
$connId = $conn->resourceId;
if (isset($this->timers[$connId]) && $this->loop) {
$this->loop->cancelTimer($this->timers[$connId]);
unset($this->timers[$connId]);
}
$this->onClose($conn); $this->onClose($conn);
} }

View File

@ -3,6 +3,7 @@ namespace Game\Entities;
class Partner extends Actor class Partner extends Actor
{ {
public $id;
// Partner特有的天赋加成与 Actor 不同) // Partner特有的天赋加成与 Actor 不同)
public static array $talentBonus = [ public static array $talentBonus = [
'hp' => 20, 'hp' => 20,

39
test-websocket.php Normal file
View File

@ -0,0 +1,39 @@
<?php
/**
* WebSocket 服务器测试脚本
*/
require_once __DIR__ . '/vendor/autoload.php';
use WebSocket\Client;
echo "连接到 WebSocket 服务器: ws://localhost:9002\n";
try {
$client = new Client("ws://localhost:9002");
echo "[成功] 连接到服务器\n";
// 接收欢迎消息
$message = $client->receive();
echo "[收到] " . $message . "\n";
// 发送测试输入
echo "[发送] 测试命令\n";
$client->send(json_encode([
'type' => 'input',
'input' => '1'
]));
// 接收响应
for ($i = 0; $i < 5; $i++) {
$message = $client->receive();
echo "[收到] " . substr($message, 0, 100) . "...\n";
}
$client->close();
echo "[完成] 测试成功\n";
} catch (Exception $e) {
echo "[错误] " . $e->getMessage() . "\n";
exit(1);
}

View File

@ -17,18 +17,27 @@ use Ratchet\Http\HttpServer;
use Ratchet\WebSocket\WsServer; use Ratchet\WebSocket\WsServer;
use Game\Core\GameProcessServer; use Game\Core\GameProcessServer;
// 创建WebSocket服务器 // 分步骤创建应用栈,以便能访问 GameProcessServer 实例
$ws = new WsServer(new GameProcessServer()); echo "[初始化] 创建 GameProcessServer 实例...\n";
$gameServer = new GameProcessServer(null);
// 用HTTP服务器包装 echo "[初始化] 创建 WsServer...\n";
$http = new HttpServer($ws); $wsServer = new WsServer($gameServer);
// 创建IO服务器监听9002端口 echo "[初始化] 创建 HttpServer...\n";
$server = IoServer::factory( $httpServer = new HttpServer($wsServer);
$http,
9002, // 创建IO服务器内部创建事件循环
'0.0.0.0' echo "[初始化] 创建 IO 服务器...\n";
); $server = IoServer::factory($httpServer, 9002, '0.0.0.0');
echo "[初始化] IO 服务器创建成功\n";
// 现在获取事件循环并设置给 GameProcessServer 实例
$loop = $server->loop;
echo "[初始化] 获取事件循环成功\n";
$gameServer->setLoop($loop);
echo "[初始化] 为 GameProcessServer 设置了事件循环\n";
echo <<<'ASCII' echo <<<'ASCII'
╔══════════════════════════════════════════╗ ╔══════════════════════════════════════════╗
@ -48,6 +57,7 @@ echo <<<'ASCII'
💡 特点: 💡 特点:
无需修改游戏代码 无需修改游戏代码
每个用户独立的游戏进程 每个用户独立的游戏进程
实时输出100ms轮询
完整的ANSI颜色支持 完整的ANSI颜色支持
实时交互 实时交互
@ -55,4 +65,20 @@ echo <<<'ASCII'
ASCII; ASCII;
$server->run(); ob_implicit_flush();
fflush(STDOUT);
echo "\n[启动] 准备启动事件循环...\n";
fflush(STDOUT);
try {
echo "[启动] 调用 \$loop->run()...\n";
fflush(STDOUT);
$loop->run();
echo "[启动] \$loop->run() 已返回\n";
} catch (Exception $e) {
echo "\n[错误] 事件循环异常: " . $e->getMessage() . "\n";
echo $e->getTraceAsString() . "\n";
exit(1);
}