Skip to content

Commit

Permalink
Merge branch 'master' into pack
Browse files Browse the repository at this point in the history
Conflicts:
	cwltool/load_tool.py
	cwltool/main.py
  • Loading branch information
Peter Amstutz committed May 10, 2016
2 parents 8dd0cf3 + 01e6bda commit cfdff3b
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 156 deletions.
6 changes: 3 additions & 3 deletions cwltool/cwlrdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any, Union, Dict, IO

def makerdf(workflow, wf, ctx):
# type: (str, Dict[str,Any], Loader.ContextType) -> Graph
# type: (Union[str, unicode], Dict[str,Any], Loader.ContextType) -> Graph
prefixes = {}
for k,v in ctx.iteritems():
if isinstance(v, dict):
Expand All @@ -31,7 +31,7 @@ def makerdf(workflow, wf, ctx):
return g

def printrdf(workflow, wf, ctx, sr, stdout):
# type: (str, Dict[str,Any], Loader.ContextType, str, IO[Any]) -> None
# type: (Union[str, unicode], Dict[str, Any], Loader.ContextType, str, IO[Any]) -> None
stdout.write(makerdf(workflow, wf, ctx).serialize(format=sr))

def lastpart(uri): # type: (Any) -> str
Expand Down Expand Up @@ -172,7 +172,7 @@ def dot_without_parameters(g, stdout): # type: (Graph, IO[Any]) -> None


def printdot(workflow, wf, ctx, stdout, include_parameters=False):
# type: (str, Dict[str,Any], Loader.ContextType, Any, bool) -> None
# type: (Union[str, unicode], Dict[str, Any], Loader.ContextType, Any, bool) -> None
g = makerdf(workflow, wf, ctx)

