Skip to content

Commit

Permalink
GCP Batch: LogsPolicy.PATH now streams the logs to GCS
Browse files Browse the repository at this point in the history
Instead of pushing the logs file after the job completes, the logs are now streamed to GCS.
  • Loading branch information
AlexITC committed Sep 5, 2024
1 parent 9b2e3c6 commit 87f2e56
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 38 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ be found [here](https://cromwell.readthedocs.io/en/stable/backends/HPC/#optional
- Fixes the "retry with more memory" feature.
- Fixes pulling Docker image metadata from private GCR repositories.
- Fixed `google_project` and `google_compute_service_account` workflow options not taking effect when using GCP Batch backend
- Added a way to use a custom LogsPolicy for the job execution, setting `backend.providers.batch.config.batch.logs-policy` to "CLOUD_LOGGING" (default) keeps the current behavior, or, set it to "PATH" to save the logs into the the mounted disk, at the end, this log file gets copied to the google cloud storage bucket with "task.log" as the name.
- Added a way to use a custom LogsPolicy for the job execution, setting `backend.providers.batch.config.batch.logs-policy` to "CLOUD_LOGGING" (default) keeps the current behavior, or, set it to "PATH" to stream the logs to Google Cloud Storage.

### Improved handling of Life Sciences API quota errors

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,14 +603,39 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
// not the `stderr` file contained memory retry error keys
val retryWithMoreMemoryKeys: Option[List[String]] = memoryRetryFactor.flatMap(_ => memoryRetryErrorKeys)

val targetLogFile = batchAttributes.logsPolicy match {
case GcpBatchLogsPolicy.CloudLogging => None
case GcpBatchLogsPolicy.Path =>
DefaultPathBuilder.build(
gcpBatchLogPath.pathAsString.replace(
gcpBatchLogPath.root.pathAsString,
GcpBatchAttachedDisk.GcsMountPoint + "/"

Check warning on line 612 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L609-L612

Added lines #L609 - L612 were not covered by tests
)
) match {

Check warning on line 614 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L614

Added line #L614 was not covered by tests
case Failure(exception) =>
throw new RuntimeException(

Check warning on line 616 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L616

Added line #L616 was not covered by tests
"Unable to use GcpBatchLogsPolicy.Path because the destination path could not be built, this is likely a programming error and a bug must be reported",
exception
)
case Success(path) =>

Check warning on line 620 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L620

Added line #L620 was not covered by tests
// remove trailing slash
val bucket = workflowPaths.workflowRoot.root.pathWithoutScheme.replace("/", "")

Check warning on line 622 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L622

Added line #L622 was not covered by tests

log.info(s"Batch logs for workflow $workflowId will be streamed to GCS at: $gcpBatchLogPath")

Check warning on line 624 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L624

Added line #L624 was not covered by tests

Some(
GcpBatchLogFile(gcsBucket = bucket, mountPath = GcpBatchAttachedDisk.GcsMountPoint, diskPath = path)

Check warning on line 627 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L626-L627

Added lines #L626 - L627 were not covered by tests
)
}
}

CreateBatchJobParameters(
jobDescriptor = jobDescriptor,
runtimeAttributes = runtimeAttributes,
dockerImage = jobDockerImage,
cloudWorkflowRoot = workflowPaths.workflowRoot,
cloudCallRoot = callRootPath,
commandScriptContainerPath = cmdInput.containerPath,
logGcsPath = gcpBatchLogPath,
inputOutputParameters = inputOutputParameters,
projectId = googleProject(jobDescriptor.workflowDescriptor),
computeServiceAccount = computeServiceAccount(jobDescriptor.workflowDescriptor),
Expand All @@ -628,7 +653,8 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
checkpointingConfiguration,
enableSshAccess = enableSshAccess,
vpcNetworkAndSubnetworkProjectLabels = data.vpcNetworkAndSubnetworkProjectLabels,
dockerhubCredentials = dockerhubCredentials
dockerhubCredentials = dockerhubCredentials,
targetLogFile = targetLogFile
)
case Some(other) =>
throw new RuntimeException(s"Unexpected initialization data: $other")
Expand Down Expand Up @@ -834,16 +860,6 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
contentType = plainTextContentType
)

