Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Cellar request dellay #424

Merged
merged 1 commit into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions ted_sws/data_manager/adapters/sparql_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import pandas as pd
import rdflib
from SPARQLWrapper import SPARQLWrapper, CSV, JSON, RDF
from SPARQLWrapper import SPARQLWrapper, CSV, JSON, RDF, POST

from ted_sws import config

Expand All @@ -35,13 +35,15 @@ class SPARQLClientPool(object):
connection_pool = {}

@staticmethod
def create_or_reuse_connection(endpoint_url: str, user: str, password: str):
def create_or_reuse_connection(endpoint_url: str, user: str, password: str, use_post_method: bool = False):
if endpoint_url not in SPARQLClientPool.connection_pool:
sparql_wrapper = SPARQLWrapper(endpoint_url)
sparql_wrapper.setCredentials(
user=user,
passwd=password
)
if use_post_method:
sparql_wrapper.setMethod(method=POST)
SPARQLClientPool.connection_pool[endpoint_url] = sparql_wrapper
return SPARQLClientPool.connection_pool[endpoint_url]

Expand Down Expand Up @@ -123,10 +125,11 @@ def add_data_to_repository(self, file_content, repository_name, mime_type):

class SPARQLTripleStoreEndpoint(TripleStoreEndpointABC):

def __init__(self, endpoint_url: str, user: str = None, password: str = None):
def __init__(self, endpoint_url: str, user: str = None, password: str = None, use_post_method: bool = False):
user = user if user else config.AGRAPH_SUPER_USER
password = password if password else config.AGRAPH_SUPER_PASSWORD
self.endpoint = SPARQLClientPool.create_or_reuse_connection(endpoint_url, user, password)
self.endpoint = SPARQLClientPool.create_or_reuse_connection(endpoint_url, user, password,
use_post_method=use_post_method)

def _set_sparql_query(self, sparql_query: str):
"""
Expand Down
8 changes: 4 additions & 4 deletions ted_sws/master_data_registry/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pathlib

MASTER_DATA_REGISTRY_RESOURCES_PATH = pathlib.Path(__file__).parent.resolve()

TRIPLES_BY_CET_URI_SPARQL_QUERY_TEMPLATE_PATH = MASTER_DATA_REGISTRY_RESOURCES_PATH / "sparql_query_templates/get_by_cet_uri.rq"
PROCEDURE_SUBJECTS_SPARQL_QUERY_TEMPLATE_PATH = MASTER_DATA_REGISTRY_RESOURCES_PATH / "sparql_query_templates/get_procedure_uris.rq"
RDF_FRAGMENT_BY_URI_SPARQL_QUERY_TEMPLATE_PATH = MASTER_DATA_REGISTRY_RESOURCES_PATH / "sparql_query_templates/get_2_dependency_levels_for_a_uri_as_root.rq"
SPARQL_QUERY_TEMPLATES_PATH = MASTER_DATA_REGISTRY_RESOURCES_PATH / "sparql_query_templates"
TRIPLES_BY_CET_URI_SPARQL_QUERY_TEMPLATE_PATH = SPARQL_QUERY_TEMPLATES_PATH / "get_by_cet_uri.rq"
PROCEDURE_SUBJECTS_SPARQL_QUERY_TEMPLATE_PATH = SPARQL_QUERY_TEMPLATES_PATH / "get_procedure_uris.rq"
RDF_FRAGMENT_BY_URI_SPARQL_QUERY_TEMPLATE_PATH = SPARQL_QUERY_TEMPLATES_PATH / "get_2_dependency_levels_for_a_uri_as_root.rq"
12 changes: 5 additions & 7 deletions ted_sws/notice_validator/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#!/usr/bin/python3
import pathlib

# __init__.py
# Date: 11/02/2022
# Author: Eugeniu Costetchi
# Email: costezki.eugen@gmail.com

""" """
NOTICE_VALIDATOR_RESOURCES_PATH = pathlib.Path(__file__).parent.resolve()
SPARQL_QUERY_TEMPLATES_PATH = NOTICE_VALIDATOR_RESOURCES_PATH / "sparql_query_templates"
NOTICE_AVAILABILITY_SPARQL_QUERY_TEMPLATE_PATH = SPARQL_QUERY_TEMPLATES_PATH / "check_notice_availability.rq"
NOTICES_AVAILABILITY_SPARQL_QUERY_TEMPLATE_PATH = SPARQL_QUERY_TEMPLATES_PATH / "check_notices_availability.rq"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ASK {{
VALUES ?instance {{<{notice_uri}>}}
?instance ?predicate [] .
}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
select distinct ?s
{{
VALUES ?s {{{notice_uries}}}
?s ?p ?o .
}}
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import time
from typing import List, Set

from pymongo import MongoClient
from ted_sws.core.model.notice import Notice, NoticeStatus
from ted_sws.core.service.batch_processing import chunks
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.data_manager.adapters.sparql_endpoint import SPARQLTripleStoreEndpoint
from ted_sws.notice_validator.resources import NOTICE_AVAILABILITY_SPARQL_QUERY_TEMPLATE_PATH, \
NOTICES_AVAILABILITY_SPARQL_QUERY_TEMPLATE_PATH

WEBAPI_SPARQL_URL = "https://publications.europa.eu/webapi/rdf/sparql"
CELLAR_NOTICE_AVAILABILITY_QUERY = "ASK {{ VALUES ?instance {{<{notice_uri}>}} ?instance ?predicate [] . }}"
CELLAR_NOTICES_AVAILABILITY_QUERY = "select distinct ?s {{VALUES ?s {{$notice_uries}} ?s ?p ?o . }}"
WEBAPI_SPARQL_RUN_FORMAT = "application/sparql-results+json"
INVALID_NOTICE_URI = 'https://www.w3.org/1999/02/22-rdf-syntax-ns#type-invalid'
DEFAULT_NOTICES_BATCH_SIZE = 1000
DEFAULT_NOTICES_BATCH_SIZE = 5000
DEFAULT_CELLAR_REQUEST_DELAY = 3


def check_availability_of_notice_in_cellar(notice_uri: str, endpoint_url: str = WEBAPI_SPARQL_URL) -> bool:
Expand All @@ -21,7 +23,8 @@ def check_availability_of_notice_in_cellar(notice_uri: str, endpoint_url: str =
:param endpoint_url:
:return:
"""
query = CELLAR_NOTICE_AVAILABILITY_QUERY.format(notice_uri=notice_uri)
query_template = NOTICE_AVAILABILITY_SPARQL_QUERY_TEMPLATE_PATH.read_text(encoding="utf-8")
query = query_template.format(notice_uri=notice_uri)
result = SPARQLTripleStoreEndpoint(endpoint_url=endpoint_url).with_query(sparql_query=query).fetch_tree()
return result['boolean']

