Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ocrd network: defer enqueue until workspace is free #1046

Closed
bertsky opened this issue May 8, 2023 · 14 comments
Closed

ocrd network: defer enqueue until workspace is free #1046

bertsky opened this issue May 8, 2023 · 14 comments

Comments

@bertsky
Copy link
Collaborator

bertsky commented May 8, 2023

In the Processing Server, we currently add jobs to the queue unconditionally, without checking whether any job is already running on the respective workspace:

self.rmq_publisher.publish_to_queue(processor_name, encoded_processing_message)

Obviously, this will create inconsistent (or at least surprising) results when another request is made for the same workspace while running a job, or if two requests are made for the same workspace before either of them is scheduled.

IMO (since everything goes through it) we can simply look up the database for effective locking. We could add a method db_has_processing_job(workspace_id) which checks whether there are any QUEUED or RUNNING jobs for that workspace.

But what happens then? Ideally, the Processing Server would itself "wait" for the workspace to become available before actually enqueuing the new job. And that of course must be atomic. And accomodate our asynchronous model.

One idea would be to enqueue the job in a "hidden" queue, which only the Processing Server itself subscribes to and can be identified via the workspace ID (e.g. queue_name="workspace." + data.workspace_id). And when the Processing Worker is done, it sends the OcrdResultMessage not only to the OcrdProcessingMessage's result queue and callback URL, but to a constant unique result queue as well. That queue in turn gets subscribed to by the Processing Server, in a background thread independent of the requests. When a result message is consumed on it, the Processing Server checks if there is any "hidden" OcrdProcessingMessage for the same workspace ID, and if so, consumes it and enqueues it on the actual processor queue.

(But I think that would still not be atomic. Perhaps instead of the result queue mechanism and background thread, we should use the callback mechanism and add a regular endpoint like a DELETE to /processor/{processor_name}/{job_id} – to the Processing Server.)

