Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unique TaskIDs #106

Merged
merged 2 commits into from
Mar 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 57 additions & 27 deletions storm/src/main/storm/mesos/MesosSupervisor.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package storm.mesos;


import backtype.storm.scheduler.ISupervisor;
import backtype.storm.utils.Utils;
import clojure.lang.PersistentVector;
Expand Down Expand Up @@ -51,13 +50,16 @@
public class MesosSupervisor implements ISupervisor {
public static final Logger LOG = LoggerFactory.getLogger(MesosSupervisor.class);

volatile String _id = null;
volatile String _executorId = null;
volatile String _supervisorId = null;
volatile String _assignmentId = null;
volatile ExecutorDriver _driver;
StormExecutor _executor;
ILocalStateShim _state;
Map _conf;
AtomicReference<Set<Integer>> _myassigned = new AtomicReference<Set<Integer>>(new HashSet<Integer>());
// Store state on port assignments arriving from MesosNimbus as task-launching requests.
private static final TaskAssignments _taskAssignments = TaskAssignments.getInstance();
// What is the storm-core supervisor's view of the assigned ports?
AtomicReference<Set<Integer>> _supervisorViewOfAssignedPorts = new AtomicReference<Set<Integer>>(new HashSet<Integer>());

public static void main(String[] args) {
backtype.storm.daemon.supervisor.launch(new MesosSupervisor());
Expand All @@ -66,16 +68,11 @@ public static void main(String[] args) {
@Override
public void assigned(Collection<Integer> ports) {
if (ports == null) ports = new HashSet<>();
_myassigned.set(new HashSet<>(ports));
_supervisorViewOfAssignedPorts.set(new HashSet<>(ports));
}

@Override
public void prepare(Map conf, String localDir) {
try {
_state = new LocalStateShim(localDir);
} catch (IOException e) {
throw new RuntimeException(e);
}
_executor = new StormExecutor();
_driver = new MesosExecutorDriver(_executor);
_driver.start();
Expand All @@ -99,25 +96,28 @@ public void prepare(Map conf, String localDir) {
suicide.start();
}

/**
* Called by storm-core supervisor to determine if the port is assigned to this
* supervisor, and thus whether a corresponding worker process should be
* killed or started.
*/
@Override
public boolean confirmAssigned(int port) {
String val = _state.get(Integer.toString(port));
return val != null;
return _taskAssignments.confirmAssigned(port);
}

@Override
public Object getMetadata() {
Object[] ports = _state.snapshot().keySet().toArray();
Integer[] p = new Integer[ports.length];
for (int i = 0; i < ports.length; i++) {
p[i] = Integer.parseInt((String) ports[i]);
Set<Integer> ports = _taskAssignments.getAssignedPorts();
if (ports == null) {
return null;
}
return PersistentVector.create(p);
return PersistentVector.create(ports);
}

@Override
public String getSupervisorId() {
return _id;
return _supervisorId;
}

@Override
Expand All @@ -127,10 +127,17 @@ public String getAssignmentId() {

@Override
public void killedWorker(int port) {
_state.remove(Integer.toString(port));
LOG.info("killedWorker: executor {} removing port {} assignment and sending " +
"TASK_FINISHED update to Mesos", _executorId, port);
TaskID taskId = _taskAssignments.deregister(port);
if (taskId == null) {
LOG.error("killedWorker: Executor {} failed to find TaskID for port {}, so not " +
"issuing TaskStatus update to Mesos for this dead task.", _executorId, port);
return;
}
TaskStatus status = TaskStatus.newBuilder()
.setState(TaskState.TASK_FINISHED)
.setTaskId(TaskID.newBuilder().setValue(MesosCommon.taskId(_assignmentId, port)))
.setTaskId(taskId)
.build();
_driver.sendStatusUpdate(status);
}
Expand All @@ -150,9 +157,10 @@ public void waitUntilRegistered() throws InterruptedException {
public void registered(ExecutorDriver driver, ExecutorInfo executorInfo, FrameworkInfo frameworkInfo, SlaveInfo slaveInfo) {
LOG.info("Received executor data <{}>", executorInfo.getData().toStringUtf8());
Map ids = (Map) JSONValue.parse(executorInfo.getData().toStringUtf8());
_id = (String) ids.get(MesosCommon.SUPERVISOR_ID);
_executorId = executorInfo.getExecutorId().getValue();
_supervisorId = (String) ids.get(MesosCommon.SUPERVISOR_ID);
_assignmentId = (String) ids.get(MesosCommon.ASSIGNMENT_ID);
LOG.info("Registered supervisor with Mesos: {}, {} ", _id, _assignmentId);
LOG.info("Registered supervisor with Mesos: {}, {} ", _supervisorId, _assignmentId);

// Completed registration, let anything waiting for us to do so continue
_registeredLatch.countDown();
Expand All @@ -161,10 +169,26 @@ public void registered(ExecutorDriver driver, ExecutorInfo executorInfo, Framewo

@Override
public void launchTask(ExecutorDriver driver, TaskInfo task) {
int port = MesosCommon.portFromTaskId(task.getTaskId().getValue());
LOG.info("Received task assignment for port {}", port);
_state.put(Integer.toString(port), Boolean.TRUE.toString());

try {
int port = _taskAssignments.register(task.getTaskId());
LOG.info("Executor {} received task assignment for port {}. Mesos TaskID: {}",
_executorId, port, task.getTaskId().getValue());
} catch (IllegalArgumentException e) {
String msg =
String.format("launchTask: failed to register task. " +
"Exception: %s Halting supervisor process.",
e.getMessage());
LOG.error(msg);
TaskStatus status = TaskStatus.newBuilder()
.setState(TaskState.TASK_FAILED)
.setTaskId(task.getTaskId())
.setMessage(msg)
.build();
driver.sendStatusUpdate(status);
Runtime.getRuntime().halt(1);
}
LOG.info("Received task assignment for TaskID: {} ",
task.getTaskId().getValue());
TaskStatus status = TaskStatus.newBuilder()
.setState(TaskState.TASK_RUNNING)
.setTaskId(task.getTaskId())
Expand All @@ -174,6 +198,8 @@ public void launchTask(ExecutorDriver driver, TaskInfo task) {

@Override
public void killTask(ExecutorDriver driver, TaskID id) {
LOG.warn("killTask not implemented in executor {}, so " +
"cowardly refusing to kill task {}", _executorId, id.getValue());
}

@Override
Expand All @@ -182,6 +208,8 @@ public void frameworkMessage(ExecutorDriver driver, byte[] data) {

@Override
public void shutdown(ExecutorDriver driver) {
LOG.warn("shutdown not implemented in executor {}, so " +
"cowardly refusing to kill tasks", _executorId);
}

@Override
Expand All @@ -192,10 +220,12 @@ public void error(ExecutorDriver driver, String msg) {

@Override
public void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo) {
LOG.info("executor has reregistered with the mesos-slave");
}

@Override
public void disconnected(ExecutorDriver driver) {
LOG.info("executor has disconnected from the mesos-slave");
}

}
Expand All @@ -213,7 +243,7 @@ public void run() {
try {
while (true) {
long now = System.currentTimeMillis();
if (!_myassigned.get().isEmpty()) {
if (!_supervisorViewOfAssignedPorts.get().isEmpty()) {
_lastTime = now;
}
if ((now - _lastTime) > 1000L * _timeoutSecs) {
Expand Down
148 changes: 148 additions & 0 deletions storm/src/main/storm/mesos/TaskAssignments.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.mesos;

import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.mesos.Protos.TaskID;

import storm.mesos.util.MesosCommon;

/**
* Tracks the Mesos Tasks / Storm Worker Processes that have been assigned
* to this MesosSupervisor instance.
*
* Serves as a bridge between the storm-core supervisor logic and the
* mesos executor logic.
*
* Tasks are assigned or removed via either:
* Storm (storm-core supervisor) calling MesosSupervisor's ISupervisor interfaces
* Mesos (mesos executor driver) calling MesosSupervisor's Executor interfaces
*
* This abstraction allows the MesosSupervisor code to stay a bit tighter and cleaner,
* while still giving the power to track the TaskID across deactivation of a port.
* That allows calls through the Mesos interfaces to deactivate a port and then have
* the storm-core supervisor see that the port is no longer active (via false return from
* ISupervisor.confirmAssigned), and thus drives the storm-core supervisor to kill the
* worker process associated with the port.
*/
public enum TaskAssignments {
INSTANCE; // This is a singleton class; see Effective Java (2nd Edition): Item 3

// Alternative to a constructor for this enum-based singleton
public static TaskAssignments getInstance() {
return INSTANCE;
}

private enum TaskState {
ACTIVE,
INACTIVE
}

// NOTE: this doesn't *really* need to be Serializable -- this is only done to avoid
// an exception thrown during mk-supervisor if the ISupervisor object isn't serializable.
private static class AssignmentInfo implements Serializable {
public final transient TaskID taskId;
public final transient TaskState taskState;

public AssignmentInfo(TaskID taskId, TaskState taskState) {
this.taskId = taskId;
this.taskState = taskState;
}
}

// Map of ports to their assigned tasks' info.
private final Map<Integer, AssignmentInfo> portAssignments = new ConcurrentHashMap<>();

public Set<Integer> getAssignedPorts() {
return portAssignments.keySet();
}

/**
* Signal to the storm-core supervisor that this task should be started.
*/
public int register(TaskID taskId) throws IllegalArgumentException {
int port = MesosCommon.portFromTaskId(taskId.getValue());
AssignmentInfo existingAssignment = portAssignments.get(port);
if (existingAssignment != null) {
throw new IllegalArgumentException("Refusing to register task " + taskId.getValue() +
" because its port " + port + " is already registered for task " +
existingAssignment.taskId.getValue());
}
portAssignments.put(port, new AssignmentInfo(taskId, TaskState.ACTIVE));
return port;
}

/**
* This task has been killed by the storm-core supervisor, ditch all
* knowledge of it.
*/
public TaskID deregister(int port) {
AssignmentInfo assignment = portAssignments.remove(port);
if (assignment != null) {
return assignment.taskId;
}
return null;
}

/**
* Is this port active? (And thus also registered?)
*
* Used by storm-core supervisor to determine if a worker process should be
* launched or killed on the specified port.
*/
public boolean confirmAssigned(int port) {
AssignmentInfo assignment = portAssignments.get(port);
if (assignment != null) {
return (assignment.taskState == TaskState.ACTIVE);
}
return false;
}

/**
* Signal to the storm-core supervisor that this task should be killed.
*
* This method provides a way for Mesos (well, calls routed through Mesos to the Executor
* interface implemented by MesosSupervisor) to kill the worker processes (mesos tasks).
*
* This works by changing the state of an existing assignment from active to inactive,
* thus signaling to the storm-core supervisor that the assignment is no longer active.
* And that works by the storm-core supervisor checking the active/inactive state via a
* periodic call to ISupervisor.confirmAssigned(), which calls TaskAssignments.confirmAssigned().
* Notably, we must retain *some* knowledge about the assignment despite it being deactivated,
* namely the TaskID, so that when the storm-core supervisor calls ISupervisor.killedWorker()
* we can send a TASK_FINISHED TaskStatus update to Mesos. That is required for Mesos to
* learn of the task having been killed, otherwise Mesos would think the task is still running.
*
* NOTE: This isn't used *yet*, as we haven't spent time to code and test
* MesosSupervisor.killTask() and MesosSupervisor.shutdown().
*/
public int deactivate(TaskID taskId) throws IllegalArgumentException {
int port = MesosCommon.portFromTaskId(taskId.getValue());
AssignmentInfo assignment = portAssignments.get(port);
if (assignment != null) {
portAssignments.put(port, new AssignmentInfo(taskId, TaskState.INACTIVE));
return port;
}
return 0;
}

}
37 changes: 33 additions & 4 deletions storm/src/main/storm/mesos/util/MesosCommon.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class MesosCommon {
public static final Logger LOG = LoggerFactory.getLogger(MesosCommon.class);
Expand Down Expand Up @@ -74,8 +75,15 @@ public static String getMesosComponentNameDelimiter(Map conf, TopologyDetails in
.or(DEFAULT_MESOS_COMPONENT_NAME_DELIMITER);
}

public static String timestampMillis() {
long now = System.currentTimeMillis();
long secs = TimeUnit.MILLISECONDS.toSeconds(now);
long msecs = now - TimeUnit.SECONDS.toMillis(secs);
return String.format("%d.%03d", secs, msecs);
}

public static String taskId(String nodeid, int port) {
return nodeid + "-" + port;
return nodeid + "-" + port + "-" + timestampMillis();
}

public static String supervisorId(String nodeid, String topologyId) {
Expand All @@ -87,9 +95,30 @@ public static boolean startLogViewer(Map conf) {
}

public static int portFromTaskId(String taskId) {
int last = taskId.lastIndexOf("-");
String port = taskId.substring(last + 1);
return Integer.parseInt(port);
String[] parts = taskId.trim().split("-");
if (parts.length < 3) {
throw new IllegalArgumentException(String.format("TaskID %s is invalid. " +
"Number of dash-delimited components (%d) is less than expected. " +
"Expected format is HOSTNAME-PORT-TIMESTAMP", taskId.trim(), parts.length));
}

// TaskID format: HOSTNAME-PORT-TIMESTAMP. Notably, HOSTNAME can have dashes too,
// so the port is the 2nd-to-last part after splitting on dash.
String portString = parts[parts.length - 2];
int port;
try {
port = Integer.parseInt(portString);
} catch (NumberFormatException e) {
LOG.error(String.format("Failed to parse string (%s) that was supposed to contain a port.",
portString));
throw e;
}

if (port < 0 || port > 0xFFFF) {
throw new IllegalArgumentException(String.format("%d is not a valid port number.", port));
}

return port;
}

public static int getSuicideTimeout(Map conf) {
Expand Down