Skip to content

Commit

Permalink
fix(java): JsiiRuntime.ErrorStreamSink does not respond to being inte…
Browse files Browse the repository at this point in the history
…rrupted (#2540)

Use `zt-exec` to launch the `jsii-runtime` process, as it offers better semantics
for IO redirection than Java's standard `ProcessBuilder` API (that it wraps). This
includes ways to pipe the standard outputs (in particular, `STDERR`) though a 
line handler, which is *exactly* what we need to achieve in this particular case.

Since we no longer manage an error stream sink thread, we can hope that this
solves the issue of the thread not responding to interruptions correctly.

Fixes #2533
  • Loading branch information
RomainMuller authored Feb 25, 2021
1 parent 67cd3ce commit 6e74bf9
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 88 deletions.
8 changes: 8 additions & 0 deletions packages/@jsii/java-runtime/pom.xml.t.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ process.stdout.write(`<?xml version="1.0" encoding="UTF-8"?>
<jetbrains-annotations.version>[13.0.0,20.0-a0)</jetbrains-annotations.version>
<junit.version>[5.7.0,5.8-a0)</junit.version>
<mockito.version>[3.5.13,4.0-a0)</mockito.version>
<zt-exec.version>[1.12,2.0-a0)</zt-exec.version>
</properties>
<dependencies>
Expand Down Expand Up @@ -127,6 +128,13 @@ process.stdout.write(`<?xml version="1.0" encoding="UTF-8"?>
<artifactId>javax.annotation-api</artifactId>
<version>\${javax-annotations.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.zeroturnaround/zt-exec -->
<dependency>
<groupId>org.zeroturnaround</groupId>
<artifactId>zt-exec</artifactId>
<version>\${zt-exec.version}</version>
</dependency>
</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.jetbrains.annotations.Nullable;
import org.zeroturnaround.exec.ProcessExecutor;
import org.zeroturnaround.exec.StartedProcess;
import org.zeroturnaround.exec.stream.LogOutputStream;
import software.amazon.jsii.api.Callback;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.*;
import java.lang.reflect.InvocationTargetException;
import java.nio.channels.Channels;
import java.nio.channels.Pipe;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static software.amazon.jsii.JsiiVersion.JSII_RUNTIME_VERSION;
Expand Down Expand Up @@ -41,7 +48,7 @@ public final class JsiiRuntime {
/**
* The child procesds.
*/
private Process childProcess;
private StartedProcess childProcess;

/**
* Child's standard output.
Expand All @@ -51,7 +58,7 @@ public final class JsiiRuntime {
/**
* Child's standard input.
*/
private BufferedWriter stdin;
private Writer stdin;

/**
* Handler for synchronous callbacks. Must be set using setCallbackHandler.
Expand Down Expand Up @@ -173,45 +180,53 @@ protected void finalize() throws Throwable {
}

synchronized void terminate() {
try {
// The jsii Kernel process exists after having received the "exit" message
if (stdin != null) {
// The jsii Kernel process exists after having received the "exit" message
if (stdin != null) {
try {
stdin.write("{\"exit\":0}\n");
stdin.close();
} catch (final IOException ioe) {
// Ignore - the stream might have already been closed, if the child exited already.
} finally {
stdin = null;
}
}

if (childProcess != null) {
// Wait for the child process to complete
try {
// Giving the process up to 5 seconds to clean up and exit
if (!childProcess.waitFor(5, TimeUnit.SECONDS)) {
// If it's still not done, forcibly terminate it at this point.
childProcess.destroy();
}
} catch (final InterruptedException ie) {
throw new RuntimeException(ie);
if (childProcess != null) {
// Wait for the child process to complete
try {
// Giving the process up to 5 seconds to clean up and exit
if (!childProcess.getProcess().waitFor(5, TimeUnit.SECONDS)) {
// If it's still not done, forcibly terminate it at this point.
childProcess.getProcess().destroyForcibly();
}
} catch (final InterruptedException ie) {
throw new RuntimeException(ie);
} finally {
childProcess = null;
}
}

// Cleaning up stdout (ensuring buffers are flushed, etc...)
if (stdout != null) {
// Cleaning up stdout (ensuring buffers are flushed, etc...)
if (stdout != null) {
try {
stdout.close();
} catch (final IOException ioe) {
// Ignore - the stream might have already been closed.
} finally {
stdout = null;
}
}

// We shut down already, no need for the shutdown hook anymore
if (this.shutdownHook != null) {
try {
Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
} catch (final IllegalStateException ise) {
// VM Shutdown is in progress, removal is now impossible (and unnecessary)
}
// We shut down already, no need for the shutdown hook anymore
if (this.shutdownHook != null) {
try {
Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
} catch (final IllegalStateException ise) {
// VM Shutdown is in progress, removal is now impossible (and unnecessary)
} finally {
this.shutdownHook = null;
}
} catch (final IOException ioe) {
throw new UncheckedIOException(ioe);
}
}

Expand All @@ -224,49 +239,44 @@ private synchronized void startRuntimeIfNeeded() {
}

// If JSII_DEBUG is set, enable traces.
String jsiiDebug = System.getenv("JSII_DEBUG");
boolean traceEnabled = jsiiDebug != null
final String jsiiDebug = System.getenv("JSII_DEBUG");
final boolean traceEnabled = jsiiDebug != null
&& !jsiiDebug.isEmpty()
&& !jsiiDebug.equalsIgnoreCase("false")
&& !jsiiDebug.equalsIgnoreCase("0");

// If JSII_RUNTIME is set, use it to find the jsii-server executable
// otherwise, we default to "jsii-runtime" from PATH.
String jsiiRuntimeExecutable = System.getenv("JSII_RUNTIME");
if (jsiiRuntimeExecutable == null) {
jsiiRuntimeExecutable = BundledRuntime.extract(getClass());
}
final String jsiiRuntimeEnv = System.getenv("JSII_RUNTIME");
final List<String> jsiiRuntimeCommand = jsiiRuntimeEnv == null
? Arrays.asList("node", BundledRuntime.extract(getClass()))
: Collections.singletonList(jsiiRuntimeEnv);

if (traceEnabled) {
System.err.println("jsii-runtime: " + jsiiRuntimeExecutable);
System.err.println("jsii-runtime: " + String.join(" ", jsiiRuntimeCommand));
}

ProcessBuilder pb = new ProcessBuilder("node", jsiiRuntimeExecutable)
.redirectInput(ProcessBuilder.Redirect.PIPE)
.redirectOutput(ProcessBuilder.Redirect.PIPE)
.redirectError(ProcessBuilder.Redirect.PIPE);
try {
final Pipe stdin = Pipe.open();
this.stdin = Channels.newWriter(stdin.sink(), StandardCharsets.UTF_8.newEncoder(), -1);

if (traceEnabled) {
pb.environment().put("JSII_DEBUG", "1");
}
final Pipe stdout = Pipe.open();
this.stdout = new BufferedReader(Channels.newReader(stdout.source(), StandardCharsets.UTF_8.newDecoder(), -1));

pb.environment().put("JSII_AGENT", "Java/" + System.getProperty("java.version"));
final ProcessExecutor executor = new ProcessExecutor(jsiiRuntimeCommand)
.environment("JSII_AGENT", String.format("Java/%s", System.getProperty("java.version")))
.environment("JSII_DEBUG", jsiiDebug)
.redirectInput(Channels.newInputStream(stdin.source()))
.redirectOutput(Channels.newOutputStream(stdout.sink()))
.redirectError(new ErrorStreamSink());

try {
this.childProcess = pb.start();
this.shutdownHook = new Thread(this::terminate, "Terminate jsii client");
Runtime.getRuntime().addShutdownHook(this.shutdownHook);
} catch (IOException e) {
throw new JsiiException("Cannot find the 'jsii-runtime' executable (JSII_RUNTIME or PATH)", e);
this.childProcess = executor.start();
} catch (final IOException ioe) {
throw new UncheckedIOException(ioe);
}

OutputStreamWriter stdinStream = new OutputStreamWriter(this.childProcess.getOutputStream(), StandardCharsets.UTF_8);
InputStreamReader stdoutStream = new InputStreamReader(this.childProcess.getInputStream(), StandardCharsets.UTF_8);

new ErrorStreamSink(this.childProcess.getErrorStream()).start();

this.stdout = new BufferedReader(stdoutStream);
this.stdin = new BufferedWriter(stdinStream);
this.shutdownHook = new Thread(this::terminate, "Terminate jsii client");
Runtime.getRuntime().addShutdownHook(this.shutdownHook);

handshake();

Expand Down Expand Up @@ -346,40 +356,22 @@ private static void notifyInspector(final JsonNode message, final MessageInspect
inspector.inspect(message, type);
}

private static final class ErrorStreamSink extends Thread {
private final InputStream inputStream;

public ErrorStreamSink(final InputStream inputStream) {
super("JsiiRuntime.ErrorStreamSink");
// This is a daemon thread, shouldn't keep the VM alive.
this.setDaemon(true);
private static final class ErrorStreamSink extends LogOutputStream {
private final ObjectMapper objectMapper = new ObjectMapper();

this.inputStream = inputStream;
}

public void run() {
try (final InputStreamReader inputStreamReader = new InputStreamReader(this.inputStream);
final BufferedReader reader = new BufferedReader(inputStreamReader)) {
String line;
final ObjectMapper objectMapper = new ObjectMapper();
while ((line = reader.readLine()) != null) {
try {
final JsonNode tree = objectMapper.readTree(line);
final ConsoleOutput consoleOutput = objectMapper.treeToValue(tree, ConsoleOutput.class);
if (consoleOutput.stderr != null) {
System.err.write(consoleOutput.stderr, 0, consoleOutput.stderr.length);
}
if (consoleOutput.stdout != null) {
System.out.write(consoleOutput.stdout, 0, consoleOutput.stdout.length);
}
} catch (final JsonParseException | JsonMappingException exception) {
// If not JSON, then this goes straight to stderr without touches...
System.err.println(line);
}
public void processLine(final String line) {
try {
final JsonNode tree = objectMapper.readTree(line);
final ConsoleOutput consoleOutput = objectMapper.treeToValue(tree, ConsoleOutput.class);
if (consoleOutput.stderr != null) {
System.err.write(consoleOutput.stderr, 0, consoleOutput.stderr.length);
}
if (consoleOutput.stdout != null) {
System.out.write(consoleOutput.stdout, 0, consoleOutput.stdout.length);
}
} catch (final IOException error) {
System.err.printf("I/O Error reading jsii Kernel's STDERR: %s%n", error);
throw new UncheckedIOException(error);
} catch (final JsonProcessingException exception) {
// If not JSON, then this goes straight to stderr without touches...
System.err.println(line);
}
}
}
Expand Down

0 comments on commit 6e74bf9

Please sign in to comment.