Skip to content

Commit

Permalink
Merge pull request #1136 from OCR-D/network-processor-api
Browse files Browse the repository at this point in the history
Change api processor paths in ocrd_network
  • Loading branch information
kba authored Nov 21, 2023
2 parents 0df3c54 + e6e4b83 commit 9930d2c
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 28 deletions.
29 changes: 17 additions & 12 deletions ocrd_network/ocrd_network/processing_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
generate_id,
get_ocrd_workspace_physical_pages
)
import time
from urllib.parse import urljoin


class ProcessingServer(FastAPI):
Expand Down Expand Up @@ -135,7 +135,7 @@ def __init__(self, config_path: str, host: str, port: int) -> None:
)

self.router.add_api_route(
path='/processor/{processor_name}',
path='/processor/run/{processor_name}',
endpoint=self.push_processor_job,
methods=['POST'],
tags=['processing'],
Expand All @@ -147,7 +147,7 @@ def __init__(self, config_path: str, host: str, port: int) -> None:
)

self.router.add_api_route(
path='/processor/{processor_name}/{job_id}',
path='/processor/job/{job_id}',
endpoint=self.get_processor_job,
methods=['GET'],
tags=['processing'],
Expand All @@ -159,7 +159,7 @@ def __init__(self, config_path: str, host: str, port: int) -> None:
)

self.router.add_api_route(
path='/processor/{processor_name}/{job_id}/log',
path='/processor/log/{job_id}',
endpoint=self.get_processor_job_log,
methods=['GET'],
tags=['processing'],
Expand All @@ -177,7 +177,7 @@ def __init__(self, config_path: str, host: str, port: int) -> None:
)

self.router.add_api_route(
path='/processor/{processor_name}',
path='/processor/info/{processor_name}',
endpoint=self.get_processor_info,
methods=['GET'],
tags=['processing', 'discovery'],
Expand Down Expand Up @@ -343,7 +343,7 @@ def query_ocrd_tool_json_from_server(self, processor_name):
)
# Request the tool json from the Processor Server
response = requests.get(
processor_server_url,
urljoin(processor_server_url, 'info'),
headers={'Content-Type': 'application/json'}
)
if not response.status_code == 200:
Expand All @@ -361,6 +361,11 @@ async def push_processor_job(self, processor_name: str, data: PYJobInput) -> PYJ
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Job id field is set but must not be: {data.job_id}"
)
if not data.workspace_id and not data.path_to_mets:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="either 'path_to_mets' or 'workspace_id' must be provided"
)
# Generate processing job id
data.job_id = generate_id()

Expand Down Expand Up @@ -517,7 +522,7 @@ async def push_to_processor_server(
timeout = httpx.Timeout(timeout=request_timeout, connect=30.0)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
processor_server_url,
urljoin(processor_server_url, 'run'),
headers={'Content-Type': 'application/json'},
json=json.loads(json_data)
)
Expand All @@ -531,11 +536,11 @@ async def push_to_processor_server(
job_output = response.json()
return job_output

async def get_processor_job(self, processor_name: str, job_id: str) -> PYJobOutput:
return await _get_processor_job(self.log, processor_name, job_id)
async def get_processor_job(self, job_id: str) -> PYJobOutput:
return await _get_processor_job(self.log, job_id)

async def get_processor_job_log(self, processor_name: str, job_id: str) -> FileResponse:
return await _get_processor_job_log(self.log, processor_name, job_id)
async def get_processor_job_log(self, job_id: str) -> FileResponse:
return await _get_processor_job_log(self.log, job_id)

async def remove_from_request_cache(self, result_message: PYResultMessage):
result_job_id = result_message.job_id
Expand Down Expand Up @@ -643,7 +648,7 @@ async def get_processor_info(self, processor_name) -> Dict:
ocrd_tool = self.ocrd_all_tool_json.get(processor_name, None)
if not ocrd_tool:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Ocrd tool JSON of '{processor_name}' not available!"
)

