From 6b3110817875a096e4b8bf3c16143ecd8de3092b Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Sat, 2 Apr 2016 00:03:52 -0700 Subject: [PATCH] Fix MesosSupervisor.getMetadata This is intended to fix issue #119. With the introduction of the TaskAssignments refactor for creating unique task IDs (#106), I introduced a couple of bugs in the implementation of MesosSupervisor.getMetadata: 1. The slot counts in the Storm UI were broken -- the return from getMetadata was always a single element vector, due to using a Set instead of a java array previously. This was causing the PersistentVector.create(Object ... object) method to be matched, which just puts the passed objects into a vector without iterating over their constituent elements. Since we are passing a single Set object, we are getting a single element in the resultant vector. So the fix is to just create a List and pass that to PersistentVector.create(). 2. The returned Object must be serializable. Depending on the build and runtime environment, the serialization done by the storm supervisor during initialization will fail, crashing the supervisor. That was happening because we were passing back the ConcurrentHashMap$KeySetView object, which is not serializable. Here too, the fix is to just create a List and pass that to PersistentVector.create(). NOTE: I haven't been able to reproduce problem 2. unfortunately. --- storm/src/main/storm/mesos/MesosSupervisor.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/storm/src/main/storm/mesos/MesosSupervisor.java b/storm/src/main/storm/mesos/MesosSupervisor.java index e4cca161f..6459873d6 100644 --- a/storm/src/main/storm/mesos/MesosSupervisor.java +++ b/storm/src/main/storm/mesos/MesosSupervisor.java @@ -40,8 +40,10 @@ import storm.mesos.util.MesosCommon; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -108,7 +110,20 @@ public boolean confirmAssigned(int port) { @Override public Object getMetadata() { - Set ports = _taskAssignments.getAssignedPorts(); + /* + * Convert obtained Set into a List for 2 reasons: + * (1) Ensure returned object is serializable as required by storm's serialization + * of the SupervisorInfo while heartbeating. + * Previous to this change we were returning a ConcurrentHashMap$KeySetView, + * which is not necessarily serializable: + * http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4756277 + * (2) Ensure we properly instantiate the PersistentVector with all ports. + * If given a Set the PersistentVector.create() method will simply wrap the passed + * Set as a single element in the new vector. Whereas if you pass a java.util.List + * or clojure.lang.ISeq, then you get a true vector composed of the elements of the + * List or ISeq you passed. + */ + List ports = new ArrayList(_taskAssignments.getAssignedPorts()); if (ports == null) { return null; }