Skip to content

Commit

Permalink
fix cache: check context switch removals
Browse files Browse the repository at this point in the history
  • Loading branch information
MehmedGIT committed Sep 25, 2023
1 parent 56dd312 commit 81bb1ac
Showing 1 changed file with 34 additions and 25 deletions.
59 changes: 34 additions & 25 deletions ocrd_network/ocrd_network/server_cache.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations
from typing import Dict, List, Union
from typing import Dict, List
from logging import DEBUG, getLogger, FileHandler

from .database import db_get_processing_job, db_update_processing_job
Expand Down Expand Up @@ -145,22 +145,26 @@ async def consume_cached_requests(self, workspace_key: str) -> List[PYJobInput]:
if not self.has_workspace_cached_requests(workspace_key=workspace_key):
self.log.debug(f"No jobs to be consumed for workspace key: {workspace_key}")
return []
found_request_indices = []
found_consume_requests = []
for i, current_element in enumerate(self.processing_requests[workspace_key]):
# Request has other job dependencies
if current_element.depends_on:
satisfied_dependencies = await self.__check_if_job_deps_met(current_element.depends_on)
if not satisfied_dependencies:
continue
found_request_indices.append(i)
found_consume_requests.append(current_element)
found_requests = []
for found_index in found_request_indices:
# Consume the request from the internal queue
found_request = (self.processing_requests[workspace_key]).pop(found_index)
# self.log.debug(f"Found cached request to be processed: {found_request}")
self.log.debug(f"Found cached request: {found_request.processor_name}, {found_request.page_id}, "
f"{found_request.job_id}, depends_on: {found_request.depends_on}")
found_requests.append(found_request)
for found_element in found_consume_requests:
try:
(self.processing_requests[workspace_key]).remove(found_element)
# self.log.debug(f"Found cached request to be processed: {found_request}")
self.log.debug(f"Found cached request: {found_element.processor_name}, {found_element.page_id}, "
f"{found_element.job_id}, depends_on: {found_element.depends_on}")
found_requests.append(found_element)
except ValueError:
# The ValueError is not an issue since the
# element was removed by another instance
continue
return found_requests

def update_request_counter(self, workspace_key: str, by_value: int) -> int:
Expand Down Expand Up @@ -191,24 +195,29 @@ async def cancel_dependent_jobs(self, workspace_key: str, processing_job_id: str
self.log.debug(f"No jobs to be cancelled for workspace key: {workspace_key}")
return []
self.log.debug(f"Cancelling jobs dependent on job id: {processing_job_id}")
found_request_indices = []
found_cancel_requests = []
for i, current_element in enumerate(self.processing_requests[workspace_key]):
if processing_job_id in current_element.depends_on:
found_request_indices.append(i)
found_cancel_requests.append(current_element)
cancelled_jobs = []
for cancel_index in found_request_indices:
found_request = (self.processing_requests[workspace_key]).pop(cancel_index)
self.log.debug(f"For job id: `{processing_job_id}`, "
f"cancelling: {found_request.job_id}")
cancelled_jobs.append(found_request)
await db_update_processing_job(job_id=processing_job_id, state=StateEnum.cancelled)
# Recursively cancel dependent jobs for the cancelled job
recursively_cancelled = await self.cancel_dependent_jobs(
workspace_key=workspace_key,
processing_job_id=found_request.job_id
)
# Add the recursively cancelled jobs to the main list of cancelled jobs
cancelled_jobs.extend(recursively_cancelled)
for cancel_element in found_cancel_requests:
try:
self.processing_requests[workspace_key].remove(cancel_element)
self.log.debug(f"For job id: `{processing_job_id}`, "
f"cancelling: {cancel_element.job_id}")
cancelled_jobs.append(cancel_element)
await db_update_processing_job(job_id=processing_job_id, state=StateEnum.cancelled)
# Recursively cancel dependent jobs for the cancelled job
recursively_cancelled = await self.cancel_dependent_jobs(
workspace_key=workspace_key,
processing_job_id=cancel_element.job_id
)
# Add the recursively cancelled jobs to the main list of cancelled jobs
cancelled_jobs.extend(recursively_cancelled)
except ValueError:
# The ValueError is not an issue since the
# element was removed by another instance
continue
return cancelled_jobs

async def is_caching_required(self, job_dependencies: List[str]) -> bool:
Expand Down

0 comments on commit 81bb1ac

Please sign in to comment.