Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added proper SIGTERM/SIGKILL handling for sub-processes #286

Merged
merged 10 commits into from
Aug 5, 2024
4 changes: 3 additions & 1 deletion build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ trait SafeDeps extends ScalaModule {
trait MiMaChecks extends Mima {
def mimaPreviousVersions = Seq("0.9.0", "0.9.1", "0.9.2", "0.9.3", "0.10.0")
override def mimaBinaryIssueFilters: T[Seq[ProblemFilter]] = Seq(
ProblemFilter.exclude[ReversedMissingMethodProblem]("os.PathConvertible.isCustomFs")
ProblemFilter.exclude[ReversedMissingMethodProblem]("os.PathConvertible.isCustomFs"),
// this is fine, because ProcessLike is sealed (and its subclasses should be final)
ProblemFilter.exclude[ReversedMissingMethodProblem]("os.ProcessLike.joinPumperThreadsHook")
)
}

Expand Down
136 changes: 103 additions & 33 deletions os/src/ProcessOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,25 @@ case class proc(command: Shellable*) {
* `call` provides a number of parameters that let you configure how the subprocess
* is run:
*
* @param cwd the working directory of the subprocess
* @param env any additional environment variables you wish to set in the subprocess
* @param stdin any data you wish to pass to the subprocess's standard input
* @param stdout How the process's output stream is configured.
* @param stderr How the process's error stream is configured.
* @param mergeErrIntoOut merges the subprocess's stderr stream into it's stdout
* @param timeout how long to wait in milliseconds for the subprocess to complete
* @param check disable this to avoid throwing an exception if the subprocess
* fails with a non-zero exit code
* @param propagateEnv disable this to avoid passing in this parent process's
* environment variables to the subprocess
* @param cwd the working directory of the subprocess
* @param env any additional environment variables you wish to set in the subprocess
* @param stdin any data you wish to pass to the subprocess's standard input
* @param stdout How the process's output stream is configured.
* @param stderr How the process's error stream is configured.
* @param mergeErrIntoOut merges the subprocess's stderr stream into it's stdout
* @param timeout how long to wait in milliseconds for the subprocess to complete
* (-1 for no timeout)
* @param check disable this to avoid throwing an exception if the subprocess
* fails with a non-zero exit code
* @param propagateEnv disable this to avoid passing in this parent process's
* environment variables to the subprocess
* @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the
* subprocess to gracefully terminate before attempting to
* forcibly kill it
* (-1 for no kill, 0 for always kill immediately)
*
* @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be
* issued. Check the documentation for your JDK's `Process.destroy`.
*/
def call(
cwd: Path = null,
Expand All @@ -66,7 +74,9 @@ case class proc(command: Shellable*) {
mergeErrIntoOut: Boolean = false,
timeout: Long = -1,
check: Boolean = true,
propagateEnv: Boolean = true
propagateEnv: Boolean = true,
// this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode)
timeoutGracePeriod: Long = 100
): CommandResult = {

val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]]
Expand All @@ -87,14 +97,38 @@ case class proc(command: Shellable*) {
propagateEnv
)

sub.join(timeout)
sub.join(timeout, timeoutGracePeriod)

val chunksSeq = chunks.iterator.asScala.toIndexedSeq
val res = CommandResult(commandChunks, sub.exitCode(), chunksSeq)
if (res.exitCode == 0 || !check) res
else throw SubprocessException(res)
}

// forwarder for the new timeoutGracePeriod flag
private[os] def call(
cwd: Path,
env: Map[String, String],
stdin: ProcessInput,
stdout: ProcessOutput,
stderr: ProcessOutput,
mergeErrIntoOut: Boolean,
timeout: Long,
check: Boolean,
propagateEnv: Boolean
): CommandResult = call(
cwd,
env,
stdin,
stdout,
stderr,
mergeErrIntoOut,
timeout,
check,
propagateEnv,
timeoutGracePeriod = 100
)

/**
* The most flexible of the [[os.proc]] calls, `os.proc.spawn` simply configures
* and starts a subprocess, and returns it as a `java.lang.Process` for you to
Expand Down Expand Up @@ -181,24 +215,31 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
* `call` provides a number of parameters that let you configure how the pipeline
* is run:
*
* @param cwd the working directory of the pipeline
* @param env any additional environment variables you wish to set in the pipeline
* @param stdin any data you wish to pass to the pipelines's standard input (to the first process)
* @param stdout How the pipelines's output stream is configured (the last process stdout)
* @param stderr How the process's error stream is configured (set for all processes)
* @param mergeErrIntoOut merges the pipeline's stderr stream into it's stdout. Note that then the
* stderr will be forwarded with stdout to subsequent processes in the pipeline.
* @param timeout how long to wait in milliseconds for the pipeline to complete
* @param check disable this to avoid throwing an exception if the pipeline
* fails with a non-zero exit code
* @param propagateEnv disable this to avoid passing in this parent process's
* environment variables to the pipeline
* @param pipefail if true, the pipeline's exitCode will be the exit code of the first
* failing process. If no process fails, the exit code will be 0.
* @param handleBrokenPipe if true, every [[java.io.IOException]] when redirecting output of a process
* will be caught and handled by killing the writing process. This behaviour
* is consistent with handlers of SIGPIPE signals in most programs
* supporting interruptable piping. Disabled by default on Windows.
* @param cwd the working directory of the pipeline
* @param env any additional environment variables you wish to set in the pipeline
* @param stdin any data you wish to pass to the pipelines's standard input (to the first process)
* @param stdout How the pipelines's output stream is configured (the last process stdout)
* @param stderr How the process's error stream is configured (set for all processes)
* @param mergeErrIntoOut merges the pipeline's stderr stream into it's stdout. Note that then the
* stderr will be forwarded with stdout to subsequent processes in the pipeline.
* @param timeout how long to wait in milliseconds for the pipeline to complete
* @param check disable this to avoid throwing an exception if the pipeline
* fails with a non-zero exit code
* @param propagateEnv disable this to avoid passing in this parent process's
* environment variables to the pipeline
* @param pipefail if true, the pipeline's exitCode will be the exit code of the first
* failing process. If no process fails, the exit code will be 0.
* @param handleBrokenPipe if true, every [[java.io.IOException]] when redirecting output of a process
* will be caught and handled by killing the writing process. This behaviour
* is consistent with handlers of SIGPIPE signals in most programs
* supporting interruptable piping. Disabled by default on Windows.
* @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the
* subprocess to gracefully terminate before attempting to
* forcibly kill it
* (-1 for no kill, 0 for always kill immediately)
*
* @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be
* issued. Check the documentation for your JDK's `Process.destroy`.
*/
def call(
cwd: Path = null,
Expand All @@ -211,7 +252,9 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
check: Boolean = true,
propagateEnv: Boolean = true,
pipefail: Boolean = true,
handleBrokenPipe: Boolean = !isWindows
handleBrokenPipe: Boolean = !isWindows,
// this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode)
timeoutGracePeriod: Long = 100
): CommandResult = {
val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]]

