Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow endpoint of the Processing Server #1083

Merged
merged 97 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
439dde4
First version of the workflow endpoint
joschrew Aug 28, 2023
f84286c
fix locking for cached requests
MehmedGIT Sep 7, 2023
c1466d9
improve code from kba feedback
MehmedGIT Sep 7, 2023
8d3ea65
improve run_workflow method
MehmedGIT Sep 7, 2023
1505445
support page_id in run_workflow
MehmedGIT Sep 7, 2023
aad8604
cancel dependent jobs of failed jobs
MehmedGIT Sep 11, 2023
26b1253
Add new job state - cancelled
MehmedGIT Sep 11, 2023
e9880e8
return models for update methods
MehmedGIT Sep 11, 2023
eefcfcc
include cancelled jobs in db
MehmedGIT Sep 11, 2023
fb7b560
Merge branch 'master' into workflow-endpoint
MehmedGIT Sep 12, 2023
af45bf9
Improve consumed message debug log
MehmedGIT Sep 12, 2023
0baee30
fix bug: wrong job_id assigned
MehmedGIT Sep 12, 2023
adcc683
Merge branch 'master' into workflow-endpoint
MehmedGIT Sep 12, 2023
0725ed0
improve code: locking/unlocking
MehmedGIT Sep 12, 2023
775beb6
Merge branch 'master' into workflow-endpoint
MehmedGIT Sep 12, 2023
9b250ef
remove unnecessary stuff
MehmedGIT Sep 12, 2023
13154c2
alpha: utilize mets server
MehmedGIT Sep 12, 2023
551741a
Merge branch 'master' into workflow-endpoint
MehmedGIT Sep 12, 2023
2373537
add mets url to run_cli
MehmedGIT Sep 12, 2023
7eb7e76
Remove already adressed notes
joschrew Sep 13, 2023
6cbdd05
fix: mets server related issues
MehmedGIT Sep 13, 2023
c9fd739
fix: exception -> info
MehmedGIT Sep 13, 2023
6c6251c
stop mets server when the cache queue is empty
MehmedGIT Sep 13, 2023
9e01a25
Merge branch 'master' into workflow-endpoint
MehmedGIT Sep 14, 2023
aa03c96
refactor process helpers
MehmedGIT Sep 14, 2023
837db61
remove chmod from mets server deployer
MehmedGIT Sep 14, 2023
0096625
fix: do not pass the logger
MehmedGIT Sep 14, 2023
0de15ed
implement page-wise processing
MehmedGIT Sep 15, 2023
51c44db
PYJobOutput returns more data
MehmedGIT Sep 15, 2023
3c3fbb6
feature: multiple job dependencies possible
MehmedGIT Sep 15, 2023
3e9e11c
Merge branch 'master' into workflow-endpoint
MehmedGIT Sep 15, 2023
a8eb261
integrate the mets server feature back
MehmedGIT Sep 15, 2023
57be596
adapt feedback
MehmedGIT Sep 18, 2023
d099445
wait before stopping server
MehmedGIT Sep 19, 2023
33f2d40
feature: more intuitive workflow output
MehmedGIT Sep 19, 2023
206acf6
allow page_wise without page_id
MehmedGIT Sep 19, 2023
1a54563
fix: notes
MehmedGIT Sep 19, 2023
22edc3a
change: return PYJobOutput again
MehmedGIT Sep 19, 2023
caf12f3
utilize dependency cache
MehmedGIT Sep 19, 2023
5727e82
reduce debug logs
MehmedGIT Sep 19, 2023
6733a87
feature: request counter
MehmedGIT Sep 20, 2023
a785995
refactor and fix internal counter
MehmedGIT Sep 20, 2023
2fe1e13
adapt from feedback
MehmedGIT Sep 20, 2023
f07574f
fix: mets server shutdown
MehmedGIT Sep 20, 2023
1bf124d
refactor: page locking/unlocking
MehmedGIT Sep 20, 2023
9b026c1
OcrdMets.physical_pages should return "proper" str
kba Sep 20, 2023
50c7eb7
add TODOs
MehmedGIT Sep 20, 2023
df15f8b
add todo for #1102
MehmedGIT Sep 20, 2023
941abd0
fix: stop mets server, get -> delete
MehmedGIT Sep 20, 2023
1e558bc
merge 1102
MehmedGIT Sep 25, 2023
27700f5
adapt after merging #1102
MehmedGIT Sep 25, 2023
a968798
utilize as a key again
MehmedGIT Sep 25, 2023
c8320d2
separate page locking cache
MehmedGIT Sep 25, 2023
5408b75
fix: all_pages placeholder
MehmedGIT Sep 25, 2023
6d1d797
separate request cache
MehmedGIT Sep 25, 2023
be33ffd
improve: deployer logs separately
MehmedGIT Sep 25, 2023
3425121
log processing server to files
MehmedGIT Sep 25, 2023
8cb9953
separate cache logging
MehmedGIT Sep 25, 2023
f96dea6
use direct access to cache
MehmedGIT Sep 25, 2023
224e468
fix: RMQ consumer set QoS
MehmedGIT Sep 25, 2023
973f2de
fix: iteration buggy state
MehmedGIT Sep 25, 2023
56dd312
fix: recent update typo
MehmedGIT Sep 25, 2023
81bb1ac
fix cache: check context switch removals
MehmedGIT Sep 25, 2023
64b3563
raise exception when missing
MehmedGIT Sep 26, 2023
52e9a73
fix bug: overwriting FAILED with CANCELLED
MehmedGIT Sep 26, 2023
23e2318
make more compact
MehmedGIT Sep 26, 2023
36cf428
Merge pull request #1104 from OCR-D/fix-1103-raise-excp
MehmedGIT Sep 26, 2023
0625050
fix error from PR #1104
MehmedGIT Sep 26, 2023
6683e96
Merge branch 'master' into workflow-endpoint
kba Sep 26, 2023
7c98254
logging: simulate ocrd_network logging behavior with ocrd_utils.logging
kba Sep 26, 2023
5c520a3
fix tests
kba Sep 26, 2023
a62a5e2
Update ocrd/ocrd/mets_server.py
kba Sep 27, 2023
2f50ee5
adapt rabbitmq loggers
MehmedGIT Sep 27, 2023
e911d81
Merge branch 'workflow-endpoint-logging-2023-09-26' of github.com:OCR…
MehmedGIT Sep 27, 2023
76ffff0
improve: worker logging suffix
MehmedGIT Sep 27, 2023
eebc5cd
add uvicorn loggers
MehmedGIT Sep 27, 2023
635d909
remove deployer log for processor servers
MehmedGIT Sep 27, 2023
b52a388
call initLogging in network modules
MehmedGIT Sep 27, 2023
7642ffb
remove test_logging
MehmedGIT Sep 27, 2023
e666049
fix: starting processor server
MehmedGIT Sep 27, 2023
986f9a0
clean old notices
MehmedGIT Sep 27, 2023
96da69d
remove dupl handler
MehmedGIT Sep 27, 2023
f1ee09e
synchronize servers impl
MehmedGIT Sep 27, 2023
585212c
pass log_level to processors
MehmedGIT Sep 27, 2023
e6ce39e
Add endpoint to query workflow status
Oct 3, 2023
0182352
Improve error handling for workflow requests
Oct 3, 2023
d7e2570
Don't return processing_job_ids on workflow execution
joschrew Oct 5, 2023
deb1188
Add processing_job_ids of faild processor to workflow-status
joschrew Oct 5, 2023
04a326b
Update coded of previous 2 commits
joschrew Oct 5, 2023
0e43e72
Add page_id to workflow status failed jobs
joschrew Oct 5, 2023
00373c9
ocrd_utils.redirect_stderr_and_stdout_to_file
kba Oct 10, 2023
6fc6060
implement --log-filename, wip
kba Oct 10, 2023
1c9f3d7
Remove deployed_* log files
MehmedGIT Oct 10, 2023
b5fa221
Processor stdout/stderr to ocrd_worker_* logs
MehmedGIT Oct 10, 2023
ff2ee9c
move initLogging to within stdout/stderr redirect
kba Oct 10, 2023
d253492
invoke_processor: force initLogging because sys.stdout change
kba Oct 11, 2023
155ea2a
Merge remote-tracking branch 'origin/workflow-additions' into workflo…
kba Oct 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ocrd/ocrd/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def ocrd_cli_wrap_processor(
subcommand=None,
address=None,
queue=None,
log_filename=None,
database=None,
# ocrd_network params end #
**kwargs
Expand Down
1 change: 1 addition & 0 deletions ocrd/ocrd/decorators/ocrd_cli_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def cli(mets_url):
option('-D', '--dump-module-dir', is_flag=True, default=False),
option('-h', '--help', is_flag=True, default=False),
option('-V', '--version', is_flag=True, default=False),
option('--log-filename', default=None),
# Subcommand, only used for 'worker'/'server'. Cannot be handled in
# click because processors use the @command decorator and even if they
# were using `group`, you cannot combine have a command with
Expand Down
3 changes: 2 additions & 1 deletion ocrd/ocrd/lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ ocrd__parse_argv () {
-I|--input-file-grp) ocrd__argv[input_file_grp]=$2 ; shift ;;
-w|--working-dir) ocrd__argv[working_dir]=$(realpath "$2") ; shift ;;
-m|--mets) ocrd__argv[mets_file]=$(realpath "$2") ; shift ;;
--log-filename) ocrd__argv[log_filename]="$2" ; shift ;;
--mets-server-url) ocrd_argv[mets_server_url]="$2" ; shift ;;
--overwrite) ocrd__argv[overwrite]=true ;;
--profile) ocrd__argv[profile]=true ;;
Expand All @@ -168,7 +169,7 @@ ocrd__parse_argv () {
if ! [ -v ocrd__worker_queue ]; then
ocrd__raise "For the Processing Worker --queue is required"
fi
ocrd network processing-worker $OCRD_TOOL_NAME --queue "${ocrd__worker_queue}" --database "${ocrd__worker_database}"
ocrd network processing-worker $OCRD_TOOL_NAME --queue "${ocrd__worker_queue}" --database "${ocrd__worker_database}" --log-filename "${ocrd__argv[log_filename]}"
elif [ ${ocrd__subcommand} = "server" ]; then
if ! [ -v ocrd__worker_address ]; then
ocrd__raise "For the Processor Server --address is required"
Expand Down
2 changes: 1 addition & 1 deletion ocrd/ocrd/mets_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def shutdown(self):
_exit(0)

def startup(self):
self.log.info("Starting down METS server")
self.log.info("Starting up METS server")

workspace = self.workspace

Expand Down
6 changes: 5 additions & 1 deletion ocrd/ocrd/processor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,11 @@ def zip_input_files(self, require_first=True, mimetype=None, on_error='skip'):
# Warn if no files found but pageId was specified because that
# might be because of invalid page_id (range)
if self.page_id and not files_:
LOG.warning(f"Could not find any files for --page-id {self.page_id} - compare '{self.page_id}' with the output of 'orcd workspace list-page'.")
msg = (f"Could not find any files for --page-id {self.page_id} - "
f"compare '{self.page_id}' with the output of 'orcd workspace list-page'.")
if on_error == 'abort':
raise ValueError(msg)
LOG.warning(msg)
for file_ in files_:
if not file_.pageId:
continue
Expand Down
2 changes: 2 additions & 0 deletions ocrd/ocrd/processor/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ def generate_processor_help(ocrd_tool, processor_instance=None, subcommand=None)
--database The MongoDB server address in format
"mongodb://{host}:{port}"
[mongodb://localhost:27018]
--log-filename Filename to redirect STDOUT/STDERR to,
if specified.
'''

processing_server_options = '''\
Expand Down
2 changes: 0 additions & 2 deletions ocrd/ocrd/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ def merge(self, other_workspace, copy_files=True, overwrite=False, **kwargs):
"""
def after_add_cb(f):
"""callback to run on merged OcrdFile instances in the destination"""
print(f)
if not f.local_filename:
# OcrdFile has no local_filename, so nothing to be copied
return
Expand Down Expand Up @@ -177,7 +176,6 @@ def download_file(self, f, _recursion_count=0):
"""
log = getLogger('ocrd.workspace.download_file')
with pushd_popd(self.directory):
print(f)
if f.local_filename:
file_path = Path(f.local_filename).absolute()
if file_path.exists():
Expand Down
4 changes: 2 additions & 2 deletions ocrd_models/ocrd_models/ocrd_mets.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,9 @@ def physical_pages(self):
if self._cache_flag:
return list(self._page_cache.keys())

return self._tree.getroot().xpath(
return [str(x) for x in self._tree.getroot().xpath(
'mets:structMap[@TYPE="PHYSICAL"]/mets:div[@TYPE="physSequence"]/mets:div[@TYPE="page"]/@ID',
namespaces=NS)
namespaces=NS)]

def get_physical_pages(self, for_fileIds=None):
"""
Expand Down
24 changes: 0 additions & 24 deletions ocrd_network/ocrd_network/__init__.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,3 @@
# This network package is supposed to contain all the packages and modules to realize the network architecture:
# https://github.com/OCR-D/spec/pull/222/files#diff-8d0dae8c9277ff1003df93c5359c82a12d3f5c8452281f87781921921204d283

# For reference, currently:
# 1. The WebAPI is available here: https://github.com/OCR-D/ocrd-webapi-implementation
# The ocrd-webapi-implementation repo implements the Discovery / Workflow / Workspace endpoints of the WebAPI currently.
# This Processing Server PR implements just the Processor endpoint of the WebAPI.
# Once we have this merged to core under ocrd-network, the other endpoints will be adapted to ocrd-network
# and then the ocrd-webapi-implementation repo can be archived for reference.

# 2. The RabbitMQ Library (i.e., utils) is used as an API to abstract and
# simplify (from the view point of processing server and workers) interactions with the RabbitMQ Server.
# The library was adopted from: https://github.com/OCR-D/ocrd-webapi-implementation/tree/main/ocrd_webapi/rabbitmq

# 3. Some potentially more useful code to be adopted for the Processing Server/Worker is available here:
# https://github.com/OCR-D/core/pull/884
# Update: Should be revisited again for adopting any relevant parts (if necessary).
# Nothing relevant is under the radar for now.

# 4. The Mets Server discussion/implementation is available here:
# https://github.com/OCR-D/core/pull/966

# Note: The Mets Server is still not placed on the architecture diagram and probably won't be a part of
# the network package. The reason, Mets Server is tightly coupled with the `OcrdWorkspace`.
from .client import Client
from .processing_server import ProcessingServer
from .processing_worker import ProcessingWorker
Expand Down
66 changes: 57 additions & 9 deletions ocrd_network/ocrd_network/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@
database (runs in docker) currently has no volume set.
"""
from beanie import init_beanie
from beanie.operators import In
from motor.motor_asyncio import AsyncIOMotorClient
from uuid import uuid4
from pathlib import Path
from typing import List

from .models import (
DBProcessorJob,
DBWorkflowJob,
DBWorkspace
)
from .utils import call_sync
Expand All @@ -26,7 +31,7 @@ async def initiate_database(db_url: str):
client = AsyncIOMotorClient(db_url)
await init_beanie(
database=client.get_default_database(default='ocrd'),
document_models=[DBProcessorJob, DBWorkspace]
document_models=[DBProcessorJob, DBWorkflowJob, DBWorkspace]
)


Expand All @@ -35,6 +40,25 @@ async def sync_initiate_database(db_url: str):
await initiate_database(db_url)


async def db_create_workspace(mets_path: str) -> DBWorkspace:
""" Create a workspace-database entry only from a mets-path
"""
if not Path(mets_path).exists():
raise FileNotFoundError(f'Cannot create DB workspace entry, `{mets_path}` does not exist!')
try:
return await db_get_workspace(workspace_mets_path=mets_path)
except ValueError:
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved
workspace_db = DBWorkspace(
workspace_id=str(uuid4()),
workspace_path=Path(mets_path).parent,
workspace_mets_path=mets_path,
ocrd_identifier="",
bagit_profile_identifier="",
)
await workspace_db.save()
return workspace_db


async def db_get_workspace(workspace_id: str = None, workspace_mets_path: str = None) -> DBWorkspace:
workspace = None
if not workspace_id and not workspace_mets_path:
Expand All @@ -59,7 +83,7 @@ async def sync_db_get_workspace(workspace_id: str = None, workspace_mets_path: s
return await db_get_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path)


async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs):
async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace:
workspace = None
if not workspace_id and not workspace_mets_path:
raise ValueError(f'Either `workspace_id` or `workspace_mets_path` field must be used as a search key')
Expand Down Expand Up @@ -96,16 +120,17 @@ async def db_update_workspace(workspace_id: str = None, workspace_mets_path: str
workspace.bag_info_adds = value
elif key == 'deleted':
workspace.deleted = value
elif key == 'pages_locked':
workspace.pages_locked = value
elif key == 'mets_server_url':
workspace.mets_server_url = value
else:
raise ValueError(f'Field "{key}" is not updatable.')
await workspace.save()
return workspace


@call_sync
async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs):
await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs)
async def sync_db_update_workspace(workspace_id: str = None, workspace_mets_path: str = None, **kwargs) -> DBWorkspace:
return await db_update_workspace(workspace_id=workspace_id, workspace_mets_path=workspace_mets_path, **kwargs)


