diff --git a/Test/Config/component/queue.php b/Test/Config/component/queue.php index c4eaace5..dfb696fa 100644 --- a/Test/Config/component/queue.php +++ b/Test/Config/component/queue.php @@ -22,5 +22,14 @@ 'queue' => function() { $redis = Application::getApp()->get('redis')->getObject(); return new \Common\Library\Queues\Queue($redis,\Test\Process\ListProcess\RedisList::queue_order_list); + }, + + 'delayQueue' => function() { + $redis = Application::getApp()->get('redis')->getObject(); + return new \Common\Library\Queues\RedisDelayQueue($redis,\Test\Process\QueueProcess\Queue::queue_order_list); + +// $predis = Application::getApp()->get('predis')->getObject(); +// return new \Common\Library\Queues\PredisDelayQueue($predis,\Test\Process\QueueProcess\Queue::queue_order_list); } + ]; \ No newline at end of file diff --git a/Test/Event.php b/Test/Event.php index ac1eaa6a..c09d10ba 100644 --- a/Test/Event.php +++ b/Test/Event.php @@ -31,7 +31,7 @@ public function onInit() { // ProcessManager::getInstance()->addProcess('tick', \Test\Process\TickProcess\Tick::class); // 测试cron自定义进程 - ProcessManager::getInstance()->addProcess('cron', \Test\Process\CronProcess\Cron::class); + // ProcessManager::getInstance()->addProcess('cron', \Test\Process\CronProcess\Cron::class); // 这里为什么获取不到pid,那是应为process需要server执行start后才会创建,而在这里只是创建实例,server还没正式启动 //$pid = ProcessManager::getInstance()->getProcessByName('cron')->getPid(); @@ -40,6 +40,10 @@ public function onInit() { // redis的队列消费 // ProcessManager::getInstance()->addProcess('redis_list_test', \Test\Process\ListProcess\RedisList::class,true, [], null, true); + // redis的延迟队列消费 + ProcessManager::getInstance()->addProcess('redis_delay_list_test', \Test\Process\QueueProcess\Queue::class,true, [], null, true); + + // amqp-direct 生产队 //ProcessManager::getInstance()->addProcess('amqp-publish', \Test\Process\AmqpProcess\AmqpPublish::class); // amqp-direct 消费队列 diff --git a/Test/Factory.php b/Test/Factory.php index 9ec55b1e..7a8853ce 100644 --- a/Test/Factory.php +++ b/Test/Factory.php @@ -61,6 +61,14 @@ public static function getQueue() return \Swoolefy\Core\Application::getApp()->get('queue'); } + /** + * @return \Common\Library\Queues\RedisDelayQueue|ContainerObjectDto + */ + public static function getDelayQueue() + { + return \Swoolefy\Core\Application::getApp()->get('delayQueue'); + } + /** * @return RedisPubSub|ContainerObjectDto */ diff --git a/Test/Process/ListProcess/RedisList.php b/Test/Process/ListProcess/RedisList.php index 82f978c1..9fa7170b 100644 --- a/Test/Process/ListProcess/RedisList.php +++ b/Test/Process/ListProcess/RedisList.php @@ -14,11 +14,9 @@ class RedisList extends AbstractProcess { */ public function run() { - \Swoole\Timer::tick(2000, function () { - goApp(function () { - $queue = Factory::getQueue(); - $queue->push(['name'=> 'bingcool','num' => rand(1,10000)]); - }); + goTick(2000, function () { + $queue = Factory::getQueue(); + $queue->push(['name'=> 'bingcool','num' => rand(1,10000)]); }); $queue = Factory::getQueue(); @@ -27,18 +25,24 @@ public function run() try { // 控制协程并发数 if($this->getCurrentRunCoroutineNum() <= 20) { - $data = $queue->pop(3); + $result = $queue->pop(3); + if (empty($result)) { + continue; + } + $data = $result[1]; // 创建协程单例 - goApp(function () use($data){ + goApp(function () use($data) { $list = new \Test\Process\ListProcess\ListController($data); $list->doHandle(); }); + + //$queue->retry($data); //var_dump('This is Redis List Queue process, pop item='.$data); } }catch (\Throwable $e) { - + var_dump($e->getMessage()); } } diff --git a/Test/Process/QueueProcess/Queue.php b/Test/Process/QueueProcess/Queue.php new file mode 100644 index 00000000..00e59624 --- /dev/null +++ b/Test/Process/QueueProcess/Queue.php @@ -0,0 +1,29 @@ +addItem(["order_id" => 1111], 2) + ->addItem(["order_id" => 2222], 2) + ->push(); + }); + + while (true) { + $items = Factory::getDelayQueue()->pop(); +// var_dump($items); + foreach ($items as $item) { + var_dump($item); + //Factory::getDelayQueue()->retry($item, 5); + } + + sleep(1); + } + } +} \ No newline at end of file diff --git a/Test/Process/QueueProcess/QueueController.php b/Test/Process/QueueProcess/QueueController.php new file mode 100644 index 00000000..e69de29b diff --git a/Test/Process/SubscribeProcess/Subscribe.php b/Test/Process/SubscribeProcess/Subscribe.php index dfe9f915..f3813aaa 100644 --- a/Test/Process/SubscribeProcess/Subscribe.php +++ b/Test/Process/SubscribeProcess/Subscribe.php @@ -18,7 +18,7 @@ public function run() $num = 1; $timeNow = time(); - Timer::tick(500, function ($timeChannel) use(& $num, $timeNow) { + goTick(500, function ($timeChannel) use(&$num, $timeNow) { if (time() - $timeNow > 3) { var_dump('close close'); Timer::cancel($timeChannel); diff --git a/monitor_reload.sh b/monitor_reload.sh old mode 100644 new mode 100755 index 6d86ca07..3a62bfdc --- a/monitor_reload.sh +++ b/monitor_reload.sh @@ -8,6 +8,10 @@ appName="$1" +portCli=9501 +portDaemon=9602 +portCron=9603 + # 通过进程名称查找进程ID,并排除当前的进程 # monitor_reload.sh 终端进程,并排除当前的进程:grep -v $$ ps aux | grep monitor_reload.sh | grep -v grep | awk '{print $1}' | grep -v $$ | xargs kill -15 @@ -18,33 +22,68 @@ ps aux | grep inotifywait | grep -v grep | awk '{print $1}'| xargs kill -15 basepath=$(cd `dirname $0`; pwd) cd $basepath -# 先停止 -/usr/bin/php cli.php stop $appName --force=1 -# 守护进程模式启动 -/usr/bin/php cli.php start $appName --daemon=1 +# 检测端口是否是否被占用 +portUsingFun() { + local port="$1" + listen=$(netstat -an | grep "$port" | awk '{print $NF}') + if [ "$listen" = "LISTEN" ]; then + echo 1 + else + echo 0 + fi +} while true; do file_changes=$(inotifywait -r -e modify,create,delete "$basepath" --format '%w%f') php_files=$(echo "$file_changes" | grep -E '\.php$') - if [ -n "$php_files" ]; then - echo "PHP files modified:$php_files" - sleep 10; - # 先停止 - /usr/bin/php cli.php stop $appName --force=1 - # 守护进程模式启动 - /usr/bin/php cli.php start $appName --daemon=1 - else - env_files=$(echo "$file_changes" | grep -E '\.env$') - if [ -n "$env_files" ]; then - echo "Env files modified:$env_files" + if [ -n "$php_files" ]; then + for execBinFile in cli.php daemon.php cron.php;do + echo "PHP files modified:$php_files" + if [ "$execBinFile" = "cli.php" ]; then sleep 10; - # 先停止 - /usr/bin/php cli.php stop $appName --force=1 - # 守护进程模式启动 - /usr/bin/php cli.php start $appName --daemon=1 + else + sleep 1; + fi + # 先停止 + /usr/bin/php $execBinFile stop $appName --force=1 + + if [ "$execBinFile" = "cli.php" ]; then + while true; do + listen=$(portUsingFun $portCli) + if [ "$listen" = 0 ]; then + echo "端口=$portCli 不被占用,可以重启" + break + fi + sleep 1 + done + fi + + if [ "$execBinFile" = "daemon.php" ]; then + while true; do + listen=$(portUsingFun $portDaemon) + if [ "$listen" = 0 ]; then + echo "端口=$portDaemon 不被占用,可以重启" + break + fi + sleep 1 + done + fi + + if [ "$execBinFile" = "cron.php" ]; then + while true; do + listen=$(portUsingFun $portCron) + if [ "$listen" = 0 ]; then + echo "端口=$portCron 不被占用,可以重启" + break + fi + sleep 1 + done fi - fi + # 守护进程模式启动 + /usr/bin/php $execBinFile start $appName --daemon=1 + done + fi done diff --git a/src/Cmd/BaseCmd.php b/src/Cmd/BaseCmd.php index 26bc09b1..53bc79b8 100644 --- a/src/Cmd/BaseCmd.php +++ b/src/Cmd/BaseCmd.php @@ -27,6 +27,7 @@ protected function configure() $this->addOption('daemon', null,InputOption::VALUE_OPTIONAL, 'Daemon model run app', 0); // 强制停止 $this->addOption('force', null,InputOption::VALUE_OPTIONAL, 'Force stop app', 0); + $options = $this->beforeInputOptions(); foreach ($options as $name=>$value) { if (!$this->getDefinition()->hasOption($name)) { diff --git a/src/Cmd/CreateCmd.php b/src/Cmd/CreateCmd.php index a296aba4..2dc8e638 100644 --- a/src/Cmd/CreateCmd.php +++ b/src/Cmd/CreateCmd.php @@ -20,7 +20,7 @@ protected function configure() protected function execute(InputInterface $input, OutputInterface $output) { - $dirs = ['Config', 'Service', 'Protocol', 'Router', 'Storage']; + $dirs = ['Config', 'Service', 'Protocol', 'Router', 'Storage', 'Middleware']; $appName = $input->getArgument('app_name'); $appPathDir = APP_PATH; if (is_dir($appPathDir)) { @@ -35,9 +35,7 @@ protected function execute(InputInterface $input, OutputInterface $output) } if ($protocol == 'http') { - $dirs = [ - 'Config', 'Controller', 'Model', 'Module', 'Router', 'Validation', 'Storage', 'Protocol' - ]; + $dirs = ['Config', 'Controller', 'Model', 'Module', 'Router', 'Validation', 'Storage', 'Protocol', 'Middleware']; } $daemonFile = START_DIR_ROOT . '/daemon.php'; @@ -109,10 +107,12 @@ protected function execute(InputInterface $input, OutputInterface $output) break; } case 'Module': - @mkdir($appPathDir . '/' . $dir.'/Demo/Controller', 0777, true); - @mkdir($appPathDir . '/' . $dir.'/Demo/Validation', 0777, true); - @mkdir($appPathDir . '/' . $dir.'/Demo/Exception', 0777, true); + { + @mkdir($appPathDir . '/' . $dir . '/Demo/Controller', 0777, true); + @mkdir($appPathDir . '/' . $dir . '/Demo/Validation', 0777, true); + @mkdir($appPathDir . '/' . $dir . '/Demo/Exception', 0777, true); break; + } case 'Router': { switch ($protocol) { @@ -128,6 +128,7 @@ protected function execute(InputInterface $input, OutputInterface $output) default: break; } + break; } case 'Protocol': { @@ -152,7 +153,23 @@ protected function execute(InputInterface $input, OutputInterface $output) break; } } + break; + } + case 'Middleware': + { + switch ($protocol) { + case "http": + @mkdir($appPathDir . '/' . $dir.'/Group', 0777, true); + @mkdir($appPathDir . '/' . $dir.'/Route', 0777, true); + break; + default: + break; + } + break; } + + default: + break; } } return 0; diff --git a/src/Cmd/ReloadCmd.php b/src/Cmd/ReloadCmd.php index 2e86644b..772509dd 100644 --- a/src/Cmd/ReloadCmd.php +++ b/src/Cmd/ReloadCmd.php @@ -1,6 +1,7 @@ getArgument('app_name'); $force = $input->getOption('force'); + $lineValue = ""; if (empty($force)) { if (!isWorkerService()) { $lineValue = initConsoleStyleIo()->ask( "1、你确定停止应用【{$appName}】? (yes or no)"); @@ -31,7 +32,7 @@ protected function execute(InputInterface $input, OutputInterface $output) } } - if (strtolower($lineValue) == 'yes') { + if (strtolower($lineValue) == 'yes' || $force) { if (!isWorkerService()) { $this->commonStop($appName); } else { diff --git a/src/Constants.php b/src/Constants.php index 9ce1f982..2116669a 100644 --- a/src/Constants.php +++ b/src/Constants.php @@ -45,7 +45,7 @@ defined('MQTT_PROTOCOL_LEVEL3') or define('MQTT_PROTOCOL_LEVEL3', 4); defined('MQTT_PROTOCOL_LEVEL5') or define('MQTT_PROTOCOL_LEVEL5', 5); -defined('SWOOLEFY_VERSION') or define('SWOOLEFY_VERSION', '5.0.21'); +defined('SWOOLEFY_VERSION') or define('SWOOLEFY_VERSION', '5.0.22'); defined('SWOOLEFY_EOF_FLAG') or define('SWOOLEFY_EOF_FLAG', '::'); defined('WORKER_CLI_STOP') or define('WORKER_CLI_STOP', 'stop'); diff --git a/src/Core/BService.php b/src/Core/BService.php index 338e3785..23256c46 100644 --- a/src/Core/BService.php +++ b/src/Core/BService.php @@ -113,13 +113,14 @@ private function getTraceId() * @param int $fd * @param BaseResponseDto $dataDto * @param array $header - * @return mixed + * @return bool */ - public function send(int $fd, BaseResponseDto $dataDto, array $header = []) + public function send(int $fd, BaseResponseDto $dataDto, array $header = []): bool { if (!BaseServer::isRpcApp()) { throw new SystemException("BService::send() this method only can be called by tcp or rpc server!"); } + if (empty($dataDto->trace_id)) { $dataDto->trace_id = $this->getTraceId(); } @@ -133,7 +134,7 @@ public function send(int $fd, BaseResponseDto $dataDto, array $header = []) $text = \Swoolefy\Rpc\RpcServer::pack($dataDto->toArray()); return Swfy::getServer()->send($fd, $text); } - + return false; } /** @@ -142,15 +143,14 @@ public function send(int $fd, BaseResponseDto $dataDto, array $header = []) * @param string $ip * @param int|null $port * @param null $server_socket - * @return mixed + * @return bool */ public function sendTo( BaseResponseDto $dataDto, string $ip = '', ?int $port = null, int $server_socket = -1 - ) - { + ): bool { if (empty($ip)) { $ip = $this->clientInfo['address']; } @@ -185,7 +185,7 @@ public function push( BaseResponseDto $dataDto, int $opcode = 1, int $finish = 1 - ) + ): bool { if (!BaseServer::isWebsocketApp()) { throw new SystemException("BService::push() this method only can be called by websocket server!"); diff --git a/src/Core/BaseServer.php b/src/Core/BaseServer.php index b27ef714..696a8634 100644 --- a/src/Core/BaseServer.php +++ b/src/Core/BaseServer.php @@ -358,6 +358,7 @@ public static function filterFaviconIcon(\Swoole\Http\Request $request, \Swoole\ if ($request->server['path_info'] == '/favicon.ico' || $request->server['request_uri'] == '/favicon.ico') { return $response->end(); } + return null; } /** diff --git a/src/Core/Coroutine/GoWaitGroup.php b/src/Core/Coroutine/GoWaitGroup.php index b3355e80..5cbae76b 100644 --- a/src/Core/Coroutine/GoWaitGroup.php +++ b/src/Core/Coroutine/GoWaitGroup.php @@ -96,10 +96,10 @@ public function goApp(\Closure $callBack, ...$params) * var_dump($result); * * @param array $callBacks - * @param float $timeOut + * @param float $maxTimeOut * @return array */ - public static function batchParallelRunWait(array $callBacks, float $timeOut = 3.0): array + public static function batchParallelRunWait(array $callBacks, float $maxTimeOut = 3.0): array { $goWait = new static(); foreach ($callBacks as $key => $callBack) { @@ -115,7 +115,7 @@ public static function batchParallelRunWait(array $callBacks, float $timeOut = 3 } }); } - $result = $goWait->wait($timeOut); + $result = $goWait->wait($maxTimeOut); return $result; } diff --git a/src/Core/Coroutine/Parallel.php b/src/Core/Coroutine/Parallel.php index 19d9006b..8b50bd14 100644 --- a/src/Core/Coroutine/Parallel.php +++ b/src/Core/Coroutine/Parallel.php @@ -73,10 +73,10 @@ public function add(callable $callable, string $key = null) /** * runWait 并发后等待结果返回 * - * @param float $timeOut + * @param float $maxTimeOut * @return array */ - public function runWait(float $timeOut = 5.0) + public function runWait(float $maxTimeOut = 5.0) { if (empty($this->callbacks)) { return []; @@ -96,7 +96,7 @@ public function runWait(float $timeOut = 5.0) } if ($items) { - $res = GoWaitGroup::batchParallelRunWait($items, $timeOut); + $res = GoWaitGroup::batchParallelRunWait($items, $maxTimeOut); } $result = array_merge($result, $res ?? []); @@ -106,9 +106,9 @@ public function runWait(float $timeOut = 5.0) } /** - * 并发限制协程数量闭包处理 + * 并发限制协程数量闭包处理,无需等待结果返回 * - * @param int $concurrent 限制的并发协程数量 + * @param int $concurrent 限制的每批并发协程数量,防止瞬间产生大量的协程拖垮下游服务或者DB * @param array $list 数组 * @param \Closure $handleFn 回调处理 * @param float $sleepTime diff --git a/src/Core/Crontab/CrontabManager.php b/src/Core/Crontab/CrontabManager.php index 7e2884ce..ddf6ac1f 100644 --- a/src/Core/Crontab/CrontabManager.php +++ b/src/Core/Crontab/CrontabManager.php @@ -77,7 +77,7 @@ public function addRule(string $cronName, string|float $expression, mixed $func, $isNext = call_user_func($callPreFn); } - // 返回false停止继续往下执行 + // return false to over function if (isset($isNext) && $isNext === false) { return false; } @@ -127,7 +127,7 @@ public function addRule(string $cronName, string|float $expression, mixed $func, $isNext = call_user_func($callPreFn); } - // 返回false停止继续往下执行 + // return false to over function if (isset($isNext) && $isNext === false) { return false; } @@ -160,7 +160,7 @@ public function addRule(string $cronName, string|float $expression, mixed $func, $isNext = call_user_func($callPreFn); } - // 返回false停止继续往下执行 + // return false to over function if (isset($isNext) && $isNext === false) { return false; } diff --git a/src/Core/Func/function.php b/src/Core/Func/function.php index 8313723a..0d11e6b3 100644 --- a/src/Core/Func/function.php +++ b/src/Core/Func/function.php @@ -137,3 +137,47 @@ function goApp(callable $callback, ...$params) { }); }); } + +/** + * @param int $timeMs + * @param callable $callable + * @return void + */ +function goTick(int $timeMs, callable $callable) +{ + if (\Swoole\Coroutine::getCid() >= 0) { + \Swoolefy\Core\Coroutine\Timer::tick($timeMs, $callable); + }else { + \Swoole\Timer::tick($timeMs, function () use($callable) { + (new \Swoolefy\Core\EventApp)->registerApp(function() use($callable) { + try { + $callable(); + }catch (\Throwable $throwable) { + \Swoolefy\Core\BaseServer::catchException($throwable); + } + }); + }); + } +} + +/** + * @param int $timeMs + * @param callable $callable + * @return void + */ +function goAfter(int $timeMs, callable $callable) +{ + if (\Swoole\Coroutine::getCid() >= 0) { + \Swoolefy\Core\Coroutine\Timer::after($timeMs, $callable); + }else { + \Swoole\Timer::after($timeMs, function () use($callable) { + (new \Swoolefy\Core\EventApp)->registerApp(function() use($callable) { + try { + $callable(); + }catch (\Throwable $throwable) { + \Swoolefy\Core\BaseServer::catchException($throwable); + } + }); + }); + } +} diff --git a/src/Core/Process/AbstractProcess.php b/src/Core/Process/AbstractProcess.php index 2673509c..81c24d7a 100644 --- a/src/Core/Process/AbstractProcess.php +++ b/src/Core/Process/AbstractProcess.php @@ -335,9 +335,7 @@ public function reboot() goApp(function () { try { $this->runtimeCoroutineWait(); - (new \Swoolefy\Core\EventApp)->registerApp(function () { - $this->onShutDown(); - }); + $this->onShutDown(); } catch (\Throwable $throwable) { $this->onHandleException($throwable); } finally { diff --git a/src/Core/ServiceDispatch.php b/src/Core/ServiceDispatch.php index 9ba3c4b6..ebac972d 100644 --- a/src/Core/ServiceDispatch.php +++ b/src/Core/ServiceDispatch.php @@ -263,25 +263,41 @@ public static function getRouterMapService(string $uri) } if (isset($routerMap[$uri])) { - $routerHandle = $routerMap[$uri]; - if(!isset($routerHandle['dispatch_route'])) { + $routerHandleMiddleware = $routerMap[$uri]; + if(!isset($routerHandleMiddleware['dispatch_route'])) { throw new DispatchException('Missing dispatch_route option key'); }else { - $dispatchRoute = $routerHandle['dispatch_route']; + $dispatchRoute = $routerHandleMiddleware['dispatch_route']; } - $beforeMiddleware = []; - foreach($routerHandle as $alias => $middleware) { + $beforeMiddleware = $afterMiddleware = []; + foreach($routerHandleMiddleware as $alias => $handle) { if ($alias != 'dispatch_route') { - $beforeMiddleware[] = $middleware; - unset($routerHandle[$alias]); + if (is_array($handle)) { + foreach ($handle as $handleItem) { + $beforeMiddleware[] = $handleItem; + } + }else { + $beforeMiddleware[] = $handle; + } + unset($routerHandleMiddleware[$alias]); continue; } - unset($routerHandle[$alias]); + unset($routerHandleMiddleware[$alias]); break; } - $afterMiddleware = array_values($routerHandle); + $afterMiddlewareTemp = array_values($routerHandleMiddleware); + foreach ($afterMiddlewareTemp as $afterMiddlewareItem) { + if (is_array($afterMiddlewareItem)) { + foreach ($afterMiddlewareItem as $afterMiddlewareEvery) { + $afterMiddleware[] = $afterMiddlewareEvery; + } + }else { + $afterMiddleware[] = $afterMiddlewareItem; + } + } + $routeItems = [$beforeMiddleware, $dispatchRoute, $afterMiddleware]; self::$routeCache[$uri] = $routeItems; return $routeItems; diff --git a/src/Stubs/LogComStubs.php b/src/Stubs/LogComStubs.php index 67d8d240..30e74ee1 100644 --- a/src/Stubs/LogComStubs.php +++ b/src/Stubs/LogComStubs.php @@ -34,7 +34,7 @@ return $logger; }, - // 系统捕捉异常错误日志 + // 用户行为记录的错误日志 'error_log' => function($name) { $logger = new Log($name); $logger->setChannel('application'); @@ -51,7 +51,7 @@ return $logger; }, - // 系统捕捉抛出异常错误日志 + // 系统内置自动捕捉抛出异常错误日志 'system_error_log' => function($name) { $logger = new \Swoolefy\Util\Log($name); $logger->setChannel('application'); diff --git a/src/Worker/Cron/CronLocalProcess.php b/src/Worker/Cron/CronLocalProcess.php index 8fc2d9df..75e69553 100644 --- a/src/Worker/Cron/CronLocalProcess.php +++ b/src/Worker/Cron/CronLocalProcess.php @@ -52,7 +52,7 @@ public function run() { try { CrontabManager::getInstance()->addRule($this->cronName, $this->cronExpression, [$this->handleClass,'doCronTask'], - function () { + function (): bool { // 上一个任务未执行完,下一个任务到来时不执行,返回false结束 if ($this->withoutOverlapping && $this->handing) { return false;