Expand All @@ -232,7 +275,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
pipefail
)

sub.join(timeout)
sub.join(timeout, timeoutGracePeriod)

val chunksSeq = chunks.iterator.asScala.toIndexedSeq
val res =
Expand All @@ -241,6 +284,33 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
else throw SubprocessException(res)
}

private[os] def call(
cwd: Path,
env: Map[String, String],
stdin: ProcessInput,
stdout: ProcessOutput,
stderr: ProcessOutput,
mergeErrIntoOut: Boolean,
timeout: Long,
check: Boolean,
propagateEnv: Boolean,
pipefail: Boolean,
handleBrokenPipe: Boolean
): CommandResult = call(
cwd,
env,
stdin,
stdout,
stderr,
mergeErrIntoOut,
timeout,
check,
propagateEnv,
pipefail,
handleBrokenPipe,
timeoutGracePeriod = 100
)

/**
* The most flexible of the [[os.ProcGroup]] calls. It sets-up a pipeline of processes,
* and returns a [[ProcessPipeline]] for you to interact with however you like.
Expand Down
101 changes: 66 additions & 35 deletions os/src/SubProcess.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,55 @@ sealed trait ProcessLike extends java.lang.AutoCloseable {
* Wait up to `millis` for the [[ProcessLike]] to terminate and all stdout and stderr
* from the subprocess to be handled. By default waits indefinitely; if a time
* limit is given, explicitly destroys the [[ProcessLike]] if it has not completed by
* the time the timeout has occurred
* the time the timeout has occurred.
*
* By default, a process is destroyed by sending a `SIGTERM` signal, which allows an opportunity
* for it to clean up any resources it was using. If the process is unresponsive to this, a
* `SIGKILL` signal is sent `timeoutGracePeriod` milliseconds later. If `timeoutGracePeriod` is
* `0`, then there is no `SIGTERM`; if it is `-1`, there is no `SIGKILL` sent.
*
* @returns `true` when the process did not require explicit termination by either `SIGTERM` or `SIGKILL` and `false` otherwise.
* @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be
* issued. Check the documentation for your JDK's `Process.destroy`.
*/
def join(timeout: Long = -1, timeoutGracePeriod: Long = 100): Boolean = {
val exitedCleanly = waitFor(timeout)
if (!exitedCleanly) {
assume(
timeout != -1,
"if the waitFor does not complete cleanly, this implies there is a timeout imposed, so the grace period is applicable"
)
if (timeoutGracePeriod == -1) destroy()
else if (timeoutGracePeriod == 0) destroyForcibly()
else {
destroy()
if (!waitFor(timeoutGracePeriod)) {
destroyForcibly()
}
}
waitFor(-1)
}
joinPumperThreadsHook()
exitedCleanly
}

