Skip to content

Commit

Permalink
Merge pull request #1142 from OCR-D/fix-ws-locks
Browse files Browse the repository at this point in the history
Fix potential workspace deadlocks (ocrd_network)
  • Loading branch information
kba authored Dec 5, 2023
2 parents 5b7c9f0 + a582643 commit d2a1546
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 105 deletions.
23 changes: 12 additions & 11 deletions ocrd/ocrd/decorators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import sys

from ocrd_utils import (
config,
initLogging,
is_local_filename,
get_local_filename,
getLogger,
parse_json_string_with_comments,
set_json_key_value_overrides,
)

from ocrd_utils import getLogger, initLogging, parse_json_string_with_comments, config
from ocrd_validators import WorkspaceValidator

from ocrd_network import ProcessingWorker, ProcessorServer
from ocrd_network import ProcessingWorker, ProcessorServer, NETWORK_AGENT_SERVER, NETWORK_AGENT_WORKER

from ..resolver import Resolver
from ..processor.base import run_processor
Expand All @@ -19,7 +20,8 @@
from .ocrd_cli_options import ocrd_cli_options
from .mets_find_options import mets_find_options

SUBCOMMANDS = ['worker', 'server']
SUBCOMMANDS = [NETWORK_AGENT_WORKER, NETWORK_AGENT_SERVER]


