Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEW] Schedule workers on consolidated offers #154

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5dcd80c
New package for Resource Collection to address the following problems…
May 11, 2016
42e5bc8
Trio coding changes for improving allSlotsAvailableForScheduling
Jun 1, 2016
8c5ac41
Remove additional logs for testing plus and optimize import statements
Jun 6, 2016
dfade0a
Using java8 specific function String.join causes travis to fail with the
Jun 6, 2016
d7a0274
Addressing First Round of code review comments from Erik Weathers and…
Jun 10, 2016
aa5ed42
Addressed the following comments:
Jun 13, 2016
42e5738
Missed a couple of comments on MesosNimbusTest
Jun 13, 2016
1392a95
Addressing Erik's comments - Second round
Jun 16, 2016
1d212fd
Addressing Jessica's comments
Jun 16, 2016
b1c875d
Fixing range addition as per Jessica's comment
Jun 17, 2016
ea55da6
Renaming OfferResources -> AggregatedOffers
Jun 17, 2016
a65ce91
Addressing Erik's comment regarding 'avaliable' typo and removing TODO
Jun 18, 2016
531c46a
make important log about aggregated resources more readable
erikdw Jun 20, 2016
bfe032a
Including worker ports into log lines related to fit and failure to l…
Jun 23, 2016
782e489
Adding in some comments for clarity
Jun 24, 2016
b0f8af9
Adding in an assert for combining offers to make sure they're from th…
Jun 24, 2016
b239dd2
Fix removeAndGet to no longer modify a list we are iterating over
Jun 24, 2016
69d63b9
Scheduling improvements for spreading work across executors:
erikdw Jun 30, 2016
b5cd2cf
Improve few existing comments, add more comments for existing functio…
Jul 20, 2016
1032e58
adjusts tests to account for some corner cases that exist for the tim…
erikdw Jul 22, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
796 changes: 309 additions & 487 deletions storm/src/main/storm/mesos/MesosNimbus.java

Large diffs are not rendered by default.

6 changes: 1 addition & 5 deletions storm/src/main/storm/mesos/MesosSupervisor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import backtype.storm.scheduler.ISupervisor;
import backtype.storm.utils.Utils;
import clojure.lang.PersistentVector;

import org.apache.mesos.Executor;
import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.MesosExecutorDriver;
Expand All @@ -35,11 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.mesos.logviewer.LogViewerController;
import storm.mesos.shims.ILocalStateShim;
import storm.mesos.shims.LocalStateShim;
import storm.mesos.util.MesosCommon;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
Expand Down Expand Up @@ -158,7 +154,7 @@ public void killedWorker(int port) {
}

protected boolean startLogViewer(Map conf) {
return MesosCommon.startLogViewer(conf);
return MesosCommon.autoStartLogViewer(conf);
}