stdout.write("digraph {")
Expand Down
10 changes: 6 additions & 4 deletions cwltool/factory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from . import main
from . import load_tool
from . import workflow
import os
from .process import Process
Expand All @@ -23,8 +24,9 @@ def __init__(self, makeTool=workflow.defaultMakeTool,
self.executor = executor
self.execkwargs = execkwargs

def make(self, cwl, frag=None, debug=False):
l = main.load_tool(cwl, self.makeTool)
if type(l) == int:
def make(self, cwl):
"""Instantiate a CWL object from a CWl document."""
load = load_tool.load_tool(cwl, self.makeTool)
if isinstance(load, int):
raise Exception("Error loading tool")
return Callable(l, self)
return Callable(load, self)
85 changes: 51 additions & 34 deletions cwltool/load_tool.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,81 @@
# pylint: disable=unused-import
"""Loads a CWL document."""

import os
import logging
import re
import urlparse
import sys
import json
from schema_salad.ref_resolver import Loader
import schema_salad.validate as validate
import schema_salad.schema as schema
from avro.schema import Names
from . import update
from . import process
from .process import Process, shortname
from .errors import WorkflowException
from typing import Any, Callable, cast, Dict, Tuple, Union

_logger = logging.getLogger("cwltool")

def fetch_document(argsworkflow):
# type: (Union[str, unicode, dict[unicode, Any]]) -> Tuple[Loader, Dict[unicode, Any], unicode]
"""Retrieve a CWL document."""
document_loader = Loader({"cwl": "https://w3id.org/cwl/cwl#", "id": "@id"})

jobobj = None
uri = None # type: str
uri = None # type: unicode
workflowobj = None # type: Dict[unicode, Any]
if isinstance(argsworkflow, basestring):
if isinstance(argsworkflow, (str, unicode)):
split = urlparse.urlsplit(argsworkflow)
if split.scheme:
uri = argsworkflow
else:
uri = "file://" + os.path.abspath(argsworkflow)
fileuri, urifrag = urlparse.urldefrag(uri)
fileuri = urlparse.urldefrag(uri)[0]
workflowobj = document_loader.fetch(fileuri)
elif isinstance(argsworkflow, dict):
workflowobj = argsworkflow
uri = "#" + str(id(argsworkflow))
else:
raise validate.ValidationException("Must be URI or object: '%s'" % argsworkflow)
raise validate.ValidationException(
"Must be URI or object: '%s'" % argsworkflow)

return document_loader, workflowobj, uri


def validate_document(document_loader, workflowobj, uri,
enable_dev=False, strict=True, preprocess_only=False):
# type: (Loader, Dict[unicode, Any], unicode, bool, bool, bool) -> Tuple[Loader, Names, Any, Dict[str, str], unicode]
"""Validate a CWL document."""
jobobj = None
if "cwl:tool" in workflowobj:
jobobj = workflowobj
uri = urlparse.urljoin(uri, jobobj["cwl:tool"])
del jobobj["cwl:tool"]
workflowobj = fetch_document(uri)
workflowobj = fetch_document(uri)[1]

if isinstance(workflowobj, list):
workflowobj = {
"$graph": workflowobj
}

fileuri, urifrag = urlparse.urldefrag(uri)
fileuri = urlparse.urldefrag(uri)[0]

if "cwlVersion" in workflowobj:
workflowobj["cwlVersion"] = re.sub(r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "", workflowobj["cwlVersion"])
workflowobj["cwlVersion"] = re.sub(
r"^(?:cwl:|https://w3id.org/cwl/cwl#)", "",
workflowobj["cwlVersion"])
else:
_logger.warn("No cwlVersion found, treating this file as draft-2.")
workflowobj["cwlVersion"] = "draft-2"

if workflowobj["cwlVersion"] == "draft-2":
workflowobj = update._draft2toDraft3dev1(workflowobj, document_loader, uri, updateSteps=False)
workflowobj = update._draft2toDraft3dev1(
workflowobj, document_loader, uri, update_steps=False)
if "@graph" in workflowobj:
workflowobj["$graph"] = workflowobj["@graph"]
del workflowobj["@graph"]

(document_loader, avsc_names, schema_metadata, schema_loader) = process.get_schema(workflowobj["cwlVersion"])
(document_loader, avsc_names) = \
process.get_schema(workflowobj["cwlVersion"])[:2]

if isinstance(avsc_names, Exception):
raise avsc_names
Expand All @@ -83,26 +94,33 @@ def validate_document(document_loader, workflowobj, uri,
document_loader.validate_links(processobj)
schema.validate_doc(avsc_names, processobj, document_loader, strict)

if metadata.get("cwlVersion") != update.latest:
processobj = update.update(processobj, document_loader, fileuri, enable_dev, metadata)
if metadata.get("cwlVersion") != update.LATEST:
processobj = update.update(
processobj, document_loader, fileuri, enable_dev, metadata)

if jobobj:
metadata["cwl:defaults"] = jobobj

return document_loader, avsc_names, processobj, metadata, uri


def make_tool(document_loader, avsc_names, processobj, metadata, uri, makeTool, kwargs):
processobj, _ = document_loader.resolve_ref(uri)
def make_tool(document_loader, avsc_names, processobj, metadata, uri, makeTool,
kwargs):
# type: (Loader, Names, Dict[str, Any], Dict[str, Any], unicode, Callable[..., Process], Dict[str, Any]) -> Process
"""Make a Python CWL object."""
resolveduri = document_loader.resolve_ref(uri)[0]

if isinstance(processobj, list):
if 1 == len(processobj):
processobj = processobj[0]
if isinstance(resolveduri, list):
if len(resolveduri) == 1:
processobj = resolveduri[0]
else:
raise WorkflowException(u"Tool file contains graph of multiple objects, "
"must specify one of #%s" %
", #".join(urlparse.urldefrag(i["id"])[1]
for i in processobj if "id" in i))
raise WorkflowException(
u"Tool file contains graph of multiple objects, must specify "
"one of #%s" % ", #".join(
urlparse.urldefrag(i["id"])[1] for i in resolveduri
if "id" in i))
else:
processobj = cast(Dict[str, Any], resolveduri)

kwargs = kwargs.copy()
kwargs.update({
Expand All @@ -111,25 +129,24 @@ def make_tool(document_loader, avsc_names, processobj, metadata, uri, makeTool,
"avsc_names": avsc_names,
"metadata": metadata
})
t = makeTool(processobj, **kwargs)
tool = makeTool(processobj, **kwargs)

if "cwl:defaults" in metadata:
jobobj = metadata["cwl:defaults"]
for inp in t.tool["inputs"]:
for inp in tool.tool["inputs"]:
if shortname(inp["id"]) in jobobj:
inp["default"] = jobobj[shortname(inp["id"])]

return t
return tool


def load_tool(argsworkflow, makeTool, kwargs=None,
enable_dev=False,
strict=True):

# type: (Union[str,unicode,dict[unicode,Any]], Callable[...,Process], Dict[str, Any], bool, bool) -> Any
document_loader, workflowobj, uri = fetch_document(argsworkflow)
document_loader, avsc_names, processobj, metadata, uri = validate_document(document_loader,
workflowobj,
uri,
enable_dev=enable_dev,
strict=strict)
return make_tool(document_loader, avsc_names, processobj, metadata, uri, makeTool, kwargs if kwargs else {})
document_loader, avsc_names, processobj, metadata, uri = validate_document(
document_loader, workflowobj, uri, enable_dev=enable_dev,
strict=strict)
return make_tool(document_loader, avsc_names, processobj, metadata, uri,
makeTool, kwargs if kwargs else {})
89 changes: 51 additions & 38 deletions cwltool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,19 @@
import copy
from . import workflow
from .errors import WorkflowException
from . import process
from .cwlrdf import printrdf, printdot
from .process import shortname, Process
from .load_tool import fetch_document, validate_document, make_tool
import schema_salad.validate as validate
import tempfile
import schema_salad.jsonld_context
import schema_salad.makedoc
import yaml
import urlparse
from . import process
from . import job
from .cwlrdf import printrdf, printdot
import pkg_resources # part of setuptools
from . import update
from .process import shortname, Process
import rdflib
from load_tool import load_tool, fetch_document, validate_document, make_tool
import hashlib
from .utils import aslist
from typing import Union, Any, cast, Callable, Dict, Tuple, IO

_logger = logging.getLogger("cwltool")
Expand Down Expand Up @@ -136,14 +133,16 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
parser.add_argument("--relative-deps", choices=['primary', 'cwd'], default="primary",
help="When using --print-deps, print paths relative to primary file or current working directory.")

parser.add_argument("--enable-dev", action="store_true", help="Allow loading and running development versions of CWL spec.", default=False)
parser.add_argument("--enable-dev", action="store_true",
help="Allow loading and running development versions "
"of CWL spec.", default=False)

parser.add_argument("--enable-net", action="store_true",
help="Use docker's default networking for containers; the default is "
"to disable networking.")
help="Use docker's default networking for containers;"
" the default is to disable networking.")
parser.add_argument("--custom-net", type=str,
help="Will be passed to `docker run` as the '--net' parameter. "
"Implies '--enable-net'.")
help="Will be passed to `docker run` as the '--net' "
"parameter. Implies '--enable-net'.")

parser.add_argument("workflow", type=str, nargs="?", default=None)
parser.add_argument("job_order", nargs=argparse.REMAINDER)
Expand Down Expand Up @@ -291,8 +290,13 @@ def generate_parser(toolparser, tool, namemap):
_logger.debug(u"Can't make command line argument from %s", inptype)
return None

if inptype != "boolean":
typekw = { 'type': atype }
else:
typekw = {}

toolparser.add_argument(flag + name, required=required,
help=ahelp, action=action, type=atype, default=default)
help=ahelp, action=action, default=default, **typekw)

