Skip to content

Commit

Permalink
first impl to handle CWL Directory type (relates to #466)
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Oct 15, 2022
1 parent dde8f87 commit 25f47e3
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 11 deletions.
2 changes: 1 addition & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ Fixes:

Changes:
--------
- No change.
- Add support of `CWL` ``Directory`` type (resolves `#466 <https://github.com/crim-ca/weaver/issues/466>`_).

Fixes:
------
Expand Down
23 changes: 23 additions & 0 deletions tests/functional/application-packages/DirectoryProcess/deploy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
processDescription:
id: DirectoryProcess
title: List contents of an input directory to the output text file.
version: "0.0.1"
inputs:
- id: input_dir
formats:
- mimeType: application/directory
default: true
minOccurs: 1
maxOccurs: 1
outputs:
- id: output_file
formats:
- mimeType: text/plain
default: true
minOccurs: 1
maxOccurs: 1
visibility: public
executionUnit:
- test: DirectoryProcess.cwl
immediateDeployment: true
deploymentProfileName: "http://www.opengis.net/profiles/eoc/dockerizedApplication"
27 changes: 27 additions & 0 deletions tests/functional/application-packages/DirectoryProcess/package.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
cwlVersion: v1.0
class: CommandLineTool
baseCommand:
- bash
- script.sh
requirements:
DockerRequirement:
dockerPull: debian:stretch-slim
InitialWorkDirRequirement:
listing:
- entryname: script.sh
entry: |
set -x
echo "Input: $2"
echo "Output: $1"
find "$1" ! -type d -exec ls -lht {} + > output.txt
cat output.txt
inputs:
input_dir:
type: Directory
inputBinding:
position: 1
outputs:
output_file:
type: File
outputBinding:
glob: "output.txt"
148 changes: 146 additions & 2 deletions tests/functional/test_wps_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from weaver.processes.constants import CWL_REQUIREMENT_APP_DOCKER, CWL_REQUIREMENT_INIT_WORKDIR, ProcessSchema
from weaver.processes.types import ProcessType
from weaver.status import Status
from weaver.utils import get_any_value
from weaver.utils import get_any_value, fetch_file, load_file
from weaver.wps.utils import get_wps_output_dir, map_wps_output_location

if TYPE_CHECKING:
Expand Down Expand Up @@ -713,6 +713,20 @@ def test_deploy_merge_literal_io_from_package_and_offering(self):
"Additional detail only within WPS output", \
"Additional details defined only in WPS matching CWL I/O by ID should be preserved."

def test_deploy_resolve_complex_io_format_directory(self):
"""
Test that directory complex type is resolved from CWL.
.. versionadded:: 4.26
"""
body = self.retrieve_payload("DirectoryProcess", "deploy", local=True)
pkg = self.retrieve_payload("DirectoryProcess", "package", local=True)
# remove definitions in deploy body to evaluate auto-resolution from CWL 'type: Directory'
body["processDescription"].pop("inputs")
body["executionUnit"] = [{"unit": pkg}]
desc, _ = self.deploy_process(body, describe_schema=ProcessSchema.OGC)
assert desc["inputs"]["input_dir"]["formats"][0]["mediaType"] == ContentType.APP_DIR

def test_deploy_merge_complex_io_format_references(self):
"""
Test validates that known `WPS` I/O formats (i.e.: `MIME-type`) considered as valid, but not corresponding to
Expand Down Expand Up @@ -2179,6 +2193,132 @@ def test_execute_job_with_custom_file_name(self):
f"Original: [{tmp_name_random}]"
)

def test_execute_with_browsable_directory(self):
"""
Test that HTTP browsable directory-like structure retrieves children files recursively for the process.
.. versionadded:: 4.26
"""
body = self.retrieve_payload("DirectoryProcess", "deploy", local=True)
pkg = self.retrieve_payload("DirectoryProcess", "package", local=True)
body["executionUnit"] = [{"unit": pkg}]
desc, _ = self.deploy_process(body, describe_schema=ProcessSchema.OGC)
assert desc["inputs"]["input_dir"]["formats"][0]["mediaType"] == ContentType.APP_DIR

with contextlib.ExitStack() as stack:
tmp_host = "https://mocked-file-server.com" # must match in 'Execute_WorkflowSelectCopyNestedOutDir.json'
tmp_dir = stack.enter_context(tempfile.TemporaryDirectory())
stack.enter_context(mocked_file_server(tmp_dir, tmp_host, self.settings))
test_http_dir = f"{tmp_host}/dir/"
test_http_dir_files = [
"dir/file.txt",
"dir/sub/file.txt",
"dir/sub/nested/file.txt",
"other/file.txt",
"root.file.txt"
]
for file in test_http_dir_files:
path = os.path.join(tmp_dir, file)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, mode="w", encoding="utf-8") as f:
f.write("test data")

exec_body = {
"mode": ExecuteMode.ASYNC,
"response": ExecuteResponse.DOCUMENT,
"inputs": [
{"id": "input_dir", "href": test_http_dir},
],
"outputs": [
{"id": "output_file", "transmissionMode": ExecuteTransmissionMode.REFERENCE},
]
}
for mock_exec in mocked_execute_celery():
stack.enter_context(mock_exec)
proc_url = f"/processes/DirectoryProcess/jobs"
resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5,
data=exec_body, headers=self.json_headers, only_local=True)
assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}"
status_url = resp.json["location"]

results = self.monitor_job(status_url)
assert "output_file" in results
stack.enter_context(mocked_wps_output(self.settings))
tmpdir = stack.enter_context(tempfile.TemporaryDirectory())
output_file = fetch_file(results["output_file"]["href"], tmpdir, self.settings)
output_data = load_file(output_file, text=True)

# because files under dir are fetched and mounted in stage dir, random sub-dir from CWL is generated
# ignore this part of the paths for testing invariant results
# ignore prefixed file metadata generated by the process listing
cwl_stage_dir = "/var/lib/cwl/" # /stg<UUID>/<expected-files>
output_listing = [file.rsplit(" ")[-1] for file in output_data.split("\n")]
expect_http_files = "\n".join([file for file in test_http_dir_files if file.startswith("dir/")])
assert len(output_listing) == len(expect_http_files)
assert all(file.startswith(cwl_stage_dir) for file in output_listing)
assert all(any(file.endswith(dir_file) for file in output_listing) for dir_file in expect_http_files)

@mocked_aws_credentials
@mocked_aws_s3
def test_execute_with_bucket_directory(self):
"""
Test that directory pointing at a S3 bucket downloads all children files recursively for the process.
.. versionadded:: 4.26
"""
body = self.retrieve_payload("DirectoryProcess", "deploy", local=True)
pkg = self.retrieve_payload("DirectoryProcess", "package", local=True)
body["executionUnit"] = [{"unit": pkg}]
desc, _ = self.deploy_process(body, describe_schema=ProcessSchema.OGC)
assert desc["inputs"]["input_dir"]["formats"][0]["mediaType"] == ContentType.APP_DIR

test_bucket_files = [
"dir/file.txt",
"dir/sub/file.txt",
"dir/sub/nested/file.txt",
"other/file.txt",
"root.file.txt"
]
test_bucket_ref = "wps-process-test-bucket"
for file in test_bucket_files:
mocked_aws_s3_bucket_test_file(test_bucket_ref, file)
test_bucket_dir = f"s3://{test_bucket_ref}/dir/"
exec_body = {
"mode": ExecuteMode.ASYNC,
"response": ExecuteResponse.DOCUMENT,
"inputs": [
{"id": "input_dir", "href": test_bucket_dir},
],
"outputs": [
{"id": "output_file", "transmissionMode": ExecuteTransmissionMode.REFERENCE},
]
}
with contextlib.ExitStack() as stack:
for mock_exec in mocked_execute_celery():
stack.enter_context(mock_exec)
proc_url = f"/processes/DirectoryProcess/jobs"
resp = mocked_sub_requests(self.app, "post_json", proc_url, timeout=5,
data=exec_body, headers=self.json_headers, only_local=True)
assert resp.status_code in [200, 201], f"Failed with: [{resp.status_code}]\nReason:\n{resp.json}"
status_url = resp.json["location"]

results = self.monitor_job(status_url)
assert "output_file" in results
stack.enter_context(mocked_wps_output(self.settings))
tmpdir = stack.enter_context(tempfile.TemporaryDirectory())
output_file = fetch_file(results["output_file"]["href"], tmpdir, self.settings)
output_data = load_file(output_file, text=True)

# because files under dir are fetched and mounted in stage dir, random sub-dir from CWL is generated
# ignore this part of the paths for testing invariant results
# ignore prefixed file metadata generated by the process listing
cwl_stage_dir = "/var/lib/cwl/" # /stg<UUID>/<expected-files>
output_listing = [file.rsplit(" ")[-1] for file in output_data.split("\n")]
expect_bucket_files = "\n".join([file for file in test_bucket_files if file.startswith("dir/")])
assert len(output_listing) == len(expect_bucket_files)
assert all(file.startswith(cwl_stage_dir) for file in output_listing)
assert all(any(file.endswith(dir_file) for file in output_listing) for dir_file in expect_bucket_files)

# FIXME: create a real async test (threading/multiprocess) to evaluate this correctly
def test_dismiss_job(self):
"""
Expand Down Expand Up @@ -2688,6 +2828,10 @@ def test_deploy_multi_outputs_file_from_wps_xml_reference(self):

@pytest.mark.functional
class WpsPackageAppWithS3BucketTest(WpsConfigBase):
"""
Test with output results uploaded to S3 bucket.
"""

@classmethod
def setUpClass(cls):
cls.settings = {
Expand All @@ -2704,7 +2848,7 @@ def setUpClass(cls):

@mocked_aws_credentials
@mocked_aws_s3
def test_execute_application_package_process_with_bucket(self):
def test_execute_application_package_process_with_bucket_results(self):
"""
Test validates:
Expand Down
9 changes: 9 additions & 0 deletions tests/test_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ def test_get_extension():
assert f.get_extension("application/unknown") == ".unknown"


def test_get_extension_directory():
assert f.get_extension(f.ContentType.APP_DIR, dot=True) == "/"
assert f.get_extension(f.ContentType.APP_DIR, dot=False) == "/"


def test_get_extension_glob_any():
assert f.get_extension(f.ContentType.ANY) == ".*"

Expand All @@ -32,6 +37,10 @@ def test_get_content_type():
assert f.get_content_type(".yaml") == f.ContentType.APP_YAML


def test_get_content_type_directory():
assert f.get_content_type("/") == f.ContentType.APP_DIR


def test_get_content_type_extra_parameters():
assert f.get_content_type(".unknown") is None
assert f.get_content_type(".unknown", default=f.ContentType.TEXT_PLAIN) == f.ContentType.TEXT_PLAIN
Expand Down
17 changes: 16 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@
# [WPS1-URL, GetCapPathXML, [DescribePathXML], [ExecutePathXML]]
MockConfigWPS1 = Sequence[str, str, Optional[Sequence[str]], Optional[Sequence[str]]]
MockReturnType = TypeVar("MockReturnType")
MockHttpMethod = Union[
responses.HEAD,
responses.GET,
responses.POST,
responses.PATCH,
responses.PUT,
responses.DELETE,
responses.OPTIONS,
]

CommandType = Callable[[Union[str, Tuple[str]]], int]

Expand Down Expand Up @@ -759,14 +768,16 @@ def get_xml(ref): # type: (str) -> str
all_request.add((responses.GET, getcap_with_proc_id_url + version_query, get_cap_xml))

def apply_mocks(_mock_resp, _requests):
# type: (responses.RequestsMock, Iterable[Tuple[MockHttpMethod, str, str]]) -> None
xml_header = {"Content-Type": ContentType.APP_XML}
for meth, url, body in _requests:
_mock_resp.add(meth, url, body=body, headers=xml_header)

def mocked_remote_server_wrapper(test):
# type: (Callable) -> Callable
# type: (Callable[[..., Any], Any]) -> Callable[[..., Any], Any]
@functools.wraps(test)
def mock_requests_wps1(*args, **kwargs):
# type: (*Any, **Any) -> Any
"""
Mock ``requests`` responses fetching ``test_server_wps`` WPS reference.
"""
Expand Down Expand Up @@ -1029,6 +1040,7 @@ def mocked_aws_credentials(test_func):
mistakenly overriding real bucket files.
"""
def wrapped(*args, **kwargs):
# type: (*Any, **Any) -> Any
with mock.patch.dict(os.environ, {
"AWS_ACCESS_KEY_ID": "testing",
"AWS_SECRET_ACCESS_KEY": "testing",
Expand All @@ -1049,6 +1061,7 @@ def mocked_aws_s3(test_func):
attempt writing to real bucket.
"""
def wrapped(*args, **kwargs):
# type: (*Any, **Any) -> Any
with moto.mock_s3():
return test_func(*args, **kwargs)
return wrapped
Expand Down Expand Up @@ -1093,12 +1106,14 @@ def mocked_http_file(test_func):
- :func:`mocked_reference_test_file`
"""
def mocked_file_request(file_reference, file_outdir, **kwargs):
# type: (str, str, **Any) -> str
if file_reference and file_reference.startswith(MOCK_HTTP_REF):
file_reference = file_reference.replace(MOCK_HTTP_REF, "")
file_path = fetch_file(file_reference, file_outdir, **kwargs)
return file_path

def wrapped(*args, **kwargs):
# type: (*Any, **Any) -> Any
with mock.patch("weaver.processes.wps_package.fetch_file", side_effect=mocked_file_request):
return test_func(*args, **kwargs)
return wrapped
Expand Down
14 changes: 11 additions & 3 deletions weaver/formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ContentType(Constants):
<type> "/" [x- | <tree> "."] <subtype> ["+" suffix] *[";" parameter=value]
"""

APP_DIR = "application/directory"
APP_CWL = "application/cwl"
APP_CWL_JSON = "application/cwl+json"
APP_CWL_YAML = "application/cwl+yaml"
Expand Down Expand Up @@ -204,7 +205,8 @@ class SchemaRole(Constants):
ContentType.APP_TAR_GZ: ".tar.gz",
ContentType.APP_YAML: ".yml",
ContentType.IMAGE_TIFF: ".tif", # common alternate to .tiff
ContentType.ANY: ".*", # any for glob
ContentType.ANY: ".*", # any for glob
ContentType.APP_DIR: "/", # force href to finish with explicit '/' to mark directory
ContentType.APP_OCTET_STREAM: "",
ContentType.APP_FORM: "",
ContentType.MULTI_PART_FORM: "",
Expand Down Expand Up @@ -440,6 +442,8 @@ def _handle_dot(_ext):

fmt = _CONTENT_TYPE_FORMAT_MAPPING.get(mime_type)
if fmt:
if not fmt.extension.startswith("."):
return fmt.extension
return _handle_dot(fmt.extension)
ext = _CONTENT_TYPE_EXTENSION_MAPPING.get(mime_type)
if ext:
Expand All @@ -462,11 +466,15 @@ def get_content_type(extension, charset=None, default=None):
:param default: Default Content-Type to return if no extension is matched.
:return: Matched or default Content-Type.
"""
ctype = None
if not extension:
return default
if not extension.startswith("."):
extension = f".{extension}"
ctype = _EXTENSION_CONTENT_TYPES_MAPPING.get(extension)
ctype = _EXTENSION_CONTENT_TYPES_MAPPING.get(extension)
if not ctype:
extension = f".{extension}"
if not ctype:
ctype = _EXTENSION_CONTENT_TYPES_MAPPING.get(extension)
if not ctype:
return default
return add_content_type_charset(ctype, charset)
Expand Down
2 changes: 1 addition & 1 deletion weaver/processes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class OpenSearchField(Constants):
PACKAGE_EXTENSIONS = frozenset(["yaml", "yml", "json", "cwl", "job"])
PACKAGE_SIMPLE_TYPES = frozenset(["string", "boolean", "float", "int", "integer", "long", "double"])
PACKAGE_LITERAL_TYPES = frozenset(PACKAGE_SIMPLE_TYPES | {"null", "Any"})
PACKAGE_COMPLEX_TYPES = frozenset(["File"]) # FIXME: type "Directory" not supported
PACKAGE_COMPLEX_TYPES = frozenset(["File", "Directory"])
PACKAGE_ENUM_BASE = "enum"
PACKAGE_CUSTOM_TYPES = frozenset([PACKAGE_ENUM_BASE]) # can be anything, but support "enum" which is more common
PACKAGE_ARRAY_BASE = "array"
Expand Down
Loading

0 comments on commit 25f47e3

Please sign in to comment.