Skip to content

Commit

Permalink
[wip] obtain remote dir listing for CWL Directory input
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Oct 13, 2022
1 parent d3d0b5b commit d3703a9
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 46 deletions.
132 changes: 88 additions & 44 deletions weaver/processes/wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
get_log_fmt,
get_sane_name,
get_settings,
list_directory,
request_extra,
setup_loggers
)
Expand Down Expand Up @@ -133,6 +134,7 @@
AnyValueType,
CWL,
CWL_AnyRequirements,
CWL_IO_ComplexType,
CWL_Requirement,
CWL_RequirementsDict,
CWL_RequirementNames,
Expand Down Expand Up @@ -1551,10 +1553,62 @@ def make_inputs(self,
raise PackageTypeError(f"Undefined package input for execution: {type(input_i)}.")
return cwl_inputs

def make_location_input_security_check(self, input_scheme, input_type, input_id, input_location, input_definition):
# type: (str, CWL_IO_ComplexType, str, str, ComplexInput) -> str
"""
Perform security access validation of the reference, and resolve it afterwards if accessible.
Auto-map local file if possible to avoid useless download from current server.
Resolve :term:`Vault` reference with local file stored after decryption.
:returns: Updated file location if any resolution occurred.
"""
if input_scheme == "vault":
if input_type != "File":
raise PackageExecutionError(
f"Vault reference must be a file, but resolved [{input_type}] type "
f"instead for input [{input_id}] from location [{input_location}]."
)
vault_id = bytes2str(urlparse(input_location).hostname)
input_url = get_vault_url(vault_id, self.settings)
resp = request_extra("HEAD", input_url, settings=self.settings, headers=self.auth)
if resp.status_code == 200:
self.logger.debug("Detected and validated remotely accessible reference [%s] "
"matching local Vault [%s]. Replacing URL reference for local access.",
input_location, input_url)
# pre-fetch by move and delete file from vault and decrypt it (as download would)
# to save transfer time/data from local file already available
auth = parse_vault_token(self.auth.get(sd.XAuthVaultFileHeader.name), unique=False)
file = get_authorized_file(vault_id, auth.get(vault_id), self.settings)
input_location = map_vault_location(input_url, self.settings)
input_location = decrypt_from_vault(file, input_location,
out_dir=input_definition.workdir, delete_encrypted=True)
self.logger.debug("Moved Vault file to temporary location: [%s]. "
"File not accessible from Vault endpoint anymore. "
"Location will be deleted after process execution.",
input_location)
else:
self.logger.error("Detected Vault file reference that is not accessible [%s] caused "
"by HTTP [%s] Detail:\n%s", input_location,
resp.status_code, repr_json(resp.text, indent=2))
raise PackageAuthenticationError(
f"Input {input_id} with Vault reference [{vault_id}] is not accessible."
)
else:
input_local_ref = map_wps_output_location(input_location, self.settings)
if input_local_ref:
resp = request_extra("HEAD", input_location, settings=self.settings, headers=self.auth)
if resp.status_code == 200: # if failed, following fetch will produce the appropriate HTTP error
self.logger.debug("Detected and validated remotely accessible reference [%s] "
"matching local WPS outputs [%s]. Skipping fetch using direct reference.",
input_location, input_local_ref)
input_location = input_local_ref
return input_location

def make_location_input(self, input_type, input_definition):
# type: (str, ComplexInput) -> Optional[JSON]
# type: (CWL_IO_ComplexType, ComplexInput) -> Optional[JSON]
"""
Generates the JSON content required to specify a `CWL` ``File`` input definition from a location.
Generates the JSON content required to specify a `CWL` ``File`` or ``Directory`` input from a location.
If the input reference corresponds to an HTTP URL that is detected as matching the local WPS output endpoint,
implicitly convert the reference to the local WPS output directory to avoid useless download of available file.
Expand All @@ -1567,14 +1621,19 @@ def make_location_input(self, input_type, input_definition):
Any other variant of file reference will be fetched as applicable by the relevant schemes.
If the reference corresponds to a ``Directory``, all files that can be located in it will be fetched as
applicable by the relevant scheme of the reference. It is up to the remote location to provide listing
capabilities accordingly to view available files.
.. seealso::
Documentation details of resolution based on schemes defined in :ref:`file_reference_types` section.
"""
# NOTE:
# When running as EMS, must not call data/file methods if URL reference, otherwise contents
# get fetched automatically by PyWPS objects.
input_location = None
# cannot rely only on 'as_reference' as often it is not provided by the request although it's an href
input_id = input_definition.identifier
# cannot rely only on 'as_reference' as often it is not provided by the request, although it's an href
if input_definition.as_reference:
input_location = input_definition.url
# FIXME: PyWPS bug
Expand Down Expand Up @@ -1623,52 +1682,37 @@ def make_location_input(self, input_type, input_definition):
any(default_format_def.get("mimeType") == fmt.mime_type and fmt.mime_type is not None
for fmt in input_definition.supported_formats)
):
self.logger.debug("File input (%s) DROPPED. Detected default format as data.", input_definition.identifier)
self.logger.debug("%s input (%s) DROPPED. Detected default format as data.", input_type, input_id)
return None

