Skip to content

Commit

Permalink
Merge pull request #424 from OP-TED/feature/TED-1138
Browse files Browse the repository at this point in the history
add Cellar request dellay
  • Loading branch information
CaptainOfHacks committed Jan 18, 2023
2 parents d6a12c8 + 69e6c59 commit c3250dc
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 23 deletions.
11 changes: 7 additions & 4 deletions ted_sws/data_manager/adapters/sparql_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import pandas as pd
import rdflib
from SPARQLWrapper import SPARQLWrapper, CSV, JSON, RDF
from SPARQLWrapper import SPARQLWrapper, CSV, JSON, RDF, POST

from ted_sws import config

Expand All @@ -35,13 +35,15 @@ class SPARQLClientPool(object):
connection_pool = {}

@staticmethod
def create_or_reuse_connection(endpoint_url: str, user: str, password: str):
def create_or_reuse_connection(endpoint_url: str, user: str, password: str, use_post_method: bool = False):
if endpoint_url not in SPARQLClientPool.connection_pool:
sparql_wrapper = SPARQLWrapper(endpoint_url)
sparql_wrapper.setCredentials(
user=user,
passwd=password
)
if use_post_method:
sparql_wrapper.setMethod(method=POST)
SPARQLClientPool.connection_pool[endpoint_url] = sparql_wrapper
return SPARQLClientPool.connection_pool[endpoint_url]

Expand Down Expand Up @@ -123,10 +125,11 @@ def add_data_to_repository(self, file_content, repository_name, mime_type):

class SPARQLTripleStoreEndpoint(TripleStoreEndpointABC):

def __init__(self, endpoint_url: str, user: str = None, password: str = None):
def __init__(self, endpoint_url: str, user: str = None, password: str = None, use_post_method: bool = False):
user = user if user else config.AGRAPH_SUPER_USER
password = password if password else config.AGRAPH_SUPER_PASSWORD
self.endpoint = SPARQLClientPool.create_or_reuse_connection(endpoint_url, user, password)
self.endpoint = SPARQLClientPool.create_or_reuse_connection(endpoint_url, user, password,
use_post_method=use_post_method)

def _set_sparql_query(self, sparql_query: str):
"""
Expand Down
8 changes: 4 additions & 4 deletions ted_sws/master_data_registry/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pathlib

MASTER_DATA_REGISTRY_RESOURCES_PATH = pathlib.Path(__file__).parent.resolve()

TRIPLES_BY_CET_URI_SPARQL_QUERY_TEMPLATE_PATH = MASTER_DATA_REGISTRY_RESOURCES_PATH / "sparql_query_templates/get_by_cet_uri.rq"
PROCEDURE_SUBJECTS_SPARQL_QUERY_TEMPLATE_PATH = MASTER_DATA_REGISTRY_RESOURCES_PATH / "sparql_query_templates/get_procedure_uris.rq"
RDF_FRAGMENT_BY_URI_SPARQL_QUERY_TEMPLATE_PATH = MASTER_DATA_REGISTRY_RESOURCES_PATH / "sparql_query_templates/get_2_dependency_levels_for_a_uri_as_root.rq"
SPARQL_QUERY_TEMPLATES_PATH = MASTER_DATA_REGISTRY_RESOURCES_PATH / "sparql_query_templates"
TRIPLES_BY_CET_URI_SPARQL_QUERY_TEMPLATE_PATH = SPARQL_QUERY_TEMPLATES_PATH / "get_by_cet_uri.rq"
PROCEDURE_SUBJECTS_SPARQL_QUERY_TEMPLATE_PATH = SPARQL_QUERY_TEMPLATES_PATH / "get_procedure_uris.rq"
RDF_FRAGMENT_BY_URI_SPARQL_QUERY_TEMPLATE_PATH = SPARQL_QUERY_TEMPLATES_PATH / "get_2_dependency_levels_for_a_uri_as_root.rq"
12 changes: 5 additions & 7 deletions ted_sws/notice_validator/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#!/usr/bin/python3
import pathlib

# __init__.py
# Date: 11/02/2022
# Author: Eugeniu Costetchi
# Email: costezki.eugen@gmail.com

""" """
NOTICE_VALIDATOR_RESOURCES_PATH = pathlib.Path(__file__).parent.resolve()
SPARQL_QUERY_TEMPLATES_PATH = NOTICE_VALIDATOR_RESOURCES_PATH / "sparql_query_templates"
NOTICE_AVAILABILITY_SPARQL_QUERY_TEMPLATE_PATH = SPARQL_QUERY_TEMPLATES_PATH / "check_notice_availability.rq"
NOTICES_AVAILABILITY_SPARQL_QUERY_TEMPLATE_PATH = SPARQL_QUERY_TEMPLATES_PATH / "check_notices_availability.rq"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ASK {{
VALUES ?instance {{<{notice_uri}>}}
?instance ?predicate [] .
}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
select distinct ?s
{{
VALUES ?s {{{notice_uries}}}
?s ?p ?o .
}}
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import time
from typing import List, Set

from pymongo import MongoClient
from ted_sws.core.model.notice import Notice, NoticeStatus
from ted_sws.core.service.batch_processing import chunks
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.data_manager.adapters.sparql_endpoint import SPARQLTripleStoreEndpoint
from ted_sws.notice_validator.resources import NOTICE_AVAILABILITY_SPARQL_QUERY_TEMPLATE_PATH, \
NOTICES_AVAILABILITY_SPARQL_QUERY_TEMPLATE_PATH

WEBAPI_SPARQL_URL = "https://publications.europa.eu/webapi/rdf/sparql"
CELLAR_NOTICE_AVAILABILITY_QUERY = "ASK {{ VALUES ?instance {{<{notice_uri}>}} ?instance ?predicate [] . }}"
CELLAR_NOTICES_AVAILABILITY_QUERY = "select distinct ?s {{VALUES ?s {{$notice_uries}} ?s ?p ?o . }}"
WEBAPI_SPARQL_RUN_FORMAT = "application/sparql-results+json"
INVALID_NOTICE_URI = 'https://www.w3.org/1999/02/22-rdf-syntax-ns#type-invalid'
DEFAULT_NOTICES_BATCH_SIZE = 1000
DEFAULT_NOTICES_BATCH_SIZE = 5000
DEFAULT_CELLAR_REQUEST_DELAY = 3


def check_availability_of_notice_in_cellar(notice_uri: str, endpoint_url: str = WEBAPI_SPARQL_URL) -> bool:
Expand All @@ -21,7 +23,8 @@ def check_availability_of_notice_in_cellar(notice_uri: str, endpoint_url: str =
:param endpoint_url:
:return:
"""
query = CELLAR_NOTICE_AVAILABILITY_QUERY.format(notice_uri=notice_uri)
query_template = NOTICE_AVAILABILITY_SPARQL_QUERY_TEMPLATE_PATH.read_text(encoding="utf-8")
query = query_template.format(notice_uri=notice_uri)
result = SPARQLTripleStoreEndpoint(endpoint_url=endpoint_url).with_query(sparql_query=query).fetch_tree()
return result['boolean']

