From c976fd468aa63fc5e0a5735467eaeb62dc94f436 Mon Sep 17 00:00:00 2001 From: Maja Massarini Date: Tue, 1 Oct 2024 12:57:20 +0200 Subject: [PATCH 1/4] Add timeout to requests Requests are blocking, a blocking call can be the cause of an hanging Celery task. Hopefully it should fix point 4. of the following analysis https://gist.github.com/majamassarini/85033b59607a3f3da570cf5aaf2b8c7a --- packit_service/worker/helpers/testing_farm.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packit_service/worker/helpers/testing_farm.py b/packit_service/worker/helpers/testing_farm.py index be0fde567..a1a3c4ffc 100644 --- a/packit_service/worker/helpers/testing_farm.py +++ b/packit_service/worker/helpers/testing_farm.py @@ -13,6 +13,7 @@ from packit.config import JobConfig, PackageConfig from packit.exceptions import PackitConfigException, PackitException from packit.utils import nested_get +from packit.constants import HTTP_REQUEST_TIMEOUT from packit_service.config import ServiceConfig from packit_service.constants import ( CONTACTS_URL, @@ -1005,6 +1006,7 @@ def get_raw_request( params=params, json=data, verify=not self.insecure, + timeout=HTTP_REQUEST_TIMEOUT, ) try: From b601b340d055c8a788845529eeeffd17f0751e3c Mon Sep 17 00:00:00 2001 From: Maja Massarini Date: Tue, 1 Oct 2024 14:11:19 +0200 Subject: [PATCH 2/4] Fix reporting of initial status check time Now we report more than once a "first initial status check time". The first is correct all the other are wrong. Should fix point 7 in this analysis: https://gist.github.com/majamassarini/85033b59607a3f3da570cf5aaf2b8c7a --- packit_service/worker/jobs.py | 13 +++++++++---- tests/unit/test_jobs.py | 5 ++++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/packit_service/worker/jobs.py b/packit_service/worker/jobs.py index 2cac6564b..07f07a9c9 100644 --- a/packit_service/worker/jobs.py +++ b/packit_service/worker/jobs.py @@ -459,6 +459,7 @@ def process_jobs(self) -> List[TaskResults]: allowlist = Allowlist(service_config=self.service_config) processing_results: List[TaskResults] = [] + statuses_check_feedback: List[datetime] = [] for handler_kls in handler_classes: # TODO: merge to to get_handlers_for_event so # so we don't need to go through the similar process twice. @@ -483,12 +484,18 @@ def process_jobs(self) -> List[TaskResults]: for job_config in job_configs ] - processing_results.extend(self.create_tasks(job_configs, handler_kls)) + processing_results.extend( + self.create_tasks(job_configs, handler_kls, statuses_check_feedback) + ) + self.push_statuses_metrics(statuses_check_feedback) return processing_results def create_tasks( - self, job_configs: List[JobConfig], handler_kls: Type[JobHandler] + self, + job_configs: List[JobConfig], + handler_kls: Type[JobHandler], + statuses_check_feedback: list[datetime], ) -> List[TaskResults]: """ Create handler tasks for handler and job configs. @@ -499,7 +506,6 @@ def create_tasks( """ processing_results: List[TaskResults] = [] signatures = [] - statuses_check_feedback = [] # we want to run handlers for all possible jobs, not just the first one for job_config in job_configs: if self.should_task_be_created_for_job_config_and_handler( @@ -528,7 +534,6 @@ def create_tasks( event=self.event, ) ) - self.push_statuses_metrics(statuses_check_feedback) # https://docs.celeryq.dev/en/stable/userguide/canvas.html#groups celery.group(signatures).apply_async() return processing_results diff --git a/tests/unit/test_jobs.py b/tests/unit/test_jobs.py index c244afc1a..29fcb34b0 100644 --- a/tests/unit/test_jobs.py +++ b/tests/unit/test_jobs.py @@ -3430,7 +3430,10 @@ def actor(self): flexmock(celery).should_receive("group").with_args( tasks_created * [None] ).and_return(flexmock().should_receive("apply_async").mock()) - assert tasks_created == len(SteveJobs(event).create_tasks(jobs, handler_kls)) + statuses_check_feedback = flexmock() + assert tasks_created == len( + SteveJobs(event).create_tasks(jobs, handler_kls, statuses_check_feedback) + ) def test_monorepo_jobs_matching_event(): From de3f78ceaa4ffe4bbd8c87958b63ff7e0916d597 Mon Sep 17 00:00:00 2001 From: Maja Massarini Date: Wed, 2 Oct 2024 10:20:14 +0200 Subject: [PATCH 3/4] Report uncatched Exception before cleaning the handler --- packit_service/worker/handlers/abstract.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packit_service/worker/handlers/abstract.py b/packit_service/worker/handlers/abstract.py index ffc9b1ab2..126bb4300 100644 --- a/packit_service/worker/handlers/abstract.py +++ b/packit_service/worker/handlers/abstract.py @@ -217,6 +217,9 @@ def run_n_clean(self) -> TaskResults: for k, v in self.get_tag_info().items(): scope.set_tag(k, v) return self.run() + except Exception as ex: + logger.error(f"Failed to run the handler: {ex}") + raise finally: self.clean() From e22ed4424149bf34ecc4b53a9112eab2435080c6 Mon Sep 17 00:00:00 2001 From: Maja Massarini Date: Wed, 2 Oct 2024 10:42:06 +0200 Subject: [PATCH 4/4] Add logs for debugging celery task hanging This should allow us to debug scenario 6. of the following analysis https://gist.github.com/majamassarini/85033b59607a3f3da570cf5aaf2b8c7a --- packit_service/worker/jobs.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packit_service/worker/jobs.py b/packit_service/worker/jobs.py index 07f07a9c9..f5cbc29b2 100644 --- a/packit_service/worker/jobs.py +++ b/packit_service/worker/jobs.py @@ -526,6 +526,10 @@ def create_tasks( signatures.append( handler_kls.get_signature(event=self.event, job=job_config) ) + logger.debug( + f"Got signature for handler={handler_kls} " + f"and job_config={job_config}." + ) processing_results.append( TaskResults.create_from( success=True, @@ -534,8 +538,10 @@ def create_tasks( event=self.event, ) ) + logger.debug("Signatures are going to be sent to Celery.") # https://docs.celeryq.dev/en/stable/userguide/canvas.html#groups celery.group(signatures).apply_async() + logger.debug("Signatures were sent to Celery.") return processing_results def should_task_be_created_for_job_config_and_handler(