Skip to content

Commit

Permalink
Merge pull request #194 from Icinga/separate-jobs-and-schedule-config
Browse files Browse the repository at this point in the history
Separate jobs and schedule config
  • Loading branch information
yhabteab authored Sep 14, 2023
2 parents f8cb537 + c45935d commit 47afc99
Show file tree
Hide file tree
Showing 39 changed files with 2,031 additions and 988 deletions.
194 changes: 136 additions & 58 deletions application/clicommands/JobsCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,63 @@

use DateTime;
use Exception;
use Icinga\Application\Config;
use Icinga\Application\Logger;
use Icinga\Module\X509\CertificateUtils;
use Icinga\Module\X509\Command;
use Icinga\Module\X509\Common\JobUtils;
use Icinga\Module\X509\Hook\SniHook;
use Icinga\Module\X509\Job;
use Icinga\Module\X509\Model\X509Job;
use Icinga\Module\X509\Model\X509Schedule;
use Icinga\Module\X509\Schedule;
use InvalidArgumentException;
use ipl\Orm\Query;
use ipl\Scheduler\Contract\Frequency;
use ipl\Scheduler\Contract\Task;
use ipl\Scheduler\Cron;
use ipl\Scheduler\Scheduler;
use ipl\Stdlib\Filter;
use Ramsey\Uuid\Uuid;
use React\EventLoop\Loop;
use React\Promise\ExtendedPromiseInterface;
use stdClass;
use Throwable;

