diff --git a/ocrd/ocrd/decorators/__init__.py b/ocrd/ocrd/decorators/__init__.py index 9c48e5c146..ecfef5dbb0 100644 --- a/ocrd/ocrd/decorators/__init__.py +++ b/ocrd/ocrd/decorators/__init__.py @@ -1,15 +1,16 @@ import sys from ocrd_utils import ( + config, + initLogging, is_local_filename, get_local_filename, + getLogger, + parse_json_string_with_comments, set_json_key_value_overrides, ) - -from ocrd_utils import getLogger, initLogging, parse_json_string_with_comments, config from ocrd_validators import WorkspaceValidator - -from ocrd_network import ProcessingWorker, ProcessorServer +from ocrd_network import ProcessingWorker, ProcessorServer, NETWORK_AGENT_SERVER, NETWORK_AGENT_WORKER from ..resolver import Resolver from ..processor.base import run_processor @@ -19,7 +20,8 @@ from .ocrd_cli_options import ocrd_cli_options from .mets_find_options import mets_find_options -SUBCOMMANDS = ['worker', 'server'] +SUBCOMMANDS = [NETWORK_AGENT_WORKER, NETWORK_AGENT_SERVER] + def ocrd_cli_wrap_processor( processorClass, @@ -63,8 +65,7 @@ def ocrd_cli_wrap_processor( # Used for checking/starting network agents for the WebAPI architecture check_and_run_network_agent(processorClass, subcommand, address, database, queue) elif address or queue or database: - raise ValueError(f"Subcommand options --adress --queue and --database are only valid for subcommands 'worker' or 'server'") - + raise ValueError(f"Subcommand options --address --queue and --database are only valid for subcommands: {SUBCOMMANDS}") initLogging() @@ -141,19 +142,19 @@ def check_and_run_network_agent(ProcessorClass, subcommand: str, address: str, d if not database: raise ValueError(f"Option '--database' is invalid for subcommand {subcommand}") - if subcommand == 'server': + if subcommand == NETWORK_AGENT_SERVER: if not address: raise ValueError(f"Option '--address' required for subcommand {subcommand}") if queue: raise ValueError(f"Option '--queue' invalid for subcommand {subcommand}") - if subcommand == 'worker': + if subcommand == NETWORK_AGENT_WORKER: if address: raise ValueError(f"Option '--address' invalid for subcommand {subcommand}") if not queue: raise ValueError(f"Option '--queue' required for subcommand {subcommand}") processor = ProcessorClass(workspace=None) - if subcommand == 'worker': + if subcommand == NETWORK_AGENT_WORKER: processing_worker = ProcessingWorker( rabbitmq_addr=queue, mongodb_addr=database, @@ -165,7 +166,7 @@ def check_and_run_network_agent(ProcessorClass, subcommand: str, address: str, d processing_worker.connect_consumer() # Start consuming from the queue with name `processor_name` processing_worker.start_consuming() - elif subcommand == 'server': + elif subcommand == NETWORK_AGENT_SERVER: # TODO: Better validate that inside the ProcessorServer itself host, port = address.split(':') processor_server = ProcessorServer( diff --git a/ocrd/ocrd/decorators/ocrd_cli_options.py b/ocrd/ocrd/decorators/ocrd_cli_options.py index f5210d2d13..c4cd7e34be 100644 --- a/ocrd/ocrd/decorators/ocrd_cli_options.py +++ b/ocrd/ocrd/decorators/ocrd_cli_options.py @@ -1,5 +1,6 @@ import click from click import option, Path, group, command, argument +from ocrd_network import NETWORK_AGENT_SERVER, NETWORK_AGENT_WORKER from .parameter_option import parameter_option, parameter_override_option from .loglevel_option import loglevel_option from ocrd_network import ( @@ -54,7 +55,8 @@ def cli(mets_url): # were using `group`, you cannot combine have a command with # subcommands. So we have to work around that by creating a # pseudo-subcommand handled in ocrd_cli_wrap_processor - argument('subcommand', nargs=1, required=False, type=click.Choice(['worker', 'server'])), + argument('subcommand', nargs=1, required=False, + type=click.Choice([NETWORK_AGENT_WORKER, NETWORK_AGENT_SERVER])), ] for param in params: param(f) diff --git a/ocrd_network/ocrd_network/__init__.py b/ocrd_network/ocrd_network/__init__.py index d851bcee1e..08153e5943 100644 --- a/ocrd_network/ocrd_network/__init__.py +++ b/ocrd_network/ocrd_network/__init__.py @@ -1,4 +1,5 @@ from .client import Client +from .constants import NETWORK_AGENT_SERVER, NETWORK_AGENT_WORKER from .processing_server import ProcessingServer from .processing_worker import ProcessingWorker from .processor_server import ProcessorServer diff --git a/ocrd_network/ocrd_network/constants.py b/ocrd_network/ocrd_network/constants.py new file mode 100644 index 0000000000..cbaccd4cf7 --- /dev/null +++ b/ocrd_network/ocrd_network/constants.py @@ -0,0 +1,2 @@ +NETWORK_AGENT_SERVER = 'server' +NETWORK_AGENT_WORKER = 'worker' diff --git a/ocrd_network/ocrd_network/deployer.py b/ocrd_network/ocrd_network/deployer.py index 5f25a58d17..ff54e0578a 100644 --- a/ocrd_network/ocrd_network/deployer.py +++ b/ocrd_network/ocrd_network/deployer.py @@ -15,6 +15,7 @@ from ocrd_utils import config, getLogger, safe_filename +from .constants import NETWORK_AGENT_SERVER, NETWORK_AGENT_WORKER from .deployment_utils import ( create_docker_client, DeployType, @@ -467,7 +468,7 @@ def start_native_processor( self.log.info(f'Starting native processing worker: {processor_name}') channel = ssh_client.invoke_shell() stdin, stdout = channel.makefile('wb'), channel.makefile('rb') - cmd = f'{processor_name} worker --database {database_url} --queue {queue_url} &' + cmd = f'{processor_name} {NETWORK_AGENT_WORKER} --database {database_url} --queue {queue_url} &' # the only way (I could find) to make it work to start a process in the background and # return early is this construction. The pid of the last started background process is # printed with `echo $!` but it is printed inbetween other output. Because of that I added @@ -510,7 +511,7 @@ def start_native_processor_server( self.log.info(f"Starting native processor server: {processor_name} on {agent_address}") channel = ssh_client.invoke_shell() stdin, stdout = channel.makefile('wb'), channel.makefile('rb') - cmd = f'{processor_name} server --address {agent_address} --database {database_url} &' + cmd = f'{processor_name} {NETWORK_AGENT_SERVER} --address {agent_address} --database {database_url} &' self.log.debug(f'About to execute command: {cmd}') stdin.write(f'{cmd}\n') stdin.write('echo xyz$!xyz \n exit \n') diff --git a/ocrd_network/ocrd_network/logging.py b/ocrd_network/ocrd_network/logging.py index 1d870e21ac..3365e2ddcc 100644 --- a/ocrd_network/ocrd_network/logging.py +++ b/ocrd_network/ocrd_network/logging.py @@ -1,6 +1,7 @@ from pathlib import Path from ocrd_utils import safe_filename, config +from .constants import NETWORK_AGENT_SERVER, NETWORK_AGENT_WORKER OCRD_NETWORK_MODULES = [ "mets_servers", @@ -32,15 +33,15 @@ def get_processing_job_logging_file_path(job_id: str) -> Path: def get_processing_server_logging_file_path(pid: int) -> Path: - return get_root_logging_dir("processing_servers") / f"server.{pid}.log" + return get_root_logging_dir("processing_servers") / f"processing_server.{pid}.log" def get_processing_worker_logging_file_path(processor_name: str, pid: int) -> Path: - return get_root_logging_dir("processing_workers") / f"worker.{pid}.{processor_name}.log" + return get_root_logging_dir("processing_workers") / f"{NETWORK_AGENT_WORKER}.{pid}.{processor_name}.log" def get_processor_server_logging_file_path(processor_name: str, pid: int) -> Path: - return get_root_logging_dir("processor_servers") / f"server.{pid}.{processor_name}.log" + return get_root_logging_dir("processor_servers") / f"{NETWORK_AGENT_SERVER}.{pid}.{processor_name}.log" def get_mets_server_logging_file_path(mets_path: str) -> Path: diff --git a/ocrd_network/ocrd_network/models/job.py b/ocrd_network/ocrd_network/models/job.py index 779f6e066c..e5230aa5fd 100644 --- a/ocrd_network/ocrd_network/models/job.py +++ b/ocrd_network/ocrd_network/models/job.py @@ -1,9 +1,10 @@ +from beanie import Document from datetime import datetime from enum import Enum +from pydantic import BaseModel from typing import Dict, List, Optional -from beanie import Document -from pydantic import BaseModel +from ocrd_network import NETWORK_AGENT_WORKER class StateEnum(str, Enum): @@ -34,9 +35,9 @@ class PYJobInput(BaseModel): parameters: dict = {} # Always set to empty dict when None, otherwise it fails ocr-d-validation result_queue_name: Optional[str] = None callback_url: Optional[str] = None - # Used to toggle between sending requests to 'worker and 'server', + # Used to toggle between sending requests to 'worker' and 'server', # i.e., Processing Worker and Processor Server, respectively - agent_type: Optional[str] = 'worker' + agent_type: Optional[str] = NETWORK_AGENT_WORKER # Auto generated by the Processing Server when forwarding to the Processor Server job_id: Optional[str] = None # If set, specifies a list of job ids this job depends on diff --git a/ocrd_network/ocrd_network/process_helpers.py b/ocrd_network/ocrd_network/process_helpers.py index f42a32308d..04dcd17d00 100644 --- a/ocrd_network/ocrd_network/process_helpers.py +++ b/ocrd_network/ocrd_network/process_helpers.py @@ -1,11 +1,12 @@ +from contextlib import nullcontext import json from typing import List, Optional -from contextlib import nullcontext from ocrd.processor.helpers import run_cli, run_processor -from .utils import get_ocrd_workspace_instance from ocrd_utils import redirect_stderr_and_stdout_to_file, initLogging +from .utils import get_ocrd_workspace_instance + # A wrapper for run_processor() and run_cli() def invoke_processor( diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 5f740d3c5b..e6606f90f0 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -1,9 +1,12 @@ +from hashlib import md5 +import httpx import json from logging import FileHandler, Formatter -import requests -import httpx from os import getpid +from pathlib import Path +import requests from typing import Dict, List, Union +from urllib.parse import urljoin import uvicorn from fastapi import ( @@ -16,12 +19,13 @@ ) from fastapi.exceptions import RequestValidationError from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse - from pika.exceptions import ChannelClosedByBroker + +from ocrd import Resolver, Workspace from ocrd.task_sequence import ProcessorTask from ocrd_utils import initLogging, getLogger, LOG_FORMAT -from ocrd import Resolver, Workspace -from pathlib import Path + +from .constants import NETWORK_AGENT_SERVER, NETWORK_AGENT_WORKER from .database import ( initiate_database, db_create_workspace, @@ -46,14 +50,8 @@ PYWorkflowJobOutput, StateEnum ) -from .rabbitmq_utils import ( - RMQPublisher, - OcrdProcessingMessage -) -from .server_cache import ( - CacheLockedPages, - CacheProcessingRequests -) +from .rabbitmq_utils import RMQPublisher, OcrdProcessingMessage +from .server_cache import CacheLockedPages, CacheProcessingRequests from .server_utils import ( _get_processor_job, _get_processor_job_log, @@ -68,8 +66,6 @@ get_ocrd_workspace_physical_pages, validate_workflow, ) -from urllib.parse import urljoin -from hashlib import md5 class ProcessingServer(FastAPI): @@ -350,41 +346,54 @@ def create_processing_message(job: DBProcessorJob) -> OcrdProcessingMessage: ) return processing_message - def check_if_queue_exists(self, processor_name): + def check_if_queue_exists(self, processor_name) -> bool: try: # Only checks if the process queue exists, if not raises ChannelClosedByBroker self.rmq_publisher.create_queue(processor_name, passive=True) + return True except ChannelClosedByBroker as error: self.log.warning(f"Process queue with id '{processor_name}' not existing: {error}") - # Reconnect publisher - not efficient, but works # TODO: Revisit when reconnection strategy is implemented + # Reconnect publisher, i.e., restore the connection - not efficient, but works self.connect_publisher(enable_acks=True) - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f"Process queue with id '{processor_name}' not existing" - ) + return False - def query_ocrd_tool_json_from_server(self, processor_name): - processor_server_url = self.deployer.resolve_processor_server_url(processor_name) - if not processor_server_url: - self.log.exception(f"Processor Server of '{processor_name}' is not available") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Processor Server of '{processor_name}' is not available" - ) + def query_ocrd_tool_json_from_server(self, processor_server_url: str): # Request the tool json from the Processor Server response = requests.get( urljoin(processor_server_url, 'info'), - headers={'Content-Type': 'application/json'} + headers={"Content-Type": "application/json"} ) if not response.status_code == 200: - self.log.exception(f"Failed to retrieve '{processor_name}' from: {processor_server_url}") + msg = f"Failed to retrieve ocrd tool json from: {processor_server_url}, status code: {response.status_code}" + self.log.exception(msg) raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to retrieve '{processor_name}' from: {processor_server_url}" + status_code=status.HTTP_404_NOT_FOUND, + detail=msg ) ocrd_tool = response.json() - return ocrd_tool, processor_server_url + return ocrd_tool + + def processing_agent_exists(self, processor_name: str, agent_type: str) -> bool: + if agent_type not in [NETWORK_AGENT_SERVER, NETWORK_AGENT_WORKER]: + return False + if agent_type == NETWORK_AGENT_WORKER: + if not self.check_if_queue_exists(processor_name): + return False + if agent_type == NETWORK_AGENT_SERVER: + processor_server_url = self.deployer.resolve_processor_server_url(processor_name) + if not processor_server_url: + return False + return True + + async def get_processing_agent_ocrd_tool(self, processor_name: str, agent_type: str) -> dict: + ocrd_tool = {} + if agent_type == NETWORK_AGENT_WORKER: + ocrd_tool = await self.get_processor_info(processor_name) + if agent_type == NETWORK_AGENT_SERVER: + processor_server_url = self.deployer.resolve_processor_server_url(processor_name) + ocrd_tool = self.query_ocrd_tool_json_from_server(processor_server_url) + return ocrd_tool async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJobOutput: if data.job_id: @@ -399,15 +408,32 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ ) # Generate processing job id data.job_id = generate_id() - # Append the processor name to the request itself data.processor_name = processor_name - if data.agent_type not in ['worker', 'server']: + # Check if the processing agent (worker/server) exists (is deployed) + if not self.processing_agent_exists(data.processor_name, data.agent_type): + msg = f"Agent of type '{data.agent_type}' does not exist for '{data.processor_name}'" + self.log.exception(msg) raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail=f"Unknown network agent with value: {data.agent_type}" + status_code=status.HTTP_404_NOT_FOUND, + detail=msg + ) + + ocrd_tool = await self.get_processing_agent_ocrd_tool( + processor_name=data.processor_name, + agent_type=data.agent_type + ) + if not ocrd_tool: + msg = f"Agent of type '{data.agent_type}' does not exist for '{data.processor_name}'" + self.log.exception(msg) + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=msg ) + + validate_job_input(self.log, data.processor_name, ocrd_tool, data) + db_workspace = await db_get_workspace( workspace_id=data.workspace_id, workspace_mets_path=data.path_to_mets @@ -484,19 +510,18 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ ) await db_queued_job.insert() self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1) - job_output = None - if data.agent_type == 'worker': - ocrd_tool = await self.get_processor_info(data.processor_name) - validate_job_input(self.log, data.processor_name, ocrd_tool, data) - processing_message = self.create_processing_message(db_queued_job) + job_output = await self.push_to_processing_agent(data=data, db_job=db_queued_job) + return job_output + + async def push_to_processing_agent(self, data: PYJobInput, db_job: DBProcessorJob) -> PYJobOutput: + if data.agent_type == NETWORK_AGENT_WORKER: + processing_message = self.create_processing_message(db_job) self.log.debug(f"Pushing to processing worker: {data.processor_name}, {data.page_id}, {data.job_id}") await self.push_to_processing_queue(data.processor_name, processing_message) - job_output = db_queued_job.to_job_output() - if data.agent_type == 'server': - ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(data.processor_name) - validate_job_input(self.log, data.processor_name, ocrd_tool, data) + job_output = db_job.to_job_output() + else: # data.agent_type == NETWORK_AGENT_SERVER self.log.debug(f"Pushing to processor server: {data.processor_name}, {data.page_id}, {data.job_id}") - job_output = await self.push_to_processor_server(data.processor_name, processor_server_url, data) + job_output = await self.push_to_processor_server(data.processor_name, data) if not job_output: self.log.exception('Failed to create job output') raise HTTPException( @@ -505,18 +530,9 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ ) return job_output - # TODO: Revisit and remove duplications between push_to_* methods async def push_to_processing_queue(self, processor_name: str, processing_message: OcrdProcessingMessage): if not self.rmq_publisher: raise Exception('RMQPublisher is not connected') - deployed_processors = self.deployer.find_matching_processors( - worker_only=True, - str_names_only=True, - unique_only=True - ) - if processor_name not in deployed_processors: - self.check_if_queue_exists(processor_name) - try: self.rmq_publisher.publish_to_queue( queue_name=processor_name, @@ -532,18 +548,20 @@ async def push_to_processing_queue(self, processor_name: str, processing_message async def push_to_processor_server( self, processor_name: str, - processor_server_url: str, job_input: PYJobInput ) -> PYJobOutput: try: json_data = json.dumps(job_input.dict(exclude_unset=True, exclude_none=True)) except Exception as e: - self.log.exception(f"Failed to json dump the PYJobInput, error: {e}") + msg = f"Failed to json dump the PYJobInput, error: {e}" + self.log.exception(msg) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to json dump the PYJobInput, error: {e}" + detail=msg ) + processor_server_url = self.deployer.resolve_processor_server_url(processor_name) + # TODO: The amount of pages should come as a request input # TODO: cf https://github.com/OCR-D/core/pull/1030/files#r1152551161 # currently, use 200 as a default @@ -655,21 +673,7 @@ async def remove_from_request_cache(self, result_message: PYResultMessage): page_ids=expand_page_ids(data.page_id) ) self.cache_processing_requests.update_request_counter(workspace_key=workspace_key, by_value=1) - job_output = None - if data.agent_type == 'worker': - ocrd_tool = await self.get_processor_info(data.processor_name) - validate_job_input(self.log, data.processor_name, ocrd_tool, data) - processing_message = self.create_processing_message(db_consumed_job) - self.log.debug(f"Pushing cached to processing worker: " - f"{data.processor_name}, {data.page_id}, {data.job_id}") - await self.push_to_processing_queue(data.processor_name, processing_message) - job_output = db_consumed_job.to_job_output() - if data.agent_type == 'server': - ocrd_tool, processor_server_url = self.query_ocrd_tool_json_from_server(data.processor_name) - validate_job_input(self.log, data.processor_name, ocrd_tool, data) - self.log.debug(f"Pushing cached to processor server: " - f"{data.processor_name}, {data.page_id}, {data.job_id}") - job_output = await self.push_to_processor_server(data.processor_name, processor_server_url, data) + job_output = await self.push_to_processing_agent(data=data, db_job=db_consumed_job) if not job_output: self.log.exception(f'Failed to create job output for job input data: {data}') @@ -704,7 +708,7 @@ async def task_sequence_to_processing_jobs( tasks: List[ProcessorTask], mets_path: str, page_id: str, - agent_type: str = 'worker', + agent_type: NETWORK_AGENT_WORKER, ) -> List[PYJobOutput]: file_group_cache = {} responses = [] @@ -740,7 +744,7 @@ async def run_workflow( mets_path: str, workflow: Union[UploadFile, None] = File(None), workflow_id: str = None, - agent_type: str = 'worker', + agent_type: str = NETWORK_AGENT_WORKER, page_id: str = None, page_wise: bool = False, workflow_callback_url: str = None @@ -773,6 +777,7 @@ async def run_workflow( raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Error parsing tasks: {e}") + # Validate the input file groups of the first task in the workflow available_groups = Workspace(Resolver(), Path(mets_path).parents[0]).mets.file_groups for grp in tasks[0].input_file_grps: if grp not in available_groups: @@ -780,6 +785,21 @@ async def run_workflow( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Input file grps of 1st processor not found: {tasks[0].input_file_grps}" ) + + # Validate existence of agents (processing workers/processor servers) + # for the ocr-d processors referenced inside tasks + missing_agents = [] + for task in tasks: + if not self.processing_agent_exists(processor_name=task.executable, agent_type=agent_type): + missing_agents.append({task.executable, agent_type}) + if missing_agents: + raise HTTPException( + status_code=status.HTTP_406_NOT_ACCEPTABLE, + detail=f"Workflow validation has failed. Processing agents not found: {missing_agents}. " + f"Make sure the desired processors are deployed either as a processing " + f"worker or processor server" + ) + try: if page_id: page_range = expand_page_ids(page_id) diff --git a/ocrd_network/ocrd_network/rabbitmq_utils/constants.py b/ocrd_network/ocrd_network/rabbitmq_utils/constants.py index 7c1db1d884..21596ef61f 100644 --- a/ocrd_network/ocrd_network/rabbitmq_utils/constants.py +++ b/ocrd_network/ocrd_network/rabbitmq_utils/constants.py @@ -1,5 +1,3 @@ -import logging - __all__ = [ 'DEFAULT_EXCHANGER_NAME', 'DEFAULT_EXCHANGER_TYPE', diff --git a/ocrd_network/ocrd_network/server_utils.py b/ocrd_network/ocrd_network/server_utils.py index 2142e78d46..fd7c0f7966 100644 --- a/ocrd_network/ocrd_network/server_utils.py +++ b/ocrd_network/ocrd_network/server_utils.py @@ -89,6 +89,6 @@ def validate_job_input(logger, processor_name: str, ocrd_tool: dict, job_input: ) else: if not report.is_valid: - logger.exception(f'Failed to validate processing job ' - f'against the ocrd_tool, errors: {report.errors}') - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=report.errors) + log_msg = f'Failed to validate processing job against the ocrd_tool, errors: {report.errors}' + logger.exception(log_msg) + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=log_msg)