Skip to content

Commit

Permalink
feat:optimize worker daemon code
Browse files Browse the repository at this point in the history
  • Loading branch information
bingcool committed Jun 1, 2024
1 parent 6e5d994 commit b4bbc94
Show file tree
Hide file tree
Showing 20 changed files with 219 additions and 134 deletions.
2 changes: 1 addition & 1 deletion Test/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public function onInit() {
var_dump("slow sql 耗时:$runTime, sql:$realSql");
});

$wairGroup = new GoWaitGroup();
$waitGroup = new GoWaitGroup();

if(!$this->isWorkerService()) {
// 创建一个测试自定义进程
Expand Down
4 changes: 4 additions & 0 deletions Test/Protocol/conf.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@
// 是否内存化线上实时任务
'enable_table_tick_task' => true,

// 是否开启内存回收
'enable_gc_mem_cache' => false,
'gc_mem_cache_tick_time' => 10,

// 内存表定义
'table' => [
'table_process' => [
Expand Down
9 changes: 8 additions & 1 deletion Test/WorkerCron/LocalOrder/LocalOrderHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
use Swoolefy\Core\Crontab\AbstractCronController;
use Swoolefy\Core\Log\LogManager;
use Swoolefy\Core\Swfy;
use Swoolefy\Core\SystemEnv;
use Swoolefy\Worker\AbstractBaseWorker;
use Test\Factory;
use Test\Logger\RunLog;

Expand All @@ -27,10 +29,15 @@ public function doCronTask($cron, string $cronName)
// var_dump(env("HOST_NAME"));
// var_dump(env('HOST_PASSWORD'));
// var_dump(Swfy::getConf()['bjg']);

SystemEnv::clearEnvRepository();
RunLog::info("this is a cron test log");
var_dump("cron start");
sleep(3);
var_dump(SystemEnv::get('WEB_SITE_HOST'));


sleep(60);
//AbstractBaseWorker::getProcessInstance()->reboot(3);
var_dump("cron end");

// goApp(function() {
Expand Down
4 changes: 2 additions & 2 deletions Test/WorkerCron/conf/product_conf.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
'handler' => \Swoolefy\Worker\Cron\CronLocalProcess::class,
'worker_num' => 1, // 默认动态进程数量
'max_handle' => 100, //消费达到10000后reboot进程
'life_time' => 3600, // 每隔3600s重启进程
'life_time' => 60, // 每隔3600s重启进程
'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
'extend_data' => [],
'args' => [
'cron_name' => 'cancel-order', // 定时任务名称
'handler_class' => \Test\WorkerCron\LocalOrder\LocalOrderHandle::class, //处理类
//'cron_expression' => '*/1 * * * *', // 每分钟执行一次
'with_block_lapping' => 0, // with_block_lapping = 1,表示每轮任务只能阻塞执行,必须等上一轮任务执行完毕,下一轮才能执行; with_block_lapping = 0, 表示每轮任务时间到了,都可执行,是并发非租塞的
'with_block_lapping' => 1, // with_block_lapping = 1,表示每轮任务只能阻塞执行,必须等上一轮任务执行完毕,下一轮才能执行; with_block_lapping = 0, 表示每轮任务时间到了,都可执行,是并发非租塞的
'run_in_background' => 1, // 后台运行,不受主进程的退出影响
'cron_expression' => 5, // 10s执行一次
],
Expand Down
28 changes: 14 additions & 14 deletions Test/WorkerCron/conf/schedule_conf.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@

return // 定时fork进程处理任务
[
[
'process_name' => 'system-schedule-task', // 进程名称
'handler' => \Swoolefy\Worker\Cron\CronForkProcess::class,
'description' => '系统fork模式任务调度',
'worker_num' => 1, // 默认动态进程数量
'max_handle' => 100, //消费达到10000后reboot进程
'life_time' => 3600, // 每隔3600s重启进程
'limit_run_coroutine_num' => 10, // 当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
'extend_data' => [],
'args' => [
// 定时任务列表
'task_list' => \Test\Scripts\Kernel::buildScheduleTaskList()
],
],
// [
// 'process_name' => 'system-schedule-task', // 进程名称
// 'handler' => \Swoolefy\Worker\Cron\CronForkProcess::class,
// 'description' => '系统fork模式任务调度',
// 'worker_num' => 1, // 默认动态进程数量
// 'max_handle' => 100, //消费达到10000后reboot进程
// 'life_time' => 3600, // 每隔3600s重启进程
// 'limit_run_coroutine_num' => 10, // 当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
// 'extend_data' => [],
// 'args' => [
// // 定时任务列表
// 'task_list' => \Test\Scripts\Kernel::buildScheduleTaskList()
// ],
// ],
];
104 changes: 62 additions & 42 deletions Test/WorkerDaemon/PipeWorkerProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,58 +8,78 @@

class PipeWorkerProcess extends \Swoolefy\Worker\AbstractWorkerProcess
{
public function run()
{
//Application::getApp()->get('log')->addInfo('pllllllllllll');
while (1) {
if ($this->isExiting()) {
sleep(1);
continue;
}

// LogManager::getInstance()->getLogger('log')->info('kkkkkkkkkkkkkkkk');
// var_dump('CID='.\Swoole\Coroutine::getCid());
// var_dump('PipeWorker');
$a = 1;
$b = 2;
$c = 3;
goApp(function ($a, $b) use($c) {
public function loopHandle()
{
$a = 1;
$b = 2;
$c = 3;
goApp(function ($a, $b) use($c) {
goApp(function () use($a, $b) {
goApp(function () use($a, $b) {
var_dump($a, $b);
});
});
}, $a, $b);

sleep(10);
var_dump("gggggggggggggggggggggggggg");
}



// $db = Application::getApp()->get('db');
// $result = $db->createCommand('select * from tbl_users limit 1')->queryAll();
// dump($result);

// \Swoole\Coroutine::create(function () {
// (new \Swoolefy\Core\EventApp)->registerApp(function (EventController $eventApp) {
// var_dump('mmmmmmmmmmmmmmmmmmmmmmmmm');
// });
// });



// if($this->isWorker0()) {
// $this->notifyMasterCreateDynamicProcess($this->getProcessName(), 1);
// }

var_dump($this->limitCurrentRunCoroutineNum);
// if($this->isWorker0()) {
// sleep(5);
// $this->reboot();
// }
var_dump('start start');
sleep(15);
var_dump("end end end ");
}

// public function run()
// {
// //Application::getApp()->get('log')->addInfo('pllllllllllll');
//// while (1) {
//// if ($this->isExiting()) {
//// sleep(1);
//// continue;
//// }
////
////// LogManager::getInstance()->getLogger('log')->info('kkkkkkkkkkkkkkkk');
////// var_dump('CID='.\Swoole\Coroutine::getCid());
////// var_dump('PipeWorker');
//// $a = 1;
//// $b = 2;
//// $c = 3;
//// goApp(function ($a, $b) use($c) {
//// goApp(function () use($a, $b) {
//// goApp(function () use($a, $b) {
//// var_dump($a, $b);
//// });
//// });
//// }, $a, $b);
////
//// var_dump('start start');
//// sleep(120);
//// var_dump("gggggggggggggggggggggggggg");
//// }
//
//
//
//// $db = Application::getApp()->get('db');
//// $result = $db->createCommand('select * from tbl_users limit 1')->queryAll();
//// dump($result);
//
//// \Swoole\Coroutine::create(function () {
//// (new \Swoolefy\Core\EventApp)->registerApp(function (EventController $eventApp) {
//// var_dump('mmmmmmmmmmmmmmmmmmmmmmmmm');
//// });
//// });
//
//
//
//// if($this->isWorker0()) {
//// $this->notifyMasterCreateDynamicProcess($this->getProcessName(), 1);
//// }
//
// var_dump($this->limitCurrentRunCoroutineNum);
//// if($this->isWorker0()) {
//// sleep(5);
//// $this->reboot();
//// }
// }

public function onHandleException(\Throwable $throwable, array $context = [])
{
parent::onHandleException($throwable, $context);
Expand Down
22 changes: 11 additions & 11 deletions Test/WorkerDaemon/conf/monitor_conf.php
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
<?php

return [
[
// 监控进程
'process_name' => 'test-monitor-worker',
'handler' => \Test\WorkerDaemon\MonitorWorkerProcess::class,
'worker_num' => 1, // 默认动态进程数量
'max_handle' => 100, //消费达到10000后reboot进程
'life_time' => 60, // 每隔3600s重启进程
'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
'extend_data' => [],
'args' => []
],
// [
// // 监控进程
// 'process_name' => 'test-monitor-worker',
// 'handler' => \Test\WorkerDaemon\MonitorWorkerProcess::class,
// 'worker_num' => 1, // 默认动态进程数量
// 'max_handle' => 100, //消费达到10000后reboot进程
// 'life_time' => 60, // 每隔3600s重启进程
// 'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
// 'extend_data' => [],
// 'args' => []
// ],
];
24 changes: 12 additions & 12 deletions Test/WorkerDaemon/conf/pipe_conf.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@
[
'process_name' => 'test-pipe-worker',
'handler' => \Test\WorkerDaemon\PipeWorkerProcess::class,
'worker_num' => 3, // 默认动态进程数量
'worker_num' => 1, // 默认动态进程数量
'max_handle' => 100, //消费达到10000后reboot进程
'life_time' => 3600, // 每隔3600s重启进程
'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
'extend_data' => [],
'args' => []
],
[
'process_name' => 'tick-pipe-worker-test',
'handler' => \Test\WorkerDaemon\PipeTestWorkerProcess::class,
'worker_num' => 3, // 默认动态进程数量
'max_handle' => 100, //消费达到10000后reboot进程
'life_time' => 3600, // 每隔3600s重启进程
'life_time' => '* * * * *', // 每隔3600s重启进程
'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
'extend_data' => [],
'args' => []
],
// [
// 'process_name' => 'tick-pipe-worker-test',
// 'handler' => \Test\WorkerDaemon\PipeTestWorkerProcess::class,
// 'worker_num' => 3, // 默认动态进程数量
// 'max_handle' => 100, //消费达到10000后reboot进程
// 'life_time' => 3600, // 每隔3600s重启进程
// 'limit_run_coroutine_num' => 10, //当前进程的实时协程数量,如果协程数量超过此设置的数量,则禁止继续消费队列处理业务,而是在等待
// 'extend_data' => [],
// 'args' => []
// ],
];
2 changes: 1 addition & 1 deletion src/Cmd/BaseCmd.php
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ protected function checkRunning(array &$config)
fmtPrintError('[' . APP_NAME . ']' . " Server is running, pid={$pid}, pidFile={$pidFile}");
exit(0);
} else {
fmtPrintError('[' . WORKER_SERVICE_NAME . ']' . " is running, pid={$pid}, pidFile={$pidFile}");
fmtPrintError('[' . WORKER_SERVICE_NAME . '-server]' . " is running, pid={$pid}, pidFile={$pidFile}");
exit(0);
}
}
Expand Down
15 changes: 15 additions & 0 deletions src/Core/EventCtrl.php
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public function workerStart($server, $worker_id)
if(!SystemEnv::isWorkerService()) {
$this->registerComponentPools();
}
$this->registerGcMemCaches();
static::onWorkerStart($server, $worker_id);
}

Expand Down Expand Up @@ -308,6 +309,20 @@ protected function clearComponentPools()
}
}

/**
* @return void
*/
protected function registerGcMemCaches()
{
$conf =BaseServer::getConf();
if (isset($conf['enable_gc_mem_cache']) && !empty($conf['enable_gc_mem_cache'])) {
$time = $conf['gc_mem_cache_tick_time'] ?? 30;
\Swoole\Timer::tick($time * 1000, function () {
gc_mem_caches();
});
}
}

/**
* eachStartInfo
*/
Expand Down
2 changes: 1 addition & 1 deletion src/Core/Process/AbstractProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public function __start(Process $process)
\Swoole\Event::del($process->pipe);
\Swoole\Event::exit();

// 脚本模式下.任务进程退出时,父进程也得退出
// script 模式下.任务进程退出时,父进程也得退出
if (SystemEnv::isScriptService()) {
$swooleMasterPid = Swfy::getMasterPid();
\Swoole\Process::kill($swooleMasterPid, SIGTERM);
Expand Down
10 changes: 9 additions & 1 deletion src/Core/SystemEnv.php
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public static function getOption(string $name, bool $force = false)
$options = self::inputOptions();
}
}
$value = trim($options[$name],'\'') ?? '';
$value = trim($options[$name],'\'') ?? '';
$value = trim($value,' ');
return $value;
}
Expand Down Expand Up @@ -308,6 +308,14 @@ public static function disablePutenv()
static::$repository = null;
}