def ocrd_cli_wrap_processor(
processorClass,
Expand Down Expand Up @@ -63,8 +65,7 @@ def ocrd_cli_wrap_processor(
# Used for checking/starting network agents for the WebAPI architecture
check_and_run_network_agent(processorClass, subcommand, address, database, queue)
elif address or queue or database:
raise ValueError(f"Subcommand options --adress --queue and --database are only valid for subcommands 'worker' or 'server'")

raise ValueError(f"Subcommand options --address --queue and --database are only valid for subcommands: {SUBCOMMANDS}")

initLogging()

Expand Down Expand Up @@ -141,19 +142,19 @@ def check_and_run_network_agent(ProcessorClass, subcommand: str, address: str, d
if not database:
raise ValueError(f"Option '--database' is invalid for subcommand {subcommand}")

if subcommand == 'server':
if subcommand == NETWORK_AGENT_SERVER:
if not address:
raise ValueError(f"Option '--address' required for subcommand {subcommand}")
if queue:
raise ValueError(f"Option '--queue' invalid for subcommand {subcommand}")
if subcommand == 'worker':
if subcommand == NETWORK_AGENT_WORKER:
if address:
raise ValueError(f"Option '--address' invalid for subcommand {subcommand}")
if not queue:
raise ValueError(f"Option '--queue' required for subcommand {subcommand}")

processor = ProcessorClass(workspace=None)
if subcommand == 'worker':
if subcommand == NETWORK_AGENT_WORKER:
processing_worker = ProcessingWorker(
rabbitmq_addr=queue,
mongodb_addr=database,
Expand All @@ -165,7 +166,7 @@ def check_and_run_network_agent(ProcessorClass, subcommand: str, address: str, d
processing_worker.connect_consumer()
# Start consuming from the queue with name `processor_name`
processing_worker.start_consuming()
elif subcommand == 'server':
elif subcommand == NETWORK_AGENT_SERVER:
# TODO: Better validate that inside the ProcessorServer itself
host, port = address.split(':')
processor_server = ProcessorServer(
Expand Down
4 changes: 3 additions & 1 deletion ocrd/ocrd/decorators/ocrd_cli_options.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import click
from click import option, Path, group, command, argument
from ocrd_network import NETWORK_AGENT_SERVER, NETWORK_AGENT_WORKER
from .parameter_option import parameter_option, parameter_override_option
from .loglevel_option import loglevel_option
from ocrd_network import (
Expand Down Expand Up @@ -54,7 +55,8 @@ def cli(mets_url):
# were using `group`, you cannot combine have a command with
# subcommands. So we have to work around that by creating a
# pseudo-subcommand handled in ocrd_cli_wrap_processor
argument('subcommand', nargs=1, required=False, type=click.Choice(['worker', 'server'])),
argument('subcommand', nargs=1, required=False,
type=click.Choice([NETWORK_AGENT_WORKER, NETWORK_AGENT_SERVER])),
]
for param in params:
param(f)
Expand Down
1 change: 1 addition & 0 deletions ocrd_network/ocrd_network/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .client import Client
from .constants import NETWORK_AGENT_SERVER, NETWORK_AGENT_WORKER
from .processing_server import ProcessingServer
from .processing_worker import ProcessingWorker
from .processor_server import ProcessorServer
Expand Down
2 changes: 2 additions & 0 deletions ocrd_network/ocrd_network/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
NETWORK_AGENT_SERVER = 'server'
NETWORK_AGENT_WORKER = 'worker'
5 changes: 3 additions & 2 deletions ocrd_network/ocrd_network/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from ocrd_utils import config, getLogger, safe_filename

from .constants import NETWORK_AGENT_SERVER, NETWORK_AGENT_WORKER
from .deployment_utils import (
create_docker_client,
DeployType,
Expand Down Expand Up @@ -467,7 +468,7 @@ def start_native_processor(
self.log.info(f'Starting native processing worker: {processor_name}')
channel = ssh_client.invoke_shell()
stdin, stdout = channel.makefile('wb'), channel.makefile('rb')
cmd = f'{processor_name} worker --database {database_url} --queue {queue_url} &'
cmd = f'{processor_name} {NETWORK_AGENT_WORKER} --database {database_url} --queue {queue_url} &'
# the only way (I could find) to make it work to start a process in the background and
# return early is this construction. The pid of the last started background process is
# printed with `echo $!` but it is printed inbetween other output. Because of that I added
Expand Down Expand Up @@ -510,7 +511,7 @@ def start_native_processor_server(
self.log.info(f"Starting native processor server: {processor_name} on {agent_address}")
channel = ssh_client.invoke_shell()
stdin, stdout = channel.makefile('wb'), channel.makefile('rb')
cmd = f'{processor_name} server --address {agent_address} --database {database_url} &'
cmd = f'{processor_name} {NETWORK_AGENT_SERVER} --address {agent_address} --database {database_url} &'
self.log.debug(f'About to execute command: {cmd}')
stdin.write(f'{cmd}\n')
stdin.write('echo xyz$!xyz \n exit \n')
Expand Down
7 changes: 4 additions & 3 deletions ocrd_network/ocrd_network/logging.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
from ocrd_utils import safe_filename, config

from .constants import NETWORK_AGENT_SERVER, NETWORK_AGENT_WORKER

OCRD_NETWORK_MODULES = [
"mets_servers",
Expand Down Expand Up @@ -32,15 +33,15 @@ def get_processing_job_logging_file_path(job_id: str) -> Path:


def get_processing_server_logging_file_path(pid: int) -> Path:
return get_root_logging_dir("processing_servers") / f"server.{pid}.log"
return get_root_logging_dir("processing_servers") / f"processing_server.{pid}.log"


def get_processing_worker_logging_file_path(processor_name: str, pid: int) -> Path:
return get_root_logging_dir("processing_workers") / f"worker.{pid}.{processor_name}.log"
return get_root_logging_dir("processing_workers") / f"{NETWORK_AGENT_WORKER}.{pid}.{processor_name}.log"


def get_processor_server_logging_file_path(processor_name: str, pid: int) -> Path:
return get_root_logging_dir("processor_servers") / f"server.{pid}.{processor_name}.log"
return get_root_logging_dir("processor_servers") / f"{NETWORK_AGENT_SERVER}.{pid}.{processor_name}.log"


def get_mets_server_logging_file_path(mets_path: str) -> Path:
Expand Down
9 changes: 5 additions & 4 deletions ocrd_network/ocrd_network/models/job.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from beanie import Document
from datetime import datetime
from enum import Enum
from pydantic import BaseModel
from typing import Dict, List, Optional

from beanie import Document
from pydantic import BaseModel
from ocrd_network import NETWORK_AGENT_WORKER


class StateEnum(str, Enum):
Expand Down Expand Up @@ -34,9 +35,9 @@ class PYJobInput(BaseModel):
parameters: dict = {} # Always set to empty dict when None, otherwise it fails ocr-d-validation
result_queue_name: Optional[str] = None
callback_url: Optional[str] = None
# Used to toggle between sending requests to 'worker and 'server',
# Used to toggle between sending requests to 'worker' and 'server',
# i.e., Processing Worker and Processor Server, respectively
agent_type: Optional[str] = 'worker'
agent_type: Optional[str] = NETWORK_AGENT_WORKER
# Auto generated by the Processing Server when forwarding to the Processor Server
job_id: Optional[str] = None
# If set, specifies a list of job ids this job depends on
Expand Down
5 changes: 3 additions & 2 deletions ocrd_network/ocrd_network/process_helpers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from contextlib import nullcontext
import json
from typing import List, Optional
from contextlib import nullcontext

from ocrd.processor.helpers import run_cli, run_processor
from .utils import get_ocrd_workspace_instance
from ocrd_utils import redirect_stderr_and_stdout_to_file, initLogging

from .utils import get_ocrd_workspace_instance


# A wrapper for run_processor() and run_cli()
def invoke_processor(
Expand Down
Loading

0 comments on commit d2a1546

Please sign in to comment.