From 2f5f1ab69f91f19d268f010b557b90ac1973cd99 Mon Sep 17 00:00:00 2001 From: Anton Khodak Date: Thu, 25 Jan 2018 18:33:12 +0200 Subject: [PATCH] Create separate classes for docker & singularity command line jobs --- cwltool/docker.py | 323 ++++++++++++++++++++++++++++------------- cwltool/draft2tool.py | 9 +- cwltool/job.py | 213 +++------------------------ cwltool/singularity.py | 196 ++++++++++++++++++------- 4 files changed, 388 insertions(+), 353 deletions(-) diff --git a/cwltool/docker.py b/cwltool/docker.py index 9db4dc1e23..c25b5ca0ae 100644 --- a/cwltool/docker.py +++ b/cwltool/docker.py @@ -1,124 +1,241 @@ from __future__ import absolute_import + import logging import os import re +import shutil import subprocess import sys import tempfile from io import open -from typing import Dict, List, Text import requests +from typing import (Dict, List, Text) +from .docker_id import docker_vm_id from .errors import WorkflowException +from .job import ContainerCommandLineJob +from .pathmapper import PathMapper, ensure_writable +from .utils import docker_windows_path_adjust, onWindows _logger = logging.getLogger("cwltool") -def get_image(dockerRequirement, pull_image, dry_run=False): - # type: (Dict[Text, Text], bool, bool) -> bool - found = False - - if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement: - dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"] - - for ln in subprocess.check_output( - ["docker", "images", "--no-trunc", "--all"]).decode('utf-8').splitlines(): - try: - m = re.match(r"^([^ ]+)\s+([^ ]+)\s+([^ ]+)", ln) - sp = dockerRequirement["dockerImageId"].split(":") - if len(sp) == 1: - sp.append("latest") - elif len(sp) == 2: - # if sp[1] doesn't match valid tag names, it is a part of repository - if not re.match(r'[\w][\w.-]{0,127}', sp[1]): - sp[0] = sp[0] + ":" + sp[1] - sp[1] = "latest" - elif len(sp) == 3: - if re.match(r'[\w][\w.-]{0,127}', sp[2]): - sp[0] = sp[0] + ":" + sp[1] - sp[1] = sp[2] - del sp[2] - - # check for repository:tag match or image id match - if ((sp[0] == m.group(1) and sp[1] == m.group(2)) or dockerRequirement["dockerImageId"] == m.group(3)): - found = True - break - except ValueError: - pass - - if not found and pull_image: - cmd = [] # type: List[Text] - if "dockerPull" in dockerRequirement: - cmd = ["docker", "pull", str(dockerRequirement["dockerPull"])] - _logger.info(Text(cmd)) - if not dry_run: - subprocess.check_call(cmd, stdout=sys.stderr) - found = True - elif "dockerFile" in dockerRequirement: - dockerfile_dir = str(tempfile.mkdtemp()) - with open(os.path.join(dockerfile_dir, "Dockerfile"), "wb") as df: - df.write(dockerRequirement["dockerFile"].encode('utf-8')) - cmd = ["docker", "build", "--tag=%s" % - str(dockerRequirement["dockerImageId"]), dockerfile_dir] - _logger.info(Text(cmd)) - if not dry_run: - subprocess.check_call(cmd, stdout=sys.stderr) - found = True - elif "dockerLoad" in dockerRequirement: - cmd = ["docker", "load"] - _logger.info(Text(cmd)) - if not dry_run: - if os.path.exists(dockerRequirement["dockerLoad"]): - _logger.info(u"Loading docker image from %s", dockerRequirement["dockerLoad"]) - with open(dockerRequirement["dockerLoad"], "rb") as f: - loadproc = subprocess.Popen(cmd, stdin=f, stdout=sys.stderr) +class DockerCommandLineJob(ContainerCommandLineJob): + + @staticmethod + def get_image(dockerRequirement, pull_image, dry_run=False): + # type: (Dict[Text, Text], bool, bool) -> bool + found = False + + if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement: + dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"] + + for ln in subprocess.check_output( + ["docker", "images", "--no-trunc", "--all"]).decode('utf-8').splitlines(): + try: + m = re.match(r"^([^ ]+)\s+([^ ]+)\s+([^ ]+)", ln) + sp = dockerRequirement["dockerImageId"].split(":") + if len(sp) == 1: + sp.append("latest") + elif len(sp) == 2: + # if sp[1] doesn't match valid tag names, it is a part of repository + if not re.match(r'[\w][\w.-]{0,127}', sp[1]): + sp[0] = sp[0] + ":" + sp[1] + sp[1] = "latest" + elif len(sp) == 3: + if re.match(r'[\w][\w.-]{0,127}', sp[2]): + sp[0] = sp[0] + ":" + sp[1] + sp[1] = sp[2] + del sp[2] + + # check for repository:tag match or image id match + if ((sp[0] == m.group(1) and sp[1] == m.group(2)) or dockerRequirement["dockerImageId"] == m.group(3)): + found = True + break + except ValueError: + pass + + if not found and pull_image: + cmd = [] # type: List[Text] + if "dockerPull" in dockerRequirement: + cmd = ["docker", "pull", str(dockerRequirement["dockerPull"])] + _logger.info(Text(cmd)) + if not dry_run: + subprocess.check_call(cmd, stdout=sys.stderr) + found = True + elif "dockerFile" in dockerRequirement: + dockerfile_dir = str(tempfile.mkdtemp()) + with open(os.path.join(dockerfile_dir, "Dockerfile"), "wb") as df: + df.write(dockerRequirement["dockerFile"].encode('utf-8')) + cmd = ["docker", "build", "--tag=%s" % + str(dockerRequirement["dockerImageId"]), dockerfile_dir] + _logger.info(Text(cmd)) + if not dry_run: + subprocess.check_call(cmd, stdout=sys.stderr) + found = True + elif "dockerLoad" in dockerRequirement: + cmd = ["docker", "load"] + _logger.info(Text(cmd)) + if not dry_run: + if os.path.exists(dockerRequirement["dockerLoad"]): + _logger.info(u"Loading docker image from %s", dockerRequirement["dockerLoad"]) + with open(dockerRequirement["dockerLoad"], "rb") as f: + loadproc = subprocess.Popen(cmd, stdin=f, stdout=sys.stderr) + else: + loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=sys.stderr) + _logger.info(u"Sending GET request to %s", dockerRequirement["dockerLoad"]) + req = requests.get(dockerRequirement["dockerLoad"], stream=True) + n = 0 + for chunk in req.iter_content(1024 * 1024): + n += len(chunk) + _logger.info("\r%i bytes" % (n)) + loadproc.stdin.write(chunk) + loadproc.stdin.close() + rcode = loadproc.wait() + if rcode != 0: + raise WorkflowException("Docker load returned non-zero exit status %i" % (rcode)) + found = True + elif "dockerImport" in dockerRequirement: + cmd = ["docker", "import", str(dockerRequirement["dockerImport"]), + str(dockerRequirement["dockerImageId"])] + _logger.info(Text(cmd)) + if not dry_run: + subprocess.check_call(cmd, stdout=sys.stderr) + found = True + + return found + + def get_from_requirements(self, r, req, pull_image, dry_run=False): + # type: (Dict[Text, Text], bool, bool, bool) -> Text + if r: + errmsg = None + try: + subprocess.check_output(["docker", "version"]) + except subprocess.CalledProcessError as e: + errmsg = "Cannot communicate with docker daemon: " + Text(e) + except OSError as e: + errmsg = "'docker' executable not found: " + Text(e) + + if errmsg: + if req: + raise WorkflowException(errmsg) else: - loadproc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=sys.stderr) - _logger.info(u"Sending GET request to %s", dockerRequirement["dockerLoad"]) - req = requests.get(dockerRequirement["dockerLoad"], stream=True) - n = 0 - for chunk in req.iter_content(1024 * 1024): - n += len(chunk) - _logger.info("\r%i bytes" % (n)) - loadproc.stdin.write(chunk) - loadproc.stdin.close() - rcode = loadproc.wait() - if rcode != 0: - raise WorkflowException("Docker load returned non-zero exit status %i" % (rcode)) - found = True - elif "dockerImport" in dockerRequirement: - cmd = ["docker", "import", str(dockerRequirement["dockerImport"]), - str(dockerRequirement["dockerImageId"])] - _logger.info(Text(cmd)) - if not dry_run: - subprocess.check_call(cmd, stdout=sys.stderr) - found = True - - return found - - -def get_from_requirements(r, req, pull_image, dry_run=False): - # type: (Dict[Text, Text], bool, bool, bool) -> Text - if r: - errmsg = None - try: - subprocess.check_output(["docker", "version"]) - except subprocess.CalledProcessError as e: - errmsg = "Cannot communicate with docker daemon: " + Text(e) - except OSError as e: - errmsg = "'docker' executable not found: " + Text(e) - - if errmsg: - if req: - raise WorkflowException(errmsg) + return None + + if self.get_image(r, pull_image, dry_run): + return r["dockerImageId"] else: - return None + if req: + raise WorkflowException(u"Docker image %s not found" % r["dockerImageId"]) + + return None + + def add_volumes(self, pathmapper, runtime): + # type: (PathMapper, List[Text]) -> None + + host_outdir = self.outdir + container_outdir = self.builder.outdir + for src, vol in pathmapper.items(): + if not vol.staged: + continue + if vol.target.startswith(container_outdir+"/"): + host_outdir_tgt = os.path.join( + host_outdir, vol.target[len(container_outdir)+1:]) + else: + host_outdir_tgt = None + if vol.type in ("File", "Directory"): + if not vol.resolved.startswith("_:"): + runtime.append(u"--volume=%s:%s:ro" % ( + docker_windows_path_adjust(vol.resolved), + docker_windows_path_adjust(vol.target))) + elif vol.type == "WritableFile": + if self.inplace_update: + runtime.append(u"--volume=%s:%s:rw" % ( + docker_windows_path_adjust(vol.resolved), + docker_windows_path_adjust(vol.target))) + else: + shutil.copy(vol.resolved, host_outdir_tgt) + ensure_writable(host_outdir_tgt) + elif vol.type == "WritableDirectory": + if vol.resolved.startswith("_:"): + os.makedirs(vol.target, 0o0755) + else: + if self.inplace_update: + runtime.append(u"--volume=%s:%s:rw" % ( + docker_windows_path_adjust(vol.resolved), + docker_windows_path_adjust(vol.target))) + else: + shutil.copytree(vol.resolved, host_outdir_tgt) + ensure_writable(host_outdir_tgt) + elif vol.type == "CreateFile": + if host_outdir_tgt: + with open(host_outdir_tgt, "wb") as f: + f.write(vol.resolved.encode("utf-8")) + else: + fd, createtmp = tempfile.mkstemp(dir=self.tmpdir) + with os.fdopen(fd, "wb") as f: + f.write(vol.resolved.encode("utf-8")) + runtime.append(u"--volume=%s:%s:rw" % ( + docker_windows_path_adjust(createtmp), + docker_windows_path_adjust(vol.target))) - if get_image(r, pull_image, dry_run): - return r["dockerImageId"] + def create_runtime(self, env, rm_container=True, **kwargs): + user_space_docker_cmd = kwargs.get("user_space_docker_cmd") + if user_space_docker_cmd: + runtime = [user_space_docker_cmd, u"run"] else: - if req: - raise WorkflowException(u"Docker image %s not found" % r["dockerImageId"]) + runtime = [u"docker", u"run", u"-i"] + + runtime.append(u"--volume=%s:%s:rw" % ( + docker_windows_path_adjust(os.path.realpath(self.outdir)), + self.builder.outdir)) + runtime.append(u"--volume=%s:%s:rw" % ( + docker_windows_path_adjust(os.path.realpath(self.tmpdir)), "/tmp")) + + self.add_volumes(self.pathmapper, runtime) + if self.generatemapper: + self.add_volumes(self.generatemapper, runtime) + + if user_space_docker_cmd: + runtime = [x.replace(":ro", "") for x in runtime] + runtime = [x.replace(":rw", "") for x in runtime] + + runtime.append(u"--workdir=%s" % ( + docker_windows_path_adjust(self.builder.outdir))) + if not user_space_docker_cmd: + + if not kwargs.get("no_read_only"): + runtime.append(u"--read-only=true") + + if kwargs.get("custom_net", None) is not None: + runtime.append(u"--net={0}".format(kwargs.get("custom_net"))) + elif kwargs.get("disable_net", None): + runtime.append(u"--net=none") + + if self.stdout: + runtime.append("--log-driver=none") + + euid, egid = docker_vm_id() + if not onWindows(): + # MS Windows does not have getuid() or geteuid() functions + euid, egid = euid or os.geteuid(), egid or os.getgid() + + if kwargs.get("no_match_user", None) is False \ + and (euid, egid) != (None, None): + runtime.append(u"--user=%d:%d" % (euid, egid)) + + if rm_container: + runtime.append(u"--rm") + + runtime.append(u"--env=TMPDIR=/tmp") + + # spec currently says "HOME must be set to the designated output + # directory." but spec might change to designated temp directory. + # runtime.append("--env=HOME=/tmp") + runtime.append(u"--env=HOME=%s" % self.builder.outdir) + + for t, v in self.environment.items(): + runtime.append(u"--env=%s=%s" % (t, v)) - return None + return runtime diff --git a/cwltool/draft2tool.py b/cwltool/draft2tool.py index 5d572bb3a2..db2848ca5a 100644 --- a/cwltool/draft2tool.py +++ b/cwltool/draft2tool.py @@ -20,10 +20,12 @@ from schema_salad.sourceline import SourceLine, indent from six.moves import urllib +from cwltool.singularity import SingularityCommandLineJob from .builder import CONTENT_LIMIT, Builder, substitute +from .docker import DockerCommandLineJob from .errors import WorkflowException from .flatten import flatten -from .job import CommandLineJob, DockerCommandLineJob, JobBase +from .job import CommandLineJob, JobBase from .pathmapper import (PathMapper, adjustDirObjs, adjustFileObjs, get_listing, trim_listing, visit_class) from .process import (Process, UnsupportedRequirement, @@ -213,7 +215,10 @@ def makeJobRunner(self, use_container=True, **kwargs): # type: (Optional[bool], _logger.warning(DEFAULT_CONTAINER_MSG % (windows_default_container_id, windows_default_container_id)) if dockerReq and use_container: - return DockerCommandLineJob() + if kwargs.get('singularity'): + return SingularityCommandLineJob() + else: + return DockerCommandLineJob() else: for t in reversed(self.requirements): if t["class"] == "DockerRequirement": diff --git a/cwltool/job.py b/cwltool/job.py index ca8f5746a0..f0e972880c 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -1,4 +1,5 @@ from __future__ import absolute_import + import codecs import functools import io @@ -11,22 +12,20 @@ import subprocess import sys import tempfile +from abc import ABCMeta, abstractmethod from io import open -from typing import (IO, Any, Callable, Dict, Iterable, List, MutableMapping, Text, - Tuple, Union, cast) import shellescape +from typing import (IO, Any, Callable, Dict, Iterable, List, MutableMapping, Text, + Union, cast) -from cwltool import singularity -from .utils import copytree_with_merge, docker_windows_path_adjust, onWindows -from . import docker from .builder import Builder -from .docker_id import docker_vm_id from .errors import WorkflowException -from .pathmapper import PathMapper, ensure_writable -from .process import (UnsupportedRequirement, empty_subtree, get_feature, +from .pathmapper import PathMapper +from .process import (UnsupportedRequirement, get_feature, stageFiles) from .utils import bytes2str_in_dicts +from .utils import copytree_with_merge, onWindows _logger = logging.getLogger("cwltool") @@ -312,100 +311,16 @@ def run(self, pull_image=True, rm_container=True, self._execute([], env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs) -class DockerCommandLineJob(JobBase): +class ContainerCommandLineJob(JobBase): + __metaclass__ = ABCMeta - def add_volumes_docker(self, pathmapper, runtime): - # type: (PathMapper, List[Text]) -> None + @abstractmethod + def get_from_requirements(self, r, req, pull_image, dry_run=False): + pass - host_outdir = self.outdir - container_outdir = self.builder.outdir - for src, vol in pathmapper.items(): - if not vol.staged: - continue - if vol.target.startswith(container_outdir+"/"): - host_outdir_tgt = os.path.join( - host_outdir, vol.target[len(container_outdir)+1:]) - else: - host_outdir_tgt = None - if vol.type in ("File", "Directory"): - if not vol.resolved.startswith("_:"): - runtime.append(u"--volume=%s:%s:ro" % ( - docker_windows_path_adjust(vol.resolved), - docker_windows_path_adjust(vol.target))) - elif vol.type == "WritableFile": - if self.inplace_update: - runtime.append(u"--volume=%s:%s:rw" % ( - docker_windows_path_adjust(vol.resolved), - docker_windows_path_adjust(vol.target))) - else: - shutil.copy(vol.resolved, host_outdir_tgt) - ensure_writable(host_outdir_tgt) - elif vol.type == "WritableDirectory": - if vol.resolved.startswith("_:"): - os.makedirs(vol.target, 0o0755) - else: - if self.inplace_update: - runtime.append(u"--volume=%s:%s:rw" % ( - docker_windows_path_adjust(vol.resolved), - docker_windows_path_adjust(vol.target))) - else: - shutil.copytree(vol.resolved, host_outdir_tgt) - ensure_writable(host_outdir_tgt) - elif vol.type == "CreateFile": - if host_outdir_tgt: - with open(host_outdir_tgt, "wb") as f: - f.write(vol.resolved.encode("utf-8")) - else: - fd, createtmp = tempfile.mkstemp(dir=self.tmpdir) - with os.fdopen(fd, "wb") as f: - f.write(vol.resolved.encode("utf-8")) - runtime.append(u"--volume=%s:%s:rw" % ( - docker_windows_path_adjust(createtmp), - docker_windows_path_adjust(vol.target))) - - def add_volumes_singularity(self, pathmapper, runtime, stage_output): - # type: (PathMapper, List[Text], bool) -> None - - host_outdir = self.outdir - container_outdir = self.builder.outdir - for src, vol in pathmapper.items(): - if not vol.staged: - continue - if stage_output: - containertgt = container_outdir + vol.target[len(host_outdir):] - else: - containertgt = vol.target - if vol.target.startswith(container_outdir+"/"): - host_outdir_tgt = os.path.join( - host_outdir, vol.target[len(container_outdir)+1:]) - else: - host_outdir_tgt = None - if vol.type in ("File", "Directory"): - if not vol.resolved.startswith("_:"): - runtime.append(u"--bind") - runtime.append("%s:%s:ro" % (docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt))) - elif vol.type == "WritableFile": - if self.inplace_update: - runtime.append(u"--bind") - runtime.append("%s:%s:rw" % (docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt))) - else: - shutil.copy(vol.resolved, host_outdir_tgt) - ensure_writable(host_outdir_tgt) - elif vol.type == "WritableDirectory": - if vol.resolved.startswith("_:"): - os.makedirs(vol.target, 0o0755) - else: - if self.inplace_update: - runtime.append(u"--bind") - runtime.append("%s:%s:rw" % (docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt))) - else: - shutil.copytree(vol.resolved, vol.target) - elif vol.type == "CreateFile": - createtmp = os.path.join(host_outdir, os.path.basename(vol.target)) - with open(createtmp, "wb") as f: - f.write(vol.resolved.encode("utf-8")) - runtime.append(u"--bind") - runtime.append("%s:%s:ro" % (docker_windows_path_adjust(createtmp), docker_windows_path_adjust(vol.target))) + @abstractmethod + def create_runtime(self, env, rm_container=True, **kwargs): + pass def run(self, pull_image=True, rm_container=True, rm_tmpdir=True, move_outputs="move", **kwargs): @@ -416,7 +331,6 @@ def run(self, pull_image=True, rm_container=True, img_id = None env = None # type: MutableMapping[Text, Text] user_space_docker_cmd = kwargs.get("user_space_docker_cmd") - use_singularity = kwargs.get("singularity") if docker_req and user_space_docker_cmd: # For user-space docker implementations, a local image name or ID # takes precedence over a network pull @@ -432,12 +346,7 @@ def run(self, pull_image=True, rm_container=True, try: env = cast(MutableMapping[Text, Text], os.environ) if docker_req and kwargs.get("use_container"): - if use_singularity: - img_id = str(singularity.get_from_requirements( - docker_req, True, pull_image)) - else: - img_id = str(docker.get_from_requirements( - docker_req, True, pull_image)) + img_id = self.get_from_requirements(docker_req, True, pull_image) if img_id is None: if self.builder.find_default_container: default_container = self.builder.find_default_container() @@ -448,7 +357,7 @@ def run(self, pull_image=True, rm_container=True, if docker_req and img_id is None and kwargs.get("use_container"): raise Exception("Docker image not available") except Exception as e: - container = "Singularity" if use_singularity else "Docker" + container = "Singularity" if kwargs.get("singularity") else "Docker" _logger.debug("%s error" % container, exc_info=True) if docker_is_req: raise UnsupportedRequirement( @@ -461,94 +370,10 @@ def run(self, pull_image=True, rm_container=True, "--user-space-docker-cmd.: {1}".format(container, e)) self._setup(kwargs) - - if use_singularity: - runtime = [u"singularity", u"--quiet", u"exec"] - - runtime.append(u"--bind") - runtime.append( - u"%s:%s:rw" % (docker_windows_path_adjust(os.path.realpath(self.outdir)), self.builder.outdir)) - runtime.append(u"--bind") - runtime.append(u"%s:%s:rw" % (docker_windows_path_adjust(os.path.realpath(self.tmpdir)), "/tmp")) - - self.add_volumes_singularity(self.pathmapper, runtime, stage_output=False) - if self.generatemapper: - self.add_volumes_singularity(self.generatemapper, runtime, stage_output=True) - - runtime.append(u"--pwd") - runtime.append("%s" % (docker_windows_path_adjust(self.builder.outdir))) - # runtime.append(u"--read-only=true") # true by default for Singularity images - - if kwargs.get("custom_net", None) is not None: - raise UnsupportedRequirement( - "Singularity implementation does not support networking") - - env["SINGULARITYENV_TMPDIR"] = "/tmp" - env["SINGULARITYENV_HOME"] = self.builder.outdir - - for t, v in self.environment.items(): - env["SINGULARITYENV_" + t] = v - else: - if user_space_docker_cmd: - runtime = [user_space_docker_cmd, u"run"] - else: - runtime = [u"docker", u"run", u"-i"] - - runtime.append(u"--volume=%s:%s:rw" % ( - docker_windows_path_adjust(os.path.realpath(self.outdir)), - self.builder.outdir)) - runtime.append(u"--volume=%s:%s:rw" % ( - docker_windows_path_adjust(os.path.realpath(self.tmpdir)), "/tmp")) - - self.add_volumes_docker(self.pathmapper, runtime) - if self.generatemapper: - self.add_volumes_docker(self.generatemapper, runtime) - - if user_space_docker_cmd: - runtime = [x.replace(":ro", "") for x in runtime] - runtime = [x.replace(":rw", "") for x in runtime] - - runtime.append(u"--workdir=%s" % ( - docker_windows_path_adjust(self.builder.outdir))) - if not user_space_docker_cmd: - - if not kwargs.get("no_read_only"): - runtime.append(u"--read-only=true") - - if kwargs.get("custom_net", None) is not None: - runtime.append(u"--net={0}".format(kwargs.get("custom_net"))) - elif kwargs.get("disable_net", None): - runtime.append(u"--net=none") - - if self.stdout: - runtime.append("--log-driver=none") - - euid, egid = docker_vm_id() - if not onWindows(): - # MS Windows does not have getuid() or geteuid() functions - euid, egid = euid or os.geteuid(), egid or os.getgid() - - if kwargs.get("no_match_user", None) is False \ - and (euid, egid) != (None, None): - runtime.append(u"--user=%d:%d" % (euid, egid)) - - if rm_container: - runtime.append(u"--rm") - - runtime.append(u"--env=TMPDIR=/tmp") - - # spec currently says "HOME must be set to the designated output - # directory." but spec might change to designated temp directory. - # runtime.append("--env=HOME=/tmp") - runtime.append(u"--env=HOME=%s" % self.builder.outdir) - - for t, v in self.environment.items(): - runtime.append(u"--env=%s=%s" % (t, v)) - + runtime = self.create_runtime(env, rm_container, **kwargs) runtime.append(img_id) - self._execute( - runtime, env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs) + self._execute(runtime, env, rm_tmpdir=rm_tmpdir, move_outputs=move_outputs) def _job_popen( diff --git a/cwltool/singularity.py b/cwltool/singularity.py index bf88517288..59b2f055f6 100644 --- a/cwltool/singularity.py +++ b/cwltool/singularity.py @@ -1,69 +1,157 @@ from __future__ import absolute_import + import logging import os import re +import shutil import subprocess import sys -from typing import Dict, List, Text +from io import open + +from typing import (Dict, List, Text) from .errors import WorkflowException +from .job import ContainerCommandLineJob +from .pathmapper import PathMapper, ensure_writable +from .process import (UnsupportedRequirement) +from .utils import docker_windows_path_adjust _logger = logging.getLogger("cwltool") -def get_image(dockerRequirement, pull_image, dry_run=False): - # type: (Dict[Text, Text], bool, bool) -> bool - found = False - - if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement: - match = re.search(pattern=r'([a-z]*://)',string=dockerRequirement["dockerPull"]) - if match: - dockerRequirement["dockerImageId"] = re.sub(pattern=r'([a-z]*://)', repl=r'', string=dockerRequirement["dockerPull"]) - dockerRequirement["dockerImageId"] = re.sub(pattern=r'[:/]', repl=r'-', string=dockerRequirement["dockerImageId"]) + ".img" - else: - dockerRequirement["dockerImageId"] = re.sub(pattern=r'[:/]', repl=r'-', string=dockerRequirement["dockerPull"]) + ".img" - dockerRequirement["dockerPull"] = "docker://" + dockerRequirement["dockerPull"] - - # check to see if the Singularity container is already downloaded - if os.path.isfile(dockerRequirement["dockerImageId"]): - _logger.info("Using local copy of Singularity image") - found = True - - # if the .img file is not already present, pull the image - elif pull_image: - cmd = [] # type: List[Text] - if "dockerPull" in dockerRequirement: - cmd = ["singularity", "pull", "--name", str(dockerRequirement["dockerImageId"]), str(dockerRequirement["dockerPull"])] - _logger.info(Text(cmd)) - if not dry_run: - subprocess.check_call(cmd, stdout=sys.stderr) - found = True - - return found - - -def get_from_requirements(r, req, pull_image, dry_run=False): - # type: (Dict[Text, Text], bool, bool, bool) -> Text - # returns the filename of the Singularity image (e.g. hello-world-latest.img) - if r: - errmsg = None - try: - subprocess.check_output(["singularity", "--version"]) - except subprocess.CalledProcessError as e: - errmsg = "Cannot execute 'singularity --version' " + Text(e) - except OSError as e: - errmsg = "'singularity' executable not found: " + Text(e) - - if errmsg: - if req: - raise WorkflowException(errmsg) +class SingularityCommandLineJob(ContainerCommandLineJob): + @staticmethod + def get_image(dockerRequirement, pull_image, dry_run=False): + # type: (Dict[Text, Text], bool, bool) -> bool + found = False + + if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement: + match = re.search(pattern=r'([a-z]*://)', string=dockerRequirement["dockerPull"]) + if match: + dockerRequirement["dockerImageId"] = re.sub(pattern=r'([a-z]*://)', repl=r'', + string=dockerRequirement["dockerPull"]) + dockerRequirement["dockerImageId"] = re.sub(pattern=r'[:/]', repl=r'-', + string=dockerRequirement["dockerImageId"]) + ".img" + else: + dockerRequirement["dockerImageId"] = re.sub(pattern=r'[:/]', repl=r'-', + string=dockerRequirement["dockerPull"]) + ".img" + dockerRequirement["dockerPull"] = "docker://" + dockerRequirement["dockerPull"] + + # check to see if the Singularity container is already downloaded + if os.path.isfile(dockerRequirement["dockerImageId"]): + _logger.info("Using local copy of Singularity image") + found = True + + # if the .img file is not already present, pull the image + elif pull_image: + cmd = [] # type: List[Text] + if "dockerPull" in dockerRequirement: + cmd = ["singularity", "pull", "--name", str(dockerRequirement["dockerImageId"]), + str(dockerRequirement["dockerPull"])] + _logger.info(Text(cmd)) + if not dry_run: + subprocess.check_call(cmd, stdout=sys.stderr) + found = True + + return found + + def get_from_requirements(self, r, req, pull_image, dry_run=False): + # type: (Dict[Text, Text], bool, bool, bool) -> Text + # returns the filename of the Singularity image (e.g. hello-world-latest.img) + if r: + errmsg = None + try: + subprocess.check_output(["singularity", "--version"]) + except subprocess.CalledProcessError as e: + errmsg = "Cannot execute 'singularity --version' " + Text(e) + except OSError as e: + errmsg = "'singularity' executable not found: " + Text(e) + + if errmsg: + if req: + raise WorkflowException(errmsg) + else: + return None + + if self.get_image(r, pull_image, dry_run): + return os.path.abspath(r["dockerImageId"]) else: - return None + if req: + raise WorkflowException(u"Container image %s not found" % r["dockerImageId"]) + + return None + + def add_volumes(self, pathmapper, runtime, stage_output): + # type: (PathMapper, List[Text], bool) -> None + + host_outdir = self.outdir + container_outdir = self.builder.outdir + for src, vol in pathmapper.items(): + if not vol.staged: + continue + if stage_output: + containertgt = container_outdir + vol.target[len(host_outdir):] + else: + containertgt = vol.target + if vol.target.startswith(container_outdir + "/"): + host_outdir_tgt = os.path.join( + host_outdir, vol.target[len(container_outdir) + 1:]) + else: + host_outdir_tgt = None + if vol.type in ("File", "Directory"): + if not vol.resolved.startswith("_:"): + runtime.append(u"--bind") + runtime.append("%s:%s:ro" % ( + docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt))) + elif vol.type == "WritableFile": + if self.inplace_update: + runtime.append(u"--bind") + runtime.append("%s:%s:rw" % ( + docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt))) + else: + shutil.copy(vol.resolved, host_outdir_tgt) + ensure_writable(host_outdir_tgt) + elif vol.type == "WritableDirectory": + if vol.resolved.startswith("_:"): + os.makedirs(vol.target, 0o0755) + else: + if self.inplace_update: + runtime.append(u"--bind") + runtime.append("%s:%s:rw" % ( + docker_windows_path_adjust(vol.resolved), docker_windows_path_adjust(containertgt))) + else: + shutil.copytree(vol.resolved, vol.target) + elif vol.type == "CreateFile": + createtmp = os.path.join(host_outdir, os.path.basename(vol.target)) + with open(createtmp, "wb") as f: + f.write(vol.resolved.encode("utf-8")) + runtime.append(u"--bind") + runtime.append( + "%s:%s:ro" % (docker_windows_path_adjust(createtmp), docker_windows_path_adjust(vol.target))) + + def create_runtime(self, env, rm_container=True, **kwargs): + runtime = [u"singularity", u"--quiet", u"exec"] + + runtime.append(u"--bind") + runtime.append( + u"%s:%s:rw" % (docker_windows_path_adjust(os.path.realpath(self.outdir)), self.builder.outdir)) + runtime.append(u"--bind") + runtime.append(u"%s:%s:rw" % (docker_windows_path_adjust(os.path.realpath(self.tmpdir)), "/tmp")) + + self.add_volumes(self.pathmapper, runtime, stage_output=False) + if self.generatemapper: + self.add_volumes(self.generatemapper, runtime, stage_output=True) + + runtime.append(u"--pwd") + runtime.append("%s" % (docker_windows_path_adjust(self.builder.outdir))) + + if kwargs.get("custom_net", None) is not None: + raise UnsupportedRequirement( + "Singularity implementation does not support networking") - if get_image(r, pull_image, dry_run): - return os.path.abspath(r["dockerImageId"]) - else: - if req: - raise WorkflowException(u"Container image %s not found" % r["dockerImageId"]) + env["SINGULARITYENV_TMPDIR"] = "/tmp" + env["SINGULARITYENV_HOME"] = self.builder.outdir - return None + for t, v in self.environment.items(): + env["SINGULARITYENV_" + t] = v + return runtime