Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ted 896 #345

Merged
merged 7 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 21 additions & 25 deletions dags/notice_validation_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ def notice_daily_validation_workflow():
))
)
def validate_fetched_notices():
"""
:return:
"""
from ted_sws import config
from ted_sws.supra_notice_manager.services.supra_notice_validator import validate_and_update_daily_supra_notice

publication_date = get_notice_publication_date()
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
validate_and_update_daily_supra_notice(notice_publication_day=publication_date,
mongodb_client=mongodb_client
)
mongodb_client=mongodb_client)

@task
@event_log(TechnicalEventMessage(
Expand All @@ -50,15 +52,19 @@ def validate_fetched_notices():
))
)
def summarize_validation_for_daily_supra_notice():
from ted_sws import config
from ted_sws.supra_notice_manager.services.supra_notice_validator import \
summary_validation_for_daily_supra_notice

publication_date = get_notice_publication_date()
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
summary_validation_for_daily_supra_notice(notice_publication_day=publication_date,
mongodb_client=mongodb_client
)
"""
:return:
"""
# Temporally disable DailySupraNotice validation summary
kaleanych marked this conversation as resolved.
Show resolved Hide resolved
# from ted_sws import config
# from ted_sws.supra_notice_manager.services.supra_notice_validator import \
# summary_validation_for_daily_supra_notice
#
# publication_date = get_notice_publication_date()
# mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
# summary_validation_for_daily_supra_notice(notice_publication_day=publication_date,
# mongodb_client=mongodb_client
# )

