From d9f788fc7999b45ef14e55dc521087173221212c Mon Sep 17 00:00:00 2001 From: Karthick Duraisamy Soundararaj Date: Wed, 20 Jul 2016 11:24:25 -0700 Subject: [PATCH] Improve few existing comments, add more comments for existing functions, delete few unnecessary/irrelevant methods --- storm/src/main/storm/mesos/MesosNimbus.java | 30 ++++---- .../mesos/resources/AggregatedOffers.java | 38 +++++----- .../storm/mesos/resources/RangeResource.java | 10 ++- .../main/storm/mesos/resources/Resource.java | 10 +++ .../mesos/resources/ResourceEntries.java | 5 ++ .../storm/mesos/resources/ScalarResource.java | 17 ++--- .../mesos/schedulers/DefaultScheduler.java | 71 ++++++++++++++----- .../mesos/schedulers/MesosWorkerSlot.java | 6 +- .../mesos/schedulers/SchedulerUtils.java | 10 ++- .../main/storm/mesos/util/MesosCommon.java | 2 +- .../main/storm/mesos/util/PrettyProtobuf.java | 2 +- .../mesos/resources/AggregatedOffersTest.java | 4 +- 12 files changed, 137 insertions(+), 68 deletions(-) diff --git a/storm/src/main/storm/mesos/MesosNimbus.java b/storm/src/main/storm/mesos/MesosNimbus.java index 1fda1d65f..95a94b6a2 100644 --- a/storm/src/main/storm/mesos/MesosNimbus.java +++ b/storm/src/main/storm/mesos/MesosNimbus.java @@ -93,6 +93,7 @@ import java.util.concurrent.TimeUnit; import static storm.mesos.util.PrettyProtobuf.offerIDListToString; +import static storm.mesos.util.PrettyProtobuf.offerToString; import static storm.mesos.util.PrettyProtobuf.offerMapToString; import static storm.mesos.util.PrettyProtobuf.taskInfoListToString; @@ -308,16 +309,14 @@ public void resourceOffers(SchedulerDriver driver, List offers) { for (Protos.Offer offer : offers) { if (isHostAccepted(offer.getHostname())) { // TODO(ksoundararaj): Should we record the following as info instead of debug - LOG.debug("resourceOffers: Recording offer from host: {}, offerId: {}", - offer.getHostname(), offer.getId().getValue()); + LOG.info("resourceOffers: Recording offer: {}", offerToString(offer)); _offers.put(offer.getId(), offer); } else { - LOG.debug("resourceOffers: Declining offer from host: {}, offerId: {}", - offer.getHostname(), offer.getId().getValue()); + LOG.info("resourceOffers: Declining offer: {}", offerToString(offer)); driver.declineOffer(offer.getId()); } } - LOG.debug("resourceOffers: After processing offers, now have {} offers buffered: {}", + LOG.info("resourceOffers: After processing offers, now have {} offers buffered: {}", _offers.size(), offerMapToString(_offers)); } } @@ -385,10 +384,6 @@ public Collection allSlotsAvailableForScheduling( } } - private String getLogViewerConfig() { - return String.format(" -c %s=true", MesosCommon.AUTO_START_LOGVIEWER_CONF); - } - /** * This method is invoked after IScheduler.schedule assigns the worker slots to the topologies that need assignments * @@ -399,17 +394,22 @@ private String getLogViewerConfig() { @Override public void assignSlots(Topologies topologies, Map> slotsForTopologiesNeedingAssignments) { if (slotsForTopologiesNeedingAssignments.isEmpty()) { - LOG.debug("assignSlots: no slots passed in, nothing to do"); + LOG.info("assignSlots: no slots passed in, nothing to do"); return; } // This is purely to print the debug information. Otherwise, the following for loop is unnecessary. for (Map.Entry> topologyToSlots : slotsForTopologiesNeedingAssignments.entrySet()) { String topologyId = topologyToSlots.getKey(); + List topologySlotAssignmentStrings = new ArrayList(); + String info = "assignSlots: " + topologyId + " being assigned to " + topologyToSlots.getValue().size() + " slots (worker:port, cpu, mem) as follows: "; for (WorkerSlot slot : topologyToSlots.getValue()) { TopologyDetails details = topologies.getById(topologyId); - LOG.debug("assignSlots: topologyId: {} worker being assigned to slot: {} with workerCpu: {} workerMem: {}", - topologyId, slot, MesosCommon.topologyWorkerCpu(mesosStormConf, details), MesosCommon.topologyWorkerMem(mesosStormConf, details)); + topologySlotAssignmentStrings.add("(" + slot + ", " + MesosCommon.topologyWorkerCpu(mesosStormConf, details) + ", " + MesosCommon.topologyWorkerMem(mesosStormConf, details) + ")"); + } + if (!topologyToSlots.getValue().isEmpty()) { + info += StringUtils.join(topologySlotAssignmentStrings, ", "); + LOG.info(info); } } @@ -427,7 +427,7 @@ public void assignSlots(Topologies topologies, Map offerIDList = aggregatedOffersPerNode.get(node).getOfferIDList(); List taskInfoList = tasksToLaunchPerNode.get(node); - LOG.info("Using offerIDs: " + offerIDListToString(offerIDList) + " on host: " + node + " to launch tasks: " + taskInfoListToString(taskInfoList)); + LOG.info("Using offerIDs: {} on host: {} to launch tasks: {}", offerIDListToString(offerIDList), node, taskInfoListToString(taskInfoList)); _driver.launchTasks(offerIDList, taskInfoList); for (OfferID offerID: offerIDList) { @@ -601,8 +601,8 @@ public Map> getTasksToLaunch(Topologies topologies, String extraConfig = ""; if (!aggregatedOffers.isFit(mesosStormConf, topologyDetails, workerPort, hostsWithSupervisors.contains(workerHost))) { - LOG.error(String.format("Unable to launch worker %s. Required cpu: %f, Required mem: %f, Required port: %d. Available aggregatedOffers : %s", - workerHost, requiredCpu, requiredMem, workerPort, aggregatedOffers)); + LOG.error(String.format("Unable to launch worker %s for topology %s. Required cpu: %f, Required mem: %f, Required port: %d. Available aggregatedOffers : %s", + workerHost, topologyDetails.getId(), requiredCpu, requiredMem, workerPort, aggregatedOffers)); continue; } diff --git a/storm/src/main/storm/mesos/resources/AggregatedOffers.java b/storm/src/main/storm/mesos/resources/AggregatedOffers.java index 645dcae77..e05da36b1 100644 --- a/storm/src/main/storm/mesos/resources/AggregatedOffers.java +++ b/storm/src/main/storm/mesos/resources/AggregatedOffers.java @@ -36,7 +36,7 @@ public class AggregatedOffers { private List offerList = new ArrayList(); - private final String hostName; + private final String hostname; private Protos.SlaveID slaveID; @@ -47,25 +47,20 @@ private void initializeAvailableResources() { availableResources.put(ResourceType.PORTS, new RangeResource(ResourceType.PORTS)); } - public AggregatedOffers(String hostName) { - this.hostName = hostName; - initializeAvailableResources(); - } - public AggregatedOffers(Protos.Offer offer) { initializeAvailableResources(); this.slaveID = offer.getSlaveId(); - this.hostName = offer.getHostname(); + this.hostname = offer.getHostname(); add(offer); } - public String getHostName() { - return hostName; + public String getHostname() { + return hostname; } public void add(Protos.Offer offer) { - // We are unable to aggregate offers if they are from different workers - assert offer.getSlaveId().equals(slaveID) && offer.getHostname().equals(hostName); + // We are unable to aggregate offers if they are from different mesos slaves/workers/agents + assert offer.getSlaveId().equals(slaveID) && offer.getHostname().equals(hostname); offerList.add(offer); for (Protos.Resource r : offer.getResourcesList()) { @@ -103,6 +98,11 @@ public boolean isAvailable(ResourceType resourceType, ResourceEntry resource) return availableResources.get(resourceType).isAvailable(resource); } + /** + * Unused Method - Exists for the purpose of facilitating support of reservations. + * TODO: Support reservations (https://github.com/mesos/storm/issues/148) + * For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075 + */ public boolean isAvailable(ResourceType resourceType, ReservationType reservationType, ResourceEntry resource) { return availableResources.get(resourceType).isAvailable(resource, reservationType); } @@ -111,6 +111,11 @@ public List getAllAvailableResources(ResourceType r return availableResources.get(resourceType).getAllAvailableResources(); } + /** + * Unused Method - Exists for the purpose of facilitating support of reservations. + * TODO: Support reservations (https://github.com/mesos/storm/issues/148) + * For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075 + */ public List getAllAvailableResources(ResourceType resourceType, ReservationType reservationType) { return availableResources.get(resourceType).getAllAvailableResources(reservationType); } @@ -128,6 +133,11 @@ public List reserveAndGet(ResourceType resourceType, ResourceEntr return new ArrayList<>(); } + /** + * Unused Method - Exists for the purpose of facilitating support of reservations. + * TODO: Support reservations (https://github.com/mesos/storm/issues/148) + * For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075 + */ public List reserveAndGet(ResourceType resourceType, ReservationType reservationType, ResourceEntry resource) throws ResourceNotAvailableException { if (availableResources.get(resourceType).isAvailable(resource, reservationType)) { @@ -136,10 +146,6 @@ public List reserveAndGet(ResourceType resourceType, ReservationT return new ArrayList<>(); } - public List getOfferList() { - return offerList; - } - public List getOfferIDList() { List offerIDList = new ArrayList<>(); for (Protos.Offer offer: offerList) { @@ -162,7 +168,6 @@ public String toString() { public boolean isFit(Map mesosStormConf, TopologyDetails topologyDetails, boolean supervisorExists) { - double requestedWorkerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails); double requestedWorkerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails); @@ -175,7 +180,6 @@ public boolean isFit(Map mesosStormConf, TopologyDetails topologyDetails, boolea } public boolean isFit(Map mesosStormConf, TopologyDetails topologyDetails, Long port, boolean supervisorExists) { - double requestedWorkerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails); double requestedWorkerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails); diff --git a/storm/src/main/storm/mesos/resources/RangeResource.java b/storm/src/main/storm/mesos/resources/RangeResource.java index aada9589d..685893123 100644 --- a/storm/src/main/storm/mesos/resources/RangeResource.java +++ b/storm/src/main/storm/mesos/resources/RangeResource.java @@ -108,11 +108,15 @@ public List removeAndGet(RangeResourceEntry rangeResourceEntry) t /** + * Unused Method - Exists for the purpose of facilitating support of reservations. + * TODO: Support reservations (https://github.com/mesos/storm/issues/148) + * For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075 + * * Remove/Reserve range from available ranges. * {@param rangeResourceEntry} range resource to removeAndGet * {@param reservationType} reservation type of resource that needs to be removed. If the resource represented by rangeResourceEntry * of the reservation type specified by this parameter is not available, then {@link ResourceNotAvailableException} - * is thrown + * is thrown. */ @Override public List removeAndGet(RangeResourceEntry rangeResourceEntry, ReservationType reservationType) throws ResourceNotAvailableException { @@ -129,6 +133,10 @@ public List removeAndGet(RangeResourceEntry rangeResourceEntry, R } /** + * Unused Method - Exists for the purpose of facilitating support of reservations. + * TODO: Support reservations (https://github.com/mesos/storm/issues/148) + * For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075 + * * Remove/Reserve range from available ranges * {@param rangeResourceEntry} range resource to removeAndGet * {@param reservationTypeComparator} comparator like {@link storm.mesos.resources.DefaultReservationTypeComparator} diff --git a/storm/src/main/storm/mesos/resources/Resource.java b/storm/src/main/storm/mesos/resources/Resource.java index 2e5931570..d9073f27a 100644 --- a/storm/src/main/storm/mesos/resources/Resource.java +++ b/storm/src/main/storm/mesos/resources/Resource.java @@ -38,8 +38,18 @@ public interface Resource> { public List removeAndGet(T resourceEntry) throws ResourceNotAvailableException; + /** + * Unused Method - Exists for the purpose of facilitating support of reservations. + * TODO: Support reservations (https://github.com/mesos/storm/issues/148) + * For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075 + */ public List removeAndGet(T value, ReservationType reservationType) throws ResourceNotAvailableException; + /** + * Unused Method - Exists for the purpose of facilitating support of reservations. + * TODO: Support reservations (https://github.com/mesos/storm/issues/148) + * For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075 + */ public List removeAndGet(T value, Comparator reservationTypeComparator) throws ResourceNotAvailableException; } diff --git a/storm/src/main/storm/mesos/resources/ResourceEntries.java b/storm/src/main/storm/mesos/resources/ResourceEntries.java index f71473451..72b6bac7a 100644 --- a/storm/src/main/storm/mesos/resources/ResourceEntries.java +++ b/storm/src/main/storm/mesos/resources/ResourceEntries.java @@ -83,6 +83,8 @@ public ReservationType getReservationType() { } /** + * Unused Method - Exists for the sake of completeness in terms of implementing ResourceEntry. + * * Lets say, we have a range [u,v]. Using this add function, we can expand the range to [w,x] if and * only if one of the following conditions are satisfied * `w < u` @@ -107,6 +109,9 @@ public RangeResourceEntry add(ResourceEntry resourceEntry) { return this; } + /** + * Unused Method - Exists for the sake of completeness in terms of implementing ResourceEntry. + */ public RangeResourceEntry remove(ResourceEntry resourceEntry) { RangeResourceEntry rangeResourceEntry = (RangeResourceEntry) resourceEntry; if (this.begin < rangeResourceEntry.getBegin()) { diff --git a/storm/src/main/storm/mesos/resources/ScalarResource.java b/storm/src/main/storm/mesos/resources/ScalarResource.java index 3ca357734..42c24a734 100644 --- a/storm/src/main/storm/mesos/resources/ScalarResource.java +++ b/storm/src/main/storm/mesos/resources/ScalarResource.java @@ -52,6 +52,11 @@ public boolean isAvailable(ScalarResourceEntry scalarResourceEntry, ReservationT return (availableResourcesByReservationType.get(reservationType).getValue() >= scalarResourceEntry.getValue()); } + /** + * Unused Method - Exists for the purpose of facilitating support of reservations. + * TODO: Support reservations (https://github.com/mesos/storm/issues/148) + * For more information about why this unused code exists, see discussion: https://github.com/mesos/storm/pull/146#issuecomment-225496075 + */ public Double getTotalAvailableResource(ReservationType reservationType) { return availableResourcesByReservationType.get(reservationType).getValue(); } @@ -82,14 +87,6 @@ public List removeAndGet(ScalarResourceEntry scalarResourceEntry) return removeAndGet(scalarResourceEntry, availableResourcesByReservationType.keySet()); } - public List reserveScalarResource(ResourceType resourceType, ScalarResourceEntry requiredValue) throws ResourceNotAvailableException { - if (totalAvailableResource < requiredValue.getValue()) { - throw new ResourceNotAvailableException(String.format("resourceType: {} is not available. Requested {} Available {}", - resourceType, requiredValue, totalAvailableResource)); - } - return removeAndGet(requiredValue); - } - /** * Removes/Reserves scalar resource from available resources. * {@param scalarResourceEntry} amount of scalar resource to removeAndGet/decrement @@ -128,10 +125,10 @@ public List removeAndGet(ScalarResourceEntry scalarResourceEntry, public String toString() { List availableResourcesByResourceTypeList = new ArrayList<>(); for (Map.Entry entry: availableResourcesByReservationType.entrySet()) { - availableResourcesByResourceTypeList.add(String.format("%s: %f", entry.getKey(), entry.getValue().getValue())); + availableResourcesByResourceTypeList.add(String.format("%s: %s", entry.getKey(), Double.toString(entry.getValue().getValue()))); } String tmp = StringUtils.join(availableResourcesByResourceTypeList, ", "); - return String.format("%s: %f (%s)", resourceType.toString(), totalAvailableResource, tmp); + return String.format("%s: %s (%s)", resourceType.toString(), Double.toString(totalAvailableResource), tmp); } private List removeAndGet(ScalarResourceEntry scalarResourceEntry, Collection reservationTypesListByPriority) throws diff --git a/storm/src/main/storm/mesos/schedulers/DefaultScheduler.java b/storm/src/main/storm/mesos/schedulers/DefaultScheduler.java index c5dbf48e7..a91cf4ac4 100644 --- a/storm/src/main/storm/mesos/schedulers/DefaultScheduler.java +++ b/storm/src/main/storm/mesos/schedulers/DefaultScheduler.java @@ -18,6 +18,7 @@ package storm.mesos.schedulers; import backtype.storm.scheduler.*; +import org.apache.commons.lang3.StringUtils; import org.apache.mesos.Protos; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ private List getMesosWorkerSlots(Map double requestedWorkerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails); double requestedWorkerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails); + int requestedWorkerMemInt = (int) requestedWorkerMem; List mesosWorkerSlots = new ArrayList<>(); boolean slotFound = false; @@ -82,13 +84,13 @@ private List getMesosWorkerSlots(Map boolean supervisorExists = nodesWithExistingSupervisors.contains(currentNode); if (!aggregatedOffers.isFit(mesosStormConf, topologyDetails, supervisorExists)) { - log.info("{} is not a fit for {} requestedWorkerCpu: {} requestedWorkerMem: {}", - aggregatedOffers.toString(), topologyDetails.getId(), requestedWorkerCpu, requestedWorkerMem); + log.info("{} with requestedWorkerCpu {} and requestedWorkerMem {} does not fit onto {} with resources {}", + topologyDetails.getId(), requestedWorkerCpu, requestedWorkerMemInt, aggregatedOffers.getHostname(), aggregatedOffers.toString()); continue; } - log.info("{} is a fit for {} requestedWorkerCpu: {} requestedWorkerMem: {}", aggregatedOffers.toString(), - topologyDetails.getId(), requestedWorkerCpu, requestedWorkerMem); + log.info("{} with requestedWorkerCpu {} and requestedWorkerMem {} does fit onto {} with resources {}", + topologyDetails.getId(), requestedWorkerCpu, requestedWorkerMemInt, aggregatedOffers.getHostname(), aggregatedOffers.toString()); MesosWorkerSlot mesosWorkerSlot; try { mesosWorkerSlot = SchedulerUtils.createMesosWorkerSlot(mesosStormConf, aggregatedOffers, topologyDetails, supervisorExists); @@ -163,10 +165,14 @@ public List allSlotsAvailableForScheduling(RotatingMap slotsStrings = new ArrayList(); + for (WorkerSlot slot : allSlots) { + slotsStrings.add("" + slot.getNodeId() + ":" + slot.getPort()); + } + log.info("allSlotsAvailableForScheduling: {} available slots: [{}]", allSlots.size(), StringUtils.join(slotsStrings, ", ")); return allSlots; } @@ -196,18 +202,18 @@ Map> getMesosWorkerSlotPerTopology(List> executorsPerWorkerList(Cluster cluster, TopologyDetails topologyDetails, int slotsRequested, int slotsAssigned, int slotsAvailable) { Collection executors = cluster.getUnassignedExecutors(topologyDetails); + String topologyId = topologyDetails.getId(); // Check if we don't actually need to schedule any executors because all requested slots are assigned already. if (slotsRequested == slotsAssigned) { if (executors.isEmpty()) { // TODO: print executors list cleanly in a single line - String msg = String.format("executorsPerWorkerList - slotsRequested: %d == slotsAssigned: %d, BUT there are unassigned executors which is nonsensical", - slotsRequested, slotsAssigned); + String msg = String.format("executorsPerWorkerList: for %s, slotsRequested: %d == slotsAssigned: %d, BUT there are unassigned executors which is nonsensical", + topologyId, slotsRequested, slotsAssigned); log.error(msg); throw new RuntimeException(msg); } - // TODO: switch from info to debug - log.info("executorsPerWorkerList - slotsRequested: {} == slotsAssigned: {}, so no need to schedule any executors", slotsRequested, slotsAssigned); + log.debug("executorsPerWorkerList: for {}, slotsRequested: {} == slotsAssigned: {}, so no need to schedule any executors", topologyId, slotsRequested, slotsAssigned); return null; } @@ -216,17 +222,17 @@ List> executorsPerWorkerList(Cluster cluster, TopologyDeta // If there are not any unassigned executors, we need to re-distribute all currently assigned executors across workers if (executors.isEmpty()) { if (slotsAssigned < slotsAvailable) { - log.info("All executors are already assigned, but only onto {} slots. Redistributing all assigned executors to new set of {} slots.", slotsAssigned, slotsAvailable); - SchedulerAssignment schedulerAssignment = cluster.getAssignmentById(topologyDetails.getId()); + log.info("All executors are already assigned for {}, but only onto {} slots. Redistributing all assigned executors to new set of {} slots.", + topologyId, slotsAssigned, slotsAvailable); + SchedulerAssignment schedulerAssignment = cluster.getAssignmentById(topologyId); // Un-assign them int slotsFreed = schedulerAssignment.getSlots().size(); cluster.freeSlots(schedulerAssignment.getSlots()); - log.info("executorsPerWorkerList - slotsAvailable: {}, slotsAssigned: {}, slotsFreed: {}", slotsAvailable, slotsAssigned, slotsFreed); + log.info("executorsPerWorkerList: for {}, slotsAvailable: {}, slotsAssigned: {}, slotsFreed: {}", topologyId, slotsAvailable, slotsAssigned, slotsFreed); executors = cluster.getUnassignedExecutors(topologyDetails); - // Need to instead wipe *everything* and just use the slotsAvailable slotsToUse = slotsAvailable; } else { - log.info("All executors are already assigned. Not going to redistribute work because slotsToUse is {} and slotsAssigned is {}", slotsAvailable, slotsAssigned); + log.info("All executors are already assigned for {}. Not going to redistribute work because slotsAvailable is {} and slotsAssigned is {}", topologyId, slotsAvailable, slotsAssigned); return null; } } else { @@ -247,9 +253,9 @@ List> executorsPerWorkerList(Cluster cluster, TopologyDeta // Notably, if slotsAssigned was 0, then this would be a full rebalance onto less workers than requested, // and hence wouldn't lead to an imbalance. if (slotsToUse + slotsAssigned < slotsRequested && slotsAssigned != 0) { - log.warn("Assigning storm executors for {} onto less slots ({}) than requested ({}), " + + log.warn("For {}, assigning {} storm executors onto {} new slots when we already have {} executors assigned to {} slots, " + "this may lead to executor imbalance.", - topologyDetails.getId(), slotsToUse + slotsAssigned, slotsRequested); + topologyId, executors.size(), slotsToUse, cluster.getAssignmentById(topologyId).getExecutors().size(), slotsAssigned); } } @@ -258,7 +264,7 @@ List> executorsPerWorkerList(Cluster cluster, TopologyDeta for (ExecutorDetails exec : executors) { executorsStrings.add(exec.toString()); } - String info = String.format("executorsPerWorkerList - available executors for topology %s: %s", topologyDetails.getId(), String.join(", ", executorsStrings)); + String info = String.format("executorsPerWorkerList: available executors for %s: %s", topologyId, StringUtils.join(executorsStrings, ", ")); for (int i = 0; i < slotsToUse; i++) { executorsPerWorkerList.add(new ArrayList()); } @@ -291,9 +297,30 @@ public int compare(ExecutorDetails e1, ExecutorDetails e2) { @Override public void schedule(Topologies topologies, Cluster cluster) { List workerSlots = cluster.getAvailableSlots(); + String info = "Scheduling the following worker slots from cluster.getAvailableSlots: "; + if (workerSlots.isEmpty()) { + info += "[]"; + } else { + List workerSlotsStrings = new ArrayList(); + for (WorkerSlot ws : workerSlots) { + workerSlotsStrings.add(ws.toString()); + } + info += String.format("[%s]", StringUtils.join(workerSlotsStrings, ", ")); + } + log.info(info); + Map> perTopologySlotList = getMesosWorkerSlotPerTopology(workerSlots); + info = "Schedule the per-topology slots:"; + for (String topo : perTopologySlotList.keySet()) { + List mwsAssignments = new ArrayList<>(); + for (MesosWorkerSlot mws : perTopologySlotList.get(topo)) { + mwsAssignments.add(mws.getNodeId() + ":" + mws.getPort()); + } + info += String.format(" {%s, [%s]}", topo, StringUtils.join(mwsAssignments, ", ")); + } + log.info(info); - // So far we know how many MesosSlots each of the topologies have got. Lets assign executors for each of them + // So far we know how many MesosSlots each of the topologies have got. Let's assign executors for each of them for (String topologyId : perTopologySlotList.keySet()) { TopologyDetails topologyDetails = topologies.getById(topologyId); List mesosWorkerSlots = perTopologySlotList.get(topologyId); @@ -325,6 +352,12 @@ public void schedule(Topologies topologies, Cluster cluster) { iterator.remove(); cluster.assign(mesosWorkerSlots.remove(0), topologyId, executorsPerWorker); } + if (slotsAvailable == 0) { + info += "[]"; + } else { + info += StringUtils.join(slotAssignmentStrings, ", "); + } + log.info(info); } mesosWorkerSlotMap.clear(); } diff --git a/storm/src/main/storm/mesos/schedulers/MesosWorkerSlot.java b/storm/src/main/storm/mesos/schedulers/MesosWorkerSlot.java index 842fb3273..9a587c892 100644 --- a/storm/src/main/storm/mesos/schedulers/MesosWorkerSlot.java +++ b/storm/src/main/storm/mesos/schedulers/MesosWorkerSlot.java @@ -31,8 +31,12 @@ public String getTopologyId() { return this.topologyId; } + public String getAssignmentAsString() { + return String.format("%s:%s", super.getNodeId(), super.getPort()); + } + @Override public String toString() { - return String.format("%s:%s topologyId:%s", super.getNodeId(), super.getPort(), topologyId); + return String.format("%s:%s topologyId: %s", super.getNodeId(), super.getPort(), topologyId); } } diff --git a/storm/src/main/storm/mesos/schedulers/SchedulerUtils.java b/storm/src/main/storm/mesos/schedulers/SchedulerUtils.java index 4b1c36479..fc426e203 100644 --- a/storm/src/main/storm/mesos/schedulers/SchedulerUtils.java +++ b/storm/src/main/storm/mesos/schedulers/SchedulerUtils.java @@ -17,6 +17,9 @@ */ package storm.mesos.schedulers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import backtype.storm.scheduler.SupervisorDetails; import backtype.storm.scheduler.TopologyDetails; import storm.mesos.resources.AggregatedOffers; @@ -34,6 +37,8 @@ public class SchedulerUtils { + private static final Logger log = LoggerFactory.getLogger(SchedulerUtils.class); + public static List getPorts(AggregatedOffers aggregatedOffers, int requiredCount) { List retVal = new ArrayList<>(); List resourceEntryList = aggregatedOffers.getAllAvailableResources(ResourceType.PORTS); @@ -46,6 +51,9 @@ public static List getPorts(AggregatedOffers aggregatedOffer ++begin; --requiredCount; } + if (requiredCount <= 0) { + break; + } } return retVal; } @@ -70,7 +78,7 @@ public static MesosWorkerSlot createMesosWorkerSlot(Map mesosStormConf, } aggregatedOffers.reserve(ResourceType.PORTS, ports.get(0)); - return new MesosWorkerSlot(aggregatedOffers.getHostName(), ports.get(0).getBegin(), topologyDetails.getId()); + return new MesosWorkerSlot(aggregatedOffers.getHostname(), ports.get(0).getBegin(), topologyDetails.getId()); } /** diff --git a/storm/src/main/storm/mesos/util/MesosCommon.java b/storm/src/main/storm/mesos/util/MesosCommon.java index ae9946ee8..ec4d81240 100644 --- a/storm/src/main/storm/mesos/util/MesosCommon.java +++ b/storm/src/main/storm/mesos/util/MesosCommon.java @@ -154,7 +154,7 @@ public static Map getAggregatedOffersPerNode(RotatingM } for (AggregatedOffers aggregatedOffers : aggregatedOffersPerNode.values()) { - LOG.info("Available resources at {}: {}", aggregatedOffers.getHostName(), aggregatedOffers.toString()); + LOG.info("Available resources at {}: {}", aggregatedOffers.getHostname(), aggregatedOffers.toString()); } return aggregatedOffersPerNode; } diff --git a/storm/src/main/storm/mesos/util/PrettyProtobuf.java b/storm/src/main/storm/mesos/util/PrettyProtobuf.java index d03f9b161..59af7f068 100644 --- a/storm/src/main/storm/mesos/util/PrettyProtobuf.java +++ b/storm/src/main/storm/mesos/util/PrettyProtobuf.java @@ -116,7 +116,7 @@ public static String offerListToString(List offers) { */ public static String taskInfoListToString(List tasks) { List tasksAsStrings = Lists.transform(tasks, taskInfoToStringTransform); - return String.format("[\n%s]", StringUtils.join(tasksAsStrings, ",\n")); + return String.format("[%s]", StringUtils.join(tasksAsStrings, ", ")); } /** diff --git a/storm/src/test/storm/mesos/resources/AggregatedOffersTest.java b/storm/src/test/storm/mesos/resources/AggregatedOffersTest.java index e63b00a6c..16d4980d3 100644 --- a/storm/src/test/storm/mesos/resources/AggregatedOffersTest.java +++ b/storm/src/test/storm/mesos/resources/AggregatedOffersTest.java @@ -45,14 +45,14 @@ public void testToIgnoreDynamicResources() { assertEquals(0, TestUtils.calculateAllAvailableScalarResources(aggregatedOffers, ResourceType.CPU), DELTA_FOR_DOUBLE_COMPARISON); assertEquals(0, TestUtils.calculateAllAvailableScalarResources(aggregatedOffers, ResourceType.MEM), DELTA_FOR_DOUBLE_COMPARISON); - assertTrue(aggregatedOffers.getHostName().equals(offer.getHostname())); + assertTrue(aggregatedOffers.getHostname().equals(offer.getHostname())); assertTrue(aggregatedOffers.getSlaveID().equals(offer.getSlaveId())); offer = TestUtils.buildOfferWithReservation("offer1", "h1", 2, 1000, 6, 1000); aggregatedOffers = new AggregatedOffers(offer); assertEquals(8, TestUtils.calculateAllAvailableScalarResources(aggregatedOffers, ResourceType.CPU), DELTA_FOR_DOUBLE_COMPARISON); assertEquals(2000, TestUtils.calculateAllAvailableScalarResources(aggregatedOffers, ResourceType.MEM), DELTA_FOR_DOUBLE_COMPARISON); - assertTrue(aggregatedOffers.getHostName().equals(offer.getHostname())); + assertTrue(aggregatedOffers.getHostname().equals(offer.getHostname())); assertTrue(aggregatedOffers.getSlaveID().equals(offer.getSlaveId())); offer = TestUtils.buildOfferWithPorts("offer1", "h1", 2.0, 2000, 3000, 3100);