Skip to content

Commit

Permalink
Fixes related with packit service slowliness (#2555)
Browse files Browse the repository at this point in the history
Fixes related with packit service slowliness

Should partially fix #2512

Reviewed-by: Matej Focko
Reviewed-by: Laura Barcziová
  • Loading branch information
2 parents d0ad4fc + e22ed44 commit be730d0
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 5 deletions.
3 changes: 3 additions & 0 deletions packit_service/worker/handlers/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ def run_n_clean(self) -> TaskResults:
for k, v in self.get_tag_info().items():
scope.set_tag(k, v)
return self.run()
except Exception as ex:
logger.error(f"Failed to run the handler: {ex}")
raise
finally:
self.clean()

Expand Down
2 changes: 2 additions & 0 deletions packit_service/worker/helpers/testing_farm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from packit.config import JobConfig, PackageConfig
from packit.exceptions import PackitConfigException, PackitException
from packit.utils import nested_get
from packit.constants import HTTP_REQUEST_TIMEOUT
from packit_service.config import ServiceConfig
from packit_service.constants import (
CONTACTS_URL,
Expand Down Expand Up @@ -1005,6 +1006,7 @@ def get_raw_request(
params=params,
json=data,
verify=not self.insecure,
timeout=HTTP_REQUEST_TIMEOUT,
)

try:
Expand Down
19 changes: 15 additions & 4 deletions packit_service/worker/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ def process_jobs(self) -> List[TaskResults]:
allowlist = Allowlist(service_config=self.service_config)
processing_results: List[TaskResults] = []

statuses_check_feedback: List[datetime] = []
for handler_kls in handler_classes:
# TODO: merge to to get_handlers_for_event so
# so we don't need to go through the similar process twice.
Expand All @@ -483,12 +484,18 @@ def process_jobs(self) -> List[TaskResults]:
for job_config in job_configs
]

processing_results.extend(self.create_tasks(job_configs, handler_kls))
processing_results.extend(
self.create_tasks(job_configs, handler_kls, statuses_check_feedback)
)
self.push_statuses_metrics(statuses_check_feedback)

return processing_results

def create_tasks(
self, job_configs: List[JobConfig], handler_kls: Type[JobHandler]
self,
job_configs: List[JobConfig],
handler_kls: Type[JobHandler],
statuses_check_feedback: list[datetime],
) -> List[TaskResults]:
"""
Create handler tasks for handler and job configs.
Expand All @@ -499,7 +506,6 @@ def create_tasks(
"""
processing_results: List[TaskResults] = []
signatures = []
statuses_check_feedback = []
# we want to run handlers for all possible jobs, not just the first one
for job_config in job_configs:
if self.should_task_be_created_for_job_config_and_handler(
Expand All @@ -520,6 +526,10 @@ def create_tasks(
signatures.append(
handler_kls.get_signature(event=self.event, job=job_config)
)
logger.debug(
f"Got signature for handler={handler_kls} "
f"and job_config={job_config}."
)
processing_results.append(
TaskResults.create_from(
success=True,
Expand All @@ -528,9 +538,10 @@ def create_tasks(
event=self.event,
)
)
self.push_statuses_metrics(statuses_check_feedback)
logger.debug("Signatures are going to be sent to Celery.")
# https://docs.celeryq.dev/en/stable/userguide/canvas.html#groups
celery.group(signatures).apply_async()
logger.debug("Signatures were sent to Celery.")
return processing_results

def should_task_be_created_for_job_config_and_handler(
Expand Down
5 changes: 4 additions & 1 deletion tests/unit/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3430,7 +3430,10 @@ def actor(self):
flexmock(celery).should_receive("group").with_args(
tasks_created * [None]
).and_return(flexmock().should_receive("apply_async").mock())
assert tasks_created == len(SteveJobs(event).create_tasks(jobs, handler_kls))
statuses_check_feedback = flexmock()
assert tasks_created == len(
SteveJobs(event).create_tasks(jobs, handler_kls, statuses_check_feedback)
)


def test_monorepo_jobs_matching_event():
Expand Down

0 comments on commit be730d0

Please sign in to comment.