Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update notice_batch_orchestrator.py #266

Merged
merged 2 commits into from
Sep 8, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions dags/notice_batch_orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from itertools import chain, islice
from typing import Iterator

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
Expand All @@ -9,7 +10,7 @@
from dags.notice_batch_worker import NOTICE_BATCH_KEY
from ted_sws import config
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository, NOTICE_STATUS
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType
Expand All @@ -28,6 +29,11 @@ def chunks(iterable, chunk_size: int):
yield chain([first], islice(iterator, chunk_size - 1))


def get_notice_ids(notice_repository: NoticeRepository, notice_status: str) -> Iterator[str]:
for result_dict in notice_repository.collection.find({NOTICE_STATUS: notice_status}, {"ted_id": 1}):
yield result_dict["ted_id"]


@dag(default_args=DEFAULT_DAG_ARGUMENTS, schedule_interval=None, tags=['master', 'notice_batch_orchestrator'])
def notice_batch_orchestrator():
@task
Expand All @@ -49,7 +55,8 @@ def generate_notice_batches_and_trigger_worker():
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
notice_batch_counter = 0
for notice_batch in chunks(notice_repository.get_notice_by_status(notice_status=NoticeStatus[notice_status]),
for notice_batch in chunks(get_notice_ids(notice_repository=notice_repository,
notice_status=notice_status),
chunk_size=notice_batch_size):
TriggerDagRunOperator(
task_id=f'trigger_notice_batch_worker_dag_{notice_batch_counter}',
Expand Down