val logFileOutput = GcpBatchFileOutput(
logFilename,
logGcsPath,
DefaultPathBuilder.get(logFilename),
workingDisk,
optional = true,
secondary = false,
contentType = plainTextContentType
)

val memoryRetryRCFileOutput = GcpBatchFileOutput(
memoryRetryRCFilename,
memoryRetryRCGcsPath,
Expand Down Expand Up @@ -884,8 +900,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
DetritusOutputParameters(
monitoringScriptOutputParameter = monitoringOutput,
rcFileOutputParameter = rcFileOutput,
memoryRetryRCFileOutputParameter = memoryRetryRCFileOutput,
logFileOutputParameter = logFileOutput
memoryRetryRCFileOutputParameter = memoryRetryRCFileOutput
),
List.empty
)
Expand All @@ -904,10 +919,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
runtimeAttributes = runtimeAttributes,
batchAttributes = batchAttributes,
projectId = batchAttributes.project,
region = batchAttributes.location,
logfile = createParameters.commandScriptContainerPath.sibling(
batchParameters.detritusOutputParameters.logFileOutputParameter.name
)
region = batchAttributes.location
)

drsLocalizationManifestCloudPath = jobPaths.callExecutionRoot / GcpBatchJobPaths.DrsLocalizationManifestName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ trait GcpBatchJobCachingActorHelper extends StandardCachingActorHelper {
lazy val gcpBatchLogPath: Path = gcpBatchCallPaths.batchLogPath
lazy val memoryRetryRCFilename: String = gcpBatchCallPaths.memoryRetryRCFilename
lazy val memoryRetryRCGcsPath: Path = gcpBatchCallPaths.memoryRetryRC

lazy val logFilename: String = "task.log"
lazy val logGcsPath: Path = gcpBatchCallPaths.callExecutionRoot.resolve(logFilename)

lazy val batchAttributes: GcpBatchConfigurationAttributes = batchConfiguration.batchAttributes

lazy val defaultLabels: Labels = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ object GcpBatchRequestFactory {
case class DetritusOutputParameters(
monitoringScriptOutputParameter: Option[GcpBatchFileOutput],
rcFileOutputParameter: GcpBatchFileOutput,
memoryRetryRCFileOutputParameter: GcpBatchFileOutput,
logFileOutputParameter: GcpBatchFileOutput
memoryRetryRCFileOutputParameter: GcpBatchFileOutput
) {
def all: List[GcpBatchFileOutput] = memoryRetryRCFileOutputParameter ::
logFileOutputParameter ::
rcFileOutputParameter ::
monitoringScriptOutputParameter.toList
}
Expand All @@ -68,13 +66,21 @@ object GcpBatchRequestFactory {

case class CreateBatchDockerKeyAndToken(key: String, encryptedToken: String)

/**
* Defines the values used for streaming the job logs to GCS.
*
* @param gcsBucket the Cloud Storage bucket where the log file should be streamed to.
* @param mountPath the path where the Cloud Storage bucket will be mounted to.
* @param diskPath the path in the mounted disk where the log file should be written to.
*/
case class GcpBatchLogFile(gcsBucket: String, mountPath: String, diskPath: Path)

case class CreateBatchJobParameters(jobDescriptor: BackendJobDescriptor,
runtimeAttributes: GcpBatchRuntimeAttributes,
dockerImage: String,
cloudWorkflowRoot: Path,
cloudCallRoot: Path,
commandScriptContainerPath: Path,
logGcsPath: Path,
inputOutputParameters: InputOutputParameters,
projectId: String,
computeServiceAccount: String,
Expand All @@ -92,7 +98,8 @@ object GcpBatchRequestFactory {
checkpointingConfiguration: CheckpointingConfiguration,
enableSshAccess: Boolean,
vpcNetworkAndSubnetworkProjectLabels: Option[VpcAndSubnetworkProjectLabelValues],
dockerhubCredentials: (String, String)
dockerhubCredentials: (String, String),
targetLogFile: Option[GcpBatchLogFile]
) {
def literalInputs = inputOutputParameters.literalInputParameters

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.google.cloud.batch.v1.{
ComputeResource,
CreateJobRequest,
DeleteJobRequest,
GCS,
GetJobRequest,
Job,
JobName,
Expand All @@ -20,7 +21,7 @@ import com.google.cloud.batch.v1.{
import com.google.protobuf.Duration
import cromwell.backend.google.batch.io.GcpBatchAttachedDisk
import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.GcsTransferConfiguration
import cromwell.backend.google.batch.models.{GcpBatchLogsPolicy, GcpBatchRequest, VpcAndSubnetworkProjectLabelValues}
import cromwell.backend.google.batch.models.{GcpBatchRequest, VpcAndSubnetworkProjectLabelValues}
import cromwell.backend.google.batch.runnable._
import cromwell.backend.google.batch.util.{BatchUtilityConversions, GcpBatchMachineConstraints}
import cromwell.core.logging.JobLogger
Expand Down Expand Up @@ -197,7 +198,12 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
val networkInterface = createNetwork(data = data)
val networkPolicy = createNetworkPolicy(networkInterface.build())
val allDisks = toDisks(allDisksToBeMounted)
val allVolumes = toVolumes(allDisksToBeMounted)
val allVolumes = toVolumes(allDisksToBeMounted) ::: createParameters.targetLogFile.map { targetLogFile =>
Volume.newBuilder
.setGcs(GCS.newBuilder().setRemotePath(targetLogFile.gcsBucket))
.setMountPath(targetLogFile.mountPath)
.build()
}.toList

val containerSetup: List[Runnable] = containerSetupRunnables(allVolumes)
val localization: List[Runnable] = localizeRunnables(createParameters, allVolumes)
Expand Down Expand Up @@ -238,13 +244,14 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
val locationPolicy = LocationPolicy.newBuilder.addAllowedLocations(zones).build
val allocationPolicy =
createAllocationPolicy(data, locationPolicy, instancePolicy.build, networkPolicy, gcpSa, accelerators)
val logsPolicy = data.gcpBatchParameters.batchAttributes.logsPolicy match {
case GcpBatchLogsPolicy.CloudLogging =>
LogsPolicy.newBuilder.setDestination(Destination.CLOUD_LOGGING).build
case GcpBatchLogsPolicy.Path =>

val logsPolicy = data.createParameters.targetLogFile match {
case None => LogsPolicy.newBuilder.setDestination(Destination.CLOUD_LOGGING).build

case Some(targetLogFile) =>
LogsPolicy.newBuilder
.setDestination(Destination.PATH)
.setLogsPath(data.gcpBatchParameters.logfile.toString)
.setLogsPath(targetLogFile.diskPath.pathAsString)
.build
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import wom.values._
import scala.util.Try

object GcpBatchAttachedDisk {
// The mount point for the Cloud Storage bucket
val GcsMountPoint = "/mnt/disks/gcs"

def parse(s: String): Try[GcpBatchAttachedDisk] = {

def sizeGbValidation(sizeGbString: String): ErrorOr[Int] = validateLong(sizeGbString).map(_.toInt)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package cromwell.backend.google.batch.models

import cromwell.backend.BackendJobDescriptor
import cromwell.core.path.Path

case class CreateGcpBatchParameters(jobDescriptor: BackendJobDescriptor,
runtimeAttributes: GcpBatchRuntimeAttributes,
batchAttributes: GcpBatchConfigurationAttributes,
projectId: String,
region: String,
logfile: Path
region: String
)

0 comments on commit 87f2e56

Please sign in to comment.