From d3703a9d4f0cfb6a13065ccc1d08e617234d24b4 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 13 Oct 2022 16:52:25 -0400 Subject: [PATCH] [wip] obtain remote dir listing for CWL Directory input --- weaver/processes/wps_package.py | 132 +++++++++++++++++++++----------- weaver/utils.py | 25 +++++- 2 files changed, 111 insertions(+), 46 deletions(-) diff --git a/weaver/processes/wps_package.py b/weaver/processes/wps_package.py index f7101e73e..be72ffbe3 100644 --- a/weaver/processes/wps_package.py +++ b/weaver/processes/wps_package.py @@ -96,6 +96,7 @@ get_log_fmt, get_sane_name, get_settings, + list_directory, request_extra, setup_loggers ) @@ -133,6 +134,7 @@ AnyValueType, CWL, CWL_AnyRequirements, + CWL_IO_ComplexType, CWL_Requirement, CWL_RequirementsDict, CWL_RequirementNames, @@ -1551,10 +1553,62 @@ def make_inputs(self, raise PackageTypeError(f"Undefined package input for execution: {type(input_i)}.") return cwl_inputs + def make_location_input_security_check(self, input_scheme, input_type, input_id, input_location, input_definition): + # type: (str, CWL_IO_ComplexType, str, str, ComplexInput) -> str + """ + Perform security access validation of the reference, and resolve it afterwards if accessible. + + Auto-map local file if possible to avoid useless download from current server. + Resolve :term:`Vault` reference with local file stored after decryption. + + :returns: Updated file location if any resolution occurred. + """ + if input_scheme == "vault": + if input_type != "File": + raise PackageExecutionError( + f"Vault reference must be a file, but resolved [{input_type}] type " + f"instead for input [{input_id}] from location [{input_location}]." + ) + vault_id = bytes2str(urlparse(input_location).hostname) + input_url = get_vault_url(vault_id, self.settings) + resp = request_extra("HEAD", input_url, settings=self.settings, headers=self.auth) + if resp.status_code == 200: + self.logger.debug("Detected and validated remotely accessible reference [%s] " + "matching local Vault [%s]. Replacing URL reference for local access.", + input_location, input_url) + # pre-fetch by move and delete file from vault and decrypt it (as download would) + # to save transfer time/data from local file already available + auth = parse_vault_token(self.auth.get(sd.XAuthVaultFileHeader.name), unique=False) + file = get_authorized_file(vault_id, auth.get(vault_id), self.settings) + input_location = map_vault_location(input_url, self.settings) + input_location = decrypt_from_vault(file, input_location, + out_dir=input_definition.workdir, delete_encrypted=True) + self.logger.debug("Moved Vault file to temporary location: [%s]. " + "File not accessible from Vault endpoint anymore. " + "Location will be deleted after process execution.", + input_location) + else: + self.logger.error("Detected Vault file reference that is not accessible [%s] caused " + "by HTTP [%s] Detail:\n%s", input_location, + resp.status_code, repr_json(resp.text, indent=2)) + raise PackageAuthenticationError( + f"Input {input_id} with Vault reference [{vault_id}] is not accessible." + ) + else: + input_local_ref = map_wps_output_location(input_location, self.settings) + if input_local_ref: + resp = request_extra("HEAD", input_location, settings=self.settings, headers=self.auth) + if resp.status_code == 200: # if failed, following fetch will produce the appropriate HTTP error + self.logger.debug("Detected and validated remotely accessible reference [%s] " + "matching local WPS outputs [%s]. Skipping fetch using direct reference.", + input_location, input_local_ref) + input_location = input_local_ref + return input_location + def make_location_input(self, input_type, input_definition): - # type: (str, ComplexInput) -> Optional[JSON] + # type: (CWL_IO_ComplexType, ComplexInput) -> Optional[JSON] """ - Generates the JSON content required to specify a `CWL` ``File`` input definition from a location. + Generates the JSON content required to specify a `CWL` ``File`` or ``Directory`` input from a location. If the input reference corresponds to an HTTP URL that is detected as matching the local WPS output endpoint, implicitly convert the reference to the local WPS output directory to avoid useless download of available file. @@ -1567,6 +1621,10 @@ def make_location_input(self, input_type, input_definition): Any other variant of file reference will be fetched as applicable by the relevant schemes. + If the reference corresponds to a ``Directory``, all files that can be located in it will be fetched as + applicable by the relevant scheme of the reference. It is up to the remote location to provide listing + capabilities accordingly to view available files. + .. seealso:: Documentation details of resolution based on schemes defined in :ref:`file_reference_types` section. """ @@ -1574,7 +1632,8 @@ def make_location_input(self, input_type, input_definition): # When running as EMS, must not call data/file methods if URL reference, otherwise contents # get fetched automatically by PyWPS objects. input_location = None - # cannot rely only on 'as_reference' as often it is not provided by the request although it's an href + input_id = input_definition.identifier + # cannot rely only on 'as_reference' as often it is not provided by the request, although it's an href if input_definition.as_reference: input_location = input_definition.url # FIXME: PyWPS bug @@ -1623,52 +1682,37 @@ def make_location_input(self, input_type, input_definition): any(default_format_def.get("mimeType") == fmt.mime_type and fmt.mime_type is not None for fmt in input_definition.supported_formats) ): - self.logger.debug("File input (%s) DROPPED. Detected default format as data.", input_definition.identifier) + self.logger.debug("%s input (%s) DROPPED. Detected default format as data.", input_type, input_id) return None - # auto-map local file if possible after security check - if input_scheme == "vault": - vault_id = bytes2str(urlparse(input_location).hostname) - input_url = get_vault_url(vault_id, self.settings) - resp = request_extra("HEAD", input_url, settings=self.settings, headers=self.auth) - if resp.status_code == 200: - self.logger.debug("Detected and validated remotely accessible reference [%s] " - "matching local Vault [%s]. Replacing URL reference for local access.", - input_location, input_url) - # pre-fetch by move and delete file from vault and decrypt it (as download would) - # to save transfer time/data from local file already available - auth = parse_vault_token(self.auth.get(sd.XAuthVaultFileHeader.name), unique=False) - file = get_authorized_file(vault_id, auth.get(vault_id), self.settings) - input_location = map_vault_location(input_url, self.settings) - input_location = decrypt_from_vault(file, input_location, - out_dir=input_definition.workdir, delete_encrypted=True) - self.logger.debug("Moved Vault file to temporary location: [%s]. " - "File not accessible from Vault endpoint anymore. " - "Location will be deleted after process execution.", - input_location) - else: - self.logger.error("Detected Vault file reference that is not accessible [%s] caused " - "by HTTP [%s] Detail:\n%s", input_location, - resp.status_code, repr_json(resp.text, indent=2)) - raise PackageAuthenticationError( - f"Input {input_definition.identifier} with Vault reference [{vault_id}] is not accessible." - ) - else: - input_local_ref = map_wps_output_location(input_location, self.settings) - if input_local_ref: - resp = request_extra("HEAD", input_location, settings=self.settings, headers=self.auth) - if resp.status_code == 200: # if failed, following fetch will produce the appropriate HTTP error - self.logger.debug("Detected and validated remotely accessible reference [%s] " - "matching local WPS outputs [%s]. Skipping fetch using direct reference.", - input_location, input_local_ref) - input_location = input_local_ref + input_location = self.make_location_input_security_check( + input_scheme, + input_type, + input_id, + input_location, + input_definition + ) if self.must_fetch(input_location): - self.logger.info("File input (%s) ATTEMPT fetch: [%s]", input_definition.identifier, input_location) - input_location = fetch_file(input_location, input_definition.workdir, - settings=self.settings, headers=self.auth) + self.logger.info("%s input (%s) ATTEMPT fetch: [%s]", input_type, input_id, input_location) + if input_type == "File": + input_location = fetch_file(input_location, input_definition.workdir, + settings=self.settings, headers=self.auth) + elif input_type == "Directory": + self.logger.info("Directory") + locations = list_directory(input_location, settings=self.settings, headers=self.auth) + for url_loc in locations: + self.logger.info("File input from listing for (%s) ATTEMPT fetch: [%s]", input_id, url_loc) + dir_loc = fetch_file(input_location, input_definition.workdir, + settings=self.settings, headers=self.auth) + self.logger.debug("Resolved file [%s] -> [%s]", url_loc, dir_loc) + else: + raise PackageExecutionError( + f"Unknown reference staging resolution method for [{input_type}] type " + f"specified for input [{input_id}] from location [{input_location}]." + ) else: - self.logger.info("File input (%s) SKIPPED fetch: [%s]", input_definition.identifier, input_location) + self.logger.info("%s input (%s) SKIPPED fetch: [%s]", input_type, input_id, input_location) location = {"location": input_location, "class": input_type} if input_definition.data_format is not None and input_definition.data_format.mime_type: diff --git a/weaver/utils.py b/weaver/utils.py index 25edbcd8d..ac3ba6a85 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -1748,7 +1748,7 @@ def download_file_http(file_reference, file_outdir, settings=None, **request_kwa def fetch_file(file_reference, file_outdir, settings=None, link=None, move=False, **request_kwargs): # type: (str, str, Optional[AnySettingsContainer], Optional[bool], bool, **Any) -> str """ - Fetches a file from local path, AWS-S3 bucket or remote URL, and dumps it's content to the output directory. + Fetches a file from local path, AWS-S3 bucket or remote URL, and dumps its content to the output directory. The output directory is expected to exist prior to this function call. The file reference scheme (protocol) determines from where to fetch the content. @@ -1770,7 +1770,7 @@ def fetch_file(file_reference, file_outdir, settings=None, link=None, move=False :param move: Move local file to the output directory instead of copying or linking it. No effect if the output directory already contains the local file. - No effect if download must occurs for remote file. + No effect if download must occur for remote file. :param request_kwargs: Additional keywords to forward to request call (if needed). :return: Path of the local copy of the fetched file. :raises HTTPException: applicable HTTP-based exception if any occurred during the operation. @@ -1879,6 +1879,27 @@ def is_remote_file(file_location): return scheme != "" and not posixpath.ismount(f"{scheme}:") # windows partition +def list_directory(location): + # type: (str) -> List[str] + """ + Obtain directory listing from a local or remote location. + + :param location: Directory reference (URL, S3, local). Trailing slash expected. + :returns: File locations obtained from listing. References will have the same scheme as the directory reference. + """ + if not location.endswith("/"): + raise ValueError(f"Invalid directory location [{location}] must have a trailing slash.") + if location.startswith("s3://") or location.startswith("https://s3."): + pass + elif location.startswith("http://") or location.startswith("https://"): + pass + elif location.startswith("file://") or location.startswith("/"): + pass + else: + raise ValueError(f"Unknown scheme for directory location [{location}].") + return [] + + REGEX_SEARCH_INVALID_CHARACTERS = re.compile(r"[^a-zA-Z0-9_\-]") REGEX_ASSERT_INVALID_CHARACTERS = re.compile(r"^[a-zA-Z0-9_\-]+$")