Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into python3
Browse files Browse the repository at this point in the history
Conflicts:
	python/pyspark/sql/dataframe.py
  • Loading branch information
Davies Liu committed Mar 26, 2015
2 parents a716d34 + 71a0d40 commit d2fd566
Show file tree
Hide file tree
Showing 147 changed files with 2,830 additions and 1,334 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
*/
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
jobProgressListener.synchronized {
val jobData = jobProgressListener.jobIdToData.valuesIterator
jobData.filter(_.jobGroup.orNull == jobGroup).map(_.jobId).toArray
jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, Seq.empty).toArray
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ private[deploy] object DeployMessages {
case class RegisterApplication(appDescription: ApplicationDescription)
extends DeployMessage

case class UnregisterApplication(appId: String)

case class MasterChangeAcknowledged(appId: String)

// Master to AppClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ private[spark] class AppClient(

case StopAppClient =>
markDead("Application has been stopped.")
master ! UnregisterApplication(appId)
sender ! true
context.stop(self)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[deploy] class ApplicationInfo(
}
}

private[master] val requestedCores = desc.maxCores.getOrElse(defaultCores)
private val requestedCores = desc.maxCores.getOrElse(defaultCores)

private[master] def coresLeft: Int = requestedCores - coresGranted

Expand All @@ -111,6 +111,10 @@ private[deploy] class ApplicationInfo(
endTime = System.currentTimeMillis()
}

private[master] def isFinished: Boolean = {
state != ApplicationState.WAITING && state != ApplicationState.RUNNING
}

def duration: Long = {
if (endTime != -1) {
endTime - startTime
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,11 @@ private[master] class Master(
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
appInfo.removeExecutor(exec)
// If an application has already finished, preserve its
// state to display its information properly on the UI
if (!appInfo.isFinished) {
appInfo.removeExecutor(exec)
}
exec.worker.removeExecutor(exec)

val normalExit = exitStatus == Some(0)
Expand Down Expand Up @@ -428,6 +432,10 @@ private[master] class Master(
if (canCompleteRecovery) { completeRecovery() }
}

case UnregisterApplication(applicationId) =>
logInfo(s"Received unregister request from application $applicationId")
idToApp.get(applicationId).foreach(finishApplication)

case DisassociatedEvent(_, address, _) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
logInfo(s"$address got disassociated, removing it.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,12 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
val workers = state.workers.sortBy(_.id)
val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)

val activeAppHeaders = Seq("Application ID", "Name", "Cores in Use",
"Cores Requested", "Memory per Node", "Submitted Time", "User", "State", "Duration")
val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Node", "Submitted Time",
"User", "State", "Duration")
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val activeAppsTable = UIUtils.listingTable(activeAppHeaders, activeAppRow, activeApps)

val completedAppHeaders = Seq("Application ID", "Name", "Cores Requested", "Memory per Node",
"Submitted Time", "User", "State", "Duration")
val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
val completedApps = state.completedApps.sortBy(_.endTime).reverse
val completedAppsTable = UIUtils.listingTable(completedAppHeaders, completeAppRow,
completedApps)
val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)

val driverHeaders = Seq("Submission ID", "Submitted Time", "Worker", "State", "Cores",
"Memory", "Main Class")
Expand Down Expand Up @@ -191,7 +187,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
</tr>
}

