diff --git a/datalad_next/runners/run.py b/datalad_next/runners/run.py new file mode 100644 index 000000000..80fc5e606 --- /dev/null +++ b/datalad_next/runners/run.py @@ -0,0 +1,213 @@ +from __future__ import annotations + +import subprocess +from collections import deque +from collections.abc import Generator +from contextlib import contextmanager +from pathlib import Path +from queue import Queue +from subprocess import DEVNULL +from typing import ( + Any, + IO, +) + +from datalad_next.runners import ( + GeneratorMixIn, + Protocol, + ThreadedRunner, +) + + +class _ProtocolShell: + def __init__(self, + base_class: type[Protocol], + base_kwargs: dict, + introduced_timeout: bool, + terminate_time: int | None, + kill_time: int | None, + armed: bool + ) -> None: + + self.base = base_class(**base_kwargs) + self.introduced_timeout = introduced_timeout + self.terminate_time = terminate_time + self.kill_time = ( + ((terminate_time or 0) + kill_time) + if kill_time is not None + else kill_time + ) + self.process: subprocess.Popen | None = None + self.armed = armed + self.kill_counter = 0 + + def arm(self) -> None: + self.kill_counter = 0 + self.armed = True + + def __getattr__(self, item: Any): + """ Forward instance attribute access to the base object """ + try: + return self.__getattribute__(item) + except AttributeError: + return self.base.__getattribute__(item) + + def connection_made(self, process: subprocess.Popen) -> None: + self.process = process + self.base.connection_made(process) + + def timeout(self, fd: int | None) -> bool: + if self.armed: + self.kill_counter += 1 + if self.kill_time and self.kill_counter >= self.kill_time: + self.process.kill() + self.kill_time = None + if self.terminate_time and self.kill_counter > self.terminate_time: + self.process.terminate() + self.terminate_time = None + if self.introduced_timeout: + return False + return self.base.timeout(fd) + + +class _GeneratorProtocolShell(_ProtocolShell, GeneratorMixIn): + def __init__(self, + base_class: type[Protocol], + base_kwargs: dict, + introduced_timeout: bool, + terminate_time: int | None, + kill_time: int | None, + armed: bool, + ) -> None: + + GeneratorMixIn.__init__(self) + _ProtocolShell.__init__( + self, + base_class, + base_kwargs, + introduced_timeout, + terminate_time, + kill_time, + armed, + ) + + @property + def result_queue(self) -> deque: + return self.base.result_queue + + + +@contextmanager +def run( + cmd: list, + protocol_class = type[Protocol], + *, + cwd: Path | None = None, + input: int | IO | bytes | Queue[bytes | None] | None = None, + timeout: float | None = None, + terminate_time: int | None = None, + kill_time: int | None = None, +) -> dict | Generator: + + introduces_timeout = False + if timeout is None: + introduces_timeout = True + timeout = 1.0 + + runner_protocol_class, armed = ( + (_GeneratorProtocolShell, False) + if issubclass(protocol_class, GeneratorMixIn) + else (_ProtocolShell, True) + ) + + # This is a little bit ugly, implement class-attribute forwarding instead + runner_protocol_class.proc_out = protocol_class.proc_out + runner_protocol_class.proc_err = protocol_class.proc_err + + runner = ThreadedRunner( + cmd=cmd, + protocol_class=runner_protocol_class, + stdin=DEVNULL if input is None else input, + protocol_kwargs=dict( + base_class=protocol_class, + base_kwargs=dict(), + introduced_timeout=introduces_timeout, + terminate_time=terminate_time, + kill_time=kill_time, + armed=armed, + ), + timeout=timeout, + exception_on_error=False, + cwd=cwd, + ) + result = runner.run() + if isinstance(result, dict): + try: + yield result + finally: + pass + else: + try: + yield result + finally: + runner.protocol.arm() + tuple(result) + + +x = ''' +with run(cmd=['find', '/home/cristian/datalad/longnow-podcasts'], + protocol_class=StdOutCaptureGeneratorProtocol, + terminate_time=10, + kill_time=5) as r: + for line in r: + print(line) + +print(r.return_code) +''' + +from datalad_next.runners import StdOutCaptureGeneratorProtocol + +with run(cmd=['sleep', '100'], + protocol_class=StdOutCaptureGeneratorProtocol, + terminate_time=3, + kill_time=3) as r: + pass + +print(r.return_code) + + + +with run(cmd=['sleep', '100'], + protocol_class=StdOutCaptureGeneratorProtocol, + terminate_time=3, + kill_time=3) as r: + pass + +print(r.return_code) + + + +py_prog = ''' +import sys +import time + +i = 0 +while True: + try: + print(i, flush=True) + i += 1 + time.sleep(1) + except BaseException as e: + pass +''' + +import sys + +with run(cmd=[sys.executable, '-c', py_prog], + protocol_class=StdOutCaptureGeneratorProtocol, + terminate_time=3, + kill_time=3) as r: + print(next(r)) + print(next(r)) + +print(r.return_code) diff --git a/docs/source/developer_guide/generator-runner.rst b/docs/source/developer_guide/generator-runner.rst index 200e61530..203dc6f84 100644 --- a/docs/source/developer_guide/generator-runner.rst +++ b/docs/source/developer_guide/generator-runner.rst @@ -107,9 +107,23 @@ The elements that a caller would read from the generator would then be tuples wh .. note:: - Remark: you might not want to inherit from any of the ``datalad_next.runners.Protocol`` subclasses, because they contain code that is never used during asynchronous runner execution. + Remark: you might not want to inherit from any of the ``datalad_next.runners.Protocol`` subclasses, because they contain code that is never used during asynchronous runner execution + Nevertheless, if you use your own class with the callbacks defined in ``datalad.next.runners.Protocol``, you will have to add the two class variables: ``proc_out``, and ``proc_err`` and set them to ``True``, if you want stdout-output and stderr-output to be sent to the "From Process Queue" and eventually to the user code. Programming examples ==================== -TODO + +Simplest line reading from a subprocess +--------------------------------------- + +.. code-block:: python + + from datalad_next.runners import Runner, StdOutCaptureGeneratorProtocol as prot + + for line in Runner().run(cmd=['ls', '-l', '/etc'], protocol=prot): + print(line.decode()) + + +Create a context-manager +--------------------------------- \ No newline at end of file