Skip to content

Commit

Permalink
WX-1784 Compute cost from metadata (#7537)
Browse files Browse the repository at this point in the history
Co-authored-by: Tom Wiseman <twiseman@broadinstitute.org>
  • Loading branch information
jgainerdewar and THWiseman committed Sep 18, 2024
1 parent 80fbf59 commit dde735a
Show file tree
Hide file tree
Showing 10 changed files with 502 additions and 117 deletions.
3 changes: 2 additions & 1 deletion engine/src/main/resources/swagger/cromwell.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,8 @@ paths:
in: query
- name: expandSubWorkflows
description: >
When true, metadata for sub workflows will be fetched and inserted automatically in the metadata response.
When true, metadata for sub workflows will be fetched and inserted automatically in the metadata response.
Requires inclusion of key subworkflowId in metadata results (either included or not excluded).
required: false
type: boolean
in: query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,8 @@ trait MetadataRouteSupport extends HttpInstrumentation {
},
path("workflows" / Segment / Segment / "cost") { (_, possibleWorkflowId) =>
get {
parameters(
(Symbol("includeTaskBreakdown").as[Boolean].?, Symbol("includeSubworkflowBreakdown").as[Boolean].?)
) { (includeTaskBreakdownOption, includeSubworkflowBreakdownOption) =>
val includeTaskBreakdown = includeTaskBreakdownOption.getOrElse(false)
val includeSubworkflowBreakdown = includeSubworkflowBreakdownOption.getOrElse(false)

metadataLookup(
possibleWorkflowId,
(w: WorkflowId) => GetCost(w, includeTaskBreakdown, includeSubworkflowBreakdown),
serviceRegistryActor
)
instrumentRequest {
metadataLookup(possibleWorkflowId, (w: WorkflowId) => GetCost(w), serviceRegistryActor)
}
}
},
Expand Down
277 changes: 267 additions & 10 deletions engine/src/test/scala/cromwell/webservice/MetadataBuilderActorSpec.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package cromwell.webservice

import java.time.{OffsetDateTime, ZoneOffset}
import akka.actor.Props

import java.time.{OffsetDateTime, ZoneOffset}
import akka.pattern.ask
import akka.testkit._
import akka.util.Timeout
import cats.implicits.catsSyntaxValidatedId
import cromwell.core._
import cromwell.services._
import cromwell.services.metadata.MetadataService._
import cromwell.services.metadata._
import cromwell.services.metadata.impl.ReadDatabaseMetadataWorkerActor
import cromwell.services.metadata.impl.builder.MetadataBuilderActor
import cromwell.util.AkkaTestUtil.EnhancedTestProbe
import cromwell.webservice.MetadataBuilderActorSpec._
Expand All @@ -18,6 +21,8 @@ import org.scalatest.prop.TableDrivenPropertyChecks
import org.scalatest.{Assertion, Succeeded}
import spray.json._

import java.util.UUID
import scala.BigDecimal
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Random
Expand Down Expand Up @@ -181,26 +186,119 @@ class MetadataBuilderActorSpec
assertMetadataResponse(queryAction, mdQuery, events, expectedRes, metadataBuilderActorName)
}

it should "build workflow cost response" in {
// Note that this is testing dummy behavior and should be updated and expanded when we add real logic
it should "build a basic workflow cost response" in {
val workflowId = WorkflowId.randomId()
val workflowState = WorkflowSucceeded
val events = Seq(
MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callA", None, 1)), "NOT_CHECKED"),
MetadataValue("NOT_CHECKED")
MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callA", None, 1)), CallMetadataKeys.VmStartTime),
MetadataValue("2023-10-30T09:00:00Z")
),
MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callA", None, 1)), CallMetadataKeys.VmEndTime),
MetadataValue("2023-10-30T11:00:00Z")
),
MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callA", None, 1)), CallMetadataKeys.VmCostPerHour),
MetadataValue(BigDecimal(3.5))
),
MetadataEvent(MetadataKey(workflowId, None, "status"), MetadataValue("Succeeded"))
MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callB", None, 1)), CallMetadataKeys.VmStartTime),
MetadataValue("2023-10-30T11:01:00Z")
),
MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callB", None, 1)), CallMetadataKeys.VmEndTime),
MetadataValue("2023-10-30T12:31:00Z")
),
MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callB", None, 1)), CallMetadataKeys.VmCostPerHour),
MetadataValue(BigDecimal(0.5))
)
)
val query = MetadataQuery(workflowId, None, None, None, None, expandSubWorkflows = false)

val expectedRes =
s"""{
|"cost": 3.5,
|"cost": 7.75,
|"currency": "USD",
|"id": "${workflowId}",
|"status": "${workflowState.toString}"
|"status": "${workflowState.toString}",
|"errors": []
|}""".stripMargin

