Skip to content

Commit

Permalink
fix(discovery): should send notifications on credential store update (#…
Browse files Browse the repository at this point in the history
…1327)

* fix(discovery): should send notifications on credential store update

* feat(discovery): extract updated nodes and send MODIFIED notifications

Signed-off-by: Thuan Vo <thvo@redhat.com>

* deps(core): bump to v2.18.0

* chore(discovery): fix tests and refactor

* chore(discovery): refactor utilities

* chore(serviceref): use builder pattern

* fix(serviceref): fix spotbugs warnings

* chore(ref): refactor some helper utils

Signed-off-by: Thuan Vo <thvo@redhat.com>
Co-authored-by: Andrew Azores <aazores@redhat.com>
  • Loading branch information
Thuan Vo and andrewazores committed Jan 16, 2023
1 parent 29c4667 commit 5a1f5af
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 71 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
<com.google.dagger.version>2.44.2</com.google.dagger.version>
<com.google.dagger.compiler.version>2.26</com.google.dagger.compiler.version>

<io.cryostat.core.version>2.17.1</io.cryostat.core.version>
<io.cryostat.core.version>2.18.0</io.cryostat.core.version>

<org.openjdk.nashorn.core.version>15.4</org.openjdk.nashorn.core.version>
<org.apache.commons.lang3.version>3.12.0</org.apache.commons.lang3.version>
Expand Down
39 changes: 18 additions & 21 deletions src/main/java/io/cryostat/discovery/DiscoveryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void start(Promise<Void> future) throws Exception {
gson.fromJson(
plugin.getSubtree(),
EnvironmentNode.class);
update(id, original.getChildren(), false);
update(id, original.getChildren());
}
} catch (JsonSyntaxException | ScriptException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -318,38 +318,29 @@ private List<AbstractNode> modifyChildrenWithJvmIds(

public List<? extends AbstractNode> update(
UUID id, Collection<? extends AbstractNode> children) {
return update(id, children, true);
}

public List<? extends AbstractNode> update(
UUID id, Collection<? extends AbstractNode> children, boolean notify) {
var updatedChildren =
modifyChildrenWithJvmIds(id, Objects.requireNonNull(children, "children"));

PluginInfo plugin = dao.get(id).orElseThrow(() -> new NotFoundException(id));

EnvironmentNode original = gson.fromJson(plugin.getSubtree(), EnvironmentNode.class);
EnvironmentNode originalTree = gson.fromJson(plugin.getSubtree(), EnvironmentNode.class);
plugin = dao.update(id, updatedChildren);
logger.trace("Discovery Update {} ({}): {}", id, plugin.getRealm(), updatedChildren);
EnvironmentNode currentTree = gson.fromJson(plugin.getSubtree(), EnvironmentNode.class);

if (notify) {
Set<TargetNode> previousLeaves = findLeavesFrom(original);
Set<TargetNode> currentLeaves = findLeavesFrom(currentTree);
Set<ServiceRef> previousRefs = getRefsFromLeaves(findLeavesFrom(originalTree));
Set<ServiceRef> currentRefs = getRefsFromLeaves(findLeavesFrom(currentTree));

Set<TargetNode> added = new HashSet<>(currentLeaves);
added.removeAll(previousLeaves);
ServiceRef.compare(previousRefs).to(currentRefs).updated().stream()
.forEach(sr -> notifyAsyncTargetDiscovery(EventKind.MODIFIED, sr));

Set<TargetNode> removed = new HashSet<>(previousLeaves);
removed.removeAll(currentLeaves);
ServiceRef.compare(previousRefs).to(currentRefs).added().stream()
.forEach(sr -> notifyAsyncTargetDiscovery(EventKind.FOUND, sr));

ServiceRef.compare(previousRefs).to(currentRefs).removed().stream()
.forEach(sr -> notifyAsyncTargetDiscovery(EventKind.LOST, sr));
;

removed.stream()
.map(TargetNode::getTarget)
.forEach(sr -> notifyAsyncTargetDiscovery(EventKind.LOST, sr));
added.stream()
.map(TargetNode::getTarget)
.forEach(sr -> notifyAsyncTargetDiscovery(EventKind.FOUND, sr));
}
return currentTree.getChildren();
}

Expand Down Expand Up @@ -407,6 +398,12 @@ private Set<TargetNode> findLeavesFrom(AbstractNode node) {
throw new IllegalArgumentException(node.getClass().getCanonicalName());
}

public Set<ServiceRef> getRefsFromLeaves(Set<TargetNode> leaves) {
final Set<ServiceRef> refs = new HashSet<>();
leaves.stream().map(TargetNode::getTarget).forEach(r -> refs.add(r));
return refs;
}

public static class NotFoundException extends RuntimeException {
NotFoundException(UUID id) {
super(String.format("Unknown registration id: [%s]", id.toString()));
Expand Down
69 changes: 69 additions & 0 deletions src/main/java/io/cryostat/platform/ServiceRef.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
package io.cryostat.platform;

import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -218,4 +220,71 @@ public enum AnnotationKey {
REALM,
;
}

public static Compare compare(Collection<ServiceRef> src) {
return new Compare(src);
}

public static class Compare {
private Collection<ServiceRef> previous, current;

public Compare(Collection<ServiceRef> previous) {
this.previous = new HashSet<>(previous);
}

public Compare to(Collection<ServiceRef> current) {
this.current = new HashSet<>(current);
return this;
}

public Collection<ServiceRef> added() {
return removeAllUpdatedRefs(addedOrUpdatedRefs(), updated());
}

public Collection<ServiceRef> removed() {
return removeAllUpdatedRefs(removedOrUpdatedRefs(), updated());
}

public Collection<ServiceRef> updated() {
Collection<ServiceRef> updated = new HashSet<>();
intersection(removedOrUpdatedRefs(), addedOrUpdatedRefs(), false)
.forEach((ref) -> updated.add(ref));
return updated;
}

private Collection<ServiceRef> addedOrUpdatedRefs() {
Collection<ServiceRef> added = new HashSet<>(current);
added.removeAll(previous);
return added;
}

private Collection<ServiceRef> removedOrUpdatedRefs() {
Collection<ServiceRef> removed = new HashSet<>(previous);
removed.removeAll(current);
return removed;
}

private Collection<ServiceRef> removeAllUpdatedRefs(
Collection<ServiceRef> src, Collection<ServiceRef> updated) {
Collection<ServiceRef> tnSet = new HashSet<>(src);
intersection(src, updated, true).stream().forEach((ref) -> tnSet.remove(ref));
return tnSet;
}

private Collection<ServiceRef> intersection(
Collection<ServiceRef> src, Collection<ServiceRef> other, boolean keepOld) {
final Collection<ServiceRef> intersection = new HashSet<>();

// Manual removal since ServiceRef also compares jvmId
for (ServiceRef srcRef : src) {
for (ServiceRef otherRef : other) {
if (Objects.equals(srcRef.getServiceUri(), otherRef.getServiceUri())) {
intersection.add(keepOld ? srcRef : otherRef);
}
}
}

return intersection;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,21 +348,21 @@ public void onAdd(Endpoints endpoints) {

@Override
public void onUpdate(Endpoints oldEndpoints, Endpoints newEndpoints) {
List<ServiceRef> previousRefs = getServiceRefs(oldEndpoints);
List<ServiceRef> currentRefs = getServiceRefs(newEndpoints);
Set<ServiceRef> previousRefs = new HashSet<>(getServiceRefs(oldEndpoints));
Set<ServiceRef> currentRefs = new HashSet<>(getServiceRefs(newEndpoints));

if (previousRefs.equals(currentRefs)) {
return;
}

Set<ServiceRef> added = new HashSet<>(currentRefs);
added.removeAll(previousRefs);
ServiceRef.compare(previousRefs).to(currentRefs).updated().stream()
.forEach(sr -> notifyAsyncTargetDiscovery(EventKind.MODIFIED, sr));

Set<ServiceRef> removed = new HashSet<>(previousRefs);
removed.removeAll(currentRefs);
ServiceRef.compare(previousRefs).to(currentRefs).added().stream()
.forEach(sr -> notifyAsyncTargetDiscovery(EventKind.FOUND, sr));

removed.stream().forEach(sr -> notifyAsyncTargetDiscovery(EventKind.LOST, sr));
added.stream().forEach(sr -> notifyAsyncTargetDiscovery(EventKind.FOUND, sr));
ServiceRef.compare(previousRefs).to(currentRefs).removed().stream()
.forEach(sr -> notifyAsyncTargetDiscovery(EventKind.LOST, sr));
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/cryostat/rules/RuleProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ public synchronized void accept(TargetDiscoveryEvent tde) {
case LOST:
deactivate(null, tde.getServiceRef());
break;
case MODIFIED:
break;
default:
throw new UnsupportedOperationException(tde.getEventKind().toString());
}
Expand Down
86 changes: 58 additions & 28 deletions src/test/java/io/cryostat/discovery/DiscoveryStorageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,7 @@ void updatesDaoAndEmitsFoundAndLostNotifications() throws Exception {
new EnvironmentNode("prev", BaseNodeType.REALM, Map.of(), Set.of(prevTarget));

ServiceRef nextServiceRef =
new ServiceRef(
"id",
URI.create("service:jmx:rmi:///jndi/rmi://localhost/jmxrmi"),
"nextServiceRef");
new ServiceRef("new_id", prevServiceRef.getServiceUri(), "nextServiceRef");
TargetNode nextTarget = new TargetNode(BaseNodeType.JVM, nextServiceRef);
EnvironmentNode next =
new EnvironmentNode("next", BaseNodeType.REALM, Map.of(), Set.of(nextTarget));
Expand All @@ -480,14 +477,12 @@ void updatesDaoAndEmitsFoundAndLostNotifications() throws Exception {
List<? extends AbstractNode> updatedChildren = storage.update(id, List.of(nextTarget));

MatcherAssert.assertThat(updatedChildren, Matchers.equalTo(List.of(nextTarget)));
MatcherAssert.assertThat(discoveryEvents, Matchers.hasSize(2));
MatcherAssert.assertThat(discoveryEvents, Matchers.hasSize(1));

TargetDiscoveryEvent lostEvent =
new TargetDiscoveryEvent(EventKind.LOST, prevServiceRef);
TargetDiscoveryEvent foundEvent =
new TargetDiscoveryEvent(EventKind.FOUND, nextServiceRef);
MatcherAssert.assertThat(
discoveryEvents, Matchers.containsInRelativeOrder(lostEvent, foundEvent));
TargetDiscoveryEvent modifiedEvent =
new TargetDiscoveryEvent(EventKind.MODIFIED, nextServiceRef);
System.out.println(modifiedEvent.getServiceRef());
MatcherAssert.assertThat(discoveryEvents, Matchers.contains(modifiedEvent));
}
}

Expand Down Expand Up @@ -687,21 +682,41 @@ public ServiceRef answer(InvocationOnMock invocation) throws Throwable {
null,
URI.create("service:jmx:rmi:///jndi/rmi://localhost:1/jmxrmi"),
"serviceRef1");
ServiceRef updatedServiceRef1 =
new ServiceRef(
serviceRef1.getAlias().get(),
serviceRef1.getServiceUri(),
serviceRef1.getAlias().get());
ServiceRef serviceRef2 =
new ServiceRef(
null,
URI.create("service:jmx:rmi:///jndi/rmi://localhost:2/jmxrmi"),
"serviceRef2");
ServiceRef updatedServiceRef2 =
new ServiceRef(
serviceRef2.getAlias().get(),
serviceRef2.getServiceUri(),
serviceRef2.getAlias().get());
ServiceRef serviceRef3 =
new ServiceRef(
null,
URI.create("service:jmx:rmi:///jndi/rmi://localhost:3/jmxrmi"),
"serviceRef3");
ServiceRef updatedServiceRef3 =
new ServiceRef(
serviceRef3.getAlias().get(),
serviceRef3.getServiceUri(),
serviceRef3.getAlias().get());
ServiceRef serviceRef4 =
new ServiceRef(
null,
URI.create("service:jmx:rmi:///jndi/rmi://localhost:4/jmxrmi"),
"serviceRef4");
ServiceRef updatedServiceRef4 =
new ServiceRef(
serviceRef4.getAlias().get(),
serviceRef4.getServiceUri(),
serviceRef4.getAlias().get());
TargetNode target1 = new TargetNode(BaseNodeType.JVM, serviceRef1);
TargetNode target2 = new TargetNode(BaseNodeType.JVM, serviceRef2);
TargetNode target3 = new TargetNode(BaseNodeType.JVM, serviceRef3);
Expand All @@ -713,7 +728,10 @@ public ServiceRef answer(InvocationOnMock invocation) throws Throwable {
new EnvironmentNode("next", BaseNodeType.REALM, Map.of(), Set.of(target4));
EnvironmentNode realm2 =
new EnvironmentNode(
"next", BaseNodeType.REALM, Map.of(), Set.of(target1, target2, agent));
"next",
BaseNodeType.REALM,
Map.of(),
Set.of(target4, target1, target2, agent));

PluginInfo prevPlugin =
new PluginInfo("test-realm", URI.create("http://example.com"), gson.toJson(realm1));
Expand All @@ -735,36 +753,48 @@ public PluginInfo answer(InvocationOnMock invocation) throws Throwable {
}
});

List<TargetDiscoveryEvent> discoveryEvents = new ArrayList<>();
storage.addTargetDiscoveryListener(discoveryEvents::add);

var updatedSubtree = storage.update(id, List.of(realm2));
MatcherAssert.assertThat(updatedSubtree, Matchers.notNullValue());
MatcherAssert.assertThat(updatedSubtree, Matchers.hasSize(1));
for (AbstractNode node : updatedSubtree) {

if (node instanceof TargetNode) {
TargetNode target = (TargetNode) node;
AbstractNode node = updatedSubtree.get(0); // realm2
MatcherAssert.assertThat(node, Matchers.instanceOf(EnvironmentNode.class));

for (AbstractNode childNode : ((EnvironmentNode) node).getChildren()) {
if (childNode instanceof TargetNode) {
TargetNode target = (TargetNode) childNode;
MatcherAssert.assertThat(
target.getTarget().getAlias().isPresent(), Matchers.is(true));
MatcherAssert.assertThat(target.getTarget().getJvmId(), Matchers.notNullValue());
MatcherAssert.assertThat(
target.getTarget().getJvmId(),
Matchers.equalTo(target.getTarget().getAlias().get()));
} else {
MatcherAssert.assertThat(node, Matchers.instanceOf(EnvironmentNode.class));
EnvironmentNode env = (EnvironmentNode) node;
for (AbstractNode nested : env.getChildren()) {
if (nested instanceof TargetNode) {
TargetNode target = (TargetNode) nested;
MatcherAssert.assertThat(
target.getTarget().getAlias().isPresent(), Matchers.is(true));
MatcherAssert.assertThat(
target.getTarget().getJvmId(), Matchers.notNullValue());
MatcherAssert.assertThat(
target.getTarget().getJvmId(),
Matchers.equalTo(target.getTarget().getAlias().get()));
}
MatcherAssert.assertThat(childNode, Matchers.instanceOf(EnvironmentNode.class));
for (AbstractNode nestedNode : ((EnvironmentNode) childNode).getChildren()) {
TargetNode target = (TargetNode) nestedNode;
MatcherAssert.assertThat(
target.getTarget().getAlias().isPresent(), Matchers.is(true));
MatcherAssert.assertThat(
target.getTarget().getJvmId(), Matchers.notNullValue());
MatcherAssert.assertThat(
target.getTarget().getJvmId(),
Matchers.equalTo(target.getTarget().getAlias().get()));
}
}
}

MatcherAssert.assertThat(discoveryEvents, Matchers.hasSize(4));
MatcherAssert.assertThat(
discoveryEvents,
Matchers.containsInAnyOrder(
new TargetDiscoveryEvent(EventKind.FOUND, updatedServiceRef1),
new TargetDiscoveryEvent(EventKind.FOUND, updatedServiceRef2),
new TargetDiscoveryEvent(EventKind.FOUND, updatedServiceRef3),
new TargetDiscoveryEvent(EventKind.MODIFIED, updatedServiceRef4)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,8 @@ public JMXServiceURL answer(InvocationOnMock args) throws Throwable {
.endSubset()
.build();

CountDownLatch latch = new CountDownLatch(3);
Queue<TargetDiscoveryEvent> events = new ArrayDeque<>(3);
CountDownLatch latch = new CountDownLatch(2);
Queue<TargetDiscoveryEvent> events = new ArrayDeque<>(2);
platformClient.addTargetDiscoveryListener(
tde -> {
events.add(tde);
Expand Down Expand Up @@ -702,7 +702,7 @@ public JMXServiceURL answer(InvocationOnMock args) throws Throwable {
latch.await();
Thread.sleep(100); // to ensure no more events are coming

MatcherAssert.assertThat(events, Matchers.hasSize(3));
MatcherAssert.assertThat(events, Matchers.hasSize(2));

ServiceRef original =
new ServiceRef(
Expand Down Expand Up @@ -740,16 +740,13 @@ public JMXServiceURL answer(InvocationOnMock args) throws Throwable {
AnnotationKey.POD_NAME,
"modifiedTarget"));

TargetDiscoveryEvent found = events.remove();
MatcherAssert.assertThat(found.getEventKind(), Matchers.equalTo(EventKind.FOUND));
MatcherAssert.assertThat(found.getServiceRef(), Matchers.equalTo(original));
TargetDiscoveryEvent foundEvent = events.remove();
MatcherAssert.assertThat(foundEvent.getEventKind(), Matchers.equalTo(EventKind.FOUND));
MatcherAssert.assertThat(foundEvent.getServiceRef(), Matchers.equalTo(original));

TargetDiscoveryEvent lost = events.remove();
MatcherAssert.assertThat(lost.getEventKind(), Matchers.equalTo(EventKind.LOST));
MatcherAssert.assertThat(lost.getServiceRef(), Matchers.equalTo(original));

TargetDiscoveryEvent refound = events.remove();
MatcherAssert.assertThat(refound.getEventKind(), Matchers.equalTo(EventKind.FOUND));
MatcherAssert.assertThat(refound.getServiceRef(), Matchers.equalTo(modified));
TargetDiscoveryEvent modifiedEvent = events.remove();
MatcherAssert.assertThat(
modifiedEvent.getEventKind(), Matchers.equalTo(EventKind.MODIFIED));
MatcherAssert.assertThat(modifiedEvent.getServiceRef(), Matchers.equalTo(modified));
}
}

0 comments on commit 5a1f5af

Please sign in to comment.