Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updatable inputs #317

Merged
merged 30 commits into from
May 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2f9136d
Implement mutation manager to validate that files can be safely modif…
Feb 28, 2017
ca0611b
Test workflows for mutable inputs.
Feb 28, 2017
da1cea7
Add in-place update tests.
Mar 1, 2017
33a29c8
Fix elif
Mar 1, 2017
e454c20
Set up Docker binds to support writable items.
Mar 2, 2017
a09cf8f
Didn't break anything.
Mar 3, 2017
3c6bf89
Inplace update works with & without Docker.
Mar 3, 2017
887c685
Add InplaceUpdateRequirement.
Mar 16, 2017
055efd9
Refactor job to split Docker and non-Docker execution into separate c…
Mar 3, 2017
d0168e9
Manage readers for inplace updates.
Mar 4, 2017
8c3e6d0
Record readers, last update step.
Mar 4, 2017
5cf82f9
Add testing.
Mar 6, 2017
a8d2a97
Test & mypy fixes WIP.
Mar 7, 2017
35966c9
Make mypy happying.
Mar 7, 2017
0dfe86a
Fix imports for casts.
Mar 7, 2017
0043494
Fix WritableDirectory. Add tests.
Mar 10, 2017
648490f
Rebase on master. Fix tests.
Mar 16, 2017
973d855
Check that readers is not falsy.
Mar 16, 2017
3e0caa2
Downgrade validation warnings when checking input & output objects.
Mar 24, 2017
bef5afc
Recursive copy (#356)
gijzelaerr Mar 30, 2017
caa992a
Merge branch 'master' into updatable-inputs
Mar 30, 2017
5c9fc20
Fix lint
Mar 30, 2017
10ff572
Merge branch 'master' into updatable-inputs
Apr 28, 2017
a80d8f1
Merge branch 'master' into updatable-inputs
tetron May 3, 2017
07423ec
Use logger filter, less hacky and should make tox happier.
May 10, 2017
ca81c56
Better handling for relocating directory outputs to final output.
May 10, 2017
328d871
Tox fix
May 10, 2017
d86c923
Fix outputdir
May 10, 2017
cf5cad3
Tox 2
May 10, 2017
8ff477c
Tox 3
May 10, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cwltool/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from .pathmapper import PathMapper, normalizeFilesDirs, get_listing, visit_class
from .stdfsaccess import StdFsAccess
from .utils import aslist
from .mutation import MutationManager

CONTENT_LIMIT = 64 * 1024

Expand Down Expand Up @@ -41,6 +42,7 @@ def __init__(self): # type: () -> None
self.make_fs_access = None # type: Type[StdFsAccess]
self.build_job_script = None # type: Callable[[List[str]], Text]
self.debug = False # type: bool
self.mutation_manager = None # type: MutationManager

# One of "no_listing", "shallow_listing", "deep_listing"
# Will be default "no_listing" for CWL v1.1
Expand Down
74 changes: 58 additions & 16 deletions cwltool/draft2tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
import shellescape
from schema_salad.ref_resolver import file_uri, uri_file_path
from schema_salad.sourceline import SourceLine, indent
from typing import Any, Callable, cast, Generator, Text, Union
from typing import Any, Callable, cast, Generator, Text, Union, Dict

from .builder import CONTENT_LIMIT, substitute, Builder
from .pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from .errors import WorkflowException
from .job import CommandLineJob
from .job import JobBase, CommandLineJob, DockerCommandLineJob
from .pathmapper import PathMapper, get_listing, trim_listing
from .process import Process, shortname, uniquename, normalizeFilesDirs, compute_checksums
from .process import Process, shortname, uniquename, normalizeFilesDirs, compute_checksums, _logger_validation_warnings
from .stdfsaccess import StdFsAccess
from .utils import aslist

Expand Down Expand Up @@ -148,8 +148,9 @@ def run(self, **kwargs):

# map files to assigned path inside a container. We need to also explicitly
# walk over input as implicit reassignment doesn't reach everything in builder.bindings
def check_adjust(builder, f):
# type: (Builder, Dict[Text, Any]) -> Dict[Text, Any]
def check_adjust(builder, stepname, f):
# type: (Builder, Text, Dict[Text, Any]) -> Dict[Text, Any]

f["path"] = builder.pathmapper.mapper(f["location"])[1]
f["dirname"], f["basename"] = os.path.split(f["path"])
if f["class"] == "File":
Expand All @@ -171,20 +172,23 @@ def __init__(self, toolpath_object, **kwargs):
# type: (Dict[Text, Any], **Any) -> None
super(CommandLineTool, self).__init__(toolpath_object, **kwargs)

def makeJobRunner(self): # type: () -> CommandLineJob
return CommandLineJob()
def makeJobRunner(self): # type: () -> JobBase
dockerReq, _ = self.get_requirement("DockerRequirement")
if dockerReq:
return DockerCommandLineJob()
else:
return CommandLineJob()

def makePathMapper(self, reffiles, stagedir, **kwargs):
# type: (List[Any], Text, **Any) -> PathMapper
dockerReq, _ = self.get_requirement("DockerRequirement")
return PathMapper(reffiles, kwargs["basedir"], stagedir)

def job(self,
job_order, # type: Dict[Text, Text]
output_callbacks, # type: Callable[[Any, Any], Any]
**kwargs # type: Any
):
# type: (...) -> Generator[Union[CommandLineJob, CallbackJob], None, None]
# type: (...) -> Generator[Union[JobBase, CallbackJob], None, None]

jobname = uniquename(kwargs.get("name", shortname(self.tool.get("id", "job"))))

Expand All @@ -199,9 +203,9 @@ def job(self,
cachebuilder.stagedir,
separateDirs=False)
_check_adjust = partial(check_adjust, cachebuilder)

visit_class([cachebuilder.files, cachebuilder.bindings],
("File", "Directory"), _check_adjust)

cmdline = flatten(map(cachebuilder.generate_arg, cachebuilder.bindings))
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
if docker_req and kwargs.get("use_container") is not False:
Expand Down Expand Up @@ -296,7 +300,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
_logger.debug(u"[job %s] path mappings is %s", j.name,
json.dumps({p: builder.pathmapper.mapper(p) for p in builder.pathmapper.files()}, indent=4))

_check_adjust = partial(check_adjust, builder)
_check_adjust = partial(check_adjust, builder, jobname)

visit_class([builder.files, builder.bindings], ("File", "Directory"), _check_adjust)

Expand Down Expand Up @@ -368,8 +372,38 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
ls[i] = t["entry"]
j.generatefiles[u"listing"] = ls

inplaceUpdateReq = self.get_requirement("http://commonwl.org/cwltool#InplaceUpdateRequirement")[0]

if inplaceUpdateReq:
j.inplace_update = inplaceUpdateReq["inplaceUpdate"]
normalizeFilesDirs(j.generatefiles)

readers = {}
muts = set()

def register_mut(f):
muts.add(f["location"])
builder.mutation_manager.register_mutation(j.name, f)

def register_reader(f):
if f["location"] not in muts:
builder.mutation_manager.register_reader(j.name, f)
readers[f["location"]] = f

for li in j.generatefiles["listing"]:
li = cast(Dict[Text, Any], li)
if li.get("writable") and j.inplace_update:
adjustFileObjs(li, register_mut)
adjustDirObjs(li, register_mut)
else:
adjustFileObjs(li, register_reader)
adjustDirObjs(li, register_reader)

adjustFileObjs(builder.files, register_reader)
adjustFileObjs(builder.bindings, register_reader)
adjustDirObjs(builder.files, register_reader)
adjustDirObjs(builder.bindings, register_reader)

j.environment = {}
evr = self.get_requirement("EnvVarRequirement")[0]
if evr:
Expand All @@ -391,16 +425,17 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
j.pathmapper = builder.pathmapper
j.collect_outputs = partial(
self.collect_output_ports, self.tool["outputs"], builder,
compute_checksum=kwargs.get("compute_checksum", True))
compute_checksum=kwargs.get("compute_checksum", True),
jobname=jobname,
readers=readers)
j.output_callback = output_callbacks