Expand All @@ -33,9 +36,11 @@ def check_availability_of_notices_in_cellar(notice_uries: List[str], endpoint_ur
:param endpoint_url:
:return:
"""
query_template = NOTICES_AVAILABILITY_SPARQL_QUERY_TEMPLATE_PATH.read_text(encoding="utf-8")
notice_uries = " ".join([f"<{notice_uri}>" for notice_uri in notice_uries])
query = CELLAR_NOTICE_AVAILABILITY_QUERY.format(notice_uri=notice_uries)
result = SPARQLTripleStoreEndpoint(endpoint_url=endpoint_url).with_query(sparql_query=query).fetch_tabular()
query = query_template.format(notice_uries=notice_uries)
result = SPARQLTripleStoreEndpoint(endpoint_url=endpoint_url,
use_post_method=True).with_query(sparql_query=query).fetch_tabular()
return set(result['s'].to_list())


Expand Down Expand Up @@ -66,11 +71,13 @@ def validate_notice_availability_in_cellar(notice: Notice, notice_uri: str = Non
return notice


def validate_notices_availability_in_cellar(notice_statuses: List[NoticeStatus], mongodb_client: MongoClient):
def validate_notices_availability_in_cellar(notice_statuses: List[NoticeStatus], mongodb_client: MongoClient,
cellar_request_delay_in_seconds: int = DEFAULT_CELLAR_REQUEST_DELAY):
"""
This function validate availability in cellar foreach notice from notices with a notice_status in notice_statuses.
:param notice_statuses:
:param mongodb_client:
:param cellar_request_delay_in_seconds:
:return:
"""
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
Expand All @@ -90,3 +97,4 @@ def validate_notices_availability_in_cellar(notice_statuses: List[NoticeStatus],
else:
notice.update_status_to(new_status=NoticeStatus.PUBLICLY_UNAVAILABLE)
notice_repository.update(notice=notice)
time.sleep(cellar_request_delay_in_seconds)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.notice_validator.services.check_availability_of_notice_in_cellar import \
check_availability_of_notice_in_cellar, validate_notice_availability_in_cellar
check_availability_of_notice_in_cellar, validate_notice_availability_in_cellar, \
check_availability_of_notices_in_cellar, DEFAULT_NOTICES_BATCH_SIZE


def test_check_availability_of_notice_in_cellar(valid_cellar_uri, invalid_cellar_uri):
Expand All @@ -19,3 +20,10 @@ def test_validate_notice_availability_in_cellar(fake_notice_F03, valid_cellar_ur
fake_notice_F03._status = NoticeStatus.PUBLISHED
validate_notice_availability_in_cellar(notice=fake_notice_F03, notice_uri=invalid_cellar_uri)
assert fake_notice_F03.status == NoticeStatus.PUBLICLY_UNAVAILABLE


def test_validate_notices_availability_in_cellar(valid_cellar_uri, invalid_cellar_uri):
notice_uries = [valid_cellar_uri] * DEFAULT_NOTICES_BATCH_SIZE + [invalid_cellar_uri]
available_uries = check_availability_of_notices_in_cellar(notice_uries=notice_uries)
assert valid_cellar_uri in available_uries
assert invalid_cellar_uri not in available_uries