What I find intriguing is the perspective that with the additional machinery, we could avoid the need to poll the job status in the Workflow Server: the latter could simply post/push all step jobs for that workspace right away, and rely on the Processing Server to ensure they are executed in order. (One still needs conditionality on success status, but that's also currently needed in the Workflow Server.)

@MehmedGIT
Copy link
Contributor

(But I think that would still not be atomic. Perhaps instead of the result queue mechanism and background thread, we should use the callback mechanism and add a regular endpoint like a DELETE to /processor/{processor_name}/{job_id} – to the Processing Server.)

I think this approach is better and more direct than creating a unique result queue.

the latter could simply post/push all step jobs for that workspace right away, and rely on the Processing Server to ensure they are executed in order.

Sounds good. But how will the Processing Server know which processing requests are chained together in a workflow? Since there is already a differentiation between DBProcessorJob (processing endpoint) and WorkflowJobDB (workflow endpoint), we could potentially cache all coming processing requests (processing job_ids) under the same workflow job_id.

@bertsky
Copy link
Collaborator Author

bertsky commented May 9, 2023

But how will the Processing Server know which processing requests are chained together in a workflow? Since there is already a differentiation between DBProcessorJob (processing endpoint) and WorkflowJobDB (workflow endpoint), we could potentially cache all coming processing requests (processing job_ids) under the same workflow job_id.

The latter is not necessary AFAICS. The Processing Server must simply assume that all requests pertaining to the same workspace have a dependency relation and therefore enqueue them in the order they were received (regardless of where from – the Workflow Server in a single workflow, or multiple workflows, or independent, single-step requests to the Processing Server).

@MehmedGIT
Copy link
Contributor

Yes, but the Processing Server is just a proxy (with some validation on top) as it is now and has no other role. I am not sure how the buffering of processing requests should happen without some sort of caching mechanism inside the Processing Server or the Database itself to enqueue all the requests of a specific Workspace.

@bertsky
Copy link
Collaborator Author

bertsky commented May 10, 2023

but the Processing Server is just a proxy (with some validation on top) as it is now and has no other role.

The PS is the only one creating and enqueuing the processing requests. That's where you can control things. You don't enqeue to the "public" side (where workers can consume) more than one job per workspace at a time. The internal queue thus enforces order. See my explanation above.

@MehmedGIT
Copy link
Contributor

MehmedGIT commented May 10, 2023

The internal queue thus enforces order. See my explanation above.

I was just thinking about a solution without utilizing any internal network queues at all (avoid additional connections to the RabbitMQ server). The Processing Server will be the only one pushing and pulling from these queues anyway. So maybe utilizing a Python list of OcrdProcessingMessage as an internal queue structure for a specific workspace_id is more straightforward? In the end, it's still an internal queue for caching processing requests to a workspace_id so that should work.

Once the Processing Worker/Processor Server is done with the execution they can use a callback (if provided in the request) method DELETE to /processor/{processor_name}/{job_id} to the processing server to indicate completion. The processing server will then pop from the internal queue the next request and submit it to the specific Process Queue. And so on.

This of course then removes the need for the Workflow Server to poll the processing request statuses. A callback method can be invoked by the Processing Server to the Workflow Server to indicate that all processing requests for a workspace_id have been finished.

However, an implementation should not come without considering the error handling and potential problems as well. Maybe I am missing something, but isn't the Nextflow Script becoming unneeded then? We can pretty much use plain ocrd-process descriptions and chaining of processors into a workflow.

@bertsky
Copy link
Collaborator Author

bertsky commented May 10, 2023

So maybe utilizing a Python list of OcrdProcessingMessage as an internal queue structure for a specific workspace_id is more straightforward? In the end, it's still an internal queue for caching processing requests to a workspace_id so that should work.

Indeed. Much simpler!

This of course then removes the need for the Workflow Server to poll the processing request statuses. A callback method can be invoked by the Processing Server to the Workflow Server to indicate that all processing requests for a workspace_id have been finished.

Yes, exactly.

However, an implementation should not come without considering the error handling and potential problems as well.

I think would make life easier, actually. We used to skip this level, because the client could directly talk to the queue. But this is a better separation of concerns IMO. But I did not revisit the error handling concept specifically, yet.

Maybe I am missing something, but isn't the Nextflow Script becoming unneeded then? We can pretty much use plain ocrd-process descriptions and chaining of processors into a workflow.

We would still need Nextflow to parse the workflow syntax and slice it up into single processing calls with all the parameters involved. Also, Nextflow can still help us with error handling and re-processing IIRC.

@MehmedGIT
Copy link
Contributor

MehmedGIT commented May 10, 2023

Also, Nextflow can still help us with error handling and re-processing IIRC.

Yes, but we should not forget that invoking some endpoint of the Worklfow Server to register errors from the Processing Server cannot affect the runtime execution of a Nextflow Workflow. That's the reason why polling processing reuqests is needed inside the script itself. This allows workflow branching during runtime as well.

@bertsky
Copy link
Collaborator Author

bertsky commented Jun 21, 2023

So maybe utilizing a Python list of OcrdProcessingMessage as an internal queue structure for a specific workspace_id is more straightforward? In the end, it's still an internal queue for caching processing requests to a workspace_id so that should work.

Indeed. Much simpler!

Sorry, have not immersed myself into our chain of thought again: Would that by any chance be something that the METS Server can help us with?

@MehmedGIT
Copy link
Contributor

So maybe utilizing a Python list of OcrdProcessingMessage as an internal queue structure for a specific workspace_id is more straightforward? In the end, it's still an internal queue for caching processing requests to a workspace_id so that should work.

Indeed. Much simpler!

Sorry, have not immersed myself into our chain of thought again: Would that by any chance be something that the METS Server can help us with?

IMO, not really. There is still no clear vision of the METS Server's location on the Network architecture as partially discussed in #1035. However, from what I remember was that the METS server should be started, used, and stopped by the workers/processor servers that actually run the OCR-D processors on the specific workspace. So, I would rather keep that as a low-level detail that workers know about but the processing server does not. The processing server should just buffer the incoming requests of a specific workspace internally and pass the messages into the designated worker/processor server queue.

Of course, with that approach, it is a bit inefficient to start/stop the METS server for each processing message. However, I guess a bit of inefficiency would be preferable to having a more complex design which may require more error handling and debugging in the future.

@bertsky
Copy link
Collaborator Author

bertsky commented Jul 3, 2023

There is still no clear vision of the METS Server's location on the Network architecture as partially discussed in #1035.

Yes, but IMO we have now come to the point where we need to mold that vision into shape. If we do not want to use the METS Server as synchronisation mechanism for workspaces, we at least have to have a good reason for doing so. I currently cannot see any.

However, from what I remember was that the METS server should be started, used, and stopped by the workers/processor servers that actually run the OCR-D processors on the specific workspace.
[...]
Of course, with that approach, it is a bit inefficient to start/stop the METS server for each processing message.

But IIRC efficency (avoiding unnecessary METS de/serialisation between workflow steps or even between pages) was the very reason (besides synchronisation for page parallelisation) we conceived of the METS Server in the first place. (I cannot find a written discussion, I think it was @kba's idea, but here is an early mention.)

Put another way: what is the point of spinning up a METS Server for each processing step and tearing it down afterwards? – We could just initially parse and finally generate the METS file instead.

Also, when you first outlined the error handling architecture you placed that task at the Processing Server (not the Workers) yourself. I still see it that way.

It could be as simple as "start if it does not already run, stop if not needed for more than 5 min". If all METS operations could be delegated to the METS server (because it is guaranteed to run), then they would automatically get synchronised (whether in a page parallel processsor or in a multi-processor workflow) and get serialised as late as possible. The latter could be implicit (when the Processing Server is done processing a single client-side request without METS Server parameter), or explicit (when a Workflow Server requires it at the end of a workflow, or when the Workspace Server is requested to provide results).

@MehmedGIT
Copy link
Contributor

Yes, but IMO we have now come to the point where we need to mold that vision into shape.

Agree.

But IIRC efficency (avoiding unnecessary METS de/serialisation between workflow steps or even between pages) was the very reason (besides synchronisation for page parallelisation) we conceived of the METS Server in the first place. (I cannot find a written discussion, I think it was @kba's idea, but #974 (comment) is an early mention.)

To make it clear I am not against the METS Server for synchronization. I was rather discussing the responsibilities for the METS Server - which module starts/stops it.

Put another way: what is the point of spinning up a METS Server for each processing step and tearing it down afterwards? – We could just initially parse and finally generate the METS file instead.

For internal page-wise processing on the worker side?

Also, when you first outlined the #1015 you placed that task at the Processing Server (not the Workers) yourself. I still see it that way.

Right, but back then the scenario of placing workers on different hosts was not considered. Okay, we can stick to that choice. In that case, the Processing Server will spawn METS servers and provide the respective address in the processing request to the workers/processor servers.

It could be as simple as "start if it does not already run, stop if not needed for more than 5 min".

I am almost sure defining the optimal time to decide when to stop the server would not be an easy task and will be error-prone. I would rather not rely on time to decide when to stop the METS server. Since the workers utilize a callback mechanism, the METS server should be shut down when the last processing step is finished.

@bertsky
Copy link
Collaborator Author

bertsky commented Jul 3, 2023

what is the point of spinning up a METS Server for each processing step and tearing it down afterwards?

For internal page-wise processing on the worker side?

Oh, of course, for that it still makes sense.

Right, but back then the scenario of placing workers on different hosts was not considered.

True. But isn't that even more reason to centralise METS Server management?

If my interpretation is correct that METS Servers always run where the workspace is (i.e. no data file transfers through the METS Server), then for external Workers the problem would merely be about

  • how to get fast access to a copy of the data files for processing
  • retrieving the METS Server address along with the processing request itself

In that case, the Processing Server will spawn METS servers and provide the respective address in the processing request to the workers/processor servers.

Yes!

And in addition, requests to the Processing Server should get an optional METS Server parameter, so its lifetime can be controlled externally.

It could be as simple as "start if it does not already run, stop if not needed for more than 5 min".

I am almost sure defining the optimal time to decide when to stop the server would not be an easy task and will be error-prone. I would rather not rely on time to decide when to stop the METS server. Since the workers utilize a callback mechanism, the METS server should be shut down when the last processing step is finished.

Indeed, for callback listeners, the write-back must be as early as possible anyway.

kba added a commit that referenced this issue Sep 4, 2023
@bertsky
Copy link
Collaborator Author

bertsky commented Aug 1, 2024

AFAICS you have already implemented all the above by now, particularly in #1069 (regarding job interdependencies independent of granularity level page/workspace) and in #1083 (regarding internal utilisation of METS Servers). So we should close this, I think.

@MehmedGIT
Copy link
Contributor

AFAICS you have already implemented all the above by now, particularly in #1069 (regarding job interdependencies independent of granularity level page/workspace) and in #1083 (regarding internal utilisation of METS Servers). So we should close this, I think.

Yes, I am closing it. I will now go through all other ocrd_network related issues and close issues already implemented and merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants