Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix airflow workers logs and notice_fetcher_worker #270

Merged
merged 1 commit into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 24 additions & 27 deletions dags/fetch_notices_per_day_worker.py
Original file line number Diff line number Diff line change
@@ -1,69 +1,66 @@
import time
from random import randint
from datetime import datetime

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from pymongo import MongoClient

from dags import DEFAULT_DAG_ARGUMENTS
from dags.index_and_normalise_notice_worker import NOTICE_ID
from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.data_sampler.services.notice_xml_indexer import index_notice
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType
from ted_sws.event_manager.services.log import log_error
from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, TedRequestAPI
from ted_sws.notice_fetcher.services.notice_fetcher import NoticeFetcher
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice
from ted_sws.supra_notice_manager.services.daily_supra_notice_manager import \
create_and_store_in_mongo_db_daily_supra_notice

DAG_NAME = "fetch_notices_per_day_worker"
DATE_WILD_CARD_KEY = "date_wild_card"


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
max_active_runs=128,
max_active_tasks=128,
max_active_runs=144,
max_active_tasks=144,
schedule_interval=None,
tags=['worker', 'fetch_notices_per_day'])
def fetch_notices_per_day_worker():
@task
@event_log(TechnicalEventMessage(
message="fetch_notices_and_trigger_index_and_normalise_notice_worker",
message="fetch_notices_and_index_and_normalise_each_notice",
metadata=EventMessageMetadata(
process_type=EventMessageProcessType.DAG, process_name=DAG_NAME
))
)
def fetch_notices_and_trigger_index_and_normalise_notice_worker():
def fetch_notices_and_index_and_normalise_each_notice():
context = get_current_context()
dag_conf = context["dag_run"].conf

if DATE_WILD_CARD_KEY not in dag_conf.keys():
raise "Config key [date] is not present in dag context"
raise Exception(f"Config key {DATE_WILD_CARD_KEY} is not present in dag context")

mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_ids = NoticeFetcher(notice_repository=NoticeRepository(mongodb_client=mongodb_client),
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
notice_ids = NoticeFetcher(notice_repository=notice_repository,
ted_api_adapter=TedAPIAdapter(
request_api=TedRequestAPI())).fetch_notices_by_date_wild_card(
wildcard_date=dag_conf[DATE_WILD_CARD_KEY]) # "20220203*"
notice_publication_date = datetime.strptime(dag_conf[DATE_WILD_CARD_KEY], "%Y%m%d*").date()
create_and_store_in_mongo_db_daily_supra_notice(notice_ids=notice_ids, mongodb_client=mongodb_client,
notice_fetched_date=notice_publication_date)
for notice_id in notice_ids:
restart_dag_operator = True
while restart_dag_operator:
restart_dag_operator = False
try:
time.sleep(randint(10, 500) / 1000)
TriggerDagRunOperator(
task_id=f'trigger_index_and_normalise_notice_worker_dag_{notice_id}',
trigger_dag_id="index_and_normalise_notice_worker",
trigger_run_id=notice_id,
conf={NOTICE_ID: notice_id}
).execute(context=context)
except Exception as e:
notice = notice_repository.get(reference=notice_id)
notice = index_notice(notice=notice)
try:
normalised_notice = normalise_notice(notice=notice)
notice_repository.update(notice=normalised_notice)
except Exception as e:
log_error(message=str(e))

restart_dag_operator = True
print("trigger dag operator restarted !!!")
print("EXCEPTION message: ", e)

fetch_notices_and_trigger_index_and_normalise_notice_worker()
fetch_notices_and_index_and_normalise_each_notice()


dag = fetch_notices_per_day_worker()
2 changes: 1 addition & 1 deletion infra/airflow-cluster/docker-compose-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ services:
<<: *airflow-common
container_name: airflow-worker-${ENVIRONMENT}
command: celery worker
hostname: ${AIRFLOW_WORKER_HOSTNAME}.${SUBDOMAIN}remote_worker
hostname: ${AIRFLOW_WORKER_HOSTNAME}
ports:
- "8793:8793"
healthcheck:
Expand Down
7 changes: 6 additions & 1 deletion infra/airflow-cluster/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ x-airflow-common:
- ${AIRFLOW_INFRA_FOLDER}/ted_sws:/opt/airflow/ted_sws
- ${AIRFLOW_INFRA_FOLDER}/tests:/opt/airflow/tests
user: "${AIRFLOW_UID:-50000}:0"
extra_hosts:
- "hermes:65.108.123.44"
- "newton:65.109.3.225"
- "pascal:65.21.59.9"
- "srv.meaningfy.ws:168.119.148.205"
command: bash -c "export PYTHONPATH='/opt/airflow/'"
depends_on:
&airflow-common-depends-on
Expand Down Expand Up @@ -208,7 +213,7 @@ services:
<<: *airflow-common
container_name: airflow-worker-${ENVIRONMENT}
command: celery worker
hostname: ${AIRFLOW_WORKER_HOSTNAME}.${SUBDOMAIN}local_worker
hostname: ${AIRFLOW_WORKER_HOSTNAME}
healthcheck:
test: ["CMD-SHELL",'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"']
interval: 10s
Expand Down