Skip to content

Commit

Permalink
small improves and fix currentJob
Browse files Browse the repository at this point in the history
  • Loading branch information
spiritdead committed Apr 1, 2017
1 parent 65688b5 commit 2c53897
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 41 deletions.
6 changes: 3 additions & 3 deletions Resque.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ class Resque
*/
public function __construct(ResqueBackend $backend = null, $pasive = false)
{
if ($backend == null) {
if ($backend === null) {
$this->backend = new ResqueBackend();
} else {
$this->backend = $backend;
}
$this->events = new ResqueEvent();
if(self::$redisGlobal == null){
if(self::$redisGlobal === null){
self::$redisGlobal = new ResqueRedis($this->backend);
}
$this->redis = self::$redisGlobal;
Expand All @@ -83,7 +83,7 @@ public function fork()

// Close the connection to Redis before forking.
// This is a workaround for issues phpredis has.
$this->redis = null;
//$this->redis = null;

$pid = pcntl_fork();
if ($pid === -1) {
Expand Down
12 changes: 10 additions & 2 deletions components/jobs/base/ResqueJobBase.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,18 @@ class ResqueJobBase implements ResqueJobInterfaceBase
*/
private $jobFactory;

/**
* @var bool
*/
public $running = false;

/**
* ResqueJobBase constructor.
* @param $queue
* @param $payload
* @param $running
*/
public function __construct($resqueInstance, $queue, $payload = null)
public function __construct($resqueInstance, $queue, $payload = null, $running = false)
{
$this->resqueInstance = $resqueInstance;
$this->queue = $queue;
Expand All @@ -67,6 +73,7 @@ public function __construct($resqueInstance, $queue, $payload = null)
$this->status = new ResqueJobStatus($this->resqueInstance, $this->payload['id']);
$this->jobFactory = new ResqueJobFactory();
}
$this->running = $running;
}

/**
Expand All @@ -82,7 +89,7 @@ public function __construct($resqueInstance, $queue, $payload = null)
*/
public function create($class, $args = null, $monitor = false, $id = null)
{
if ($id == null || empty($id)) {
if ($id === null || empty($id)) {
$id = ResqueHelper::generateJobId();
}

Expand Down Expand Up @@ -190,6 +197,7 @@ public function getInstance()
*/
public function perform()
{
$this->running = true;
try {
$this->resqueInstance->events->trigger('beforePerform', $this);

Expand Down
57 changes: 42 additions & 15 deletions components/workers/ResqueWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@
*/
class ResqueWorker extends ResqueWorkerBase implements ResqueWorkerInterface
{
/**
* Name instances in redis
*/
const WORKER_NAME = 'worker';

/**
* Name instance in redis
* @return string
*/
public function workerName()
{
return self::WORKER_NAME;
}

/**
* The primary loop for a worker which when called on an instance starts
* the worker's life cycle.
Expand All @@ -35,6 +49,9 @@ public function work($interval = ResqueWorkerBase::DEFAULT_INTERVAL, $blocking =
$this->startup();

while (true) {
if (function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
if ($this->shutdown) {
break;
}
Expand Down Expand Up @@ -172,10 +189,9 @@ public static function find($resqueInst, $workerId)
return false;
}

list($hostname, $pid, $queues) = explode(':', $workerId, 3);
$queues = explode(',', $queues);
$worker = new self($resqueInst, $queues);
$worker->id = $workerId;

$worker = new self($resqueInst);
$worker->restore($workerId);
return $worker;
}

Expand All @@ -186,7 +202,7 @@ public static function find($resqueInst, $workerId)
*/
public static function all($resqueInst)
{
$workersRaw = $resqueInst->redis->smembers('workers');
$workersRaw = $resqueInst->redis->smembers(self::WORKER_NAME . 's');
$workers = [];
if (is_array($workersRaw) && count($workersRaw) > 0) {
foreach ($workersRaw as $workerId) {
Expand All @@ -205,16 +221,25 @@ public static function all($resqueInst)
*/
public static function exists($resqueInst, $workerId)
{
return (bool)$resqueInst->redis->sismember('workers', $workerId);
return (bool)$resqueInst->redis->sismember(self::WORKER_NAME . 's', $workerId);
}

/**
* Time start to work
* @return int
*/
public function getStartTime()
{
return $this->resqueInstance->redis->get(self::WORKER_NAME . ':' . $this->id . ':started');
}

/**
* Register this worker in Redis.
*/
public function registerWorker()
{
$this->resqueInstance->redis->sadd('workers', (string)$this);
$this->resqueInstance->redis->set('worker:' . (string)$this . ':started', strftime('%a %b %d %H:%M:%S %Z %Y'));
$this->resqueInstance->redis->sadd(self::WORKER_NAME . 's', (string)$this);
$this->resqueInstance->redis->set(self::WORKER_NAME . ':' . (string)$this . ':started', strtotime('now UTC'));
}

/**
Expand All @@ -226,9 +251,9 @@ public function unregisterWorker()
$this->currentJob->fail(new ResqueJobForceExitException());
}

$this->resqueInstance->redis->srem('workers', $this->id);
$this->resqueInstance->redis->del('worker:' . $this->id);
$this->resqueInstance->redis->del('worker:' . $this->id . ':started');
$this->resqueInstance->redis->srem(self::WORKER_NAME . 's', $this->id);
$this->resqueInstance->redis->del(self::WORKER_NAME . ':' . $this->id);
$this->resqueInstance->redis->del(self::WORKER_NAME . ':' . $this->id . ':started');
$this->resqueInstance->stats->clear('processed:' . $this->id);
$this->resqueInstance->stats->clear('failed:' . $this->id);
}
Expand All @@ -248,7 +273,9 @@ protected function pruneDeadWorkers()
foreach ($workers as $worker) {
if (is_object($worker)) {
list($host, $pid, $queues) = explode(':', (string)$worker, 3);
if ($host != $this->resqueInstance->backend->namespaceWorkers || in_array($pid, $workerPids) || $pid == getmypid()) {
if ($host != $this->resqueInstance->backend->namespaceWorkers || in_array($pid,
$workerPids) || $pid == getmypid()
) {
continue;
}
$this->logger->log(LogLevel::INFO, 'Pruning dead worker: {worker}',
Expand All @@ -271,10 +298,10 @@ public function workingOn($job)
$job->status->update(ResqueJobStatus::STATUS_RUNNING);
$data = json_encode([
'queue' => $job->queue,
'run_at' => strftime('%a %b %d %H:%M:%S %Z %Y'),
'run_at' => strtotime('now UTC'),
'payload' => $job->payload
]);
$this->resqueInstance->redis->set('worker:' . $job->worker, $data);
$this->resqueInstance->redis->set(self::WORKER_NAME . ':' . $job->worker, $data);
}

/**
Expand All @@ -295,6 +322,6 @@ public function doneWorking()
$this->working = false;
$this->resqueInstance->stats->incr('processed');
$this->resqueInstance->stats->incr('processed:' . (string)$this);
$this->resqueInstance->redis->del('worker:' . (string)$this);
$this->resqueInstance->redis->del(self::WORKER_NAME . ':' . (string)$this);
}
}
34 changes: 32 additions & 2 deletions components/workers/base/ResqueWorkerBase.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ abstract class ResqueWorkerBase
// Abstract functions
abstract protected function work($interval = ResqueWorkerBase::DEFAULT_INTERVAL, $blocking = false);

abstract protected function workerName();

abstract protected function pruneDeadWorkers();

abstract protected function perform(ResqueJobBase $job);
Expand All @@ -26,6 +28,8 @@ abstract protected function doneWorking();

abstract protected function getWorking();

abstract public function getStartTime();

/**
* DEFAULT INTERVAL WORK
*/
Expand Down Expand Up @@ -57,10 +61,15 @@ abstract protected function getWorking();
protected $paused = false;

/**
* @var string String identifying this worker.
* @var string identifying this worker.
*/
protected $id;

/**
* @var integer process id in the server
*/
protected $pid;

/**
* @var ResqueJobBase Current job, if any, being processed by this worker.
*/
Expand Down Expand Up @@ -92,7 +101,7 @@ abstract protected function getWorking();
*
* @param string|array $queues String with a single queue name, array with multiple.
*/
public function __construct(Resque $resqueInst, $queues)
public function __construct(Resque $resqueInst, $queues = '*')
{
$this->logger = new ResqueLog();

Expand All @@ -105,6 +114,27 @@ public function __construct(Resque $resqueInst, $queues)

$this->id = $this->resqueInstance->backend->namespaceWorkers . ':' . getmypid() . ':' . implode(',',
$this->queues);
$this->pid = getmypid();
}

/**
* Method for regenerate worker from the current ID saved in the redis and the instance in the server
* @param $workerInstance
*/
public function restore($workerInstance)
{
list($hostname, $pid, $queues) = explode(':', $workerInstance, 3);
if (!is_array($queues)) {
$queues = explode(',', $queues);
}
$this->queues = $queues;
$this->pid = $pid;
$this->id = $workerInstance; //regenerate worker
$data = $this->resqueInstance->redis->get($this->workerName() . ':' . $workerInstance);
if ($data !== false) {
$data = json_decode($data, true);
$this->currentJob = new ResqueJobBase($this->resqueInstance, $data['queue'], $data['payload'], true);
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion helpers/ResqueEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
class ResqueEvent
{
/**
* @var array Array containing all registered callbacks, indexked by event name.
* @var array Array containing all registered callbacks, indexed by event name.
*/
private $events = [];

Expand Down
Loading

0 comments on commit 2c53897

Please sign in to comment.