val action = GetCost(workflowId, false, false)
val action = GetCost(workflowId)

val mockReadMetadataWorkerActor = TestProbe("mockReadMetadataWorkerActor")
def readMetadataWorkerMaker = () => mockReadMetadataWorkerActor.props

val mba = system.actorOf(
props = MetadataBuilderActor.props(readMetadataWorkerMaker, 1000000),
name = "mba-cost-builder"
)

val response = mba.ask(action).mapTo[MetadataJsonResponse]
mockReadMetadataWorkerActor.expectMsg(defaultTimeout, action)
mockReadMetadataWorkerActor.reply(
CostResponse(workflowId, workflowState, MetadataLookupResponse(query, events))
)
response map { r => r shouldBe a[SuccessfulMetadataJsonResponse] }
response.mapTo[SuccessfulMetadataJsonResponse] map { b => b.responseJson shouldBe expectedRes.parseJson }
}

it should "build a workflow cost response with an error" in {
val workflowId = WorkflowId.randomId()
val workflowState = WorkflowSucceeded
val events = Seq(
MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callA", None, 1)), CallMetadataKeys.VmStartTime),
MetadataValue("2023-10-30T09:00:00Z")
),
MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callA", None, 1)), CallMetadataKeys.VmEndTime),
MetadataValue("2023-10-30T11:00:00Z")
),
MetadataEvent(
MetadataKey(workflowId, Option(MetadataJobKey("callA", None, 1)), CallMetadataKeys.VmCostPerHour),
MetadataValue(BigDecimal(-1)) // indicates an error when computing vmCostPerHour
),
MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callB", None, 1)), CallMetadataKeys.VmStartTime),
MetadataValue("2023-10-30T11:01:00Z")
),
MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callB", None, 1)), CallMetadataKeys.VmEndTime),
MetadataValue("2023-10-30T12:31:00Z")
),
MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callB", None, 1)), CallMetadataKeys.VmCostPerHour),
MetadataValue(BigDecimal(0.5))
)
)
val query = MetadataQuery(workflowId, None, None, None, None, expandSubWorkflows = false)

val expectedRes =
s"""{
|"cost": 0.75,
|"currency": "USD",
|"id": "${workflowId}",
|"status": "${workflowState.toString}",
|"errors": ["Couldn't find valid vmCostPerHour for callA.-1.1"]
|}""".stripMargin

val action = GetCost(workflowId)

val mockReadMetadataWorkerActor = TestProbe("mockReadMetadataWorkerActor")
def readMetadataWorkerMaker = () => mockReadMetadataWorkerActor.props

val mba = system.actorOf(
props = MetadataBuilderActor.props(readMetadataWorkerMaker, 1000000),
name = "mba-cost-builder"
)

val response = mba.ask(action).mapTo[MetadataJsonResponse]
mockReadMetadataWorkerActor.expectMsg(defaultTimeout, action)
mockReadMetadataWorkerActor.reply(
CostResponse(workflowId, workflowState, MetadataLookupResponse(query, events))
)
response map { r => r shouldBe a[SuccessfulMetadataJsonResponse] }
response.mapTo[SuccessfulMetadataJsonResponse] map { b => b.responseJson shouldBe expectedRes.parseJson }
}

it should "build an error workflow cost response" in {
val workflowId = WorkflowId.randomId()

val action = GetCost(workflowId)

val expectedException = new Exception("Oh nooooo :(")

val mockReadMetadataWorkerActor = TestProbe("mockReadMetadataWorkerActor")
def readMetadataWorkerMaker = () => mockReadMetadataWorkerActor.props
Expand All @@ -213,12 +311,171 @@ class MetadataBuilderActorSpec
val response = mba.ask(action).mapTo[MetadataJsonResponse]
mockReadMetadataWorkerActor.expectMsg(defaultTimeout, action)
mockReadMetadataWorkerActor.reply(
CostResponse(workflowId, workflowState, events, false, false)
CostFailure(workflowId, expectedException)
)
response map { r => r shouldBe a[FailedMetadataJsonResponse] }
response.mapTo[FailedMetadataJsonResponse] map { b =>
b.reason.getClass shouldBe expectedException.getClass
b.reason.getMessage shouldBe expectedException.getMessage
}
}

