Skip to content

Commit

Permalink
First version of the workflow endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
joschrew committed Aug 31, 2023
1 parent 79016bf commit 439dde4
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 4 deletions.
57 changes: 56 additions & 1 deletion ocrd_network/ocrd_network/processing_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
from typing import Dict, List
import uvicorn

from fastapi import FastAPI, status, Request, HTTPException
from fastapi import (
FastAPI,
status,
Request,
HTTPException,
UploadFile
)
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse

Expand Down Expand Up @@ -33,12 +39,14 @@
expand_page_ids,
validate_and_return_mets_path,
validate_job_input,
create_db_workspace,
)
from .utils import (
download_ocrd_all_tool_json,
generate_created_time,
generate_id
)
from ocrd.task_sequence import ProcessorTask


class ProcessingServer(FastAPI):
Expand Down Expand Up @@ -150,6 +158,15 @@ def __init__(self, config_path: str, host: str, port: int) -> None:
summary='Get a list of all available processors',
)

self.router.add_api_route(
path='/workflow',
endpoint=self.run_workflow,
methods=['POST'],
tags=['workflow', 'processing'],
status_code=status.HTTP_200_OK,
summary='Run a workflow',
)

@self.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ')
Expand Down Expand Up @@ -670,3 +687,41 @@ async def list_processors(self) -> List[str]:
unique_only=True
)
return processor_names_list

# TODO: think about providing arguments in another way
# TODO: this function "just" writes to the queue and returns. A network-client functionality
# should be available to react to everys processors callback. With this feedback
# a blocking mechanism could be provided to inform about starting the cain and waiting for
# the processors to finish and printing when reponses are received from the processors
async def run_workflow(self, workflow: UploadFile, workspace_path, callback_url=None) -> List:
# core cannot create workspaces by api, but processing-server needs the workspace in the
# database. Here the workspace is created if the path available and not existing in db:
#from pudb import set_trace; set_trace()
if not await create_db_workspace(workspace_path):
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Workspace with path: '{workspace_path}' not existing")

workflow = (await workflow.read()).decode("utf-8")
try:
tasks_list = workflow.splitlines()
tasks = [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()]
except BaseException as e:
print(e)
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Error parsing tasks: {e}")
outputs = []
last_job_id = ""
for task in tasks:
data = PYJobInput(
agent_type='worker',
path_to_mets=workspace_path,
input_file_grps=task.input_file_grps,
output_file_grps=task.output_file_grps,
parameters=task.parameters,
callback_url=callback_url,
depends_on=[last_job_id] if last_job_id else [],
)
output = await self.push_processor_job(task.executable, data)
outputs.append(output)
last_job_id = output.job_id
return outputs
27 changes: 24 additions & 3 deletions ocrd_network/ocrd_network/server_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
from fastapi import HTTPException, status
from typing import Dict, List
from typing import List
from ocrd_validators import ParameterValidator
from ocrd_utils import (
generate_range,
Expand All @@ -10,7 +10,9 @@
db_get_processing_job,
db_get_workspace,
)
from .models import PYJobInput, PYJobOutput
from .models import PYJobInput, PYJobOutput, DBWorkspace
import uuid
from pathlib import Path


async def _get_processor_job(logger, processor_name: str, job_id: str) -> PYJobOutput:
Expand Down Expand Up @@ -78,10 +80,29 @@ def validate_job_input(logger, processor_name: str, ocrd_tool: dict, job_input:
logger.exception(f'Failed to validate processing job against the ocrd_tool: {e}')
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f'Failed to validate processing job against the ocrd_tool'
detail='Failed to validate processing job against the ocrd_tool'
)
else:
if not report.is_valid:
logger.exception(f'Failed to validate processing job '
f'against the ocrd_tool, errors: {report.errors}')
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=report.errors)


async def create_db_workspace(workspace_path: str) -> DBWorkspace:
""" Create a workspace-database entry only from a mets-path
"""
if not Path(workspace_path).exists():
return None
try:
return await db_get_workspace(workspace_mets_path=workspace_path)
except ValueError:
workspace_db = DBWorkspace(
workspace_id=str(uuid.uuid4()),
workspace_path=Path(workspace_path).parent,
workspace_mets_path=workspace_path,
ocrd_identifier="",
bagit_profile_identifier="",
)
await workspace_db.save()
return workspace_db

0 comments on commit 439dde4

Please sign in to comment.