yield j

def collect_output_ports(self, ports, builder, outdir, compute_checksum=True):
# type: (Set[Dict[Text, Any]], Builder, Text, bool) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]]
def collect_output_ports(self, ports, builder, outdir, compute_checksum=True, jobname="", readers=None):
# type: (Set[Dict[Text, Any]], Builder, Text, bool, Text, Dict[Text, Any]) -> Dict[Text, Union[Text, List[Any], Dict[Text, Any]]]
ret = {} # type: Dict[Text, Union[Text, List[Any], Dict[Text, Any]]]
try:

fs_access = builder.make_fs_access(outdir)
custom_output = fs_access.join(outdir, "cwl.output.json")
if fs_access.exists(custom_output):
Expand Down Expand Up @@ -429,14 +464,21 @@ def collect_output_ports(self, ports, builder, outdir, compute_checksum=True):
visit_class(ret, ("File", "Directory"), cast(Callable[[Any], Any], revmap))
visit_class(ret, ("File", "Directory"), remove_path)
normalizeFilesDirs(ret)
adjustFileObjs(ret, builder.mutation_manager.set_generation)
visit_class(ret, ("File", "Directory"), partial(check_valid_locations, fs_access))

if compute_checksum:
adjustFileObjs(ret, partial(compute_checksums, fs_access))

validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret)
validate.validate_ex(self.names.get_name("outputs_record_schema", ""), ret,
strict=False, logger=_logger_validation_warnings)
return ret if ret is not None else {}
except validate.ValidationException as e:
raise WorkflowException("Error validating output record. " + Text(e) + "\n in " + json.dumps(ret, indent=4))
finally:
if readers:
for r in readers.values():
builder.mutation_manager.release_reader(jobname, r)

def collect_output(self, schema, builder, outdir, fs_access, compute_checksum=True):
# type: (Dict[Text, Any], Builder, Text, StdFsAccess, bool) -> Union[Dict[Text, Any], List[Union[Dict[Text, Any], Text]]]
Expand Down
16 changes: 15 additions & 1 deletion cwltool/extensions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,18 @@ $graph:
type:
- type: enum
name: LoadListingEnum
symbols: [no_listing, shallow_listing, deep_listing]
symbols: [no_listing, shallow_listing, deep_listing]

- name: InplaceUpdateRequirement
type: record
inVocab: false
extends: cwl:ProcessRequirement
fields:
class:
type: string
doc: "Always 'InplaceUpdateRequirement'"
jsonldPredicate:
"_id": "@type"
"_type": "@vocab"
inplaceUpdate:
type: boolean
Loading