# auto-map local file if possible after security check
if input_scheme == "vault":
vault_id = bytes2str(urlparse(input_location).hostname)
input_url = get_vault_url(vault_id, self.settings)
resp = request_extra("HEAD", input_url, settings=self.settings, headers=self.auth)
if resp.status_code == 200:
self.logger.debug("Detected and validated remotely accessible reference [%s] "
"matching local Vault [%s]. Replacing URL reference for local access.",
input_location, input_url)
# pre-fetch by move and delete file from vault and decrypt it (as download would)
# to save transfer time/data from local file already available
auth = parse_vault_token(self.auth.get(sd.XAuthVaultFileHeader.name), unique=False)
file = get_authorized_file(vault_id, auth.get(vault_id), self.settings)
input_location = map_vault_location(input_url, self.settings)
input_location = decrypt_from_vault(file, input_location,
out_dir=input_definition.workdir, delete_encrypted=True)
self.logger.debug("Moved Vault file to temporary location: [%s]. "
"File not accessible from Vault endpoint anymore. "
"Location will be deleted after process execution.",
input_location)
else:
self.logger.error("Detected Vault file reference that is not accessible [%s] caused "
"by HTTP [%s] Detail:\n%s", input_location,
resp.status_code, repr_json(resp.text, indent=2))
raise PackageAuthenticationError(
f"Input {input_definition.identifier} with Vault reference [{vault_id}] is not accessible."
)
else:
input_local_ref = map_wps_output_location(input_location, self.settings)
if input_local_ref:
resp = request_extra("HEAD", input_location, settings=self.settings, headers=self.auth)
if resp.status_code == 200: # if failed, following fetch will produce the appropriate HTTP error
self.logger.debug("Detected and validated remotely accessible reference [%s] "
"matching local WPS outputs [%s]. Skipping fetch using direct reference.",
input_location, input_local_ref)
input_location = input_local_ref
input_location = self.make_location_input_security_check(
input_scheme,
input_type,
input_id,
input_location,
input_definition
)

if self.must_fetch(input_location):
self.logger.info("File input (%s) ATTEMPT fetch: [%s]", input_definition.identifier, input_location)
input_location = fetch_file(input_location, input_definition.workdir,
settings=self.settings, headers=self.auth)
self.logger.info("%s input (%s) ATTEMPT fetch: [%s]", input_type, input_id, input_location)
if input_type == "File":
input_location = fetch_file(input_location, input_definition.workdir,
settings=self.settings, headers=self.auth)
elif input_type == "Directory":
self.logger.info("Directory")
locations = list_directory(input_location, settings=self.settings, headers=self.auth)
for url_loc in locations:
self.logger.info("File input from listing for (%s) ATTEMPT fetch: [%s]", input_id, url_loc)
dir_loc = fetch_file(input_location, input_definition.workdir,
settings=self.settings, headers=self.auth)
self.logger.debug("Resolved file [%s] -> [%s]", url_loc, dir_loc)
else:
raise PackageExecutionError(
f"Unknown reference staging resolution method for [{input_type}] type "
f"specified for input [{input_id}] from location [{input_location}]."
)
else:
self.logger.info("File input (%s) SKIPPED fetch: [%s]", input_definition.identifier, input_location)
self.logger.info("%s input (%s) SKIPPED fetch: [%s]", input_type, input_id, input_location)

location = {"location": input_location, "class": input_type}
if input_definition.data_format is not None and input_definition.data_format.mime_type:
Expand Down
25 changes: 23 additions & 2 deletions weaver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,7 @@ def download_file_http(file_reference, file_outdir, settings=None, **request_kwa
def fetch_file(file_reference, file_outdir, settings=None, link=None, move=False, **request_kwargs):
# type: (str, str, Optional[AnySettingsContainer], Optional[bool], bool, **Any) -> str
"""
Fetches a file from local path, AWS-S3 bucket or remote URL, and dumps it's content to the output directory.
Fetches a file from local path, AWS-S3 bucket or remote URL, and dumps its content to the output directory.
The output directory is expected to exist prior to this function call.
The file reference scheme (protocol) determines from where to fetch the content.
Expand All @@ -1770,7 +1770,7 @@ def fetch_file(file_reference, file_outdir, settings=None, link=None, move=False
:param move:
Move local file to the output directory instead of copying or linking it.
No effect if the output directory already contains the local file.
No effect if download must occurs for remote file.
No effect if download must occur for remote file.
:param request_kwargs: Additional keywords to forward to request call (if needed).
:return: Path of the local copy of the fetched file.
:raises HTTPException: applicable HTTP-based exception if any occurred during the operation.
Expand Down Expand Up @@ -1879,6 +1879,27 @@ def is_remote_file(file_location):
return scheme != "" and not posixpath.ismount(f"{scheme}:") # windows partition


def list_directory(location):
# type: (str) -> List[str]
"""
Obtain directory listing from a local or remote location.
:param location: Directory reference (URL, S3, local). Trailing slash expected.
:returns: File locations obtained from listing. References will have the same scheme as the directory reference.
"""
if not location.endswith("/"):
raise ValueError(f"Invalid directory location [{location}] must have a trailing slash.")
if location.startswith("s3://") or location.startswith("https://s3."):
pass
elif location.startswith("http://") or location.startswith("https://"):
pass
elif location.startswith("file://") or location.startswith("/"):
pass
else:
raise ValueError(f"Unknown scheme for directory location [{location}].")
return []


REGEX_SEARCH_INVALID_CHARACTERS = re.compile(r"[^a-zA-Z0-9_\-]")
REGEX_ASSERT_INVALID_CHARACTERS = re.compile(r"^[a-zA-Z0-9_\-]+$")

Expand Down

0 comments on commit d3703a9

Please sign in to comment.