@task
@event_log(TechnicalEventMessage(
Expand All @@ -69,23 +75,13 @@ def summarize_validation_for_daily_supra_notice():
)
def validate_availability_of_notice_in_cellar():
from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.data_manager.adapters.supra_notice_repository import DailySupraNoticeRepository
from ted_sws.notice_validator.services.check_availability_of_notice_in_cellar import \
validate_notice_availability_in_cellar
from ted_sws.supra_notice_manager.services.supra_notice_validator import \
validate_and_update_supra_notice_availability_in_cellar

notice_publication_day = get_notice_publication_date()
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
repo = DailySupraNoticeRepository(mongodb_client=mongodb_client)
supra_notice = repo.get(reference=notice_publication_day)
if supra_notice:
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
for notice_id in supra_notice.notice_ids:
notice = notice_repository.get(reference=notice_id)
old_notice_status = notice.status
notice = validate_notice_availability_in_cellar(notice=notice)
if notice.status != old_notice_status:
notice_repository.update(notice=notice)
validate_and_update_supra_notice_availability_in_cellar(notice_publication_day=notice_publication_day,
mongodb_client=mongodb_client)

validate_fetched_notices() >> summarize_validation_for_daily_supra_notice() >> validate_availability_of_notice_in_cellar()

Expand Down
3 changes: 2 additions & 1 deletion dags/operators/DagBatchPipelineOperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
DEFAULT_NUBER_OF_CELERY_WORKERS = 144
NOTICE_PROCESS_WORKFLOW_DAG_NAME = "notice_process_workflow"
DEFAULT_START_WITH_TASK_ID = "notice_normalisation_pipeline"
DEFAULT_PIPELINE_NAME_FOR_LOGS = "unknown_pipeline_name"
DEFAULT_PIPELINE_NAME_FOR_LOGS = "unknown_pipeline_name"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is merged from main. ask Stefan



class BatchPipelineCallable(Protocol):

Expand Down
9 changes: 9 additions & 0 deletions dags/pipelines/notice_processor_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,23 @@ def notice_validation_pipeline(notice: Notice, mongodb_client: MongoClient) -> N
from ted_sws.notice_validator.services.validation_summary_runner import validation_summary_report_notice
from ted_sws.notice_validator.services.xpath_coverage_runner import validate_xpath_coverage_notice
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
from ted_sws.event_manager.services.log import log_notice_info

mapping_suite_id = notice.distilled_rdf_manifestation.mapping_suite_id
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
mapping_suite = mapping_suite_repository.get(reference=mapping_suite_id)
log_notice_info(message="Validation :: XPATH coverage :: START", notice_id=notice.ted_id)
validate_xpath_coverage_notice(notice=notice, mapping_suite=mapping_suite, mongodb_client=mongodb_client)
log_notice_info(message="Validation :: XPATH coverage :: END", notice_id=notice.ted_id)
log_notice_info(message="Validation :: SPARQL :: START", notice_id=notice.ted_id)
validate_notice_with_sparql_suite(notice=notice, mapping_suite_package=mapping_suite)
log_notice_info(message="Validation :: SPARQL :: END", notice_id=notice.ted_id)
log_notice_info(message="Validation :: SHACL :: START", notice_id=notice.ted_id)
validate_notice_with_shacl_suite(notice=notice, mapping_suite_package=mapping_suite)
log_notice_info(message="Validation :: SHACL :: END", notice_id=notice.ted_id)
log_notice_info(message="Validation :: Summary :: START", notice_id=notice.ted_id)
validation_summary_report_notice(notice=notice)
log_notice_info(message="Validation :: Summary :: END", notice_id=notice.ted_id)
return NoticePipelineOutput(notice=notice)


Expand Down
3 changes: 2 additions & 1 deletion ted_sws/core/model/supra_notice.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ class SupraNoticeValidationReport(Manifestation):
Result of checking whether all the notices published in TED are present in the internal database.
"""
missing_notice_ids: Optional[List[str]]
not_published_notice_ids: Optional[List[str]]

def is_valid(self):
if not self.missing_notice_ids:
if not self.missing_notice_ids and not self.not_published_notice_ids:
return True
return False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def notice_shacl_summary(self, notice: Notice, report: RDFManifestationValidatio
validation_results = shacl_report.validation_results
is_new, result_validation = self.shacl_summary_result(shacl_report, result_counts)
result_count: SHACLSummarySeverityCountReport = result_validation.result_severity.aggregate
if validation_results:
if validation_results and validation_results.results_dict:
bindings = validation_results.results_dict['results']['bindings']
for binding in bindings:
result_severity = binding['resultSeverity']
Expand Down
36 changes: 34 additions & 2 deletions ted_sws/supra_notice_manager/services/supra_notice_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
from ted_sws.data_manager.adapters.supra_notice_repository import DailySupraNoticeRepository
from ted_sws.notice_fetcher.adapters.ted_api import TedAPIAdapter, RequestAPI, TedRequestAPI
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.core.model.notice import Notice
from ted_sws.core.model.notice import Notice, NoticeStatus
from ted_sws.notice_validator.services.validation_summary_runner import generate_validation_summary_report_notices
from typing import List
from ted_sws.notice_validator.services.check_availability_of_notice_in_cellar import \
validate_notice_availability_in_cellar

day_type = Union[datetime, date]

Expand All @@ -37,7 +39,7 @@ def validate_and_update_daily_supra_notice(notice_publication_day: day_type, mon
api_notice_ids_list = [document["ND"] for document in documents] if documents and len(documents) else []
api_notice_ids = set(api_notice_ids_list)

validation_report = SupraNoticeValidationReport(object_data="")
validation_report = supra_notice.validation_report or SupraNoticeValidationReport(object_data="")
missing_notice_ids = api_notice_ids - fetched_notice_ids
if len(missing_notice_ids):
validation_report.missing_notice_ids = missing_notice_ids
Expand Down Expand Up @@ -68,3 +70,33 @@ def summary_validation_for_daily_supra_notice(notice_publication_day: day_type,
# no notice_ids needed to be stored for supra_notice
kaleanych marked this conversation as resolved.
Show resolved Hide resolved
# supra_notice.validation_summary.notice_ids = []
repo.update(daily_supra_notice=supra_notice)


def validate_and_update_supra_notice_availability_in_cellar(notice_publication_day: day_type,
mongodb_client: MongoClient):
if isinstance(notice_publication_day, date):
notice_publication_day = datetime.combine(notice_publication_day, time())

repo = DailySupraNoticeRepository(mongodb_client=mongodb_client)
supra_notice: DailySupraNotice = repo.get(reference=notice_publication_day)

if not supra_notice:
raise ValueError("SupraNotice not found in Database!")

if supra_notice:
not_published_notice_ids: List[str] = []
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
for notice_id in supra_notice.notice_ids:
notice = notice_repository.get(reference=notice_id)
if notice:
old_notice_status = notice.status
notice = validate_notice_availability_in_cellar(notice=notice)
if notice.status == NoticeStatus.PUBLICLY_UNAVAILABLE:
not_published_notice_ids.append(notice_id)
if notice.status != old_notice_status:
notice_repository.update(notice=notice)

validation_report = supra_notice.validation_report or SupraNoticeValidationReport(object_data="")
validation_report.not_published_notice_ids = not_published_notice_ids
supra_notice.validation_report = validation_report
repo.update(daily_supra_notice=supra_notice)
6 changes: 6 additions & 0 deletions tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pymongo import MongoClient

from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.data_manager.adapters.triple_store import AllegroGraphTripleStore, FusekiAdapter

from tests import TEST_DATA_PATH
Expand Down Expand Up @@ -72,3 +73,8 @@ def fake_repository_path():
@pytest.fixture
def path_to_file_system_repository():
return TEST_DATA_PATH / "notice_transformer" / "test_repository"


@pytest.fixture
def fake_notice_repository(fake_mongodb_client):
return NoticeRepository(mongodb_client=fake_mongodb_client)
3 changes: 2 additions & 1 deletion tests/e2e/master_data_registry/test_entity_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def test_deduplicate_entities_by_cet_uri(notice_with_rdf_manifestation, organisa
sparql_endpoint = fuseki_triple_store.get_sparql_triple_store_endpoint(repository_name=TEST_MDR_REPOSITORY)
unique_names = sparql_endpoint.with_query(sparql_query=TEST_QUERY_UNIQUE_NAMES).fetch_tabular()
unique_cet_roots = sparql_endpoint.with_query(sparql_query=TEST_QUERY_UNIQUE_CET_ROOTS).fetch_tabular()
assert len(unique_names) == len(unique_cet_roots)

fuseki_triple_store.delete_repository(repository_name=TEST_MDR_REPOSITORY)

assert len(unique_names) == len(unique_cet_roots)
1 change: 0 additions & 1 deletion tests/e2e/notice_validator/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from ted_sws.core.model.manifestation import XMLManifestation
from ted_sws.core.model.notice import Notice
from tests import TEST_DATA_PATH


@pytest.fixture
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/supra_notice_manager/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@


@pytest.fixture
def daily_supra_notice_repository(mongodb_client) -> DailySupraNoticeRepository:
return DailySupraNoticeRepository(mongodb_client=mongodb_client)
def daily_supra_notice_repository(fake_mongodb_client) -> DailySupraNoticeRepository:
return DailySupraNoticeRepository(mongodb_client=fake_mongodb_client)
39 changes: 35 additions & 4 deletions tests/e2e/supra_notice_manager/test_supra_notice_validator.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,52 @@
from datetime import datetime, time, timedelta

import pytest

from ted_sws.core.model.manifestation import XMLManifestation
from ted_sws.core.model.notice import Notice, NoticeStatus
from ted_sws.supra_notice_manager.services.daily_supra_notice_manager import \
create_and_store_in_mongo_db_daily_supra_notice
from ted_sws.supra_notice_manager.services.supra_notice_validator import validate_and_update_daily_supra_notice
from ted_sws.supra_notice_manager.services.supra_notice_validator import validate_and_update_daily_supra_notice, \
validate_and_update_supra_notice_availability_in_cellar


def test_supra_notice_validator(mongodb_client, daily_supra_notice_repository):
def test_supra_notice_validator(fake_mongodb_client, daily_supra_notice_repository):
day = datetime.combine(datetime.today() - timedelta(days=1), time())

notice_ids = ["XYZ067623-2022023"]
create_and_store_in_mongo_db_daily_supra_notice(notice_ids=notice_ids, mongodb_client=mongodb_client,
create_and_store_in_mongo_db_daily_supra_notice(notice_ids=notice_ids, mongodb_client=fake_mongodb_client,
notice_fetched_date=day)
validate_and_update_daily_supra_notice(day, mongodb_client)
validate_and_update_daily_supra_notice(day, fake_mongodb_client)
result = daily_supra_notice_repository.get(reference=day)
assert result
assert result.notice_ids is not None
if result.validation_report.missing_notice_ids is not None:
assert result.validation_report.missing_notice_ids
assert notice_ids[0] not in result.validation_report.missing_notice_ids
assert not result.validation_report.is_valid()


def test_validate_and_update_supra_notice_availability_in_cellar(fake_mongodb_client, daily_supra_notice_repository,
fake_notice_repository):
day = datetime.combine(datetime.today() - timedelta(days=1), time())

with pytest.raises(ValueError):
validate_and_update_supra_notice_availability_in_cellar(day, fake_mongodb_client)

notice_id = "TEST-XYZ067623-2022023"
notice_ids = [notice_id]

notice = Notice(ted_id=notice_id, xml_manifestation=XMLManifestation(object_data=""))
notice._status = NoticeStatus.PUBLISHED

fake_notice_repository.add(notice)

create_and_store_in_mongo_db_daily_supra_notice(notice_ids=notice_ids, mongodb_client=fake_mongodb_client,
notice_fetched_date=day)
validate_and_update_supra_notice_availability_in_cellar(day, fake_mongodb_client)
result = daily_supra_notice_repository.get(reference=day)

assert result
assert len(result.validation_report.not_published_notice_ids) > 0
assert notice_id in result.validation_report.not_published_notice_ids
assert not result.validation_report.is_valid()