class StormExecutor implements Executor {
Expand Down
7 changes: 3 additions & 4 deletions storm/src/main/storm/mesos/TaskAssignments.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@
*/
package storm.mesos;

import org.apache.mesos.Protos.TaskID;
import storm.mesos.util.MesosCommon;

import java.io.Serializable;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.mesos.Protos.TaskID;

import storm.mesos.util.MesosCommon;

/**
* Tracks the Mesos Tasks / Storm Worker Processes that have been assigned
* to this MesosSupervisor instance.
Expand Down
194 changes: 194 additions & 0 deletions storm/src/main/storm/mesos/resources/AggregatedOffers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.mesos.resources;

import backtype.storm.scheduler.TopologyDetails;
import org.apache.mesos.Protos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.mesos.util.MesosCommon;
import storm.mesos.util.PrettyProtobuf;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class AggregatedOffers {

private final Logger log = LoggerFactory.getLogger(AggregatedOffers.class);
private Map<ResourceType, Resource> availableResources;

private List<Protos.Offer> offerList = new ArrayList<Protos.Offer>();

private final String hostname;

private Protos.SlaveID slaveID;

private void initializeAvailableResources() {
availableResources = new HashMap<>();
availableResources.put(ResourceType.CPU, new ScalarResource(ResourceType.CPU));
availableResources.put(ResourceType.MEM, new ScalarResource(ResourceType.MEM));
availableResources.put(ResourceType.PORTS, new RangeResource(ResourceType.PORTS));
}

public AggregatedOffers(Protos.Offer offer) {
initializeAvailableResources();
this.slaveID = offer.getSlaveId();
this.hostname = offer.getHostname();
add(offer);
}

public String getHostname() {
return hostname;
}

public void add(Protos.Offer offer) {
// We are unable to aggregate offers if they are from different mesos slaves/workers/agents
assert offer.getSlaveId().equals(slaveID) && offer.getHostname().equals(hostname);
offerList.add(offer);

for (Protos.Resource r : offer.getResourcesList()) {
ResourceType resourceType = ResourceType.of(r.getName());
ReservationType reservationType = (r.getRole().equals("*")) ?
ReservationType.UNRESERVED : ReservationType.STATIC;

if (r.hasReservation()) {
// skip resources with dynamic reservations
continue;
}

switch (resourceType) {
case CPU:
case MEM:
ResourceEntries.ScalarResourceEntry scalarResourceEntry = new ResourceEntries.ScalarResourceEntry(reservationType, r.getScalar().getValue());
availableResources.get(resourceType).add(scalarResourceEntry, reservationType);
break;
case PORTS:
for (Protos.Value.Range range : r.getRanges().getRangeList()) {
ResourceEntries.RangeResourceEntry rangeResourceEntry = new ResourceEntries.RangeResourceEntry(reservationType, range.getBegin(), range.getEnd());
availableResources.get(resourceType).add(rangeResourceEntry, reservationType);
}
break;
case DISK:
// TODO: Support disk resource isolation (https://github.com/mesos/storm/issues/147)
break;
default:
log.warn(String.format("Found unsupported resourceType '%s' while adding offer %s", resourceType, PrettyProtobuf.offerToString(offer)));
}
}
}

public boolean isAvailable(ResourceType resourceType, ResourceEntry<?> resource) {
return availableResources.get(resourceType).isAvailable(resource);
}

/**
* Unused Method - Exists for the purpose of facilitating support of reservations.
* TODO: Support reservations (https://github.com/mesos/storm/issues/148)
* For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075
*/
public boolean isAvailable(ResourceType resourceType, ReservationType reservationType, ResourceEntry<?> resource) {
return availableResources.get(resourceType).isAvailable(resource, reservationType);
}

public <T extends ResourceEntry> List<T> getAllAvailableResources(ResourceType resourceType) {
return availableResources.get(resourceType).getAllAvailableResources();
}

/**
* Unused Method - Exists for the purpose of facilitating support of reservations.
* TODO: Support reservations (https://github.com/mesos/storm/issues/148)
* For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075
*/
public <T extends ResourceEntry> List<T> getAllAvailableResources(ResourceType resourceType, ReservationType reservationType) {
return availableResources.get(resourceType).getAllAvailableResources(reservationType);
}

public void reserve(ResourceType resourceType, ResourceEntry<?> resource) throws ResourceNotAvailableException {
if (availableResources.get(resourceType).isAvailable(resource)) {
availableResources.get(resourceType).removeAndGet(resource);
}
}

public List<ResourceEntry> reserveAndGet(ResourceType resourceType, ResourceEntry<?> resource) throws ResourceNotAvailableException {
if (availableResources.get(resourceType).isAvailable(resource)) {
return availableResources.get(resourceType).removeAndGet(resource);
}
return new ArrayList<>();
}

/**
* Unused Method - Exists for the purpose of facilitating support of reservations.
* TODO: Support reservations (https://github.com/mesos/storm/issues/148)
* For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075
*/
public List<ResourceEntry> reserveAndGet(ResourceType resourceType, ReservationType reservationType, ResourceEntry<?> resource) throws
ResourceNotAvailableException {
if (availableResources.get(resourceType).isAvailable(resource, reservationType)) {
return availableResources.get(resourceType).removeAndGet(resource, reservationType);
}
return new ArrayList<>();
}

public List<Protos.OfferID> getOfferIDList() {
List<Protos.OfferID> offerIDList = new ArrayList<>();
for (Protos.Offer offer: offerList) {
offerIDList.add(offer.getId());
}
return offerIDList;
}

public Protos.SlaveID getSlaveID() {
return slaveID;
}

@Override
public String toString() {
return String.format("%s, %s, %s",
availableResources.get(ResourceType.CPU),
availableResources.get(ResourceType.MEM),
availableResources.get(ResourceType.PORTS));
}


public boolean isFit(Map mesosStormConf, TopologyDetails topologyDetails, boolean supervisorExists) {
double requestedWorkerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails);
double requestedWorkerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails);

requestedWorkerCpu += supervisorExists ? 0 : MesosCommon.executorCpu(mesosStormConf);
requestedWorkerMem += supervisorExists ? 0 : MesosCommon.executorMem(mesosStormConf);

return (isAvailable(ResourceType.CPU, new ResourceEntries.ScalarResourceEntry(requestedWorkerCpu)) &&
isAvailable(ResourceType.MEM, new ResourceEntries.ScalarResourceEntry(requestedWorkerMem)) &&
!getAllAvailableResources(ResourceType.PORTS).isEmpty());
}

public boolean isFit(Map mesosStormConf, TopologyDetails topologyDetails, Long port, boolean supervisorExists) {
double requestedWorkerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails);
double requestedWorkerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails);

requestedWorkerCpu += supervisorExists ? 0 : MesosCommon.executorCpu(mesosStormConf);
requestedWorkerMem += supervisorExists ? 0 : MesosCommon.executorMem(mesosStormConf);

return (isAvailable(ResourceType.CPU, new ResourceEntries.ScalarResourceEntry(requestedWorkerCpu)) &&
isAvailable(ResourceType.MEM, new ResourceEntries.ScalarResourceEntry(requestedWorkerMem)) &&
isAvailable(ResourceType.PORTS, new ResourceEntries.RangeResourceEntry(port, port)));
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.mesos.resources;

import java.util.Comparator;

public class DefaultReservationTypeComparator implements Comparator<ReservationType> {

public int compare(ReservationType left, ReservationType right) {
if (left.equals(right)) {
return 0;
} else if (left.equals(ReservationType.UNRESERVED)) {
return 1;
}
return -1;
}
}
Loading