Skip to content

Commit

Permalink
create unique task ID per launch of a task onto a slot
Browse files Browse the repository at this point in the history
This is needed to avoid a problem with mesos-slave recovery resulting in LOST tasks.

i.e., we discovered that if you relaunch a topology's task onto the same worker slot (so there are 2 different instances with the same "task ID" that have run), then when the mesos-slave process is recovering, it terminates the task upon finding a "terminal" update in the recorded state of the task. The terminal state having been recorded the 1st time the task with that task ID stopped.

To solve this we ensure all task IDs are unique, by adding a milisecond-granularity timestamp onto the task IDs.
  • Loading branch information
erikdw committed Feb 22, 2016
1 parent 9bd5a3e commit a6f1d7e
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 11 deletions.
35 changes: 31 additions & 4 deletions storm/src/main/storm/mesos/MesosCommon.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,15 @@ public static String getMesosComponentNameDelimiter(Map conf, TopologyDetails in
.or(DEFAULT_MESOS_COMPONENT_NAME_DELIMITER);
}

public static String timestampMillis() {
long now = System.currentTimeMillis();
long sec = now / 1000L;
long msec = now % 1000L;
return String.valueOf(sec) + "." + String.valueOf(msec);
}

public static String taskId(String nodeid, int port) {
return nodeid + "-" + port;
return nodeid + "-" + port + "-" + timestampMillis();
}

public static String supervisorId(String nodeid, String topologyId) {
Expand All @@ -85,9 +92,29 @@ public static boolean startLogViewer(Map conf) {
}

public static int portFromTaskId(String taskId) {
int last = taskId.lastIndexOf("-");
String port = taskId.substring(last + 1);
return Integer.parseInt(port);
String[] parts = taskId.trim().split("-");
if (parts.length < 3) {
throw new IllegalArgumentException("TaskID " + taskId.trim() + " is invalid. " +
"Number of dash-delimited components (" + parts.length + ") is less than expected. " +
"Expected format is HOSTNAME-PORT-TIMESTAMP");
}

// TaskID format: HOSTNAME-PORT-TIMESTAMP. Notably, HOSTNAME can have dashes too,
// so the port is the 2nd-to-last part after splitting on dash.
String portString = parts[parts.length - 2];
int port;
try {
port = Integer.parseInt(portString);
} catch (NumberFormatException e) {
LOG.error("Failed to parse string (" + portString + ") that was supposed to contain a port.");
throw e;
}

if (port < 0 || port > 0xFFFF) {
throw new IllegalArgumentException(port + " is not a valid port number");
}

return port;
}

public static int getSuicideTimeout(Map conf) {
Expand Down
57 changes: 50 additions & 7 deletions storm/src/main/storm/mesos/MesosSupervisor.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package storm.mesos;



import backtype.storm.generated.JavaObject;
import backtype.storm.generated.JavaObjectArg;
import backtype.storm.scheduler.ISupervisor;
Expand Down Expand Up @@ -94,6 +92,11 @@ public void prepare(Map conf, String localDir) {
suicide.start();
}

/**
* Called by supervisor core to determine if the port is assigned to this
* supervisor, and thus whether a corresponding worker process should
* be killed or started.
*/
@Override
public boolean confirmAssigned(int port) {
String val = _state.get(Integer.toString(port));
Expand All @@ -107,7 +110,7 @@ public Object getMetadata() {
for (int i = 0; i < ports.length; i++) {
p[i] = Integer.parseInt((String) ports[i]);
}
return PersistentVector.create(p);
return PersistentVector.create((Object[]) p);
}

@Override
Expand All @@ -122,10 +125,12 @@ public String getAssignmentId() {

@Override
public void killedWorker(int port) {
LOG.info("killedWorker: removing port " + port + " from the 'assigned port state'");
String taskId = _state.get(Integer.toString(port));
_state.remove(Integer.toString(port));
TaskStatus status = TaskStatus.newBuilder()
.setState(TaskState.TASK_FINISHED)
.setTaskId(TaskID.newBuilder().setValue(MesosCommon.taskId(_assignmentId, port)))
.setTaskId(TaskID.newBuilder().setValue(taskId))
.build();
_driver.sendStatusUpdate(status);
}
Expand Down Expand Up @@ -156,19 +161,54 @@ public void registered(ExecutorDriver driver, ExecutorInfo executorInfo, Framewo

@Override
public void launchTask(ExecutorDriver driver, TaskInfo task) {
int port = MesosCommon.portFromTaskId(task.getTaskId().getValue());
LOG.info("Received task assignment for port " + port);
_state.put(Integer.toString(port), Boolean.TRUE.toString());
int port = 0;
try {
port = MesosCommon.portFromTaskId(task.getTaskId().getValue());
} catch (IllegalArgumentException e) {
String msg = "launchTask: failed to extract port from TaskID: " +
task.getTaskId().getValue() + ". Halting supervisor process.";
LOG.error(msg);
TaskStatus status = TaskStatus.newBuilder()
.setState(TaskState.TASK_FAILED)
.setTaskId(task.getTaskId())
.setMessage(msg)
.build();
driver.sendStatusUpdate(status);
Runtime.getRuntime().halt(1);
}

LOG.info("Received task assignment for port " + port + ". Mesos TaskID: " +
task.getTaskId().getValue());
// Record TaskID to be used later for sending a TASK_FINISHED update
// when the worker process is killed.
_state.put(Integer.toString(port), task.getTaskId().getValue());
TaskStatus status = TaskStatus.newBuilder()
.setState(TaskState.TASK_RUNNING)
.setTaskId(task.getTaskId())
.build();
driver.sendStatusUpdate(status);
}

/**
* If a failure occurs we halt this process, to avoid having an inconsistency
* between Mesos's view of the running tasks and which processes are actually
* running.
* Killing this supervisor process also kills any child worker processes.
*/
@Override
public void killTask(ExecutorDriver driver, TaskID id) {
int port = 0;
try {
port = MesosCommon.portFromTaskId(id.getValue());
} catch (IllegalArgumentException e) {
LOG.error("killTask: Halting executor process because we had a problem" +
" extracting the port from the TaskID: " + id.getValue(), e);
Runtime.getRuntime().halt(1);
}

LOG.info("killTask: killing task " + id.getValue() +
" which is running on port " + port);
_state.remove(Integer.toString(port));
}

@Override
Expand All @@ -177,6 +217,7 @@ public void frameworkMessage(ExecutorDriver driver, byte[] data) {

@Override
public void shutdown(ExecutorDriver driver) {
LOG.info("executor is being shutdown");
}

@Override
Expand All @@ -187,10 +228,12 @@ public void error(ExecutorDriver driver, String msg) {

@Override
public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) {
LOG.info("executor has reregistered with the mesos-slave");
}

@Override
public void disconnected(ExecutorDriver driver) {
LOG.info("executor has disconnected from the mesos-slave");
}

}
Expand Down

0 comments on commit a6f1d7e

Please sign in to comment.