Expand All @@ -33,9 +36,11 @@ def check_availability_of_notices_in_cellar(notice_uries: List[str], endpoint_ur
:param endpoint_url:
:return:
"""
query_template = NOTICES_AVAILABILITY_SPARQL_QUERY_TEMPLATE_PATH.read_text(encoding="utf-8")
notice_uries = " ".join([f"<{notice_uri}>" for notice_uri in notice_uries])
query = CELLAR_NOTICE_AVAILABILITY_QUERY.format(notice_uri=notice_uries)
result = SPARQLTripleStoreEndpoint(endpoint_url=endpoint_url).with_query(sparql_query=query).fetch_tabular()
query = query_template.format(notice_uries=notice_uries)
result = SPARQLTripleStoreEndpoint(endpoint_url=endpoint_url,
use_post_method=True).with_query(sparql_query=query).fetch_tabular()
return set(result['s'].to_list())


Expand Down Expand Up @@ -66,11 +71,13 @@ def validate_notice_availability_in_cellar(notice: Notice, notice_uri: str = Non
return notice


def validate_notices_availability_in_cellar(notice_statuses: List[NoticeStatus], mongodb_client: MongoClient):
def validate_notices_availability_in_cellar(notice_statuses: List[NoticeStatus], mongodb_client: MongoClient,
cellar_request_delay_in_seconds: int = DEFAULT_CELLAR_REQUEST_DELAY):
"""
This function validate availability in cellar foreach notice from notices with a notice_status in notice_statuses.
:param notice_statuses:
:param mongodb_client:
:param cellar_request_delay_in_seconds:
:return:
"""
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
Expand All @@ -90,3 +97,4 @@ def validate_notices_availability_in_cellar(notice_statuses: List[NoticeStatus],
else:
notice.update_status_to(new_status=NoticeStatus.PUBLICLY_UNAVAILABLE)
notice_repository.update(notice=notice)
time.sleep(cellar_request_delay_in_seconds)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.notice_validator.services.check_availability_of_notice_in_cellar import \
check_availability_of_notice_in_cellar, validate_notice_availability_in_cellar
check_availability_of_notice_in_cellar, validate_notice_availability_in_cellar, \
check_availability_of_notices_in_cellar, DEFAULT_NOTICES_BATCH_SIZE


def test_check_availability_of_notice_in_cellar(valid_cellar_uri, invalid_cellar_uri):
Expand All @@ -19,3 +20,10 @@ def test_validate_notice_availability_in_cellar(fake_notice_F03, valid_cellar_ur
fake_notice_F03._status = NoticeStatus.PUBLISHED
validate_notice_availability_in_cellar(notice=fake_notice_F03, notice_uri=invalid_cellar_uri)
assert fake_notice_F03.status == NoticeStatus.PUBLICLY_UNAVAILABLE


def test_validate_notices_availability_in_cellar(valid_cellar_uri, invalid_cellar_uri):
notice_uries = [valid_cellar_uri] * DEFAULT_NOTICES_BATCH_SIZE + [invalid_cellar_uri]
available_uries = check_availability_of_notices_in_cellar(notice_uries=notice_uries)
assert valid_cellar_uri in available_uries
assert invalid_cellar_uri not in available_uries

0 comments on commit c3250dc

Please sign in to comment.