Expand Down
28 changes: 18 additions & 10 deletions ocrd_network/ocrd_network/processor_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
DBProcessorJob,
db_get_workspace,
db_update_processing_job,
db_get_processing_job,
initiate_database
)
from .logging import get_processor_server_logging_file_path
from .logging import (
get_processor_server_logging_file_path,
get_processing_job_logging_file_path,
)
from .models import (
PYJobInput,
PYJobOutput,
Expand Down Expand Up @@ -76,7 +80,7 @@ def __init__(self, mongodb_addr: str, processor_name: str = "", processor_class=

# Create routes
self.router.add_api_route(
path='/',
path='/info',
endpoint=self.get_processor_info,
methods=['GET'],
tags=['Processing'],
Expand All @@ -88,7 +92,7 @@ def __init__(self, mongodb_addr: str, processor_name: str = "", processor_class=
)

self.router.add_api_route(
path='/',
path='/run',
endpoint=self.create_processor_task,
methods=['POST'],
tags=['Processing'],
Expand All @@ -100,7 +104,7 @@ def __init__(self, mongodb_addr: str, processor_name: str = "", processor_class=
)

self.router.add_api_route(
path='/{job_id}',
path='/job/{job_id}',
endpoint=self.get_processor_job,
methods=['GET'],
tags=['Processing'],
Expand All @@ -112,7 +116,7 @@ def __init__(self, mongodb_addr: str, processor_name: str = "", processor_class=
)

self.router.add_api_route(
path='/{job_id}/log',
path='/log/{job_id}',
endpoint=self.get_processor_job_log,
methods=['GET'],
tags=['processing'],
Expand Down Expand Up @@ -143,7 +147,6 @@ async def create_processor_task(self, job_input: PYJobInput):
validate_job_input(self.log, self.processor_name, self.ocrd_tool, job_input)
job_input.path_to_mets = await validate_and_return_mets_path(self.log, job_input)

job = None
# The request is not forwarded from the Processing Server, assign a job_id
if not job_input.job_id:
job_id = generate_id()
Expand All @@ -155,19 +158,23 @@ async def create_processor_task(self, job_input: PYJobInput):
state=StateEnum.queued
)
await job.insert()
else:
job = await db_get_processing_job(job_input.job_id)
await self.run_processor_task(job=job)
return job.to_job_output()

async def run_processor_task(self, job: DBProcessorJob):
execution_failed = False
start_time = datetime.now()
job_log_file = get_processing_job_logging_file_path(job_id=job.job_id)
await db_update_processing_job(
job_id=job.job_id,
state=StateEnum.running,
start_time=start_time
start_time=start_time,
log_file_path=job_log_file
)

mets_server_url = await db_get_workspace(workspace_mets_path=job.path_to_mets).mets_server_url
mets_server_url = (await db_get_workspace(workspace_mets_path=job.path_to_mets)).mets_server_url
try:
invoke_processor(
processor_class=self.processor_class,
Expand All @@ -177,7 +184,8 @@ async def run_processor_task(self, job: DBProcessorJob):
output_file_grps=job.output_file_grps,
page_id=job.page_id,
parameters=job.parameters,
mets_server_url=mets_server_url
mets_server_url=mets_server_url,
log_filename=job_log_file,
)
except Exception as error:
self.log.debug(f"processor_name: {self.processor_name}, path_to_mets: {job.path_to_mets}, "
Expand Down Expand Up @@ -233,7 +241,7 @@ def get_version(self) -> str:
if self.version:
return self.version

"""
"""
if self.processor_class:
# The way of accessing the version like in the line below may be problematic
# version_str = self.processor_class(workspace=None, version=True).version
Expand Down
11 changes: 5 additions & 6 deletions ocrd_network/ocrd_network/server_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,22 @@
from .models import PYJobInput, PYJobOutput


async def _get_processor_job(logger, processor_name: str, job_id: str) -> PYJobOutput:
async def _get_processor_job(logger, job_id: str) -> PYJobOutput:
""" Return processing job-information from the database
"""
try:
job = await db_get_processing_job(job_id)
return job.to_job_output()
except ValueError as e:
logger.exception(f"Processing job with id '{job_id}' of processor type "
f"'{processor_name}' not existing, error: {e}")
logger.exception(f"Processing job with id '{job_id}' not existing, error: {e}")
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Processing job with id '{job_id}' of processor type '{processor_name}' not existing"
detail=f"Processing job with id '{job_id}' not existing"
)


async def _get_processor_job_log(logger, processor_name: str, job_id: str) -> FileResponse:
db_job = await _get_processor_job(logger, processor_name, job_id)
async def _get_processor_job_log(logger, job_id: str) -> FileResponse:
db_job = await _get_processor_job(logger, job_id)
log_file_path = Path(db_job.log_file_path)
return FileResponse(path=log_file_path, filename=log_file_path.name)

Expand Down

0 comments on commit 9930d2c

Please sign in to comment.