class JobsCommand extends Command
{
use JobUtils;

/**
* Run all configured jobs based on their schedule
*
* USAGE:
*
* icingacli x509 jobs run
* icingacli x509 jobs run [OPTIONS]
*
* OPTIONS
*
* --job=<name>
* Run all configured schedules only of the specified job.
*
* --schedule=<name>
* Run only the given schedule of the specified job. Providing a schedule name
* without a job will fail immediately.
*
* --parallel=<number>
* Allow parallel scanning of targets up to the specified number. Defaults to 256.
* May cause **too many open files** error if set to a number higher than the configured one (ulimit).
*/
public function runAction()
public function runAction(): void
{
$parallel = (int) $this->Config()->get('scan', 'parallel', 256);
$parallel = (int) $this->params->get('parallel', Job::DEFAULT_PARALLEL);
if ($parallel <= 0) {
$this->fail("The 'parallel' option must be set to at least 1");
}

$jobName = (string) $this->params->get('job');
$scheduleName = (string) $this->params->get('schedule');
if (! $jobName && $scheduleName) {
throw new InvalidArgumentException('You cannot provide a schedule without a job');
}

$scheduler = new Scheduler();
$this->attachJobsLogging($scheduler);

Expand All @@ -53,13 +80,14 @@ public function runAction()
$scheduled = [];
// Periodically check configuration changes to ensure that new jobs are scheduled, jobs are updated,
// and deleted jobs are canceled.
$watchdog = function () use (&$watchdog, $scheduler, &$scheduled, $parallel) {
$jobs = $this->fetchJobs();
$watchdog = function () use (&$watchdog, &$scheduled, $scheduler, $parallel, $jobName, $scheduleName) {
$jobs = $this->fetchSchedules($jobName, $scheduleName);
$outdatedJobs = array_diff_key($scheduled, $jobs);
foreach ($outdatedJobs as $job) {
Logger::info(
'Removing scheduled job %s, as it either no longer exists in the configuration or its config has'
. ' been changed',
'Removing schedule %s of job %s, as it either no longer exists in the configuration or its'
. ' config has been changed',
$job->getSchedule()->getName(),
$job->getName()
);

Expand All @@ -70,30 +98,21 @@ public function runAction()
foreach ($newJobs as $job) {
$job->setParallel($parallel);

$config = $job->getConfig();
if (! isset($config->frequencyType)) {
if (! Cron::isValid($config->schedule)) {
Logger::error('Job %s has invalid schedule expression %s', $job->getName(), $config->schedule);

continue;
}
/** @var stdClass $config */
$config = $job->getSchedule()->getConfig();
try {
/** @var Frequency $type */
$type = $config->type;
$frequency = $type::fromJson($config->frequency);
} catch (Throwable $err) {
Logger::error(
'Cannot create schedule %s of job %s: %s',
$job->getSchedule()->getName(),
$job->getName(),
$err->getMessage()
);

$frequency = new Cron($config->schedule);
} else {
try {
/** @var Frequency $type */
$type = $config->frequencyType;
$frequency = $type::fromJson($config->schedule);
} catch (Exception $err) {
Logger::error(
'Job %s has invalid schedule expression %s: %s',
$job->getName(),
$config->schedule,
$err->getMessage()
);

continue;
}
continue;
}

$scheduler->schedule($job, $frequency);
Expand All @@ -108,28 +127,53 @@ public function runAction()
}

/**
* Fetch jobs from disk
* Fetch job schedules from database
*
* @param ?string $jobName
* @param ?string $scheduleName
*
* @return Job[]
*/
protected function fetchJobs(): array
protected function fetchSchedules(?string $jobName, ?string $scheduleName): array
{
$configs = Config::module($this->getModuleName(), 'jobs', true);
$defaultSchedule = $configs->get('jobs', 'default_schedule');
$jobs = X509Job::on($this->getDb());
if ($jobName) {
$jobs->filter(Filter::equal('name', $jobName));
}

$jobSchedules = [];
$snimap = SniHook::getAll();
/** @var X509Job $jobConfig */
foreach ($jobs as $jobConfig) {
$cidrs = $this->parseCIDRs($jobConfig->cidrs);
$ports = $this->parsePorts($jobConfig->ports);
$job = (new Job($jobConfig->name, $cidrs, $ports, $snimap))
->setId($jobConfig->id)
->setExcludes($this->parseExcludes($jobConfig->exclude_targets));

/** @var Query $schedules */
$schedules = $jobConfig->schedule;
if ($scheduleName) {
$schedules->filter(Filter::equal('name', $scheduleName));
}

$jobs = [];
foreach ($configs as $name => $config) {
if (! $config->get('schedule', $defaultSchedule)) {
Logger::debug('Job %s cannot be scheduled', $name);
$jobSchedules = [];
/** @var X509Schedule $scheduleModel */
foreach ($schedules as $scheduleModel) {
$schedule = Schedule::fromModel($scheduleModel);
$job = (clone $job)
->setSchedule($schedule)
->setUuid(Uuid::fromBytes($job->getChecksum()));

continue;
$jobSchedules[$job->getUuid()->toString()] = $job;
}

$job = new Job($name, $config, SniHook::getAll());
$jobs[$job->getUuid()->toString()] = $job;
if (! isset($jobSchedules[$job->getUuid()->toString()])) {
Logger::info('Skipping job %s because no schedules are configured', $job->getName());
}
}

return $jobs;
return $jobSchedules;
}

/**
Expand All @@ -139,15 +183,34 @@ protected function fetchJobs(): array
*/
protected function attachJobsLogging(Scheduler $scheduler): void
{
$scheduler->on(Scheduler::ON_TASK_CANCEL, function (Task $job, array $_) {
Logger::info('Job %s canceled', $job->getName());
$scheduler->on(Scheduler::ON_TASK_CANCEL, function (Job $task, array $_) {
Logger::info('Schedule %s of job %s canceled', $task->getSchedule()->getName(), $task->getName());
});

$scheduler->on(Scheduler::ON_TASK_DONE, function (Task $job, $targets = 0) {
$scheduler->on(Scheduler::ON_TASK_DONE, function (Job $task, $targets = 0) {
if ($targets === 0) {
Logger::warning('The job %s does not have any targets', $job->getName());
$sinceLastScan = $task->getSinceLastScan();
if ($sinceLastScan) {
Logger::info(
'Schedule %s of job %s does not have any targets to be rescanned matching since last scan: %s',
$task->getSchedule()->getName(),
$task->getName(),
$sinceLastScan->format('Y-m-d H:i:s')
);
} else {
Logger::warning(
'Schedule %s of job %s does not have any targets',
$task->getSchedule()->getName(),
$task->getName()
);
}
} else {
Logger::info('Scanned %d target(s) from job %s', $targets, $job->getName());
Logger::info(
'Scanned %d target(s) by schedule %s of job %s',
$targets,
$task->getSchedule()->getName(),
$task->getName()
);

try {
$verified = CertificateUtils::verifyCertificates($this->getDb());
Expand All @@ -160,21 +223,36 @@ protected function attachJobsLogging(Scheduler $scheduler): void
}
});

$scheduler->on(Scheduler::ON_TASK_FAILED, function (Task $job, Throwable $e) {
Logger::error('Failed to run job %s: %s', $job->getName(), $e->getMessage());
$scheduler->on(Scheduler::ON_TASK_FAILED, function (Job $task, Throwable $e) {
Logger::error(
'Failed to run schedule %s of job %s: %s',
$task->getSchedule()->getName(),
$task->getName(),
$e->getMessage()
);
Logger::debug($e->getTraceAsString());
});

$scheduler->on(Scheduler::ON_TASK_RUN, function (Task $job, ExtendedPromiseInterface $_) {
Logger::info('Running job %s', $job->getName());
$scheduler->on(Scheduler::ON_TASK_RUN, function (Job $task, ExtendedPromiseInterface $_) {
Logger::info('Running schedule %s of job %s', $task->getSchedule()->getName(), $task->getName());
});

$scheduler->on(Scheduler::ON_TASK_SCHEDULED, function (Task $job, DateTime $dateTime) {
Logger::info('Scheduling job %s to run at %s', $job->getName(), $dateTime->format('Y-m-d H:i:s'));
$scheduler->on(Scheduler::ON_TASK_SCHEDULED, function (Job $task, DateTime $dateTime) {
Logger::info(
'Scheduling %s of job %s to run at %s',
$task->getSchedule()->getName(),
$task->getName(),
$dateTime->format('Y-m-d H:i:s')
);
});

$scheduler->on(Scheduler::ON_TASK_EXPIRED, function (Task $task, DateTime $dateTime) {
Logger::info(sprintf('Detaching expired job %s at %s', $task->getName(), $dateTime->format('Y-m-d H:i:s')));
$scheduler->on(Scheduler::ON_TASK_EXPIRED, function (Job $task, DateTime $dateTime) {
Logger::info(
'Detaching expired schedule %s of job %s at %s',
$task->getSchedule()->getName(),
$task->getName(),
$dateTime->format('Y-m-d H:i:s')
);
});
}
}
59 changes: 30 additions & 29 deletions application/clicommands/ScanCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@

namespace Icinga\Module\X509\Clicommands;

use DateTime;
use Exception;
use Icinga\Application\Logger;
use Icinga\Module\X509\CertificateUtils;
use Icinga\Module\X509\Command;
use Icinga\Module\X509\Common\JobUtils;
use Icinga\Module\X509\Hook\SniHook;
use Icinga\Module\X509\Job;
use InvalidArgumentException;
use Icinga\Module\X509\Model\X509Job;
use ipl\Stdlib\Filter;
use React\EventLoop\Loop;
use Throwable;

class ScanCommand extends Command
{
use JobUtils;

/**
* Scan targets to find their X.509 certificates and track changes to them.
*
Expand Down Expand Up @@ -45,6 +48,10 @@ class ScanCommand extends Command
* which can also be an English textual datetime description like "2 days".
* Defaults to "-24 hours".
*
* --parallel=<number>
* Allow parallel scanning of targets up to the specified number. Defaults to 256.
* May cause **too many open files** error if set to a number higher than the configured one (ulimit).
*
* --rescan
* Rescan only targets that have been scanned before.
*
Expand Down Expand Up @@ -74,51 +81,45 @@ class ScanCommand extends Command
*
* icingacli x509 scan --job <name> --full
*/
public function indexAction()
public function indexAction(): void
{
/** @var string $name */
$name = $this->params->shiftRequired('job');
$fullScan = (bool) $this->params->get('full', false);
$rescan = (bool) $this->params->get('rescan', false);

$parallel = (int) $this->Config()->get('scan', 'parallel', 256);
/** @var string $sinceLastScan */
$sinceLastScan = $this->params->get('since-last-scan', Job::DEFAULT_SINCE_LAST_SCAN);
if ($sinceLastScan === 'null') {
$sinceLastScan = null;
}

/** @var int $parallel */
$parallel = $this->params->get('parallel', Job::DEFAULT_PARALLEL);
if ($parallel <= 0) {
throw new Exception('The \'parallel\' option must be set to at least 1');
}

$jobs = $this->Config('jobs');
if (! $jobs->hasSection($name)) {
/** @var X509Job $jobConfig */
$jobConfig = X509Job::on($this->getDb())
->filter(Filter::equal('name', $name))
->first();
if ($jobConfig === null) {
throw new Exception(sprintf('Job %s not found', $name));
}

$jobDescription = $this->Config('jobs')->getSection($name);
if (! strlen($jobDescription->get('cidrs'))) {
if (! strlen($jobConfig->cidrs)) {
throw new Exception(sprintf('The job %s does not specify any CIDRs', $name));
}

$sinceLastScan = $this->params->get('since-last-scan', '-24 hours');
if ($sinceLastScan === 'null') {
$sinceLastScan = null;
} else {
if ($sinceLastScan[0] !== '-') {
// When the user specified "2 days" as a threshold strtotime() will compute the
// timestamp NOW() + 2 days, but it has to be NOW() + (-2 days)
$sinceLastScan = "-$sinceLastScan";
}

try {
$sinceLastScan = new DateTime($sinceLastScan);
} catch (Exception $_) {
throw new InvalidArgumentException(sprintf(
'The specified last scan time is in an unknown format: %s',
$this->params->get('since-last-scan')
));
}
}

$job = (new Job($name, $jobDescription, SniHook::getAll()))
$cidrs = $this->parseCIDRs($jobConfig->cidrs);
$ports = $this->parsePorts($jobConfig->ports);
$job = (new Job($name, $cidrs, $ports, SniHook::getAll()))
->setId($jobConfig->id)
->setFullScan($fullScan)
->setRescan($rescan)
->setParallel($parallel)
->setExcludes($this->parseExcludes($jobConfig->exclude_targets))
->setLastScan($sinceLastScan);

$promise = $job->run();
Expand Down
Loading

0 comments on commit 47afc99

Please sign in to comment.