j-mie6 marked this conversation as resolved.
Show resolved Hide resolved
@deprecatedOverriding("this method is now a forwarder, and should not be overriden", "0.10.4")
private[os] def join(timeout: Long): Boolean = join(timeout, timeoutGracePeriod = 100)

/**
* A hook method used by `join` to close the input and output streams associated with the process, not for public consumption.
*/
def join(timeout: Long = -1): Boolean
private[os] def joinPumperThreadsHook(): Unit
}

/**
* Represents a spawn subprocess that has started and may or may not have
* completed.
*/
@deprecatedInheritance(
"this class will be made final: if you are using it be aware that `join` has a new overloading",
"0.10.4"
)
class SubProcess(
val wrapped: java.lang.Process,
val inputPumperThread: Option[Thread],
Expand Down Expand Up @@ -114,22 +154,9 @@ class SubProcess(
}
}

/**
* Wait up to `millis` for the subprocess to terminate and all stdout and stderr
* from the subprocess to be handled. By default waits indefinitely; if a time
* limit is given, explicitly destroys the subprocess if it has not completed by
* the time the timeout has occurred
*/
def join(timeout: Long = -1): Boolean = {
val exitedCleanly = waitFor(timeout)
if (!exitedCleanly) {
destroy()
destroyForcibly()
waitFor(-1)
}
private[os] def joinPumperThreadsHook(): Unit = {
outputPumperThread.foreach(_.join())
errorPumperThread.foreach(_.join())
exitedCleanly
}
}

Expand Down Expand Up @@ -222,6 +249,10 @@ object SubProcess {
}
}

@deprecatedInheritance(
"this class will be made final: if you are using it be aware that `join` has a new overloading",
"0.10.4"
)
class ProcessPipeline(
val processes: Seq[SubProcess],
pipefail: Boolean,
Expand Down Expand Up @@ -312,12 +343,12 @@ class ProcessPipeline(
}

/**
* Wait up to `millis` for the [[ProcessPipeline]] to terminate, by default waits
* Wait up to `timeout` for the [[ProcessPipeline]] to terminate, by default waits
* indefinitely. Returns `true` if the [[ProcessPipeline]] has terminated by the time
* this method returns.
*
* Waits for each process one by one, while aggregating the total time waited. If
* [[timeout]] has passed before all processes have terminated, returns `false`.
* `timeout` has passed before all processes have terminated, returns `false`.
*/
override def waitFor(timeout: Long = -1): Boolean = {
@tailrec
Expand All @@ -340,28 +371,28 @@ class ProcessPipeline(
}

/**
* Wait up to `millis` for the [[ProcessPipeline]] to terminate all the processes
* Wait up to `timeout` for the [[ProcessPipeline]] to terminate all the processes
* in pipeline. By default waits indefinitely; if a time limit is given, explicitly
* destroys each process if it has not completed by the time the timeout has occurred.
*
* By default, the processes are destroyed by sending `SIGTERM` signals, which allows an opportunity
* for them to clean up any resources it. If any process is unresponsive to this, a
* `SIGKILL` signal is sent `timeoutGracePeriod` milliseconds later. If `timeoutGracePeriod` is
* `0`, then there is no `SIGTERM`; if it is `-1`, there is no `SIGKILL` sent.
*
* @returns `true` when the processes did not require explicit termination by either `SIGTERM` or `SIGKILL` and `false` otherwise.
* @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be
* issued. Check the documentation for your JDK's `Process.destroy`.
*/
override def join(timeout: Long = -1): Boolean = {
@tailrec
def joinRec(startedAt: Long, processesLeft: Seq[SubProcess], result: Boolean): Boolean =
processesLeft match {
case Nil => result
case head :: tail =>
val elapsed = System.currentTimeMillis() - startedAt
val timeoutLeft = Math.max(0, timeout - elapsed)
val exitedCleanly = head.join(timeoutLeft)
joinRec(startedAt, tail, result && exitedCleanly)
}

override def join(timeout: Long = -1, timeoutGracePeriod: Long = 100): Boolean = {
// in this case, the grace period does not apply, so fine
if (timeout == -1) {
processes.forall(_.join())
} else {
val timeNow = System.currentTimeMillis()
joinRec(timeNow, processes, true)
}
} else super.join(timeout, timeoutGracePeriod)
}

private[os] def joinPumperThreadsHook(): Unit = {
processes.foreach(_.joinPumperThreadsHook())
}
}

Expand Down