diff --git a/Resque.php b/Resque.php index 3881350..201cc43 100644 --- a/Resque.php +++ b/Resque.php @@ -53,13 +53,13 @@ class Resque */ public function __construct(ResqueBackend $backend = null, $pasive = false) { - if ($backend == null) { + if ($backend === null) { $this->backend = new ResqueBackend(); } else { $this->backend = $backend; } $this->events = new ResqueEvent(); - if(self::$redisGlobal == null){ + if(self::$redisGlobal === null){ self::$redisGlobal = new ResqueRedis($this->backend); } $this->redis = self::$redisGlobal; @@ -83,7 +83,7 @@ public function fork() // Close the connection to Redis before forking. // This is a workaround for issues phpredis has. - $this->redis = null; + //$this->redis = null; $pid = pcntl_fork(); if ($pid === -1) { diff --git a/components/jobs/base/ResqueJobBase.php b/components/jobs/base/ResqueJobBase.php index 31b6c75..c0f7d37 100644 --- a/components/jobs/base/ResqueJobBase.php +++ b/components/jobs/base/ResqueJobBase.php @@ -53,12 +53,18 @@ class ResqueJobBase implements ResqueJobInterfaceBase */ private $jobFactory; + /** + * @var bool + */ + public $running = false; + /** * ResqueJobBase constructor. * @param $queue * @param $payload + * @param $running */ - public function __construct($resqueInstance, $queue, $payload = null) + public function __construct($resqueInstance, $queue, $payload = null, $running = false) { $this->resqueInstance = $resqueInstance; $this->queue = $queue; @@ -67,6 +73,7 @@ public function __construct($resqueInstance, $queue, $payload = null) $this->status = new ResqueJobStatus($this->resqueInstance, $this->payload['id']); $this->jobFactory = new ResqueJobFactory(); } + $this->running = $running; } /** @@ -82,7 +89,7 @@ public function __construct($resqueInstance, $queue, $payload = null) */ public function create($class, $args = null, $monitor = false, $id = null) { - if ($id == null || empty($id)) { + if ($id === null || empty($id)) { $id = ResqueHelper::generateJobId(); } @@ -190,6 +197,7 @@ public function getInstance() */ public function perform() { + $this->running = true; try { $this->resqueInstance->events->trigger('beforePerform', $this); diff --git a/components/workers/ResqueWorker.php b/components/workers/ResqueWorker.php index 984adce..e5fa6d0 100644 --- a/components/workers/ResqueWorker.php +++ b/components/workers/ResqueWorker.php @@ -17,6 +17,20 @@ */ class ResqueWorker extends ResqueWorkerBase implements ResqueWorkerInterface { + /** + * Name instances in redis + */ + const WORKER_NAME = 'worker'; + + /** + * Name instance in redis + * @return string + */ + public function workerName() + { + return self::WORKER_NAME; + } + /** * The primary loop for a worker which when called on an instance starts * the worker's life cycle. @@ -35,6 +49,9 @@ public function work($interval = ResqueWorkerBase::DEFAULT_INTERVAL, $blocking = $this->startup(); while (true) { + if (function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } if ($this->shutdown) { break; } @@ -172,10 +189,9 @@ public static function find($resqueInst, $workerId) return false; } - list($hostname, $pid, $queues) = explode(':', $workerId, 3); - $queues = explode(',', $queues); - $worker = new self($resqueInst, $queues); - $worker->id = $workerId; + + $worker = new self($resqueInst); + $worker->restore($workerId); return $worker; } @@ -186,7 +202,7 @@ public static function find($resqueInst, $workerId) */ public static function all($resqueInst) { - $workersRaw = $resqueInst->redis->smembers('workers'); + $workersRaw = $resqueInst->redis->smembers(self::WORKER_NAME . 's'); $workers = []; if (is_array($workersRaw) && count($workersRaw) > 0) { foreach ($workersRaw as $workerId) { @@ -205,7 +221,16 @@ public static function all($resqueInst) */ public static function exists($resqueInst, $workerId) { - return (bool)$resqueInst->redis->sismember('workers', $workerId); + return (bool)$resqueInst->redis->sismember(self::WORKER_NAME . 's', $workerId); + } + + /** + * Time start to work + * @return int + */ + public function getStartTime() + { + return $this->resqueInstance->redis->get(self::WORKER_NAME . ':' . $this->id . ':started'); } /** @@ -213,8 +238,8 @@ public static function exists($resqueInst, $workerId) */ public function registerWorker() { - $this->resqueInstance->redis->sadd('workers', (string)$this); - $this->resqueInstance->redis->set('worker:' . (string)$this . ':started', strftime('%a %b %d %H:%M:%S %Z %Y')); + $this->resqueInstance->redis->sadd(self::WORKER_NAME . 's', (string)$this); + $this->resqueInstance->redis->set(self::WORKER_NAME . ':' . (string)$this . ':started', strtotime('now UTC')); } /** @@ -226,9 +251,9 @@ public function unregisterWorker() $this->currentJob->fail(new ResqueJobForceExitException()); } - $this->resqueInstance->redis->srem('workers', $this->id); - $this->resqueInstance->redis->del('worker:' . $this->id); - $this->resqueInstance->redis->del('worker:' . $this->id . ':started'); + $this->resqueInstance->redis->srem(self::WORKER_NAME . 's', $this->id); + $this->resqueInstance->redis->del(self::WORKER_NAME . ':' . $this->id); + $this->resqueInstance->redis->del(self::WORKER_NAME . ':' . $this->id . ':started'); $this->resqueInstance->stats->clear('processed:' . $this->id); $this->resqueInstance->stats->clear('failed:' . $this->id); } @@ -248,7 +273,9 @@ protected function pruneDeadWorkers() foreach ($workers as $worker) { if (is_object($worker)) { list($host, $pid, $queues) = explode(':', (string)$worker, 3); - if ($host != $this->resqueInstance->backend->namespaceWorkers || in_array($pid, $workerPids) || $pid == getmypid()) { + if ($host != $this->resqueInstance->backend->namespaceWorkers || in_array($pid, + $workerPids) || $pid == getmypid() + ) { continue; } $this->logger->log(LogLevel::INFO, 'Pruning dead worker: {worker}', @@ -271,10 +298,10 @@ public function workingOn($job) $job->status->update(ResqueJobStatus::STATUS_RUNNING); $data = json_encode([ 'queue' => $job->queue, - 'run_at' => strftime('%a %b %d %H:%M:%S %Z %Y'), + 'run_at' => strtotime('now UTC'), 'payload' => $job->payload ]); - $this->resqueInstance->redis->set('worker:' . $job->worker, $data); + $this->resqueInstance->redis->set(self::WORKER_NAME . ':' . $job->worker, $data); } /** @@ -295,6 +322,6 @@ public function doneWorking() $this->working = false; $this->resqueInstance->stats->incr('processed'); $this->resqueInstance->stats->incr('processed:' . (string)$this); - $this->resqueInstance->redis->del('worker:' . (string)$this); + $this->resqueInstance->redis->del(self::WORKER_NAME . ':' . (string)$this); } } \ No newline at end of file diff --git a/components/workers/base/ResqueWorkerBase.php b/components/workers/base/ResqueWorkerBase.php index c283de3..bc5ba4b 100644 --- a/components/workers/base/ResqueWorkerBase.php +++ b/components/workers/base/ResqueWorkerBase.php @@ -12,6 +12,8 @@ abstract class ResqueWorkerBase // Abstract functions abstract protected function work($interval = ResqueWorkerBase::DEFAULT_INTERVAL, $blocking = false); + abstract protected function workerName(); + abstract protected function pruneDeadWorkers(); abstract protected function perform(ResqueJobBase $job); @@ -26,6 +28,8 @@ abstract protected function doneWorking(); abstract protected function getWorking(); + abstract public function getStartTime(); + /** * DEFAULT INTERVAL WORK */ @@ -57,10 +61,15 @@ abstract protected function getWorking(); protected $paused = false; /** - * @var string String identifying this worker. + * @var string identifying this worker. */ protected $id; + /** + * @var integer process id in the server + */ + protected $pid; + /** * @var ResqueJobBase Current job, if any, being processed by this worker. */ @@ -92,7 +101,7 @@ abstract protected function getWorking(); * * @param string|array $queues String with a single queue name, array with multiple. */ - public function __construct(Resque $resqueInst, $queues) + public function __construct(Resque $resqueInst, $queues = '*') { $this->logger = new ResqueLog(); @@ -105,6 +114,27 @@ public function __construct(Resque $resqueInst, $queues) $this->id = $this->resqueInstance->backend->namespaceWorkers . ':' . getmypid() . ':' . implode(',', $this->queues); + $this->pid = getmypid(); + } + + /** + * Method for regenerate worker from the current ID saved in the redis and the instance in the server + * @param $workerInstance + */ + public function restore($workerInstance) + { + list($hostname, $pid, $queues) = explode(':', $workerInstance, 3); + if (!is_array($queues)) { + $queues = explode(',', $queues); + } + $this->queues = $queues; + $this->pid = $pid; + $this->id = $workerInstance; //regenerate worker + $data = $this->resqueInstance->redis->get($this->workerName() . ':' . $workerInstance); + if ($data !== false) { + $data = json_decode($data, true); + $this->currentJob = new ResqueJobBase($this->resqueInstance, $data['queue'], $data['payload'], true); + } } /** diff --git a/helpers/ResqueEvent.php b/helpers/ResqueEvent.php index f57091d..f9fa875 100644 --- a/helpers/ResqueEvent.php +++ b/helpers/ResqueEvent.php @@ -9,7 +9,7 @@ class ResqueEvent { /** - * @var array Array containing all registered callbacks, indexked by event name. + * @var array Array containing all registered callbacks, indexed by event name. */ private $events = []; diff --git a/plugins/schedule/workers/ResqueWorkerScheduler.php b/plugins/schedule/workers/ResqueWorkerScheduler.php index e03f31d..c6ac6a7 100644 --- a/plugins/schedule/workers/ResqueWorkerScheduler.php +++ b/plugins/schedule/workers/ResqueWorkerScheduler.php @@ -19,6 +19,20 @@ class ResqueWorkerScheduler extends ResqueWorkerBase implements ResqueWorkerInte */ protected $resqueInstance; + /** + * Name instances in redis + */ + const WORKER_NAME = 'worker-scheduler'; + + /** + * Name instance in redis + * @return string + */ + public function workerName() + { + return self::WORKER_NAME; + } + /** * Instantiate a new worker, given a list of queues that it should be working * on. The list of queues should be supplied in the priority that they should @@ -42,7 +56,7 @@ public function __construct(ResqueScheduler $resqueInst, $queues = 'delayed_queu */ public static function all($resqueInst) { - $workersRaw = $resqueInst->redis->smembers('worker-schedulers'); + $workersRaw = $resqueInst->redis->smembers(self::WORKER_NAME . 's'); $workers = []; if (is_array($workersRaw) && count($workersRaw) > 0) { foreach ($workersRaw as $workerId) { @@ -64,11 +78,8 @@ public static function find($resqueInst, $workerId) if (!self::exists($resqueInst, $workerId) || false === strpos($workerId, ":")) { return false; } - - list($hostname, $pid, $queues) = explode(':', $workerId, 3); - $queues = explode(',', $queues); - $worker = new self($resqueInst, $queues); - $worker->id = $workerId; + $worker = new self($resqueInst); + $worker->restore($workerId); return $worker; } @@ -81,7 +92,7 @@ public static function find($resqueInst, $workerId) */ public static function exists($resqueInst, $workerId) { - return (bool)$resqueInst->redis->sismember('worker-schedulers', $workerId); + return (bool)$resqueInst->redis->sismember(self::WORKER_NAME . 's', $workerId); } /** @@ -110,6 +121,9 @@ public function work($interval = self::DEFAULT_INTERVAL, $blockinge = false) $this->startup(); while (true) { + if (function_exists('pcntl_signal_dispatch')) { + pcntl_signal_dispatch(); + } if ($this->shutdown) { break; } @@ -177,14 +191,22 @@ public function startup() parent::startup(); } + /** + * Time start to work + * @return int + */ + public function getStartTime() + { + return $this->resqueInstance->redis->get(self::WORKER_NAME . ':' . $this->id . ':started'); + } + /** * Register this worker in Redis. */ public function registerWorker() { - $this->resqueInstance->redis->sadd('worker-schedulers', (string)$this); - $this->resqueInstance->redis->set('worker-scheduler:' . (string)$this . ':started', - strftime('%a %b %d %H:%M:%S %Z %Y')); + $this->resqueInstance->redis->sadd(self::WORKER_NAME . 's', (string)$this); + $this->resqueInstance->redis->set(self::WORKER_NAME . ':' . (string)$this . ':started', strtotime('now UTC')); } /** @@ -192,9 +214,9 @@ public function registerWorker() */ public function unregisterWorker() { - $this->resqueInstance->redis->srem('worker-schedulers', $this->id); - $this->resqueInstance->redis->del('worker-scheduler:' . $this->id); - $this->resqueInstance->redis->del('worker-scheduler:' . $this->id . ':started'); + $this->resqueInstance->redis->srem(self::WORKER_NAME . 's', $this->id); + $this->resqueInstance->redis->del(self::WORKER_NAME . ':' . $this->id); + $this->resqueInstance->redis->del(self::WORKER_NAME . ':' . $this->id . ':started'); $this->resqueInstance->stats->clear('processed:' . $this->id); $this->resqueInstance->stats->clear('failed:' . $this->id); } @@ -214,7 +236,9 @@ protected function pruneDeadWorkers() foreach ($workers as $worker) { if (is_object($worker)) { list($host, $pid, $queues) = explode(':', (string)$worker, 3); - if ($host != $this->resqueInstance->backend->namespaceWorkers || in_array($pid, $workerPids) || $pid == getmypid()) { + if ($host != $this->resqueInstance->backend->namespaceWorkers || in_array($pid, + $workerPids) || $pid == getmypid() + ) { continue; } $this->logger->log(LogLevel::INFO, 'Pruning dead worker: {worker}', @@ -234,10 +258,10 @@ public function workingOn($item) $this->working = true; $data = json_encode([ 'queue' => 'schedule', - 'run_at' => strftime('%a %b %d %H:%M:%S %Z %Y'), + 'run_at' => strtotime('now UTC'), 'payload' => $item ]); - $this->resqueInstance->redis->set('worker-scheduler:' . $this, $data); + $this->resqueInstance->redis->set(self::WORKER_NAME . ':' . $this, $data); } /** @@ -249,7 +273,7 @@ public function doneWorking() $this->currentJob = null; $this->working = false; $this->resqueInstance->stats->incr('processed:' . (string)$this); - $this->resqueInstance->redis->del('worker-scheduler:' . (string)$this); + $this->resqueInstance->redis->del(self::WORKER_NAME . ':' . (string)$this); } /** @@ -267,7 +291,7 @@ public function getWorking() */ public function job() { - $job = $this->resqueInstance->redis->get('worker-scheduler:' . $this); + $job = $this->resqueInstance->redis->get(self::WORKER_NAME . ':' . $this); if (!$job) { return []; } else {