async def db_get_processing_job(job_id: str) -> DBProcessorJob:
Expand All @@ -121,7 +146,7 @@ async def sync_db_get_processing_job(job_id: str) -> DBProcessorJob:
return await db_get_processing_job(job_id)


async def db_update_processing_job(job_id: str, **kwargs):
async def db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
job = await DBProcessorJob.find_one(
DBProcessorJob.job_id == job_id)
if not job:
Expand All @@ -144,8 +169,31 @@ async def db_update_processing_job(job_id: str, **kwargs):
else:
raise ValueError(f'Field "{key}" is not updatable.')
await job.save()
return job


@call_sync
async def sync_db_update_processing_job(job_id: str, **kwargs) -> DBProcessorJob:
return await db_update_processing_job(job_id=job_id, **kwargs)


async def db_get_workflow_job(job_id: str) -> DBWorkflowJob:
job = await DBWorkflowJob.find_one(DBWorkflowJob.job_id == job_id)
if not job:
raise ValueError(f'Workflow job with id "{job_id}" not in the DB.')
return job


@call_sync
async def sync_db_get_workflow_job(job_id: str) -> DBWorkflowJob:
return await db_get_workflow_job(job_id)


async def db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]:
jobs = await DBProcessorJob.find(In(DBProcessorJob.job_id, job_ids)).to_list()
return jobs