/**
* @return void
*/
public static function clearEnvRepository()
{
static::$repository = null;
}

/**
* Get the environment repository instance.
*
Expand Down
3 changes: 3 additions & 0 deletions src/Http/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@

// 是否内存化线上实时任务
'enable_table_tick_task' => true,
// 是否开启内存回收
'enable_gc_mem_cache' => true,
'gc_mem_cache_tick_time' => 10,

// 内存表定义
'table' => [
Expand Down
4 changes: 4 additions & 0 deletions src/Mqtt/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@

'enable_table_tick_task' => true,

// 是否开启内存回收
'enable_gc_mem_cache' => true,
'gc_mem_cache_tick_time' => 10,

'mqtt' => [
'username' => '',
'password' => '',
Expand Down
4 changes: 4 additions & 0 deletions src/Udp/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@

'enable_table_tick_task' => true,

// 是否开启内存回收
'enable_gc_mem_cache' => true,
'gc_mem_cache_tick_time' => 10,

// 依赖于EnableSysCollector = true,否则设置没有意义,不生效
'enable_pv_collector' => true,
'enable_sys_collector' => true,
Expand Down
4 changes: 4 additions & 0 deletions src/Websocket/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@

'enable_table_tick_task' => true,

// 是否开启内存回收
'enable_gc_mem_cache' => true,
'gc_mem_cache_tick_time' => 10,

// 依赖于EnableSysCollector = true,否则设置没有意义,不生效
'enable_pv_collector' => true,
'enable_sys_collector' => true,
Expand Down
Loading

0 comments on commit b4bbc94

Please sign in to comment.