From 4284e5bba5e2308cb87e6cfbf030ffd83c780b48 Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Fri, 30 Sep 2022 19:37:59 -0400 Subject: [PATCH 01/19] patch requirements with latest CWL --- docs/examples/docker-python-script-report.cwl | 4 +-- requirements-sys.txt | 8 ++--- requirements.txt | 19 ++++------- weaver/processes/convert.py | 33 +++++++++++++----- weaver/processes/wps_workflow.py | 34 ++++++++++++------- 5 files changed, 58 insertions(+), 40 deletions(-) diff --git a/docs/examples/docker-python-script-report.cwl b/docs/examples/docker-python-script-report.cwl index 8d609a1b1..891ec73ad 100644 --- a/docs/examples/docker-python-script-report.cwl +++ b/docs/examples/docker-python-script-report.cwl @@ -23,5 +23,5 @@ requirements: entry: | amount = $(inputs.amount) cost = $(inputs.cost) - with open("report.txt", "w") as report: - report.write(f"Order Total: {amount * cost:0.2f}$\n") + with open("report.txt", mode="w", encoding="utf-8") as report: + report.write(f"Order Total: {amount * cost:0.2f}$\\n") diff --git a/requirements-sys.txt b/requirements-sys.txt index cc7446905..657f26e5e 100644 --- a/requirements-sys.txt +++ b/requirements-sys.txt @@ -1,8 +1,4 @@ pip>=20.2.2; python_version <= "3.6" pip>=22.0.4; python_version >= "3.7" -# celery enforces some specific versions of setuptools (<60) -# FIXME: when cwltool and rdflib-jsonld versions are updated to avoid failing setuptools install, update more recent -# 'rdflib-jsonld==0.5.0' (required by: cwltool -> schema_salad) uses 'use_2to3' which is invalid for setuptools>=58 -# setuptools<58; python_version <= "3.6" -# setuptools>=59,<59.7; python_version >= "3.7" -setuptools<58 +setuptools<58; python_version <= "3.6" +setuptools>=60; python_version >= "3.7" diff --git a/requirements.txt b/requirements.txt index 04036d78e..c2a2b9410 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,8 +12,6 @@ billiard>3.2,<3.4; sys_platform == "win32" # celery's CLI interface changed # https://github.com/celery/celery/blob/master/docs/whatsnew-5.2.rst#upgrading-from-celery-4x celery[mongodb]<4; sys_platform == "win32" # rq.filter: <4 # pyup: ignore -# FIXME: use celery 5.2 specifically to resolve security issue -# best match for now since cannot use setuptools>=59 until cwltool/rdflib-jsonld are updated celery[mongodb]>=5.2.2,<6; sys_platform != "win32" and python_version >= "3.7" # technically, >=5.2 preferred to resolve security issue, but dependency resolver cannot find it # FIXME: drop support? more recent versions not avilable for end-of-life python @@ -30,10 +28,7 @@ cryptography # use cwltool gpu-enabled support until integrated within the original tool # (https://github.com/common-workflow-language/common-workflow-language/issues/587) ### git+https://github.com/crim-ca/cwltool@docker-gpu#egg=cwltool; python_version >= "3" -# FIXME: remove extra CWL code and let it handle it for use -# - changes since cause error with invalid get_listing import location -# - https://github.com/crim-ca/weaver/issues/154 -cwltool==3.0.20200324120055 +cwltool==3.1.20220913185150 docker duration git+https://github.com/ESGF/esgf-compute-api.git@v2.3.7#egg=esgf-compute-api @@ -51,7 +46,8 @@ oauthlib owslib==0.27.2 psutil # FIXME: pymongo>=4 breaks with kombu corresponding to pinned Celery (https://github.com/crim-ca/weaver/issues/386) -pymongo>=3.12.0,<4 +pymongo>=3.12.0,<4; python_version <= "3.6" +pymongo>=4; python_version >= "3.7" pyramid>=1.7.3 pyramid_beaker>=0.8 pyramid_celery>=4.0.0 # required for celery>=5 @@ -62,13 +58,12 @@ pyramid_storage pytz pywps==4.5.1 pyyaml>=5.2 +rdflib>=5 requests requests_file -# let cwltool define ruamel.yaml version (<=0.16.5) -# ensure minimal 0.15.78 to solve install issue (python 3.8) -# (https://bitbucket.org/ruamel/yaml/issues/261/error-while-installing-ruamelyaml-setuppy) -ruamel.yaml>=0.15.78,<=0.16.5 -shapely +ruamel.yaml>=0.16 +schema-salad>=8.2,<9 +shapely==1.8.2 simplejson urlmatch xmltodict diff --git a/weaver/processes/convert.py b/weaver/processes/convert.py index d9d719a36..78b612ac7 100644 --- a/weaver/processes/convert.py +++ b/weaver/processes/convert.py @@ -859,7 +859,7 @@ def is_cwl_array_type(io_info, strict=True): io_return = { "array": False, "allow": AnyValue, - "type": io_info["type"], + "type": get_cwl_io_type_name(io_info["type"]), "mode": MODE.NONE, } @@ -897,19 +897,21 @@ def _update_if_sub_enum(_io_item): if io_type["type"] != PACKAGE_ARRAY_BASE: raise PackageTypeError(f"Unsupported I/O 'array' definition: '{io_info!r}'.") # parse enum in case we got an array of allowed symbols - is_enum = _update_if_sub_enum(io_type["items"]) + io_items = get_cwl_io_type_name(io_type["items"]) + is_enum = _update_if_sub_enum(io_items) if not is_enum: io_return["type"] = io_type["items"] - if io_return["type"] not in PACKAGE_ARRAY_ITEMS: # includes Complex, so implicit literal-only check possible - io_type = any2cwl_literal_datatype(io_return["type"]) + io_type = get_cwl_io_type_name(io_return["type"]) + if io_type not in PACKAGE_ARRAY_ITEMS: # includes Complex, so implicit literal-only check possible + io_type = any2cwl_literal_datatype(io_type) if strict or io_type not in PACKAGE_ARRAY_ITEMS: raise PackageTypeError(f"Unsupported I/O 'array' definition: '{io_info!r}'.") io_return["type"] = io_type LOGGER.debug("I/O [%s] parsed as 'array' with nested dict notation", io_info["name"]) io_return["array"] = True # array type conversion when defined as string '[]' - elif isinstance(io_return["type"], str) and io_return["type"] in PACKAGE_ARRAY_TYPES: - io_return["type"] = io_return["type"][:-2] # remove '[]' + elif isinstance(io_return["type"], str) and get_cwl_io_type_name(io_return["type"]) in PACKAGE_ARRAY_TYPES: + io_return["type"] = get_cwl_io_type_name(io_return["type"][:-2]) # remove '[]' if io_return["type"] in PACKAGE_CUSTOM_TYPES: # parse 'enum[]' for array of allowed symbols, provide expected structure for sub-item parsing io_item = deepcopy(io_info) @@ -935,7 +937,7 @@ def is_cwl_enum_type(io_info): - ``io_allow``: validation values of the enum. :raises PackageTypeError: if the enum doesn't have the required parameters and valid format. """ - io_type = io_info["type"] + io_type = get_cwl_io_type_name(io_info["type"]) if not isinstance(io_type, dict) or "type" not in io_type or io_type["type"] not in PACKAGE_CUSTOM_TYPES: return False, io_type, MODE.NONE, None @@ -964,6 +966,19 @@ def is_cwl_enum_type(io_info): return True, io_type, MODE.SIMPLE, io_allow # allowed value validator mode must be set for input +def get_cwl_io_type_name(io_type): + # type: (Any) -> Any + """ + Obtain the simple type-name representation of a :term:`CWL` I/O. + + Depending on :mod:`cwltool` version, types are represented with or without an extended prefix, and using an + explicit quoted class representation rather than plain strings. + """ + if isinstance(io_type, str): + return str(io_type.replace("org.w3id.cwl.cwl.", "")) + return io_type + + @dataclass class CWLIODefinition: """ @@ -1003,7 +1018,7 @@ def get_cwl_io_type(io_info, strict=True): :param strict: Indicates if only pure :term:`CWL` definition is allowed, or allow implicit data-type conversions. :return: tuple of guessed base type and flag indicating if it can be null (optional input). """ - io_type = io_info["type"] + io_type = get_cwl_io_type_name(io_info["type"]) is_null = False # parse multi-definition @@ -1077,6 +1092,8 @@ def get_cwl_io_type(io_info, strict=True): LOGGER.debug("type(io_type): [%s]", type(io_type)) raise TypeError(f"I/O type has not been properly decoded. Should be a string, got: '{io_type!r}'") + io_type = get_cwl_io_type_name(io_type) + # parse shorthand notation for nullable if io_type.endswith("?"): io_type = io_type[:-1] diff --git a/weaver/processes/wps_workflow.py b/weaver/processes/wps_workflow.py index 729800d8f..886861b72 100644 --- a/weaver/processes/wps_workflow.py +++ b/weaver/processes/wps_workflow.py @@ -15,9 +15,9 @@ from cwltool.context import LoadingContext, RuntimeContext, getdefault from cwltool.errors import WorkflowException from cwltool.job import JobBase, relink_initialworkdir -from cwltool.pathmapper import adjustDirObjs, adjustFileObjs, get_listing, trim_listing, visit_class from cwltool.process import ( Process as ProcessCWL, + avroize_type, compute_checksums, normalizeFilesDirs, shortname, @@ -25,7 +25,12 @@ uniquename ) from cwltool.stdfsaccess import StdFsAccess -from cwltool.utils import aslist, bytes2str_in_dicts, onWindows +from cwltool.utils import ( + aslist, + adjustDirObjs, adjustFileObjs, get_listing, trim_listing, visit_class, + bytes2str_in_dicts, CWLObjectType, OutputCallbackType, + JobsGeneratorType +) from cwltool.workflow import Workflow from schema_salad import validate from schema_salad.sourceline import SourceLine @@ -242,8 +247,9 @@ def make_workflow_exception(msg): adjustFileObjs(ret, partial(compute_checksums, fs_access)) validate.validate_ex( - self.names.get_name("outputs_record_schema", ""), ret, - strict=False, logger=LOGGER) + self.names.get_name("outputs_record_schema", None), ret, + strict=False, logger=LOGGER, vocab={typ: avroize_type(typ) for typ in ["File", "Directory"]} + ) if ret is not None and builder.mutation_manager is not None: adjustFileObjs(ret, builder.mutation_manager.set_generation) return ret if ret is not None else {} @@ -457,13 +463,23 @@ def __init__(self, output_id = shortname(output["id"]) self.expected_outputs[output_id] = output["outputBinding"]["glob"] + def _required_env(self): + # type: () -> Dict[str, str] + env = {} + env["HOME"] = self.outdir + env["TMPDIR"] = self.tmpdir + env["PATH"] = os.environ["PATH"] + if "SYSTEMROOT" in os.environ: + env["SYSTEMROOT"] = os.environ["SYSTEMROOT"] + return env + def run(self, runtimeContext, # type: RuntimeContext tmpdir_lock=None, # type: Optional[ThreadLock] ): # type: (...) -> None make_dirs(self.tmpdir, exist_ok=True) - env = self.environment + env = self._required_env() vars_to_preserve = runtimeContext.preserve_environment if runtimeContext.preserve_entire_environment: vars_to_preserve = os.environ @@ -471,13 +487,7 @@ def run(self, for key, value in os.environ.items(): if key in vars_to_preserve and key not in env: # On Windows, subprocess env can't handle unicode. - env[key] = str(value) if onWindows() else value - env["HOME"] = str(self.outdir) if onWindows() else self.outdir - env["TMPDIR"] = str(self.tmpdir) if onWindows() else self.tmpdir - if "PATH" not in env: - env["PATH"] = str(os.environ["PATH"]) if onWindows() else os.environ["PATH"] - if "SYSTEMROOT" not in env and "SYSTEMROOT" in os.environ: - env["SYSTEMROOT"] = str(os.environ["SYSTEMROOT"]) if onWindows() else os.environ["SYSTEMROOT"] + env[key] = value # stageFiles(self.pathmapper, ignoreWritable=True, symLink=True, secret_store=runtimeContext.secret_store) if self.generatemapper: From 26847c1303ab1bd5f77479e11ce54dcda0a7698e Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Mon, 3 Oct 2022 15:53:23 -0400 Subject: [PATCH 02/19] fix spacings in dockers --- docker/Dockerfile-base | 10 +++++----- docker/Dockerfile-worker | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docker/Dockerfile-base b/docker/Dockerfile-base index b330a11a8..b91ff78f5 100644 --- a/docker/Dockerfile-base +++ b/docker/Dockerfile-base @@ -17,11 +17,11 @@ COPY requirements* setup.py README.rst CHANGES.rst ${APP_DIR}/ # install runtime/package dependencies RUN apt-get update && apt-get install -y --no-install-recommends \ - ca-certificates \ - netbase \ - gcc \ - git \ - && pip install --no-cache-dir --upgrade -r requirements-sys.txt \ + ca-certificates \ + netbase \ + gcc \ + git \ + && pip install --no-cache-dir --upgrade -r requirements-sys.txt \ && pip install --no-cache-dir -r requirements.txt \ && pip install --no-cache-dir -e ${APP_DIR} \ && apt-get remove -y \ diff --git a/docker/Dockerfile-worker b/docker/Dockerfile-worker index 9f2aa3b40..fb48c5abe 100644 --- a/docker/Dockerfile-worker +++ b/docker/Dockerfile-worker @@ -15,7 +15,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ # Only install CLI package, 'docker-ce' and 'containerd.io' not required as they should be provided by host. # Docker sibling execution is expected. See 'docker/docker-compose.yml.example' for details. && apt install --no-install-recommends docker-ce-cli \ - && rm -rf /var/lib/apt/lists/* + && rm -rf /var/lib/apt/lists/* # run app # see CHANGES (4.15.0), celery>=5 needs '-A' before 'worker' From 8a31034a18b90e17efe75665bb52464b1c9804c5 Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Mon, 3 Oct 2022 15:53:38 -0400 Subject: [PATCH 03/19] update definitions to work with latest cwltool --- requirements.txt | 4 ++-- weaver/processes/convert.py | 2 +- weaver/processes/wps_workflow.py | 32 ++++++++++++++++++++++---------- weaver/typedefs.py | 8 +++++++- 4 files changed, 32 insertions(+), 14 deletions(-) diff --git a/requirements.txt b/requirements.txt index c2a2b9410..bea4c5db0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -45,7 +45,7 @@ mako oauthlib owslib==0.27.2 psutil -# FIXME: pymongo>=4 breaks with kombu corresponding to pinned Celery (https://github.com/crim-ca/weaver/issues/386) +# pymongo>=4 breaks with kombu corresponding to pinned Celery (https://github.com/crim-ca/weaver/issues/386) pymongo>=3.12.0,<4; python_version <= "3.6" pymongo>=4; python_version >= "3.7" pyramid>=1.7.3 @@ -56,7 +56,7 @@ python-dateutil pyramid_rewrite pyramid_storage pytz -pywps==4.5.1 +pywps==4.5.2 pyyaml>=5.2 rdflib>=5 requests diff --git a/weaver/processes/convert.py b/weaver/processes/convert.py index 78b612ac7..0a1625889 100644 --- a/weaver/processes/convert.py +++ b/weaver/processes/convert.py @@ -900,7 +900,7 @@ def _update_if_sub_enum(_io_item): io_items = get_cwl_io_type_name(io_type["items"]) is_enum = _update_if_sub_enum(io_items) if not is_enum: - io_return["type"] = io_type["items"] + io_return["type"] = io_items io_type = get_cwl_io_type_name(io_return["type"]) if io_type not in PACKAGE_ARRAY_ITEMS: # includes Complex, so implicit literal-only check possible io_type = any2cwl_literal_datatype(io_type) diff --git a/weaver/processes/wps_workflow.py b/weaver/processes/wps_workflow.py index 886861b72..1fc8bc16f 100644 --- a/weaver/processes/wps_workflow.py +++ b/weaver/processes/wps_workflow.py @@ -27,9 +27,12 @@ from cwltool.stdfsaccess import StdFsAccess from cwltool.utils import ( aslist, - adjustDirObjs, adjustFileObjs, get_listing, trim_listing, visit_class, - bytes2str_in_dicts, CWLObjectType, OutputCallbackType, - JobsGeneratorType + adjustDirObjs, + adjustFileObjs, + get_listing, + trim_listing, + visit_class, + bytes2str_in_dicts ) from cwltool.workflow import Workflow from schema_salad import validate @@ -51,8 +54,8 @@ from threading import Lock as ThreadLock from typing import Any, Dict, Generator, List, Optional, Set, Union - from cwltool.command_line_tool import OutputPorts - from cwltool.provenance import ProvenanceProfile + from cwltool.command_line_tool import OutputPortsType + from cwltool.provenance_profile import ProvenanceProfile from weaver.typedefs import ( AnyValueType, @@ -81,6 +84,15 @@ def default_make_tool(toolpath_object, # type: CWL_ToolPathObjectTy loading_context, # type: LoadingContext get_job_process_definition, # type: JobProcessDefinitionCallback ): # type: (...) -> ProcessCWL + """ + Generate the tool class object from the :term:`CWL` definition to handle its execution. + + .. warning:: + Package :mod:`cwltool` introduces explicit typing definitions with :mod:`mypy_extensions`. + This can cause ``TypeError("interpreted classes cannot inherit from compiled")`` when using + :class:`cwltool.process.Process` as base class for our custom definitions below. + To avoid the error, we must enforce the type using :func:`cast`. + """ if not isinstance(toolpath_object, collections.abc.MutableMapping): raise WorkflowException(f"Not a dict: '{toolpath_object}'") if "class" in toolpath_object: @@ -88,8 +100,8 @@ def default_make_tool(toolpath_object, # type: CWL_ToolPathObjectTy builtin_process_hints = [h.get("process") for h in toolpath_object.get("hints") if h.get("class", "").endswith(CWL_REQUIREMENT_APP_BUILTIN)] if len(builtin_process_hints) == 1: - return BuiltinProcess(toolpath_object, loading_context) - return WpsWorkflow(toolpath_object, loading_context, get_job_process_definition) + return cast(BuiltinProcess, BuiltinProcess(toolpath_object, loading_context)) + return cast(WpsWorkflow, WpsWorkflow(toolpath_object, loading_context, get_job_process_definition)) if toolpath_object["class"] == "ExpressionTool": return command_line_tool.ExpressionTool(toolpath_object, loading_context) if toolpath_object["class"] == "Workflow": @@ -212,8 +224,8 @@ def collect_output_ports(self, compute_checksum=True, # type: bool job_name="", # type: Text readers=None # type: Dict[Text, Any] - ): # type: (...) -> OutputPorts - ret = {} # type: OutputPorts + ): # type: (...) -> OutputPortsType + ret = {} # type: OutputPortsType debug = LOGGER.isEnabledFor(logging.DEBUG) try: fs_access = builder.make_fs_access(outdir) @@ -442,7 +454,7 @@ def collect_output(self, class WpsWorkflowJob(JobBase): def __init__(self, builder, # type: Builder - job_order, # type: Dict[Text, Union[Dict[Text, Any], List, Text, None]] + job_order, # type: Dict[Text, Union[Dict[Text, Any], List, Text, None]] requirements, # type: List[Dict[Text, Text]] hints, # type: List[Dict[Text, Text]] name, # type: Text diff --git a/weaver/typedefs.py b/weaver/typedefs.py index fa0ebebc9..c59fe93da 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -89,6 +89,12 @@ _JsonItem = Union[AnyValueType, _JsonObjectItem, _JsonListItem, _JSON] JSON = Union[Dict[str, _JsonItem], List[_JsonItem], AnyValueType] + # JSON-like definition employed by cwltool + try: + from ruamel.yaml.comments import CommentedMap + except (AttributeError, ImportError, NameError): + CommentedMap = Dict[str, JSON] + Link = TypedDict("Link", { "rel": str, "title": str, @@ -213,7 +219,7 @@ # CWL loading CWL_WorkflowInputs = Dict[str, AnyValueType] # mapping of ID:value (any type) CWL_ExpectedOutputs = Dict[str, AnyValueType] # mapping of ID:pattern (File only) - CWL_ToolPathObjectType = Dict[str, Any] + CWL_ToolPathObjectType = Union[Dict[str, Any], CommentedMap] # CWL document definition JobProcessDefinitionCallback = Callable[[str, Dict[str, str], Dict[str, Any]], WpsProcessInterface] # CWL runtime From 990464c263709b7d4a329e3bb4772234b2dd1897 Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Mon, 3 Oct 2022 16:04:52 -0400 Subject: [PATCH 04/19] pin pywps pre/post Python 3.7 --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index bea4c5db0..b1c43b77b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -56,7 +56,8 @@ python-dateutil pyramid_rewrite pyramid_storage pytz -pywps==4.5.2 +pywps==4.5.1; python_version <= "3.6" +pywps==4.5.2; python_version >= "3.7" pyyaml>=5.2 rdflib>=5 requests From ef8581b97c468662d1ed1a3733fd341de296d90f Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Mon, 3 Oct 2022 16:39:39 -0400 Subject: [PATCH 05/19] fix linting --- .pylintrc | 4 +++- weaver/processes/wps_workflow.py | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/.pylintrc b/.pylintrc index a98857e77..6bcf2d912 100644 --- a/.pylintrc +++ b/.pylintrc @@ -3,7 +3,9 @@ # A comma-separated list of package or module names from where C extensions may # be loaded. Extensions are loading into the active Python interpreter and may # run arbitrary code. -extension-pkg-whitelist=lxml.etree +extension-pkg-whitelist=lxml.etree, + schema_salad.sourceline, + schema_salad.validate # Add files or directories to the blacklist. They should be base names, not # paths. diff --git a/weaver/processes/wps_workflow.py b/weaver/processes/wps_workflow.py index 1fc8bc16f..e93f6bc0e 100644 --- a/weaver/processes/wps_workflow.py +++ b/weaver/processes/wps_workflow.py @@ -26,13 +26,13 @@ ) from cwltool.stdfsaccess import StdFsAccess from cwltool.utils import ( - aslist, adjustDirObjs, adjustFileObjs, + aslist, + bytes2str_in_dicts, get_listing, trim_listing, - visit_class, - bytes2str_in_dicts + visit_class ) from cwltool.workflow import Workflow from schema_salad import validate From 779d70ba2ef9824ae9a4f45d528e2aad27f5ed78 Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Mon, 3 Oct 2022 16:41:37 -0400 Subject: [PATCH 06/19] ignore rdflib pyup safety --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index b1c43b77b..6acfd105c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -59,7 +59,7 @@ pytz pywps==4.5.1; python_version <= "3.6" pywps==4.5.2; python_version >= "3.7" pyyaml>=5.2 -rdflib>=5 +rdflib>=5 # pyup: ignore requests requests_file ruamel.yaml>=0.16 From d98492dfeb8e54bf47b72e0c7b5b947db57a9517 Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Mon, 3 Oct 2022 16:55:45 -0400 Subject: [PATCH 07/19] ignore security in test cases --- tests/processes/test_convert.py | 14 +++++++------- tests/utils.py | 5 +++-- tests/wps/test_utils.py | 12 ++++++------ tests/wps_restapi/test_jobs.py | 2 +- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/tests/processes/test_convert.py b/tests/processes/test_convert.py index f29b8e33a..a31107aed 100644 --- a/tests/processes/test_convert.py +++ b/tests/processes/test_convert.py @@ -952,11 +952,11 @@ def test_cwl2json_input_values_ogc_format(): "test1": {"value": "value"}, "test2": {"value": 1}, "test3": {"value": 1.23}, - "test4": {"href": "/tmp/random.txt"}, + "test4": {"href": "/tmp/random.txt"}, # nosec: B108 "test5": [{"value": "val1"}, {"value": "val2"}], "test6": [{"value": 1}, {"value": 2}], "test7": [{"value": 1.23}, {"value": 4.56}], - "test8": [{"href": "/tmp/other.txt"}] + "test8": [{"href": "/tmp/other.txt"}] # nosec: B108 } result = cwl2json_input_values(values, ProcessSchema.OGC) assert result == expect @@ -1096,17 +1096,17 @@ def test_repr2json_input_values(): {"id": "test6", "value": 2}, {"id": "test7", "value": 1.23}, {"id": "test7", "value": 4.56}, - {"id": "test8", "href": "/tmp/other.txt"}, + {"id": "test8", "href": "/tmp/other.txt"}, # nosec: B108 {"id": "test9", "value": "short"}, {"id": "test10", "value": "long"}, - {"id": "test11", "href": "/tmp/file.json", "format": { + {"id": "test11", "href": "/tmp/file.json", "format": { # nosec: B108 "mediaType": ContentType.APP_JSON, "schema": "http://schema.org/random.json" }}, - {"id": "test12", "href": "/tmp/other.xml", "format": { + {"id": "test12", "href": "/tmp/other.xml", "format": { # nosec: B108 "mediaType": ContentType.TEXT_XML, "schema": "http://schema.org/random.xml" }}, - {"id": "test13", "href": "/tmp/one.json", "format": {"mediaType": ContentType.APP_JSON}}, - {"id": "test13", "href": "/tmp/two.xml", "format": {"mediaType": ContentType.TEXT_XML}}, + {"id": "test13", "href": "/tmp/one.json", "format": {"mediaType": ContentType.APP_JSON}}, # nosec: B108 + {"id": "test13", "href": "/tmp/two.xml", "format": {"mediaType": ContentType.TEXT_XML}}, # nosec: B108 ] result = repr2json_input_values(values) assert result == expect diff --git a/tests/utils.py b/tests/utils.py index 484afa285..be3a8d185 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -12,7 +12,7 @@ import mimetypes import os import re -import subprocess +import subprocess # nosec: B404 import sys import tempfile import uuid @@ -358,7 +358,8 @@ def run_command(command, trim=True, expect_error=False, entrypoint=None): command = command.split(" ") command = [str(arg) for arg in command] if entrypoint is None: - out, _ = subprocess.Popen(["which", "python"], universal_newlines=True, stdout=subprocess.PIPE).communicate() + func = ["which", "python"] + out, _ = subprocess.Popen(func, universal_newlines=True, stdout=subprocess.PIPE).communicate() # nosec: B603 if not out: out = sys.executable # fallback for some systems that fail above call python_path = os.path.split(out)[0] diff --git a/tests/wps/test_utils.py b/tests/wps/test_utils.py index 5e395af54..ee1af7456 100644 --- a/tests/wps/test_utils.py +++ b/tests/wps/test_utils.py @@ -218,24 +218,24 @@ def test_get_wps_output_context_resolution(): def test_map_wps_output_location_duplicate_subdir(): for tmp_dir in [ - "/tmp/tmp/tmp", - "/tmp/tmpdir" + "/tmp/tmp/tmp", # nosec: B108 # don't care hardcoded for test + "/tmp/tmpdir" # nosec: B108 # don't care hardcoded for test ]: - wps_out = "http:///localhost/wps-output/tmp" + wps_out = "http:///localhost/wps-output/tmp" # nosec: B108 # don't care hardcoded for test settings = { "weaver.wps_output_dir": tmp_dir, "weaver.wps_output_url": wps_out } path = map_wps_output_location(f"{wps_out}/tmp/some-file-tmp.tmp", settings, exists=False) - assert path == f"{tmp_dir}/tmp/some-file-tmp.tmp" + assert path == f"{tmp_dir}/tmp/some-file-tmp.tmp" # nosec: B108 # don't care hardcoded for test path = map_wps_output_location(f"{tmp_dir}/here/some-file-tmp.tmp", settings, exists=False, url=True) - assert path == f"{wps_out}/here/some-file-tmp.tmp" + assert path == f"{wps_out}/here/some-file-tmp.tmp" # nosec: B108 # don't care hardcoded for test def test_map_wps_output_location_exists(): wps_url = "http:///localhost/wps-output/tmp" - wps_dir = "/tmp/weaver-test/test-outputs" + wps_dir = "/tmp/weaver-test/test-outputs" # nosec: B108 # don't care hardcoded for test settings = { "weaver.wps_output_dir": wps_dir, "weaver.wps_output_url": wps_url diff --git a/tests/wps_restapi/test_jobs.py b/tests/wps_restapi/test_jobs.py index b5238d5ed..b6c3c9214 100644 --- a/tests/wps_restapi/test_jobs.py +++ b/tests/wps_restapi/test_jobs.py @@ -63,7 +63,7 @@ def setUpClass(cls): cls.settings = { "weaver.url": "https://localhost", "weaver.wps_email_encrypt_salt": "weaver-test", - "weaver.wps_output_dir": "/tmp/weaver-test/wps-outputs", + "weaver.wps_output_dir": "/tmp/weaver-test/wps-outputs", # nosec: B108 # don't care hardcoded for test } cls.config = setup_config_with_mongodb(settings=cls.settings) cls.app = get_test_weaver_app(config=cls.config) From 4be442e2403efa355a0def9c9417d20c491284d9 Mon Sep 17 00:00:00 2001 From: Francis Charette-Migneault Date: Mon, 3 Oct 2022 17:18:05 -0400 Subject: [PATCH 08/19] security check workaround to ignore misbehaving bandit excludes not considered (relates to https://github.com/PyCQA/bandit/issues/657) --- Makefile | 3 ++- setup.cfg | 22 +++++++++++----------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/Makefile b/Makefile index 56f861336..d391781a0 100644 --- a/Makefile +++ b/Makefile @@ -532,8 +532,9 @@ check-security-deps-only: mkdir-reports ## run security checks on package depen $(SAFETY_IGNORE) \ 1> >(tee "$(REPORTS_DIR)/check-security-deps.txt")' +# FIXME: bandit excludes not working (https://github.com/PyCQA/bandit/issues/657), clean-src beforehand to avoid error .PHONY: check-security-code-only -check-security-code-only: mkdir-reports ## run security checks on source code +check-security-code-only: mkdir-reports clean-src ## run security checks on source code @echo "Running security code checks..." @-rm -fr "$(REPORTS_DIR)/check-security-code.txt" @bash -c '$(CONDA_CMD) \ diff --git a/setup.cfg b/setup.cfg index 1f3745847..808cf8268 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,23 +5,23 @@ tag = True tag_name = {new_version} [bumpversion:file:CHANGES.rst] -search = +search = `Unreleased `_ (latest) ======================================================================== -replace = +replace = `Unreleased `_ (latest) ======================================================================== - + Changes: -------- - No change. - + Fixes: ------ - No change. - + .. _changes_{new_version}: - + `{new_version} `_ ({now:%%Y-%%m-%%d}) ======================================================================== @@ -42,14 +42,14 @@ search = LABEL version="{current_version}" replace = LABEL version="{new_version}" [tool:pytest] -addopts = +addopts = --strict-markers --tb=native weaver/ log_cli = false log_level = DEBUG python_files = test_*.py -markers = +markers = cli: mark test as related to CLI operations testbed14: mark test as 'testbed14' validation functional: mark test as functionality validation @@ -80,7 +80,7 @@ targets = . [flake8] ignore = E126,E226,E402,F401,W503,W504 max-line-length = 120 -exclude = +exclude = src, .git, __pycache__, @@ -112,14 +112,14 @@ add_select = D201,D213 branch = true source = ./ include = weaver/* -omit = +omit = setup.py docs/* tests/* *_mako [coverage:report] -exclude_lines = +exclude_lines = pragma: no cover raise AssertionError raise NotImplementedError From 60019d61a25516eabad1e437929b90dc37eba1bf Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 3 Oct 2022 18:01:36 -0400 Subject: [PATCH 09/19] fix cwl type name resolution --- weaver/processes/convert.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/weaver/processes/convert.py b/weaver/processes/convert.py index 0a1625889..15dfcb0e2 100644 --- a/weaver/processes/convert.py +++ b/weaver/processes/convert.py @@ -1039,8 +1039,9 @@ def get_cwl_io_type(io_info, strict=True): io_type_many = set() io_base_type = None for i, typ in enumerate(io_type): + typ = get_cwl_io_type_name(typ) io_name = io_info["name"] - sub_type = {"type": typ, "name": f"{io_name}[{i}]"} + sub_type = {"type": typ, "name": f"{io_name}[{i}]"} # type: CWL_IO_Type is_array, array_elem, _, _ = is_cwl_array_type(sub_type, strict=strict) is_enum, enum_type, _, _ = is_cwl_enum_type(sub_type) # array base type more important than enum because later array conversion also handles allowed values From f76a44f2e0841cd312285c7843ff9173c09fa137 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Mon, 3 Oct 2022 18:10:28 -0400 Subject: [PATCH 10/19] drop CallbackJob --- weaver/processes/wps_workflow.py | 25 +++---------------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/weaver/processes/wps_workflow.py b/weaver/processes/wps_workflow.py index e93f6bc0e..415c6b37a 100644 --- a/weaver/processes/wps_workflow.py +++ b/weaver/processes/wps_workflow.py @@ -55,10 +55,9 @@ from typing import Any, Dict, Generator, List, Optional, Set, Union from cwltool.command_line_tool import OutputPortsType - from cwltool.provenance_profile import ProvenanceProfile + from cwltool.utils import CWLObjectType, JobsGeneratorType from weaver.typedefs import ( - AnyValueType, CWL_ExpectedOutputs, CWL_Output_Type, CWL_ToolPathObjectType, @@ -113,24 +112,6 @@ def default_make_tool(toolpath_object, # type: CWL_ToolPathObjectTy ) -class CallbackJob(object): - def __init__(self, job, output_callback, cachebuilder, jobcache): - # type: (WpsWorkflow, Callable[[Any, Any], Any], Builder, Text) -> None - self.job = job - self.output_callback = output_callback - self.cache_builder = cachebuilder - self.output_dir = jobcache - self.prov_obj = None # type: Optional[ProvenanceProfile] - - def run(self, loading_context): - # type: (RuntimeContext) -> None - self.output_callback(self.job.collect_output_ports( - self.job.tool["outputs"], - self.cache_builder, - self.output_dir, - getdefault(loading_context.compute_checksum, True)), "success") - - class WpsWorkflow(ProcessCWL): """ Definition of a `CWL` ``workflow`` that can execute ``WPS`` application packages as intermediate job steps. @@ -156,10 +137,10 @@ def __init__(self, toolpath_object, loading_context, get_job_process_definition) # pylint: disable=W0221,W0237 # naming using python like arguments def job(self, - job_order, # type: Dict[Text, AnyValueType] + job_order, # type: CWLObjectType output_callbacks, # type: Callable[[Any, Any], Any] runtime_context, # type: RuntimeContext - ): # type: (...) -> Generator[Union[JobBase, CallbackJob], None, None] + ): # type: (...) -> JobsGeneratorType """ Workflow job generator. From 59f3796a40d6e7a33a20915147faea32c7427921 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 4 Oct 2022 11:44:55 -0400 Subject: [PATCH 11/19] drop duplicate CWL redefinitions for WpsWorkflow (fixes #154) --- weaver/processes/wps_package.py | 7 +- weaver/processes/wps_process_base.py | 2 +- weaver/processes/wps_workflow.py | 499 +++++---------------------- weaver/typedefs.py | 19 +- 4 files changed, 98 insertions(+), 429 deletions(-) diff --git a/weaver/processes/wps_package.py b/weaver/processes/wps_package.py index e6bc19c0a..cb4f61679 100644 --- a/weaver/processes/wps_package.py +++ b/weaver/processes/wps_package.py @@ -138,7 +138,7 @@ CWL_RequirementNames, CWL_RequirementsList, CWL_Results, - CWL_ToolPathObjectType, + CWL_ToolPathObject, CWL_WorkflowStepPackage, CWL_WorkflowStepPackageMap, CWL_WorkflowStepReference, @@ -1017,6 +1017,9 @@ def insert_package_log(self, result): if isinstance(result, CWLException): result = getattr(result, "out") status = Status.FAILED + if not result: + LOGGER.warning("Could not retrieve any internal application log from empty result.") + return [] stderr_file = result.get(self.package_log_hook_stderr, {}).get("location", "").replace("file://", "") stdout_file = result.get(self.package_log_hook_stdout, {}).get("location", "").replace("file://", "") with_stderr_file = os.path.isfile(stderr_file) @@ -1771,7 +1774,7 @@ def make_location_output(self, cwl_result, output_id): self.logger.info("Resolved WPS output [%s] as file reference: [%s]", output_id, result_wps) def make_tool(self, toolpath_object, loading_context): - # type: (CWL_ToolPathObjectType, LoadingContext) -> ProcessCWL + # type: (CWL_ToolPathObject, LoadingContext) -> ProcessCWL from weaver.processes.wps_workflow import default_make_tool return default_make_tool(toolpath_object, loading_context, self.get_job_process_definition) diff --git a/weaver/processes/wps_process_base.py b/weaver/processes/wps_process_base.py index cbfd62504..066a2f57c 100644 --- a/weaver/processes/wps_process_base.py +++ b/weaver/processes/wps_process_base.py @@ -431,7 +431,7 @@ def format_outputs(self, workflow_outputs): return workflow_outputs def dispatch(self, process_inputs, process_outputs): - # type: (JobInputs, JobOutputs) -> Any + # type: (JobInputs, JobOutputs) -> str LOGGER.debug("Execute process %s request for [%s]", self.process_type, self.process) execute_body = { "mode": ExecuteMode.ASYNC, diff --git a/weaver/processes/wps_workflow.py b/weaver/processes/wps_workflow.py index 415c6b37a..11fc94fd4 100644 --- a/weaver/processes/wps_workflow.py +++ b/weaver/processes/wps_workflow.py @@ -1,44 +1,23 @@ import collections.abc -import hashlib -import json -import locale import logging import os -import shutil import tempfile -from functools import cmp_to_key, partial -from pathlib import Path -from typing import TYPE_CHECKING, Callable, MutableMapping, Text, cast # these are actually used in the code +from functools import partial +from typing import TYPE_CHECKING, cast # these are actually used in the code from cwltool import command_line_tool -from cwltool.builder import CONTENT_LIMIT, Builder, substitute from cwltool.context import LoadingContext, RuntimeContext, getdefault from cwltool.errors import WorkflowException -from cwltool.job import JobBase, relink_initialworkdir +from cwltool.job import CommandLineJob from cwltool.process import ( Process as ProcessCWL, - avroize_type, - compute_checksums, - normalizeFilesDirs, shortname, supportedProcessRequirements, uniquename ) from cwltool.stdfsaccess import StdFsAccess -from cwltool.utils import ( - adjustDirObjs, - adjustFileObjs, - aslist, - bytes2str_in_dicts, - get_listing, - trim_listing, - visit_class -) from cwltool.workflow import Workflow -from schema_salad import validate -from schema_salad.sourceline import SourceLine -from weaver.formats import repr_json from weaver.processes.builtin import BuiltinProcess from weaver.processes.constants import ( CWL_REQUIREMENT_APP_BUILTIN, @@ -47,30 +26,31 @@ CWL_REQUIREMENT_APP_WPS1 ) from weaver.processes.convert import is_cwl_file_type -from weaver.utils import get_settings, make_dirs, now +from weaver.utils import get_settings from weaver.wps.utils import get_wps_output_dir if TYPE_CHECKING: - from threading import Lock as ThreadLock - from typing import Any, Dict, Generator, List, Optional, Set, Union + from subprocess import Popen + from typing import Any, Callable, Generator, List, MutableMapping, Optional - from cwltool.command_line_tool import OutputPortsType - from cwltool.utils import CWLObjectType, JobsGeneratorType + from cwltool.builder import Builder + from cwltool.pathmapper import PathMapper + from cwltool.utils import CWLObjectType, CWLOutputType, JobsGeneratorType from weaver.typedefs import ( CWL_ExpectedOutputs, CWL_Output_Type, - CWL_ToolPathObjectType, + CWL_RequirementsList, + CWL_ToolPathObject, JobProcessDefinitionCallback, ) from weaver.processes.wps_process_base import WpsProcessInterface + MonitorFunction = Optional[Callable[[Popen[str]], None]] + LOGGER = logging.getLogger(__name__) DEFAULT_TMP_PREFIX = "tmp" -# TODO: The code started as a copy of the class cwltool/command_line_tool.py, -# and still has useless code in the context of a WPS workflow - # Extend the supported process requirements supportedProcessRequirements += [ CWL_REQUIREMENT_APP_BUILTIN, @@ -79,7 +59,7 @@ ] -def default_make_tool(toolpath_object, # type: CWL_ToolPathObjectType +def default_make_tool(toolpath_object, # type: CWL_ToolPathObject loading_context, # type: LoadingContext get_job_process_definition, # type: JobProcessDefinitionCallback ): # type: (...) -> ProcessCWL @@ -112,21 +92,19 @@ def default_make_tool(toolpath_object, # type: CWL_ToolPathObjectTy ) -class WpsWorkflow(ProcessCWL): +class WpsWorkflow(command_line_tool.CommandLineTool): """ Definition of a `CWL` ``workflow`` that can execute ``WPS`` application packages as intermediate job steps. Steps are expected to be defined as individual :class:`weaver.processes.wps_package.WpsPackage` references. """ - # pylint: disable=R1260,too-complex # FIXME: simplify operations - # imposed by original CWL implementation # pylint: disable=C0103,invalid-name # pylint: disable=W0201,attribute-defined-outside-init def __init__(self, toolpath_object, loading_context, get_job_process_definition): - # type: (Dict[Text, Any], LoadingContext, JobProcessDefinitionCallback) -> None + # type: (CWL_ToolPathObject, LoadingContext, JobProcessDefinitionCallback) -> None super(WpsWorkflow, self).__init__(toolpath_object, loading_context) self.prov_obj = loading_context.prov_obj self.get_job_process_definition = get_job_process_definition @@ -149,118 +127,54 @@ def job(self, :param runtime_context: configs about execution environment :return: """ - require_prefix = "" - if self.metadata["cwlVersion"] == "v1.0": - require_prefix = "http://commonwl.org/cwltool#" - job_name = uniquename(runtime_context.name or shortname(self.tool.get("id", "job"))) # outdir must be served by the EMS because downstream step will need access to upstream steps output weaver_out_dir = get_wps_output_dir(get_settings()) runtime_context.outdir = tempfile.mkdtemp( prefix=getdefault(runtime_context.tmp_outdir_prefix, DEFAULT_TMP_PREFIX), - dir=weaver_out_dir) + dir=weaver_out_dir + ) builder = self._init_job(job_order, runtime_context) # `job_name` is the step name and `job_order` is the actual step inputs - wps_workflow_job = WpsWorkflowJob(builder, builder.job, self.requirements, self.hints, job_name, - self.get_job_process_definition(job_name, job_order, self.tool), - self.tool["outputs"]) + wps_workflow_job = WpsWorkflowJob( + builder, + builder.job, + self.make_path_mapper, + self.requirements, + self.hints, + job_name, + self.get_job_process_definition(job_name, job_order, self.tool), + self.tool["outputs"] + ) wps_workflow_job.prov_obj = self.prov_obj wps_workflow_job.successCodes = self.tool.get("successCodes") wps_workflow_job.temporaryFailCodes = self.tool.get("temporaryFailCodes") wps_workflow_job.permanentFailCodes = self.tool.get("permanentFailCodes") - - # TODO Taken from command_line_tool.py maybe this could let us use the revmap if required at all - # reffiles = copy.deepcopy(builder.files) - # builder.pathmapper = self.make_path_mapper( - # reffiles, builder.stagedir, runtimeContext, True) - # builder.requirements = wps_workflow_job.requirements - wps_workflow_job.outdir = builder.outdir wps_workflow_job.tmpdir = builder.tmpdir wps_workflow_job.stagedir = builder.stagedir - - readers = {} # type: Dict[Text, Any] - timelimit = self.get_requirement(require_prefix + "TimeLimit")[0] - if timelimit: - with SourceLine(timelimit, "timelimit", validate.ValidationException): - wps_workflow_job.timelimit = builder.do_eval(timelimit["timelimit"]) - if not isinstance(wps_workflow_job.timelimit, int) or wps_workflow_job.timelimit < 0: - raise Exception(f"timelimit must be an integer >= 0, got: {wps_workflow_job.timelimit}") - wps_workflow_job.collect_outputs = partial( - self.collect_output_ports, self.tool["outputs"], builder, + self.collect_output_ports, + self.tool["outputs"], + builder, compute_checksum=getdefault(runtime_context.compute_checksum, True), - job_name=job_name, - readers=readers) + jobname=job_name, + readers={} + ) wps_workflow_job.output_callback = output_callbacks yield wps_workflow_job - def collect_output_ports(self, - ports, # type: Set[Dict[Text, Any]] - builder, # type: Builder - outdir, # type: Text - compute_checksum=True, # type: bool - job_name="", # type: Text - readers=None # type: Dict[Text, Any] - ): # type: (...) -> OutputPortsType - ret = {} # type: OutputPortsType - debug = LOGGER.isEnabledFor(logging.DEBUG) - try: - fs_access = builder.make_fs_access(outdir) - custom_output = fs_access.join(outdir, "cwl.output.json") - if fs_access.exists(custom_output): - with fs_access.open(custom_output, "r") as f: - ret = json.load(f) - if debug: - LOGGER.debug(u"Raw output from %s: %s", custom_output, json.dumps(ret, indent=4)) - else: - for i, port in enumerate(ports): - def make_workflow_exception(msg): - name = shortname(port["id"]) - return WorkflowException(f"Error collecting output for parameter '{name}':\n{msg}") - with SourceLine(ports, i, make_workflow_exception, debug): - fragment = shortname(port["id"]) - ret[fragment] = self.collect_output(port, builder, outdir, fs_access, - compute_checksum=compute_checksum) - if ret: - # revmap = partial(command_line_tool.revmap_file, builder, outdir) - adjustDirObjs(ret, trim_listing) - - # TODO: Attempt to avoid a crash because the revmap fct is not functional - # (intend for a docker usage only?) - # visit_class(ret, ("File", "Directory"), cast(Callable[[Any], Any], revmap)) - visit_class(ret, ("File", "Directory"), command_line_tool.remove_path) - normalizeFilesDirs(ret) - visit_class(ret, ("File", "Directory"), partial(command_line_tool.check_valid_locations, fs_access)) - - if compute_checksum: - adjustFileObjs(ret, partial(compute_checksums, fs_access)) - - validate.validate_ex( - self.names.get_name("outputs_record_schema", None), ret, - strict=False, logger=LOGGER, vocab={typ: avroize_type(typ) for typ in ["File", "Directory"]} - ) - if ret is not None and builder.mutation_manager is not None: - adjustFileObjs(ret, builder.mutation_manager.set_generation) - return ret if ret is not None else {} - except validate.ValidationException as exc: - raise WorkflowException(f"Error validating output record: {exc!s}\nIn:\n{repr_json(ret, indent=2)}") - finally: - if builder.mutation_manager and readers: - for reader in readers.values(): - builder.mutation_manager.release_reader(job_name, reader) - - def collect_output(self, - schema, # type: Dict[Text, Any] - builder, # type: Builder - outdir, # type: Text - fs_access, # type: StdFsAccess - compute_checksum=True # type: bool - ): - # type: (...) -> Optional[Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]]] + def collect_output( + self, + schema, # type: CWLObjectType + builder, # type: Builder + outdir, # type: str + fs_access, # type: StdFsAccess + compute_checksum=True, # type: bool + ): # type: (...) -> Optional[CWLOutputType] """ Collect outputs from the step :term:`Process` following its execution. @@ -278,173 +192,37 @@ def collect_output(self, ignoring any nested dirs where the modified *outputBindings* definition will be able to match as if each step :term:`Process` outputs were generated locally. """ - result = [] # type: List[Any] - empty_and_optional = False - debug = LOGGER.isEnabledFor(logging.DEBUG) - if "outputBinding" in schema: - binding = schema["outputBinding"] - globpatterns = [] # type: List[Text] - - revmap = partial(command_line_tool.revmap_file, builder, outdir) - - if "glob" in binding: - with SourceLine(binding, "glob", WorkflowException, debug): - for glob in aslist(binding["glob"]): - glob = builder.do_eval(glob) - if glob: - globpatterns.extend(aslist(glob)) - - # rebase glob pattern as applicable (see note) - for glob in list(globpatterns): - if not any(glob.startswith(part) for part in [".", "/", "~"]) and "/" in glob: - glob = builder.do_eval(glob.split("/")[-1]) - if glob: - globpatterns.extend(aslist(glob)) - - for glob in globpatterns: - if glob.startswith(outdir): - glob = glob[len(outdir) + 1:] - elif glob == ".": - glob = outdir - elif glob.startswith("/"): - raise WorkflowException("glob patterns must not start with '/'") - try: - prefix = fs_access.glob(outdir) - key = cmp_to_key(cast(Callable[[Text, Text], int], locale.strcoll)) - - # In case of stdout.log or stderr.log file not created - if "stdout" in self.tool and "stderr" in self.tool \ - and glob in (self.tool["stdout"], self.tool["stderr"]): - filepath = Path(fs_access.join(outdir, glob)) - if not filepath.is_file(): - Path(filepath).touch() - - result.extend([{ - "location": g, - "path": fs_access.join(builder.outdir, g[len(prefix[0])+1:]), - "basename": os.path.basename(g), - "nameroot": os.path.splitext(os.path.basename(g))[0], - "nameext": os.path.splitext(os.path.basename(g))[1], - "class": "File" if fs_access.isfile(g) else "Directory" - } for g in sorted(fs_access.glob(fs_access.join(outdir, glob)), key=key)]) - except (OSError, IOError) as exc: - LOGGER.warning(Text(exc)) - except Exception: - LOGGER.exception("Unexpected error from fs_access") - raise - - for files in result: - rfile = files.copy() - # TODO This function raise an exception and seems to be related to docker (which is not used here) - # revmap(rfile) - if files["class"] == "Directory": - load_listing = builder.loadListing or (binding and binding.get("loadListing")) - if load_listing and load_listing != "no_listing": - get_listing(fs_access, files, (load_listing == "deep_listing")) - else: - with fs_access.open(rfile["location"], "rb") as f: - contents = b"" - if binding.get("loadContents") or compute_checksum: - contents = f.read(CONTENT_LIMIT) - if binding.get("loadContents"): - files["contents"] = contents.decode("utf-8") - if compute_checksum: - checksum = hashlib.sha1() # nosec: B303 - while contents != b"": - checksum.update(contents) - contents = f.read(1024 * 1024) - files["checksum"] = f"sha1${checksum.hexdigest()}" - f.seek(0, 2) - file_size = f.tell() - files["size"] = file_size - - optional = False - single = False - if isinstance(schema["type"], list): - if "null" in schema["type"]: - optional = True - if "File" in schema["type"] or "Directory" in schema["type"]: - single = True - elif schema["type"] == "File" or schema["type"] == "Directory": - single = True - - if "outputEval" in binding: - with SourceLine(binding, "outputEval", WorkflowException, debug): - result = builder.do_eval(binding["outputEval"], context=result) - - if single: - if not result and not optional: - with SourceLine(binding, "glob", WorkflowException, debug): - raise WorkflowException(f"Did not find output file with glob pattern: '{globpatterns}'") - elif not result and optional: - pass - elif isinstance(result, list): - if len(result) > 1: - raise WorkflowException("Multiple matches for output item that is a single file.") - result = result[0] - - if "secondaryFiles" in schema: - with SourceLine(schema, "secondaryFiles", WorkflowException, debug): - for primary in aslist(result): - if isinstance(primary, dict): - primary.setdefault("secondaryFiles", []) - pathprefix = primary["path"][0:primary["path"].rindex("/")+1] - for file in aslist(schema["secondaryFiles"]): - if isinstance(file, dict) or "$(" in file or "${" in file: - sfpath = builder.do_eval(file, context=primary) - subst = False - else: - sfpath = file - subst = True - for sfitem in aslist(sfpath): - if isinstance(sfitem, str): - if subst: - sfitem = {"path": substitute(primary["path"], sfitem)} - else: - sfitem = {"path": pathprefix+sfitem} - if "path" in sfitem and "location" not in sfitem: - revmap(sfitem) - if fs_access.isfile(sfitem["location"]): - sfitem["class"] = "File" - primary["secondaryFiles"].append(sfitem) - elif fs_access.isdir(sfitem["location"]): - sfitem["class"] = "Directory" - primary["secondaryFiles"].append(sfitem) - - if "format" in schema: - for primary in aslist(result): - primary["format"] = builder.do_eval(schema["format"], context=primary) - - # Ensure files point to local references outside of the run environment - # TODO: Again removing revmap.... - # adjustFileObjs(result, revmap) - - if not result and optional: - return None - - if not empty_and_optional and isinstance(schema["type"], dict) and schema["type"]["type"] == "record": - out = {} - for f in schema["type"]["fields"]: - out[shortname(f["name"])] = self.collect_output( # type: ignore - f, builder, outdir, fs_access, - compute_checksum=compute_checksum) - return out - return result - - -class WpsWorkflowJob(JobBase): + if "outputBinding" in schema and "glob" in schema["outputBinding"]: + glob = schema["outputBinding"]["glob"] + glob = os.path.split(glob)[-1] + schema["outputBinding"]["glob"] = glob + return super(WpsWorkflow, self).collect_output( + schema, + builder, + outdir, + fs_access, + compute_checksum=compute_checksum, + ) + + +class WpsWorkflowJob(CommandLineJob): def __init__(self, builder, # type: Builder - job_order, # type: Dict[Text, Union[Dict[Text, Any], List, Text, None]] - requirements, # type: List[Dict[Text, Text]] - hints, # type: List[Dict[Text, Text]] - name, # type: Text + job_order, # type: CWLObjectType + make_path_mapper, # type: Callable[..., PathMapper] + requirements, # type: CWL_RequirementsList + hints, # type: CWL_RequirementsList + name, # type: str wps_process, # type: WpsProcessInterface expected_outputs, # type: List[CWL_Output_Type] ): # type: (...) -> None - super(WpsWorkflowJob, self).__init__(builder, job_order, None, requirements, hints, name) - self.wps_process = wps_process - self.expected_outputs = {} # type: CWL_ExpectedOutputs # {id: file-pattern} + super(WpsWorkflowJob, self).__init__(builder, job_order, make_path_mapper, requirements, hints, name) + + # avoid error on builder 'revmap' when 'WpsWorkflow.collect_output' gets called + builder.pathmapper = self.pathmapper + + self.wps_process = wps_process # type: WpsProcessInterface + self.expected_outputs = {} # type: CWL_ExpectedOutputs # {id: file-pattern} for output in expected_outputs: # TODO Should we support something else? if is_cwl_file_type(output): @@ -454,136 +232,23 @@ def __init__(self, # "outputBinding": {"glob": output_name } # } output_id = shortname(output["id"]) - self.expected_outputs[output_id] = output["outputBinding"]["glob"] - - def _required_env(self): - # type: () -> Dict[str, str] - env = {} - env["HOME"] = self.outdir - env["TMPDIR"] = self.tmpdir - env["PATH"] = os.environ["PATH"] - if "SYSTEMROOT" in os.environ: - env["SYSTEMROOT"] = os.environ["SYSTEMROOT"] - return env - - def run(self, - runtimeContext, # type: RuntimeContext - tmpdir_lock=None, # type: Optional[ThreadLock] - ): # type: (...) -> None - - make_dirs(self.tmpdir, exist_ok=True) - env = self._required_env() - vars_to_preserve = runtimeContext.preserve_environment - if runtimeContext.preserve_entire_environment: - vars_to_preserve = os.environ - if vars_to_preserve is not None: - for key, value in os.environ.items(): - if key in vars_to_preserve and key not in env: - # On Windows, subprocess env can't handle unicode. - env[key] = value - - # stageFiles(self.pathmapper, ignoreWritable=True, symLink=True, secret_store=runtimeContext.secret_store) - if self.generatemapper: - # FIXME: see if this is needed... func doesn't exist anymore in cwltool 2.x - # stageFiles(self.generatemapper, ignoreWritable=self.inplace_update, - # symLink=True, secret_store=runtimeContext.secret_store) - relink_initialworkdir(self.generatemapper, self.outdir, - self.builder.outdir, inplace_update=self.inplace_update) - - self.execute([], env, runtimeContext) + output_glob = output["outputBinding"]["glob"].split("/")[-1] + self.expected_outputs[output_id] = ( + output_id + "/" + output_glob + if self.wps_process.stage_output_id_nested else + output_glob + ) # pylint: disable=W0221,arguments-differ # naming using python like arguments - def execute(self, runtime, env, runtime_context): # noqa: E811 - # type: (List[Text], MutableMapping[Text, Text], RuntimeContext) -> None + def _execute(self, + runtime, # type: List[str] + env, # type: MutableMapping[str, str] + runtime_context, # type: RuntimeContext + monitor_function=None, # type: MonitorFunction + ): # type: (...) -> None """ Execute the :term:`WPS` :term:`Process` defined as :term:`Workflow` step and chains their intermediate results. """ - - # pylint: disable=R1260,too-complex # FIXME: simplify operations - self.wps_process.execute(self.builder.job, self.outdir, self.expected_outputs) - - if self.joborder and runtime_context.research_obj: - job_order = self.joborder - assert runtime_context.prov_obj - assert runtime_context.process_run_id - runtime_context.prov_obj.used_artifacts(job_order, runtime_context.process_run_id, str(self.name)) - outputs = {} # type: Dict[Text, Text] - try: - rcode = 0 - - if self.successCodes: - process_status = "success" - elif self.temporaryFailCodes: - process_status = "temporaryFail" - elif self.permanentFailCodes: - process_status = "permanentFail" - elif rcode == 0: - process_status = "success" - else: - process_status = "permanentFail" - - if self.generatefiles["listing"]: - assert self.generatemapper is not None - relink_initialworkdir( - self.generatemapper, self.outdir, self.builder.outdir, - inplace_update=self.inplace_update) - - outputs = self.collect_outputs(self.outdir) # type: ignore - outputs = bytes2str_in_dicts(outputs) # type: ignore - except OSError as exc: - if exc.errno == 2: - if runtime: - LOGGER.exception(u"'%s' not found", runtime[0]) - else: - LOGGER.exception(u"'%s' not found", self.command_line[0]) - else: - LOGGER.exception("Exception while running job") - process_status = "permanentFail" - except WorkflowException as err: - LOGGER.exception(u"[job %s] Job error:\n%s", self.name, err) - process_status = "permanentFail" - except Exception: # noqa: W0703 # nosec: B110 - LOGGER.exception("Exception while running job") - process_status = "permanentFail" - if runtime_context.research_obj and self.prov_obj and runtime_context.process_run_id: - # creating entities for the outputs produced by each step (in the provenance document) - self.prov_obj.generate_output_prov( - outputs, runtime_context.process_run_id, str(self.name)) - self.prov_obj.document.wasEndedBy( - runtime_context.process_run_id, None, self.prov_obj.workflow_run_uri, - now()) - if process_status != "success": - LOGGER.warning(u"[job %s] completed %s", self.name, process_status) - else: - LOGGER.info(u"[job %s] completed %s", self.name, process_status) - - if LOGGER.isEnabledFor(logging.DEBUG): - LOGGER.debug(u"[job %s] %s", self.name, json.dumps(outputs, indent=4)) - - if self.generatemapper and runtime_context.secret_store: - # Delete any runtime-generated files containing secrets. - for _, path_item in self.generatemapper.items(): - if path_item.type == "CreateFile": - if runtime_context.secret_store.has_secret(path_item.resolved): - host_outdir = self.outdir - container_outdir = self.builder.outdir - host_outdir_tgt = path_item.target - if path_item.target.startswith(container_outdir + "/"): - host_outdir_tgt = os.path.join( - host_outdir, path_item.target[len(container_outdir)+1:]) - os.remove(host_outdir_tgt) - - if runtime_context.workflow_eval_lock is None: - raise WorkflowException("runtime_context.workflow_eval_lock must not be None") - - with runtime_context.workflow_eval_lock: - self.output_callback(outputs, process_status) - - if self.stagedir and os.path.exists(self.stagedir): - LOGGER.debug(u"[job %s] Removing input staging directory %s", self.name, self.stagedir) - shutil.rmtree(self.stagedir, True) - - if runtime_context.rm_tmpdir: - LOGGER.debug(u"[job %s] Removing temporary directory %s", self.name, self.tmpdir) - shutil.rmtree(self.tmpdir, True) + outputs = self.collect_outputs(self.outdir, 0) + self.output_callback(outputs, "success") diff --git a/weaver/typedefs.py b/weaver/typedefs.py index c59fe93da..040cd1ea0 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -89,12 +89,6 @@ _JsonItem = Union[AnyValueType, _JsonObjectItem, _JsonListItem, _JSON] JSON = Union[Dict[str, _JsonItem], List[_JsonItem], AnyValueType] - # JSON-like definition employed by cwltool - try: - from ruamel.yaml.comments import CommentedMap - except (AttributeError, ImportError, NameError): - CommentedMap = Dict[str, JSON] - Link = TypedDict("Link", { "rel": str, "title": str, @@ -216,10 +210,17 @@ }) CWL_WorkflowStepPackageMap = Dict[CWL_WorkflowStepID, CWL_WorkflowStepPackage] + # JSON-like definition employed by cwltool + try: + from ruamel.yaml.comments import CommentedMap + + CWL_ToolPathObject = CommentedMap # CWL document definition + except (AttributeError, ImportError, NameError): + CWL_ToolPathObject = CWL # CWL document definition + # CWL loading - CWL_WorkflowInputs = Dict[str, AnyValueType] # mapping of ID:value (any type) - CWL_ExpectedOutputs = Dict[str, AnyValueType] # mapping of ID:pattern (File only) - CWL_ToolPathObjectType = Union[Dict[str, Any], CommentedMap] # CWL document definition + CWL_WorkflowInputs = Dict[str, AnyValueType] # mapping of ID:value (any type) + CWL_ExpectedOutputs = Dict[str, AnyValueType] # mapping of ID:pattern (File only) JobProcessDefinitionCallback = Callable[[str, Dict[str, str], Dict[str, Any]], WpsProcessInterface] # CWL runtime From 5179179874be1e9af9e2b5c0175ee3db488364f5 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 4 Oct 2022 11:48:07 -0400 Subject: [PATCH 12/19] adjust comments --- weaver/typedefs.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 040cd1ea0..7170aa4c6 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -210,13 +210,13 @@ }) CWL_WorkflowStepPackageMap = Dict[CWL_WorkflowStepID, CWL_WorkflowStepPackage] - # JSON-like definition employed by cwltool + # JSON-like CWL definition employed by cwltool try: from ruamel.yaml.comments import CommentedMap - CWL_ToolPathObject = CommentedMap # CWL document definition + CWL_ToolPathObject = CommentedMap except (AttributeError, ImportError, NameError): - CWL_ToolPathObject = CWL # CWL document definition + CWL_ToolPathObject = CWL # CWL loading CWL_WorkflowInputs = Dict[str, AnyValueType] # mapping of ID:value (any type) From 6bf968043bd230694786585b70ce100356c23bd3 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 4 Oct 2022 11:52:11 -0400 Subject: [PATCH 13/19] revert log result check --- weaver/processes/wps_package.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/weaver/processes/wps_package.py b/weaver/processes/wps_package.py index cb4f61679..050974039 100644 --- a/weaver/processes/wps_package.py +++ b/weaver/processes/wps_package.py @@ -1017,9 +1017,6 @@ def insert_package_log(self, result): if isinstance(result, CWLException): result = getattr(result, "out") status = Status.FAILED - if not result: - LOGGER.warning("Could not retrieve any internal application log from empty result.") - return [] stderr_file = result.get(self.package_log_hook_stderr, {}).get("location", "").replace("file://", "") stdout_file = result.get(self.package_log_hook_stdout, {}).get("location", "").replace("file://", "") with_stderr_file = os.path.isfile(stderr_file) From 2f3f808bbf4f32481b8938ea66dbe63ac8b99f83 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 4 Oct 2022 14:22:10 -0400 Subject: [PATCH 14/19] avoid missing stderr/stdout validation error for WpsWorkflow steps not expecting any --- weaver/processes/wps_package.py | 5 ++++- weaver/processes/wps_workflow.py | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/weaver/processes/wps_package.py b/weaver/processes/wps_package.py index 050974039..f7101e73e 100644 --- a/weaver/processes/wps_package.py +++ b/weaver/processes/wps_package.py @@ -1362,7 +1362,10 @@ def _handler(self, request, response): self.package_requirement = get_application_requirement(self.package) try: # workflows do not support stdout/stderr - log_stdout_stderr = self.package_type != ProcessType.WORKFLOW + log_stdout_stderr = ( + self.package_type != ProcessType.WORKFLOW + and self.package_requirement.get("class") not in CWL_REQUIREMENT_APP_REMOTE + ) self.setup_loggers(log_stdout_stderr) self.update_status("Preparing package logs done.", PACKAGE_PROGRESS_PREP_LOG, Status.RUNNING) except Exception as exc: diff --git a/weaver/processes/wps_workflow.py b/weaver/processes/wps_workflow.py index 11fc94fd4..df0e43e4b 100644 --- a/weaver/processes/wps_workflow.py +++ b/weaver/processes/wps_workflow.py @@ -196,13 +196,14 @@ def collect_output( glob = schema["outputBinding"]["glob"] glob = os.path.split(glob)[-1] schema["outputBinding"]["glob"] = glob - return super(WpsWorkflow, self).collect_output( + output = super(WpsWorkflow, self).collect_output( schema, builder, outdir, fs_access, compute_checksum=compute_checksum, ) + return output or {} class WpsWorkflowJob(CommandLineJob): From 7fa9973cf9879987b99fdb90621d958201536bd7 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 4 Oct 2022 14:31:50 -0400 Subject: [PATCH 15/19] fix lint arg name --- weaver/processes/wps_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaver/processes/wps_workflow.py b/weaver/processes/wps_workflow.py index df0e43e4b..38b8d2c3b 100644 --- a/weaver/processes/wps_workflow.py +++ b/weaver/processes/wps_workflow.py @@ -240,7 +240,7 @@ def __init__(self, output_glob ) - # pylint: disable=W0221,arguments-differ # naming using python like arguments + # pylint: disable=W0221,W0237 # naming using python like arguments def _execute(self, runtime, # type: List[str] env, # type: MutableMapping[str, str] From c677c125c46c9ca5e15430e8502a5a8758ce4855 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 4 Oct 2022 14:49:14 -0400 Subject: [PATCH 16/19] fix security check --- weaver/processes/wps_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaver/processes/wps_workflow.py b/weaver/processes/wps_workflow.py index 38b8d2c3b..9d461db27 100644 --- a/weaver/processes/wps_workflow.py +++ b/weaver/processes/wps_workflow.py @@ -30,7 +30,7 @@ from weaver.wps.utils import get_wps_output_dir if TYPE_CHECKING: - from subprocess import Popen + from subprocess import Popen # nosec: B404 from typing import Any, Callable, Generator, List, MutableMapping, Optional from cwltool.builder import Builder From 14504312a7f3e226ee877576ec948ed1604c1403 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 4 Oct 2022 15:55:15 -0400 Subject: [PATCH 17/19] fix import lint --- weaver/processes/wps_workflow.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/weaver/processes/wps_workflow.py b/weaver/processes/wps_workflow.py index 9d461db27..b0c2f44ea 100644 --- a/weaver/processes/wps_workflow.py +++ b/weaver/processes/wps_workflow.py @@ -9,12 +9,7 @@ from cwltool.context import LoadingContext, RuntimeContext, getdefault from cwltool.errors import WorkflowException from cwltool.job import CommandLineJob -from cwltool.process import ( - Process as ProcessCWL, - shortname, - supportedProcessRequirements, - uniquename -) +from cwltool.process import Process as ProcessCWL, shortname, supportedProcessRequirements, uniquename from cwltool.stdfsaccess import StdFsAccess from cwltool.workflow import Workflow From fc95daa49cc1f0be24346467f8103e42f7153907 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Tue, 4 Oct 2022 16:15:45 -0400 Subject: [PATCH 18/19] remove unused typing import --- weaver/processes/wps_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaver/processes/wps_workflow.py b/weaver/processes/wps_workflow.py index b0c2f44ea..d146325ce 100644 --- a/weaver/processes/wps_workflow.py +++ b/weaver/processes/wps_workflow.py @@ -26,7 +26,7 @@ if TYPE_CHECKING: from subprocess import Popen # nosec: B404 - from typing import Any, Callable, Generator, List, MutableMapping, Optional + from typing import Any, Callable, List, MutableMapping, Optional from cwltool.builder import Builder from cwltool.pathmapper import PathMapper From ae6306c7288095b685169faa3839dd272f796200 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Wed, 5 Oct 2022 09:59:31 -0400 Subject: [PATCH 19/19] update changelog (fix #154) --- CHANGES.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index a41c6d930..7943db1e1 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,7 +12,9 @@ Changes Changes: -------- -- No change. +- Refactor ``weaver.processes.wps_workflow`` definitions to delegate implementation to ``cwltool`` core classes, + removing code duplication and allowing update to latest revisions + (resolves `#154 `_). Fixes: ------