Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ProcessingWorker create its queue by default #1093

Merged
merged 2 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ Some parts of the software are configured via environement variables:
* `OCRD_NETWORK_SERVER_ADDR_PROCESSING`: Default address of Processing Server to connect to (for `ocrd network client processing`).
* `OCRD_NETWORK_SERVER_ADDR_WORKFLOW`: Default address of Workflow Server to connect to (for `ocrd network client workflow`).
* `OCRD_NETWORK_SERVER_ADDR_WORKSPACE`: Default address of Workspace Server to connect to (for `ocrd network client workspace`).
* `OCRD_NETWORK_WORKER_QUEUE_CONNECT_ATTEMPTS`: Number of attempts for a worker to create its queue. Helpfull if the rabbitmq-server needs time to be fully started.


## Packages
Expand Down Expand Up @@ -283,9 +284,9 @@ To be used in a **loop over all selected pages**:
local in_pageId=($(ocrd__input_file $n pageId))
local out_id=$(ocrd__input_file $n outputFileId)
local out_fpath="${ocrd__argv[output_file_grp]}/${out_id}.xml

# process $in_fpath to $out_fpath ...

declare -a options
if [ -n "$in_pageId" ]; then
options=( -g $in_pageId )
Expand All @@ -303,11 +304,11 @@ To be used in a **loop over all selected pages**:
> **Note**: If the `--input-file-grp` is **multi-valued** (N fileGrps separated by commas),
> then usage is similar:
> * The function `ocrd__input_file` can be used, but
> its results will be **lists** (delimited by whitespace and surrounded by single quotes),
> its results will be **lists** (delimited by whitespace and surrounded by single quotes),
> e.g. `[url]='file1.xml file2.xml' [ID]='id_file1 id_file2' [mimetype]='application/vnd.prima.page+xml image/tiff' ...`.
> * Therefore its results should be encapsulated in a (non-associative) **array variable**
> and without extra quotes, e.g. `in_file=($(ocrd__input_file 3 url))`, or as shown above.
> * This will yield the first fileGrp's results on index 0,
> * This will yield the first fileGrp's results on index 0,
> which in bash will always be the same as if you referenced the array without index
> (so code does not need to be changed much), e.g. `test -f $in_file` which equals `test -f ${in_file[0]}`.
> * Additional fileGrps will have to be fetched from higher indexes, e.g. `test -f ${in_file[1]}`.
Expand Down
2 changes: 2 additions & 0 deletions ocrd/ocrd/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
\b
{config.describe('OCRD_NETWORK_SERVER_ADDR_WORKSPACE')}
\b
{config.describe('OCRD_NETWORK_WORKER_QUEUE_CONNECT_ATTEMPTS')}
\b
{config.describe('OCRD_PROFILE_FILE')}
\b
{config.describe('OCRD_PROFILE', wrap_text=False)}
Expand Down
19 changes: 1 addition & 18 deletions ocrd_network/ocrd_network/cli/processing_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,7 @@
help='The URL of the MongoDB, format: mongodb://host:port',
type=DatabaseParamType(),
required=True)
@click.option('--create-queue',
is_flag=True,
help='Create the rabbitmq-queue for the worker. Usually the processing server starts'
'the workers and creates the queues. This is to make external addition of workers for'
'new processors possible')
@click.option('--queue-connect-attempts',
type=int,
default=1,
help='Number of attempts to establish the connection to rabbitmq for creating the '
'queue. There is two seconds wait between attempts. Helpfull if the server needs '
'time to be fully started')
def processing_worker_cli(processor_name: str, queue: str, database: str, create_queue: bool,
queue_connect_attempts: int):
def processing_worker_cli(processor_name: str, queue: str, database: str):
"""
Start Processing Worker
(a specific ocr-d processor consuming tasks from RabbitMQ queue)
Expand All @@ -55,11 +43,6 @@ def processing_worker_cli(processor_name: str, queue: str, database: str, create
ocrd_tool=ocrd_tool,
processor_class=None, # For readability purposes assigned here
)
if create_queue:
processing_worker.create_queue(
connection_attempts=queue_connect_attempts,
retry_delay=2
)
# The RMQConsumer is initialized and a connection to the RabbitMQ is performed
processing_worker.connect_consumer()
# Start consuming from the queue with name `processor_name`
Expand Down
8 changes: 7 additions & 1 deletion ocrd_network/ocrd_network/processing_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
verify_database_uri,
verify_and_parse_mq_uri
)
from ocrd_utils import config


class ProcessingWorker:
Expand Down Expand Up @@ -79,6 +80,11 @@ def __init__(self, rabbitmq_addr, mongodb_addr, processor_name, ocrd_tool: dict,
# The publisher is connected when the `result_queue` field of the OcrdProcessingMessage is set for first time
# Used to publish OcrdResultMessage type message to the queue with name {processor_name}-result
self.rmq_publisher = None
# Always create a queue (idempotent)
self.create_queue(
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved
connection_attempts=config.OCRD_NETWORK_WORKER_QUEUE_CONNECT_ATTEMPTS,
retry_delay=2
)

def connect_consumer(self) -> None:
self.log.info(f'Connecting RMQConsumer to RabbitMQ server: '
Expand Down Expand Up @@ -261,7 +267,7 @@ def publish_to_result_queue(self, result_queue: str, result_message: OcrdResultM
message=encoded_result_message
)

def create_queue(self, connection_attempts=1, retry_delay=1):
def create_queue(self, connection_attempts: int = 1, retry_delay: int = 1):
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved
"""Create the queue for this worker

Originally only the processing-server created the queues for the workers according to the
Expand Down
5 changes: 5 additions & 0 deletions ocrd_utils/ocrd_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ def _ocrd_download_timeout_parser(val):
description="Default address of Workspace Server to connect to (for `ocrd network client workspace`).",
default=(True, ''))

config.add("OCRD_NETWORK_WORKER_QUEUE_CONNECT_ATTEMPTS",
description="Number of attempts for a worker to create its queue. Helpfull if the rabbitmq-server needs time to be fully started",
parser=int,
default=(True, 1))
MehmedGIT marked this conversation as resolved.
Show resolved Hide resolved

config.add("HOME",
description="Directory to look for `ocrd_logging.conf`, fallback for unset XDG variables.",
# description="HOME directory, cf. https://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html",
Expand Down