Skip to content

Commit

Permalink
Merge branch 'swoolefy-5.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
bingcool committed Feb 29, 2024
2 parents adaee2b + 24f1e3a commit a5dcf4c
Show file tree
Hide file tree
Showing 23 changed files with 245 additions and 73 deletions.
9 changes: 9 additions & 0 deletions Test/Config/component/queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,14 @@
'queue' => function() {
$redis = Application::getApp()->get('redis')->getObject();
return new \Common\Library\Queues\Queue($redis,\Test\Process\ListProcess\RedisList::queue_order_list);
},

'delayQueue' => function() {
$redis = Application::getApp()->get('redis')->getObject();
return new \Common\Library\Queues\RedisDelayQueue($redis,\Test\Process\QueueProcess\Queue::queue_order_list);

// $predis = Application::getApp()->get('predis')->getObject();
// return new \Common\Library\Queues\PredisDelayQueue($predis,\Test\Process\QueueProcess\Queue::queue_order_list);
}

];
6 changes: 5 additions & 1 deletion Test/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public function onInit() {
// ProcessManager::getInstance()->addProcess('tick', \Test\Process\TickProcess\Tick::class);

// 测试cron自定义进程
ProcessManager::getInstance()->addProcess('cron', \Test\Process\CronProcess\Cron::class);
// ProcessManager::getInstance()->addProcess('cron', \Test\Process\CronProcess\Cron::class);

// 这里为什么获取不到pid,那是应为process需要server执行start后才会创建,而在这里只是创建实例,server还没正式启动
//$pid = ProcessManager::getInstance()->getProcessByName('cron')->getPid();
Expand All @@ -40,6 +40,10 @@ public function onInit() {
// redis的队列消费
// ProcessManager::getInstance()->addProcess('redis_list_test', \Test\Process\ListProcess\RedisList::class,true, [], null, true);

// redis的延迟队列消费
ProcessManager::getInstance()->addProcess('redis_delay_list_test', \Test\Process\QueueProcess\Queue::class,true, [], null, true);


// amqp-direct 生产队
//ProcessManager::getInstance()->addProcess('amqp-publish', \Test\Process\AmqpProcess\AmqpPublish::class);
// amqp-direct 消费队列
Expand Down
8 changes: 8 additions & 0 deletions Test/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ public static function getQueue()
return \Swoolefy\Core\Application::getApp()->get('queue');
}

/**
* @return \Common\Library\Queues\RedisDelayQueue|ContainerObjectDto
*/
public static function getDelayQueue()
{
return \Swoolefy\Core\Application::getApp()->get('delayQueue');
}

/**
* @return RedisPubSub|ContainerObjectDto
*/
Expand Down
20 changes: 12 additions & 8 deletions Test/Process/ListProcess/RedisList.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@ class RedisList extends AbstractProcess {
*/
public function run()
{
\Swoole\Timer::tick(2000, function () {
goApp(function () {
$queue = Factory::getQueue();
$queue->push(['name'=> 'bingcool','num' => rand(1,10000)]);
});
goTick(2000, function () {
$queue = Factory::getQueue();
$queue->push(['name'=> 'bingcool','num' => rand(1,10000)]);
});

$queue = Factory::getQueue();
Expand All @@ -27,18 +25,24 @@ public function run()
try {
// 控制协程并发数
if($this->getCurrentRunCoroutineNum() <= 20) {
$data = $queue->pop(3);
$result = $queue->pop(3);
if (empty($result)) {
continue;
}
$data = $result[1];
// 创建协程单例
goApp(function () use($data){
goApp(function () use($data) {
$list = new \Test\Process\ListProcess\ListController($data);
$list->doHandle();
});

//$queue->retry($data);
//var_dump('This is Redis List Queue process, pop item='.$data);
}

}catch (\Throwable $e)
{

var_dump($e->getMessage());
}
}

Expand Down
29 changes: 29 additions & 0 deletions Test/Process/QueueProcess/Queue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php
namespace Test\Process\QueueProcess;

use Swoolefy\Core\Process\AbstractProcess;
use Test\Factory;

class Queue extends AbstractProcess {
const queue_order_list = 'queue:order:delay';
public function run()
{
goAfter(2000, function () {
Factory::getDelayQueue()
->addItem(["order_id" => 1111], 2)
->addItem(["order_id" => 2222], 2)
->push();
});

while (true) {
$items = Factory::getDelayQueue()->pop();
// var_dump($items);
foreach ($items as $item) {
var_dump($item);
//Factory::getDelayQueue()->retry($item, 5);
}

sleep(1);
}
}
}
Empty file.
2 changes: 1 addition & 1 deletion Test/Process/SubscribeProcess/Subscribe.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public function run()
$num = 1;
$timeNow = time();

Timer::tick(500, function ($timeChannel) use(& $num, $timeNow) {
goTick(500, function ($timeChannel) use(&$num, $timeNow) {
if (time() - $timeNow > 3) {
var_dump('close close');
Timer::cancel($timeChannel);
Expand Down
79 changes: 59 additions & 20 deletions monitor_reload.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

appName="$1"

portCli=9501
portDaemon=9602
portCron=9603

# 通过进程名称查找进程ID,并排除当前的进程
# monitor_reload.sh 终端进程,并排除当前的进程:grep -v $$
ps aux | grep monitor_reload.sh | grep -v grep | awk '{print $1}' | grep -v $$ | xargs kill -15
Expand All @@ -18,33 +22,68 @@ ps aux | grep inotifywait | grep -v grep | awk '{print $1}'| xargs kill -15
basepath=$(cd `dirname $0`; pwd)
cd $basepath

# 先停止
/usr/bin/php cli.php stop $appName --force=1
# 守护进程模式启动
/usr/bin/php cli.php start $appName --daemon=1
# 检测端口是否是否被占用
portUsingFun() {
local port="$1"
listen=$(netstat -an | grep "$port" | awk '{print $NF}')
if [ "$listen" = "LISTEN" ]; then
echo 1
else
echo 0
fi
}

while true; do
file_changes=$(inotifywait -r -e modify,create,delete "$basepath" --format '%w%f')
php_files=$(echo "$file_changes" | grep -E '\.php$')
if [ -n "$php_files" ]; then
echo "PHP files modified:$php_files"
sleep 10;
# 先停止
/usr/bin/php cli.php stop $appName --force=1
# 守护进程模式启动
/usr/bin/php cli.php start $appName --daemon=1
else
env_files=$(echo "$file_changes" | grep -E '\.env$')
if [ -n "$env_files" ]; then
echo "Env files modified:$env_files"
if [ -n "$php_files" ]; then
for execBinFile in cli.php daemon.php cron.php;do
echo "PHP files modified:$php_files"
if [ "$execBinFile" = "cli.php" ]; then
sleep 10;
# 先停止
/usr/bin/php cli.php stop $appName --force=1
# 守护进程模式启动
/usr/bin/php cli.php start $appName --daemon=1
else
sleep 1;
fi
# 先停止
/usr/bin/php $execBinFile stop $appName --force=1

if [ "$execBinFile" = "cli.php" ]; then
while true; do
listen=$(portUsingFun $portCli)
if [ "$listen" = 0 ]; then
echo "端口=$portCli 不被占用,可以重启"
break
fi
sleep 1
done
fi

if [ "$execBinFile" = "daemon.php" ]; then
while true; do
listen=$(portUsingFun $portDaemon)
if [ "$listen" = 0 ]; then
echo "端口=$portDaemon 不被占用,可以重启"
break
fi
sleep 1
done
fi

if [ "$execBinFile" = "cron.php" ]; then
while true; do
listen=$(portUsingFun $portCron)
if [ "$listen" = 0 ]; then
echo "端口=$portCron 不被占用,可以重启"
break
fi
sleep 1
done
fi
fi

# 守护进程模式启动
/usr/bin/php $execBinFile start $appName --daemon=1
done
fi
done


1 change: 1 addition & 0 deletions src/Cmd/BaseCmd.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ protected function configure()
$this->addOption('daemon', null,InputOption::VALUE_OPTIONAL, 'Daemon model run app', 0);
// 强制停止
$this->addOption('force', null,InputOption::VALUE_OPTIONAL, 'Force stop app', 0);

$options = $this->beforeInputOptions();
foreach ($options as $name=>$value) {
if (!$this->getDefinition()->hasOption($name)) {
Expand Down
31 changes: 24 additions & 7 deletions src/Cmd/CreateCmd.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ protected function configure()

protected function execute(InputInterface $input, OutputInterface $output)
{
$dirs = ['Config', 'Service', 'Protocol', 'Router', 'Storage'];
$dirs = ['Config', 'Service', 'Protocol', 'Router', 'Storage', 'Middleware'];
$appName = $input->getArgument('app_name');
$appPathDir = APP_PATH;
if (is_dir($appPathDir)) {
Expand All @@ -35,9 +35,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
}

if ($protocol == 'http') {
$dirs = [
'Config', 'Controller', 'Model', 'Module', 'Router', 'Validation', 'Storage', 'Protocol'
];
$dirs = ['Config', 'Controller', 'Model', 'Module', 'Router', 'Validation', 'Storage', 'Protocol', 'Middleware'];
}

$daemonFile = START_DIR_ROOT . '/daemon.php';
Expand Down Expand Up @@ -109,10 +107,12 @@ protected function execute(InputInterface $input, OutputInterface $output)
break;
}
case 'Module':
@mkdir($appPathDir . '/' . $dir.'/Demo/Controller', 0777, true);
@mkdir($appPathDir . '/' . $dir.'/Demo/Validation', 0777, true);
@mkdir($appPathDir . '/' . $dir.'/Demo/Exception', 0777, true);
{
@mkdir($appPathDir . '/' . $dir . '/Demo/Controller', 0777, true);
@mkdir($appPathDir . '/' . $dir . '/Demo/Validation', 0777, true);
@mkdir($appPathDir . '/' . $dir . '/Demo/Exception', 0777, true);
break;
}
case 'Router':
{
switch ($protocol) {
Expand All @@ -128,6 +128,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
default:
break;
}
break;
}
case 'Protocol':
{
Expand All @@ -152,7 +153,23 @@ protected function execute(InputInterface $input, OutputInterface $output)
break;
}
}
break;
}
case 'Middleware':
{
switch ($protocol) {
case "http":
@mkdir($appPathDir . '/' . $dir.'/Group', 0777, true);
@mkdir($appPathDir . '/' . $dir.'/Route', 0777, true);
break;
default:
break;
}
break;
}

default:
break;
}
}
return 0;
Expand Down
3 changes: 2 additions & 1 deletion src/Cmd/ReloadCmd.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<?php
namespace Swoolefy\Cmd;

use Swoolefy\Core\SystemEnv;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
Expand All @@ -20,7 +21,7 @@ protected function configure()

protected function execute(InputInterface $input, OutputInterface $output)
{
if (isWorkerService()) {
if (SystemEnv::isWorkerService()) {
fmtPrintError("WorkerServer, CronService, ScriptService is not support reload command");
return 0;
}
Expand Down
3 changes: 2 additions & 1 deletion src/Cmd/StopCmd.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
{
$appName = $input->getArgument('app_name');
$force = $input->getOption('force');
$lineValue = "";
if (empty($force)) {
if (!isWorkerService()) {
$lineValue = initConsoleStyleIo()->ask( "1、你确定停止应用【{$appName}】? (yes or no)");
Expand All @@ -31,7 +32,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
}
}

if (strtolower($lineValue) == 'yes') {
if (strtolower($lineValue) == 'yes' || $force) {
if (!isWorkerService()) {
$this->commonStop($appName);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/Constants.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
defined('MQTT_PROTOCOL_LEVEL3') or define('MQTT_PROTOCOL_LEVEL3', 4);
defined('MQTT_PROTOCOL_LEVEL5') or define('MQTT_PROTOCOL_LEVEL5', 5);

defined('SWOOLEFY_VERSION') or define('SWOOLEFY_VERSION', '5.0.21');
defined('SWOOLEFY_VERSION') or define('SWOOLEFY_VERSION', '5.0.22');
defined('SWOOLEFY_EOF_FLAG') or define('SWOOLEFY_EOF_FLAG', '::');

defined('WORKER_CLI_STOP') or define('WORKER_CLI_STOP', 'stop');
Expand Down
Loading

0 comments on commit a5dcf4c

Please sign in to comment.