def costMetadataEvents(wfId: WorkflowId,
callName: String,
shardIndex: Option[Int] = None,
attempt: Int = 1,
costPerHour: BigDecimal = BigDecimal(1)
) = List(
MetadataEvent(
MetadataKey(wfId, Option(MetadataJobKey(callName, shardIndex, attempt)), CallMetadataKeys.VmStartTime),
MetadataValue("2023-10-30T09:00:00Z")
),
MetadataEvent(MetadataKey(wfId, Option(MetadataJobKey(callName, shardIndex, attempt)), CallMetadataKeys.VmEndTime),
MetadataValue("2023-10-30T10:00:00Z")
),
MetadataEvent(
MetadataKey(wfId, Option(MetadataJobKey(callName, shardIndex, attempt)), CallMetadataKeys.VmCostPerHour),
MetadataValue(costPerHour)
)
)

it should "build a cost response with subworkflow data" in {
// hard-coding UUIDs to make test failures easier to diagnose
val mainWorkflowId = WorkflowId(UUID.fromString("00000000-f76d-4af3-b371-5ba580916729"))
val subWorkflow1Id = WorkflowId(UUID.fromString("11111111-f76d-4af3-b371-5ba580916729"))
val subWorkflow2Id = WorkflowId(UUID.fromString("22222222-f76d-4af3-b371-5ba580916729"))
val subWorkflow1aId = WorkflowId(UUID.fromString("1a1a1a1a-f76d-4af3-b371-5ba580916729"))
val subWorkflow1bId = WorkflowId(UUID.fromString("1b1b1b1b-f76d-4af3-b371-5ba580916729"))

val workflowState = WorkflowSucceeded

val mainEvents = List(
MetadataEvent(MetadataKey(mainWorkflowId, Option(MetadataJobKey("wfMain", None, 1)), "subWorkflowId"),
MetadataValue(subWorkflow1Id)
),
MetadataEvent(MetadataKey(mainWorkflowId, Option(MetadataJobKey("wfMain", None, 1)), "subWorkflowId"),
MetadataValue(subWorkflow2Id)
)
) ++
costMetadataEvents(mainWorkflowId, "callA", Option(0), 1) ++
costMetadataEvents(mainWorkflowId, "callA", Option(1), 1)

val sub1Events = List(
MetadataEvent(MetadataKey(subWorkflow1Id, Option(MetadataJobKey("wfSub1", None, 1)), "subWorkflowId"),
MetadataValue(subWorkflow1aId)
),
MetadataEvent(MetadataKey(subWorkflow1Id, Option(MetadataJobKey("wfSub1", None, 1)), "subWorkflowId"),
MetadataValue(subWorkflow1bId)
)
)

val sub2Events =
costMetadataEvents(subWorkflow2Id, "call2A") ++
costMetadataEvents(subWorkflow2Id, "call2B") ++
costMetadataEvents(subWorkflow2Id, "call2C")
val sub1aEvents = costMetadataEvents(subWorkflow1aId, "call1aA", costPerHour = BigDecimal(-1))
val sub1bEvents =
costMetadataEvents(subWorkflow1bId, "call1bA", attempt = 1) ++
costMetadataEvents(subWorkflow1bId, "call1bA", attempt = 2)

val mainQuery = MetadataQuery(mainWorkflowId, None, None, None, None, expandSubWorkflows = true)
val sub1Query = MetadataQuery(subWorkflow1Id, None, None, None, None, expandSubWorkflows = true)
val sub2Query = MetadataQuery(subWorkflow2Id, None, None, None, None, expandSubWorkflows = true)
val sub1aQuery = MetadataQuery(subWorkflow1aId, None, None, None, None, expandSubWorkflows = true)
val sub1bQuery = MetadataQuery(subWorkflow1bId, None, None, None, None, expandSubWorkflows = true)
val mainAction = GetCost(mainWorkflowId)

// Mock ReadDatabaseMetadataWorkerActor to return the expected metadata results for each query.
// Would normally have done this with expect/reply on a TestProbe, but that required the messages to
// be sent in a deterministic order, which is not the case here.
class TestReadDatabaseMetadataWorkerActorForCost extends ReadDatabaseMetadataWorkerActor(defaultTimeout, 1000000) {
override def receive: Receive = {
case GetCost(wfId) if wfId == mainWorkflowId =>
sender() ! CostResponse(mainWorkflowId, workflowState, MetadataLookupResponse(mainQuery, mainEvents))
()
case GetCost(wfId) if wfId == subWorkflow1Id =>
sender() ! CostResponse(subWorkflow1Id, workflowState, MetadataLookupResponse(sub1Query, sub1Events))
()
case GetCost(wfId) if wfId == subWorkflow2Id =>
sender() ! CostResponse(subWorkflow2Id, workflowState, MetadataLookupResponse(sub2Query, sub2Events))
()
case GetCost(wfId) if wfId == subWorkflow1aId =>
sender() ! CostResponse(subWorkflow1aId, workflowState, MetadataLookupResponse(sub1aQuery, sub1aEvents))
()
case GetCost(wfId) if wfId == subWorkflow1bId =>
sender() ! CostResponse(subWorkflow1bId, workflowState, MetadataLookupResponse(sub1bQuery, sub1bEvents))
()
case _ => ()
}
}

val expectedRes =
s"""{
|"cost": 7,
|"currency": "USD",
|"id": "${mainWorkflowId}",
|"status": "${workflowState.toString}",
|"errors": ["Couldn't find valid vmCostPerHour for call1aA.-1.1"]
|}""".stripMargin

def readMetadataWorkerMaker = () => Props(new TestReadDatabaseMetadataWorkerActorForCost)
val mba = system.actorOf(
props = MetadataBuilderActor.props(readMetadataWorkerMaker, 1000000),
name = "mba-cost-builder"
)

val response = mba.ask(mainAction).mapTo[MetadataJsonResponse]
response map { r => r shouldBe a[SuccessfulMetadataJsonResponse] }
response.mapTo[SuccessfulMetadataJsonResponse] map { b => b.responseJson shouldBe expectedRes.parseJson }
}

