Skip to content

Commit

Permalink
Fix deadlock when ryuk does not acknowledge filters (#843)
Browse files Browse the repository at this point in the history
* Fix deadlock when ryuk does not acknowledge filters

* Extract FilterRegistry in ResourceReaper for unit-testability

* Use explicit scoping for FilterRegistry#register

* Restructure tests

* Update CHANGELOG.md
  • Loading branch information
thammerl authored and bsideup committed Sep 5, 2018
1 parent f928c4d commit 7ea1483
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 20 deletions.
76 changes: 56 additions & 20 deletions core/src/main/java/org/testcontainers/utility/ResourceReaper.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.github.dockerjava.api.model.Network;
import com.github.dockerjava.api.model.Ports;
import com.github.dockerjava.api.model.Volume;
import com.google.common.annotations.VisibleForTesting;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.utils.URLEncodedUtils;
Expand All @@ -23,6 +24,7 @@

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
Expand Down Expand Up @@ -117,8 +119,7 @@ public static String start(String hostIpAddress, DockerClient client, boolean wi
while (true) {
int index = 0;
try(Socket clientSocket = new Socket(hostIpAddress, ryukPort)) {
OutputStream out = clientSocket.getOutputStream();
BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
FilterRegistry registry = new FilterRegistry(clientSocket.getInputStream(), clientSocket.getOutputStream());

synchronized (DEATH_NOTE) {
while (true) {
Expand All @@ -131,24 +132,14 @@ public static String start(String hostIpAddress, DockerClient client, boolean wi
}
}
List<Map.Entry<String, String>> filters = DEATH_NOTE.get(index);

String query = URLEncodedUtils.format(
filters.stream()
.map(it -> new BasicNameValuePair(it.getKey(), it.getValue()))
.collect(Collectors.toList()),
(String) null
);

log.debug("Sending '{}' to Ryuk", query);
out.write(query.getBytes());
out.write('\n');
out.flush();

while (!"ACK".equalsIgnoreCase(in.readLine())) {
boolean isAcknowledged = registry.register(filters);
if (isAcknowledged) {
log.debug("Received 'ACK' from Ryuk");
ryukScheduledLatch.countDown();
index++;
} else {
log.debug("Didn't receive 'ACK' from Ryuk. Will retry to send filters.");
}

ryukScheduledLatch.countDown();
index++;
}
}
} catch (IOException e) {
Expand Down Expand Up @@ -179,7 +170,6 @@ public synchronized static ResourceReaper instance() {

/**
* Perform a cleanup.
*
*/
public synchronized void performCleanup() {
registeredContainers.forEach(this::stopContainer);
Expand Down Expand Up @@ -362,4 +352,50 @@ private void setHook() {
Runtime.getRuntime().addShutdownHook(new Thread(DockerClientFactory.TESTCONTAINERS_THREAD_GROUP, this::performCleanup));
}
}

static class FilterRegistry {

@VisibleForTesting
static final String ACKNOWLEDGMENT = "ACK";

private final BufferedReader in;
private final OutputStream out;

FilterRegistry(InputStream ryukInputStream, OutputStream ryukOutputStream) {
this.in = new BufferedReader(new InputStreamReader(ryukInputStream));
this.out = ryukOutputStream;
}

/**
* Registers the given filters with Ryuk
*
* @param filters the filter to register
* @return true if the filters have been registered successfuly, false otherwise
* @throws IOException if communication with Ryuk fails
*/
protected boolean register(List<Map.Entry<String, String>> filters) throws IOException {
String query = URLEncodedUtils.format(
filters.stream()
.map(it -> new BasicNameValuePair(it.getKey(), it.getValue()))
.collect(Collectors.toList()),
(String) null
);

log.debug("Sending '{}' to Ryuk", query);
out.write(query.getBytes());
out.write('\n');
out.flush();

return waitForAcknowledgment(in);
}

private static boolean waitForAcknowledgment(BufferedReader in) throws IOException {
String line = in.readLine();
while (line != null && !ACKNOWLEDGMENT.equalsIgnoreCase(line)) {
line = in.readLine();
}
return ACKNOWLEDGMENT.equalsIgnoreCase(line);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.testcontainers.utility;

import org.junit.Test;
import org.testcontainers.utility.ResourceReaper.FilterRegistry;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.AbstractMap.SimpleEntry;
import java.util.List;
import java.util.Map.Entry;

import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class FilterRegistryTest {

private static final List<Entry<String, String>> FILTERS = asList(
new SimpleEntry<>("key1!", "value2?"), new SimpleEntry<>("key2#", "value2%")
);
private static final String URL_ENCODED_FILTERS = "key1%21=value2%3F&key2%23=value2%25";
private static final byte[] ACKNOWLEDGEMENT = FilterRegistry.ACKNOWLEDGMENT.getBytes();
private static final byte[] NO_ACKNOWLEDGEMENT = "".getBytes();
private static final String NEW_LINE = "\n";

@Test
public void registerReturnsTrueIfAcknowledgementIsReadFromInputStream() throws IOException {
FilterRegistry registry = new FilterRegistry(inputStream(ACKNOWLEDGEMENT), anyOutputStream());

boolean successful = registry.register(FILTERS);

assertTrue(successful);
}

@Test
public void registerReturnsFalseIfNoAcknowledgementIsReadFromInputStream() throws IOException {
FilterRegistry registry = new FilterRegistry(inputStream(NO_ACKNOWLEDGEMENT), anyOutputStream());

boolean successful = registry.register(FILTERS);

assertFalse(successful);
}

@Test
public void registerWritesUrlEncodedFiltersAndNewlineToOutputStream() throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
FilterRegistry registry = new FilterRegistry(anyInputStream(), outputStream);

registry.register(FILTERS);

assertEquals(URL_ENCODED_FILTERS + NEW_LINE, new String(outputStream.toByteArray()));
}

private static InputStream inputStream(byte[] bytes) {
return new ByteArrayInputStream(bytes);
}

private static InputStream anyInputStream() {
return inputStream(ACKNOWLEDGEMENT);
}

private static OutputStream anyOutputStream() {
return new ByteArrayOutputStream();
}

}

0 comments on commit 7ea1483

Please sign in to comment.