return toolparser

Expand Down Expand Up @@ -538,11 +542,10 @@ def main(argsl=None,
printdeps(workflowobj, document_loader, stdout, args.relative_deps)
return 0

document_loader, avsc_names, processobj, metadata, uri = validate_document(document_loader,
workflowobj, uri,
enable_dev=args.enable_dev,
strict=args.strict,
preprocess_only=args.print_pre or args.pack)
document_loader, avsc_names, processobj, metadata, uri \
= validate_document(document_loader, workflowobj, uri,
enable_dev=args.enable_dev, strict=args.strict,
preprocess_only=args.print_pre or args.pack)

if args.pack:
stdout.write(print_pack(document_loader, processobj, uri, metadata))
Expand All @@ -560,22 +563,26 @@ def main(argsl=None,
printdot(uri, processobj, document_loader.ctx, stdout)
return 0

t = make_tool(document_loader, avsc_names, processobj,
metadata, uri, makeTool, {})
except (validate.ValidationException) as e:
_logger.error(u"Tool definition failed validation:\n%s", e, exc_info=(e if args.debug else False))
tool = make_tool(document_loader, avsc_names, processobj, metadata,
uri, makeTool, {})
except (validate.ValidationException) as exc:
_logger.error(u"Tool definition failed validation:\n%s", exc,
exc_info=(exc if args.debug else False))
return 1
except (RuntimeError, WorkflowException) as e:
_logger.error(u"Tool definition failed initialization:\n%s", e, exc_info=(e if args.debug else False))
except (RuntimeError, WorkflowException) as exc:
_logger.error(u"Tool definition failed initialization:\n%s", exc,
exc_info=(exc if args.debug else False))
return 1
except Exception as e:
_logger.error(u"I'm sorry, I couldn't load this CWL file%s",
", try again with --debug for more information.\nThe error was: %s" % e if not args.debug else ". The error was:",
exc_info=(e if args.debug else False))
except Exception as exc:
_logger.error(
u"I'm sorry, I couldn't load this CWL file%s",
", try again with --debug for more information.\nThe error was: "
"%s" % exc if not args.debug else ". The error was:",
exc_info=(exc if args.debug else False))
return 1

if isinstance(t, int):
return t
if isinstance(tool, int):
return tool

if args.tmp_outdir_prefix != 'tmp':
# Use user defined temp directory (if it exists)
Expand All @@ -591,7 +598,7 @@ def main(argsl=None,
_logger.error("Temporary directory prefix doesn't exist.")
return 1

job_order_object = load_job_order(args, t, parser, stdin,
job_order_object = load_job_order(args, tool, parser, stdin,
print_input_deps=args.print_input_deps,
relative_deps=args.relative_deps,
stdout=stdout)
Expand All @@ -604,7 +611,7 @@ def main(argsl=None,
args.move_outputs = False

try:
out = executor(t, job_order_object[0],
out = executor(tool, job_order_object[0],
job_order_object[1], args,
conformance_test=args.conformance_test,
dry_run=args.dry_run,
Expand Down Expand Up @@ -633,14 +640,20 @@ def main(argsl=None,
stdout.flush()
else:
return 1
except (validate.ValidationException) as e:
_logger.error(u"Input object failed validation:\n%s", e, exc_info=(e if args.debug else False))
except (validate.ValidationException) as exc:
_logger.error(
u"Input object failed validation:\n%s", exc,
exc_info=(exc if args.debug else False))
return 1
except WorkflowException as e:
_logger.error(u"Workflow error, try again with --debug for more information:\n %s", e, exc_info=(e if args.debug else False))
except WorkflowException as exc:
_logger.error(
u"Workflow error, try again with --debug for more "
"information:\n %s", exc, exc_info=(exc if args.debug else False))
return 1
except Exception as e:
_logger.error(u"Unhandled error, try again with --debug for more information:\n %s", e, exc_info=(e if args.debug else False))
except Exception as exc:
_logger.error(
u"Unhandled error, try again with --debug for more information:\n"
" %s", exc, exc_info=(exc if args.debug else False))
return 1

return 0
Expand Down
Loading

0 comments on commit cfdff3b

Please sign in to comment.