From a6f1d7e1426a2c86999df81188ff389425630ad3 Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Sun, 21 Feb 2016 23:59:43 -0800 Subject: [PATCH] create unique task ID per launch of a task onto a slot This is needed to avoid a problem with mesos-slave recovery resulting in LOST tasks. i.e., we discovered that if you relaunch a topology's task onto the same worker slot (so there are 2 different instances with the same "task ID" that have run), then when the mesos-slave process is recovering, it terminates the task upon finding a "terminal" update in the recorded state of the task. The terminal state having been recorded the 1st time the task with that task ID stopped. To solve this we ensure all task IDs are unique, by adding a milisecond-granularity timestamp onto the task IDs. --- storm/src/main/storm/mesos/MesosCommon.java | 35 ++++++++++-- .../src/main/storm/mesos/MesosSupervisor.java | 57 ++++++++++++++++--- 2 files changed, 81 insertions(+), 11 deletions(-) diff --git a/storm/src/main/storm/mesos/MesosCommon.java b/storm/src/main/storm/mesos/MesosCommon.java index 344192775..f0470787f 100644 --- a/storm/src/main/storm/mesos/MesosCommon.java +++ b/storm/src/main/storm/mesos/MesosCommon.java @@ -72,8 +72,15 @@ public static String getMesosComponentNameDelimiter(Map conf, TopologyDetails in .or(DEFAULT_MESOS_COMPONENT_NAME_DELIMITER); } + public static String timestampMillis() { + long now = System.currentTimeMillis(); + long sec = now / 1000L; + long msec = now % 1000L; + return String.valueOf(sec) + "." + String.valueOf(msec); + } + public static String taskId(String nodeid, int port) { - return nodeid + "-" + port; + return nodeid + "-" + port + "-" + timestampMillis(); } public static String supervisorId(String nodeid, String topologyId) { @@ -85,9 +92,29 @@ public static boolean startLogViewer(Map conf) { } public static int portFromTaskId(String taskId) { - int last = taskId.lastIndexOf("-"); - String port = taskId.substring(last + 1); - return Integer.parseInt(port); + String[] parts = taskId.trim().split("-"); + if (parts.length < 3) { + throw new IllegalArgumentException("TaskID " + taskId.trim() + " is invalid. " + + "Number of dash-delimited components (" + parts.length + ") is less than expected. " + + "Expected format is HOSTNAME-PORT-TIMESTAMP"); + } + + // TaskID format: HOSTNAME-PORT-TIMESTAMP. Notably, HOSTNAME can have dashes too, + // so the port is the 2nd-to-last part after splitting on dash. + String portString = parts[parts.length - 2]; + int port; + try { + port = Integer.parseInt(portString); + } catch (NumberFormatException e) { + LOG.error("Failed to parse string (" + portString + ") that was supposed to contain a port."); + throw e; + } + + if (port < 0 || port > 0xFFFF) { + throw new IllegalArgumentException(port + " is not a valid port number"); + } + + return port; } public static int getSuicideTimeout(Map conf) { diff --git a/storm/src/main/storm/mesos/MesosSupervisor.java b/storm/src/main/storm/mesos/MesosSupervisor.java index 446205e9f..8ffe3c14e 100644 --- a/storm/src/main/storm/mesos/MesosSupervisor.java +++ b/storm/src/main/storm/mesos/MesosSupervisor.java @@ -17,8 +17,6 @@ */ package storm.mesos; - - import backtype.storm.generated.JavaObject; import backtype.storm.generated.JavaObjectArg; import backtype.storm.scheduler.ISupervisor; @@ -94,6 +92,11 @@ public void prepare(Map conf, String localDir) { suicide.start(); } + /** + * Called by supervisor core to determine if the port is assigned to this + * supervisor, and thus whether a corresponding worker process should + * be killed or started. + */ @Override public boolean confirmAssigned(int port) { String val = _state.get(Integer.toString(port)); @@ -107,7 +110,7 @@ public Object getMetadata() { for (int i = 0; i < ports.length; i++) { p[i] = Integer.parseInt((String) ports[i]); } - return PersistentVector.create(p); + return PersistentVector.create((Object[]) p); } @Override @@ -122,10 +125,12 @@ public String getAssignmentId() { @Override public void killedWorker(int port) { + LOG.info("killedWorker: removing port " + port + " from the 'assigned port state'"); + String taskId = _state.get(Integer.toString(port)); _state.remove(Integer.toString(port)); TaskStatus status = TaskStatus.newBuilder() .setState(TaskState.TASK_FINISHED) - .setTaskId(TaskID.newBuilder().setValue(MesosCommon.taskId(_assignmentId, port))) + .setTaskId(TaskID.newBuilder().setValue(taskId)) .build(); _driver.sendStatusUpdate(status); } @@ -156,10 +161,27 @@ public void registered(ExecutorDriver driver, ExecutorInfo executorInfo, Framewo @Override public void launchTask(ExecutorDriver driver, TaskInfo task) { - int port = MesosCommon.portFromTaskId(task.getTaskId().getValue()); - LOG.info("Received task assignment for port " + port); - _state.put(Integer.toString(port), Boolean.TRUE.toString()); + int port = 0; + try { + port = MesosCommon.portFromTaskId(task.getTaskId().getValue()); + } catch (IllegalArgumentException e) { + String msg = "launchTask: failed to extract port from TaskID: " + + task.getTaskId().getValue() + ". Halting supervisor process."; + LOG.error(msg); + TaskStatus status = TaskStatus.newBuilder() + .setState(TaskState.TASK_FAILED) + .setTaskId(task.getTaskId()) + .setMessage(msg) + .build(); + driver.sendStatusUpdate(status); + Runtime.getRuntime().halt(1); + } + LOG.info("Received task assignment for port " + port + ". Mesos TaskID: " + + task.getTaskId().getValue()); + // Record TaskID to be used later for sending a TASK_FINISHED update + // when the worker process is killed. + _state.put(Integer.toString(port), task.getTaskId().getValue()); TaskStatus status = TaskStatus.newBuilder() .setState(TaskState.TASK_RUNNING) .setTaskId(task.getTaskId()) @@ -167,8 +189,26 @@ public void launchTask(ExecutorDriver driver, TaskInfo task) { driver.sendStatusUpdate(status); } + /** + * If a failure occurs we halt this process, to avoid having an inconsistency + * between Mesos's view of the running tasks and which processes are actually + * running. + * Killing this supervisor process also kills any child worker processes. + */ @Override public void killTask(ExecutorDriver driver, TaskID id) { + int port = 0; + try { + port = MesosCommon.portFromTaskId(id.getValue()); + } catch (IllegalArgumentException e) { + LOG.error("killTask: Halting executor process because we had a problem" + + " extracting the port from the TaskID: " + id.getValue(), e); + Runtime.getRuntime().halt(1); + } + + LOG.info("killTask: killing task " + id.getValue() + + " which is running on port " + port); + _state.remove(Integer.toString(port)); } @Override @@ -177,6 +217,7 @@ public void frameworkMessage(ExecutorDriver driver, byte[] data) { @Override public void shutdown(ExecutorDriver driver) { + LOG.info("executor is being shutdown"); } @Override @@ -187,10 +228,12 @@ public void error(ExecutorDriver driver, String msg) { @Override public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) { + LOG.info("executor has reregistered with the mesos-slave"); } @Override public void disconnected(ExecutorDriver driver) { + LOG.info("executor has disconnected from the mesos-slave"); } }