it should "compute cost for calls" in {
val costJsValRows = Table(
("jsInput", "expected"),
("""{"attempt": 1, "shardIndex": -1, "vmStartTime": "2023-10-30T04:00:00Z", "vmEndTime": "2023-10-30T05:00:00Z", "vmCostPerHour": 0.0567}""",
BigDecimal(0.0567).validNel
),
("""{"attempt": 1, "shardIndex": -1, "vmStartTime": "2023-10-30T04:00:00Z", "vmEndTime": "2023-10-30T04:30:00Z", "vmCostPerHour": 0.0567}""",
BigDecimal(0.02835).validNel
),
("""{"attempt": 1, "shardIndex": -1, "vmEndTime": "2023-10-30T05:00:00Z", "vmCostPerHour": 0.0567}""",
BigDecimal(0).validNel
),
("""{"attempt": 1, "shardIndex": -1, "vmEndTime": "2023-10-30T05:00:00Z", "vmEndTime": "2023-10-30T04:30:00Z"}""",
BigDecimal(0).validNel
),
(s"""{"attempt": 1, "shardIndex": -1, "vmStartTime": "2023-10-30 05:00:00Z", "vmCostPerHour": 0.0567}""",
"Text '2023-10-30 05:00:00Z' could not be parsed at index 10".invalidNel
),
(s"""{"attempt": 1, "shardIndex": -1, "vmStartTime": "2023-10-30T05:00:00Z", "vmEndTime": "2023-10-30T04:30:00Z", "vmCostPerHour": -1}""",
"Couldn't find valid vmCostPerHour for foo.-1.1".invalidNel
)
)

forAll(costJsValRows) { case (jsInput: String, expected) =>
val jsVal = jsInput.parseJson
val retVal = MetadataBuilderActor.computeCost("foo", jsVal)
retVal shouldBe expected
}
}

it should "compute cost for calls that are still running" in {
val inputJsVal =
s"""{
|"attempt": 1,
|"shardIndex": -1,
|"vmStartTime": "${OffsetDateTime.now().minusMinutes(5)}",
|"vmCostPerHour": 12
|}""".stripMargin.parseJson

val retVal = MetadataBuilderActor.computeCost("foo", inputJsVal)

// Just test that the cost is approximately 1 - we don't know
// exactly how long the test takes to run
retVal.getOrElse(BigDecimal(0)) should be > BigDecimal(0.9)
retVal.getOrElse(BigDecimal(100)) should be < BigDecimal(1.1)
}

it should "build the call list for failed tasks when prompted" in {

def makeEvent(workflow: WorkflowId, key: Option[MetadataJobKey]) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,19 +725,16 @@ object CromwellApiServiceSpec {
)
)
sender() ! SuccessfulMetadataJsonResponse(request, MetadataBuilderActor.processOutputsResponse(id, event))
case request @ GetCost(id, includeTaskBreakdown, includeSubworkflowBreakdown) =>
case request @ GetCost(id) =>
sender() ! SuccessfulMetadataJsonResponse(
request,
MetadataBuilderActor.processCostResponse(
id,
WorkflowSucceeded,
Vector(
MetadataEvent(MetadataKey(id, None, "outputs:test.hello.salutation"),
MetadataValue("Hello foo!", MetadataString)
)
),
includeTaskBreakdown,
includeSubworkflowBreakdown
JsObject(
Map(
WorkflowMetadataKeys.Id -> JsString(id.toString),
WorkflowMetadataKeys.Status -> JsString(WorkflowSucceeded.toString),
"currency" -> JsString("USD"),
"cost" -> JsNumber(3.5)
)
)
)
case request @ GetLogs(id) =>
Expand Down
Loading

0 comments on commit dde735a

Please sign in to comment.