Skip to content

Commit

Permalink
Change api paths for processor_server
Browse files Browse the repository at this point in the history
Align to changes made to processing-server so that endpoints are
accessed in similar manner. Additionally fixed some bugs found while
testing
  • Loading branch information
joschrew committed Nov 16, 2023
1 parent 0cf5dde commit e6e4b83
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 14 deletions.
8 changes: 4 additions & 4 deletions ocrd_network/ocrd_network/processing_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
generate_id,
get_ocrd_workspace_physical_pages
)
import time
from urllib.parse import urljoin


class ProcessingServer(FastAPI):
Expand Down Expand Up @@ -343,7 +343,7 @@ def query_ocrd_tool_json_from_server(self, processor_name):
)
# Request the tool json from the Processor Server
response = requests.get(
processor_server_url,
urljoin(processor_server_url, 'info'),
headers={'Content-Type': 'application/json'}
)
if not response.status_code == 200:
Expand Down Expand Up @@ -522,7 +522,7 @@ async def push_to_processor_server(
timeout = httpx.Timeout(timeout=request_timeout, connect=30.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
processor_server_url,
urljoin(processor_server_url, 'run'),
headers={'Content-Type': 'application/json'},
json=json.loads(json_data)
)
Expand Down Expand Up @@ -648,7 +648,7 @@ async def get_processor_info(self, processor_name) -> Dict:
ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None)
if not ocrd_tool:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Ocrd tool JSON of '{processor_name}' not available!"
)

Expand Down
28 changes: 18 additions & 10 deletions ocrd_network/ocrd_network/processor_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
DBProcessorJob,
db_get_workspace,
db_update_processing_job,
db_get_processing_job,
initiate_database
)
from .logging import get_processor_server_logging_file_path
from .logging import (
get_processor_server_logging_file_path,
get_processing_job_logging_file_path,
)
from .models import (
PYJobInput,
PYJobOutput,
Expand Down Expand Up @@ -76,7 +80,7 @@ def __init__(self, mongodb_addr: str, processor_name: str = "", processor_class=

# Create routes
self.router.add_api_route(
path='/',
path='/info',
endpoint=self.get_processor_info,
methods=['GET'],
tags=['Processing'],
Expand All @@ -88,7 +92,7 @@ def __init__(self, mongodb_addr: str, processor_name: str = "", processor_class=
)

self.router.add_api_route(
path='/',
path='/run',
endpoint=self.create_processor_task,
methods=['POST'],
tags=['Processing'],
Expand All @@ -100,7 +104,7 @@ def __init__(self, mongodb_addr: str, processor_name: str = "", processor_class=
)

self.router.add_api_route(
path='/{job_id}',
path='/job/{job_id}',
endpoint=self.get_processor_job,
methods=['GET'],
tags=['Processing'],
Expand All @@ -112,7 +116,7 @@ def __init__(self, mongodb_addr: str, processor_name: str = "", processor_class=
)

self.router.add_api_route(
path='/{job_id}/log',
path='/log/{job_id}',
endpoint=self.get_processor_job_log,
methods=['GET'],
tags=['processing'],
Expand Down Expand Up @@ -143,7 +147,6 @@ async def create_processor_task(self, job_input: PYJobInput):
validate_job_input(self.log, self.processor_name, self.ocrd_tool, job_input)
job_input.path_to_mets = await validate_and_return_mets_path(self.log, job_input)

job = None
# The request is not forwarded from the Processing Server, assign a job_id
if not job_input.job_id:
job_id = generate_id()
Expand All @@ -155,19 +158,23 @@ async def create_processor_task(self, job_input: PYJobInput):
state=StateEnum.queued
)
await job.insert()
else:
job = await db_get_processing_job(job_input.job_id)
await self.run_processor_task(job=job)
return job.to_job_output()

async def run_processor_task(self, job: DBProcessorJob):
execution_failed = False
start_time = datetime.now()
job_log_file = get_processing_job_logging_file_path(job_id=job.job_id)
await db_update_processing_job(
job_id=job.job_id,
state=StateEnum.running,
start_time=start_time
start_time=start_time,
log_file_path=job_log_file
)

mets_server_url = await db_get_workspace(workspace_mets_path=job.path_to_mets).mets_server_url
mets_server_url = (await db_get_workspace(workspace_mets_path=job.path_to_mets)).mets_server_url
try:
invoke_processor(
processor_class=self.processor_class,
Expand All @@ -177,7 +184,8 @@ async def run_processor_task(self, job: DBProcessorJob):
output_file_grps=job.output_file_grps,
page_id=job.page_id,
parameters=job.parameters,
mets_server_url=mets_server_url
mets_server_url=mets_server_url,
log_filename=job_log_file,
)
except Exception as error:
self.log.debug(f"processor_name: {self.processor_name}, path_to_mets: {job.path_to_mets}, "
Expand Down Expand Up @@ -233,7 +241,7 @@ def get_version(self) -> str:
if self.version:
return self.version

"""
"""
if self.processor_class:
# The way of accessing the version like in the line below may be problematic
# version_str = self.processor_class(workspace=None, version=True).version
Expand Down

0 comments on commit e6e4b83

Please sign in to comment.