Skip to content

Commit

Permalink
Fix MesosSupervisor.getMetadata
Browse files Browse the repository at this point in the history
This is intended to fix issue #119.

With the introduction of the TaskAssignments refactor for creating unique
task IDs (#106), I introduced a couple of bugs in the implementation of
MesosSupervisor.getMetadata:

1. The slot counts in the Storm UI were broken -- the return from
   getMetadata was always a single element vector, due to using a
   Set instead of a java array previously. This was causing the
   PersistentVector.create(Object ... object) method to be matched,
   which just puts the passed objects into a vector without iterating
   over their constituent elements.  Since we are passing a single
   Set object, we are getting a single element in the resultant vector.
   So the fix is to just create a List and pass that to
   PersistentVector.create().
2. The returned Object must be serializable.  Depending on the
   build and runtime environment, the serialization done by the storm
   supervisor during initialization will fail, crashing the supervisor.
   That was happening because we were passing back the
   ConcurrentHashMap$KeySetView object, which is not serializable.
   Here too, the fix is to just create a List and pass that to
   PersistentVector.create().

NOTE: I haven't been able to reproduce problem 2. unfortunately.
  • Loading branch information
erikdw committed Apr 2, 2016
1 parent b6f232a commit 6b31108
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion storm/src/main/storm/mesos/MesosSupervisor.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import storm.mesos.util.MesosCommon;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -108,7 +110,20 @@ public boolean confirmAssigned(int port) {

@Override
public Object getMetadata() {
Set<Integer> ports = _taskAssignments.getAssignedPorts();
/*
* Convert obtained Set into a List for 2 reasons:
* (1) Ensure returned object is serializable as required by storm's serialization
* of the SupervisorInfo while heartbeating.
* Previous to this change we were returning a ConcurrentHashMap$KeySetView,
* which is not necessarily serializable:
* http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4756277
* (2) Ensure we properly instantiate the PersistentVector with all ports.
* If given a Set the PersistentVector.create() method will simply wrap the passed
* Set as a single element in the new vector. Whereas if you pass a java.util.List
* or clojure.lang.ISeq, then you get a true vector composed of the elements of the
* List or ISeq you passed.
*/
List ports = new ArrayList(_taskAssignments.getAssignedPorts());
if (ports == null) {
return null;
}
Expand Down

0 comments on commit 6b31108

Please sign in to comment.