private def appRow(app: ApplicationInfo, active: Boolean): Seq[Node] = {
private def appRow(app: ApplicationInfo): Seq[Node] = {
val killLink = if (parent.killEnabled &&
(app.state == ApplicationState.RUNNING || app.state == ApplicationState.WAITING)) {
val killLinkUri = s"app/kill?id=${app.id}&terminate=true"
Expand All @@ -201,7 +197,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
(<a href={killLinkUri} onclick={confirm}>kill</a>)
</span>
}

<tr>
<td>
<a href={"app?appId=" + app.id}>{app.id}</a>
Expand All @@ -210,15 +205,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<td>
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
</td>
{
if (active) {
<td>
{app.coresGranted}
</td>
}
}
<td>
{if (app.requestedCores == Int.MaxValue) "*" else app.requestedCores}
{app.coresGranted}
</td>
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
Expand All @@ -230,14 +218,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
</tr>
}

private def activeAppRow(app: ApplicationInfo): Seq[Node] = {
appRow(app, active = true)
}

private def completeAppRow(app: ApplicationInfo): Seq[Node] = {
appRow(app, active = false)
}

private def driverRow(driver: DriverInfo): Seq[Node] = {
val killLink = if (parent.killEnabled &&
(driver.state == DriverState.RUNNING ||
Expand Down
20 changes: 9 additions & 11 deletions core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,23 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
}
}
// Determine the bucket function in constant time. Requires that buckets are evenly spaced
def fastBucketFunction(min: Double, increment: Double, count: Int)(e: Double): Option[Int] = {
def fastBucketFunction(min: Double, max: Double, count: Int)(e: Double): Option[Int] = {
// If our input is not a number unless the increment is also NaN then we fail fast
if (e.isNaN()) {
return None
}
val bucketNumber = (e - min)/(increment)
// We do this rather than buckets.lengthCompare(bucketNumber)
// because Array[Double] fails to override it (for now).
if (bucketNumber > count || bucketNumber < 0) {
if (e.isNaN || e < min || e > max) {
None
} else {
Some(bucketNumber.toInt.min(count - 1))
// Compute ratio of e's distance along range to total range first, for better precision
val bucketNumber = (((e - min) / (max - min)) * count).toInt
// should be less than count, but will equal count if e == max, in which case
// it's part of the last end-range-inclusive bucket, so return count-1
Some(math.min(bucketNumber, count - 1))
}
}
// Decide which bucket function to pass to histogramPartition. We decide here
// rather than having a general function so that the decission need only be made
// rather than having a general function so that the decision need only be made
// once rather than once per shard
val bucketFunction = if (evenBuckets) {
fastBucketFunction(buckets(0), buckets(1)-buckets(0), buckets.length-1) _
fastBucketFunction(buckets.head, buckets.last, buckets.length - 1) _
} else {
basicBucketFunction _
}
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ class TaskInfo(

def status: String = {
if (running) {
"RUNNING"
} else if (gettingResult) {
"GET RESULT"
if (gettingResult) {
"GET RESULT"
} else {
"RUNNING"
}
} else if (failed) {
"FAILED"
} else if (successful) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class FileShuffleBlockManager(conf: SparkConf)
private val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null

val openStartTime = System.nanoTime
val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
Expand All @@ -135,6 +136,9 @@ class FileShuffleBlockManager(conf: SparkConf)
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
}
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, so should be included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)

override def releaseWriters(success: Boolean) {
if (consolidateShuffleFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ private[spark] class SortShuffleWriter[K, V, C](
sorter.insertAll(records)
}

// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
Expand Down
23 changes: 18 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,14 @@ private[spark] class BlockManager(
/* We'll store the bytes in memory if the block's storage level includes
* "memory serialized", or if it should be cached as objects in memory
* but we only requested its serialized bytes. */
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
memoryStore.putBytes(blockId, copyForMemory, level)
memoryStore.putBytes(blockId, bytes.limit, () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we cannot
// put it into MemoryStore, copyForMemory should not be created. That's why this
// action is put into a `() => ByteBuffer` and created lazily.
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
})
bytes.rewind()
}
if (!asBlockResult) {
Expand Down Expand Up @@ -991,15 +996,23 @@ private[spark] class BlockManager(
putIterator(blockId, Iterator(value), level, tellMaster)
}

def dropFromMemory(
blockId: BlockId,
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
dropFromMemory(blockId, () => data)
}

/**
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
*
* If `data` is not put on disk, it won't be created.
*
* Return the block status if the given block has been updated, else None.
*/
def dropFromMemory(
blockId: BlockId,
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {

logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId).orNull
Expand All @@ -1023,7 +1036,7 @@ private[spark] class BlockManager(
// Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo(s"Writing block $blockId to disk")
data match {
data() match {
case Left(elements) =>
diskStore.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
Expand Down
32 changes: 18 additions & 14 deletions core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
// The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content
// of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

private val shutdownHook = addShutdownHook()
Expand All @@ -61,20 +63,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

// Create the subdirectory if it doesn't already exist
var subDir = subDirs(dirId)(subDirId)
if (subDir == null) {
subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
}
subDirs(dirId)(subDirId) = newDir
newDir
val subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists() && !newDir.mkdir()) {
throw new IOException(s"Failed to create local dir in $newDir.")
}
subDirs(dirId)(subDirId) = newDir
newDir
}
}

Expand All @@ -91,7 +90,12 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
/** List all the files currently stored on disk by the disk manager. */
def getAllFiles(): Seq[File] = {
// Get all the files inside the array of array of directories
subDirs.flatten.filter(_ != null).flatMap { dir =>
subDirs.flatMap { dir =>
dir.synchronized {
// Copy the content of dir because it may be modified in other threads
dir.clone()
}
}.filter(_ != null).flatMap { dir =>
val files = dir.listFiles()
if (files != null) files else Seq.empty
}
Expand Down
Loading

0 comments on commit d2fd566

Please sign in to comment.