@call_sync
async def sync_db_update_processing_job(job_id: str, **kwargs):
await db_update_processing_job(job_id=job_id, **kwargs)
async def sync_db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]:
return await db_get_processing_jobs(job_ids)
75 changes: 65 additions & 10 deletions ocrd_network/ocrd_network/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
from typing import Dict, List, Union
from re import search as re_search
from os import getpid
from pathlib import Path
import subprocess
from time import sleep

from ocrd_utils import getLogger
from ocrd_utils import getLogger, safe_filename

from .deployment_utils import (
create_docker_client,
Expand All @@ -28,7 +30,11 @@
DataProcessorServer,
DataRabbitMQ
)
from .utils import validate_and_load_config
from .utils import (
is_mets_server_running,
stop_mets_server,
validate_and_load_config
)


class Deployer:
Expand All @@ -42,6 +48,7 @@ def __init__(self, config_path: str) -> None:
self.internal_callback_url = config.get('internal_callback_url', None)
for config_host in config['hosts']:
self.data_hosts.append(DataHost(config_host))
self.mets_servers: Dict = {} # {"mets_server_url": "mets_server_pid"}

# TODO: Reconsider this.
def find_matching_processors(
Expand Down Expand Up @@ -467,9 +474,12 @@ def start_native_processor(
# printed with `echo $!` but it is printed inbetween other output. Because of that I added
# `xyz` before and after the code to easily be able to filter out the pid via regex when
# returning from the function
log_path = '/tmp/ocrd-processing-server-startup.log'
stdin.write(f"echo starting processing worker with '{cmd}' >> '{log_path}'\n")
stdin.write(f'{cmd} >> {log_path} 2>&1 &\n')

# TODO: Check here again
# log_path = f'/tmp/deployed_{processor_name}.log'
# stdin.write(f"echo starting processing worker with '{cmd}' >> '{log_path}'\n")
# stdin.write(f'{cmd} >> {log_path} 2>&1 &\n')
stdin.write(f'{cmd} &\n')
stdin.write('echo xyz$!xyz \n exit \n')
output = stdout.read().decode('utf-8')
stdout.close()
Expand Down Expand Up @@ -505,13 +515,58 @@ def start_native_processor_server(
channel = ssh_client.invoke_shell()
stdin, stdout = channel.makefile('wb'), channel.makefile('rb')
cmd = f'{processor_name} server --address {agent_address} --database {database_url}'
port = agent_address.split(':')[1]
log_path = f'/tmp/server_{processor_name}_{port}_{getpid()}.log'
# TODO: This entire stdin/stdout thing is broken with servers!
stdin.write(f"echo starting processor server with '{cmd}' >> '{log_path}'\n")
stdin.write(f'{cmd} >> {log_path} 2>&1 &\n')
stdin.write(f"echo starting processor server with '{cmd}'\n")
stdin.write(f'{cmd} &\n')
stdin.write('echo xyz$!xyz \n exit \n')
output = stdout.read().decode('utf-8')
stdout.close()
stdin.close()
return re_search(r'xyz([0-9]+)xyz', output).group(1) # type: ignore

# TODO: No support for TCP version yet
def start_unix_mets_server(self, mets_path: str) -> str:
socket_file = f'{safe_filename(mets_path)}.sock'
log_path = f'/tmp/{safe_filename(mets_path)}.log'
mets_server_url = f'/tmp/{socket_file}'

if is_mets_server_running(mets_server_url=mets_server_url):
self.log.info(f"The mets server is already started: {mets_server_url}")
return mets_server_url

cwd = Path(mets_path).parent
self.log.info(f'Starting UDS mets server: {mets_server_url}')
sub_process = subprocess.Popen(
args=['nohup', 'ocrd', 'workspace', '--mets-server-url', f'{mets_server_url}',
'-d', f'{cwd}', 'server', 'start'],
shell=False,
stdout=open(log_path, 'w'),
stderr=open(log_path, 'a'),
cwd=cwd,
universal_newlines=True
)
# Wait for the mets server to start
sleep(2)
self.mets_servers[mets_server_url] = sub_process.pid
return mets_server_url

def stop_unix_mets_server(self, mets_server_url: str) -> None:
self.log.info(f'Stopping UDS mets server: {mets_server_url}')
if mets_server_url in self.mets_servers:
mets_server_pid = self.mets_servers[mets_server_url]
else:
raise Exception(f"Mets server not found: {mets_server_url}")

'''
subprocess.run(
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved
args=['kill', '-s', 'SIGINT', f'{mets_server_pid}'],
shell=False,
universal_newlines=True
)
'''

# TODO: Reconsider this again
# Not having this sleep here causes connection errors
# on the last request processed by the processing worker.
# Sometimes 3 seconds is enough, sometimes not.
sleep(5)
stop_mets_server(mets_server_url=mets_server_url)
3 changes: 2 additions & 1 deletion ocrd_network/ocrd_network/deployment_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
'create_docker_client',
'create_ssh_client',
'DeployType',
'wait_for_rabbitmq_availability'
'verify_mongodb_available',
'verify_rabbitmq_available'
]


Expand Down
4 changes: 4 additions & 0 deletions ocrd_network/ocrd_network/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,22 @@

__all__ = [
'DBProcessorJob',
'DBWorkflowJob',
'DBWorkspace',
'PYJobInput',
'PYJobOutput',
'PYOcrdTool',
'PYResultMessage',
'PYWorkflowJobOutput',
'StateEnum',
]

from .job import (
DBProcessorJob,
DBWorkflowJob,
PYJobInput,
PYJobOutput,
PYWorkflowJobOutput,
StateEnum
)
from .messages import PYResultMessage
Expand Down
Loading