diff --git a/engine/src/main/resources/swagger/cromwell.yaml b/engine/src/main/resources/swagger/cromwell.yaml index 59a202df393..8f4f94fbdd6 100644 --- a/engine/src/main/resources/swagger/cromwell.yaml +++ b/engine/src/main/resources/swagger/cromwell.yaml @@ -556,7 +556,8 @@ paths: in: query - name: expandSubWorkflows description: > - When true, metadata for sub workflows will be fetched and inserted automatically in the metadata response. + When true, metadata for sub workflows will be fetched and inserted automatically in the metadata response. + Requires inclusion of key subworkflowId in metadata results (either included or not excluded). required: false type: boolean in: query diff --git a/engine/src/main/scala/cromwell/webservice/routes/MetadataRouteSupport.scala b/engine/src/main/scala/cromwell/webservice/routes/MetadataRouteSupport.scala index afa12d871dd..ee2b4cdd483 100644 --- a/engine/src/main/scala/cromwell/webservice/routes/MetadataRouteSupport.scala +++ b/engine/src/main/scala/cromwell/webservice/routes/MetadataRouteSupport.scala @@ -125,17 +125,8 @@ trait MetadataRouteSupport extends HttpInstrumentation { }, path("workflows" / Segment / Segment / "cost") { (_, possibleWorkflowId) => get { - parameters( - (Symbol("includeTaskBreakdown").as[Boolean].?, Symbol("includeSubworkflowBreakdown").as[Boolean].?) - ) { (includeTaskBreakdownOption, includeSubworkflowBreakdownOption) => - val includeTaskBreakdown = includeTaskBreakdownOption.getOrElse(false) - val includeSubworkflowBreakdown = includeSubworkflowBreakdownOption.getOrElse(false) - - metadataLookup( - possibleWorkflowId, - (w: WorkflowId) => GetCost(w, includeTaskBreakdown, includeSubworkflowBreakdown), - serviceRegistryActor - ) + instrumentRequest { + metadataLookup(possibleWorkflowId, (w: WorkflowId) => GetCost(w), serviceRegistryActor) } } }, diff --git a/engine/src/test/scala/cromwell/webservice/MetadataBuilderActorSpec.scala b/engine/src/test/scala/cromwell/webservice/MetadataBuilderActorSpec.scala index 2851f36e013..02ffaefa3b7 100644 --- a/engine/src/test/scala/cromwell/webservice/MetadataBuilderActorSpec.scala +++ b/engine/src/test/scala/cromwell/webservice/MetadataBuilderActorSpec.scala @@ -1,14 +1,17 @@ package cromwell.webservice -import java.time.{OffsetDateTime, ZoneOffset} +import akka.actor.Props +import java.time.{OffsetDateTime, ZoneOffset} import akka.pattern.ask import akka.testkit._ import akka.util.Timeout +import cats.implicits.catsSyntaxValidatedId import cromwell.core._ import cromwell.services._ import cromwell.services.metadata.MetadataService._ import cromwell.services.metadata._ +import cromwell.services.metadata.impl.ReadDatabaseMetadataWorkerActor import cromwell.services.metadata.impl.builder.MetadataBuilderActor import cromwell.util.AkkaTestUtil.EnhancedTestProbe import cromwell.webservice.MetadataBuilderActorSpec._ @@ -18,6 +21,8 @@ import org.scalatest.prop.TableDrivenPropertyChecks import org.scalatest.{Assertion, Succeeded} import spray.json._ +import java.util.UUID +import scala.BigDecimal import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Random @@ -181,26 +186,119 @@ class MetadataBuilderActorSpec assertMetadataResponse(queryAction, mdQuery, events, expectedRes, metadataBuilderActorName) } - it should "build workflow cost response" in { - // Note that this is testing dummy behavior and should be updated and expanded when we add real logic + it should "build a basic workflow cost response" in { val workflowId = WorkflowId.randomId() val workflowState = WorkflowSucceeded val events = Seq( - MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callA", None, 1)), "NOT_CHECKED"), - MetadataValue("NOT_CHECKED") + MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callA", None, 1)), CallMetadataKeys.VmStartTime), + MetadataValue("2023-10-30T09:00:00Z") + ), + MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callA", None, 1)), CallMetadataKeys.VmEndTime), + MetadataValue("2023-10-30T11:00:00Z") + ), + MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callA", None, 1)), CallMetadataKeys.VmCostPerHour), + MetadataValue(BigDecimal(3.5)) ), - MetadataEvent(MetadataKey(workflowId, None, "status"), MetadataValue("Succeeded")) + MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callB", None, 1)), CallMetadataKeys.VmStartTime), + MetadataValue("2023-10-30T11:01:00Z") + ), + MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callB", None, 1)), CallMetadataKeys.VmEndTime), + MetadataValue("2023-10-30T12:31:00Z") + ), + MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callB", None, 1)), CallMetadataKeys.VmCostPerHour), + MetadataValue(BigDecimal(0.5)) + ) ) + val query = MetadataQuery(workflowId, None, None, None, None, expandSubWorkflows = false) val expectedRes = s"""{ - |"cost": 3.5, + |"cost": 7.75, |"currency": "USD", |"id": "${workflowId}", - |"status": "${workflowState.toString}" + |"status": "${workflowState.toString}", + |"errors": [] |}""".stripMargin - val action = GetCost(workflowId, false, false) + val action = GetCost(workflowId) + + val mockReadMetadataWorkerActor = TestProbe("mockReadMetadataWorkerActor") + def readMetadataWorkerMaker = () => mockReadMetadataWorkerActor.props + + val mba = system.actorOf( + props = MetadataBuilderActor.props(readMetadataWorkerMaker, 1000000), + name = "mba-cost-builder" + ) + + val response = mba.ask(action).mapTo[MetadataJsonResponse] + mockReadMetadataWorkerActor.expectMsg(defaultTimeout, action) + mockReadMetadataWorkerActor.reply( + CostResponse(workflowId, workflowState, MetadataLookupResponse(query, events)) + ) + response map { r => r shouldBe a[SuccessfulMetadataJsonResponse] } + response.mapTo[SuccessfulMetadataJsonResponse] map { b => b.responseJson shouldBe expectedRes.parseJson } + } + + it should "build a workflow cost response with an error" in { + val workflowId = WorkflowId.randomId() + val workflowState = WorkflowSucceeded + val events = Seq( + MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callA", None, 1)), CallMetadataKeys.VmStartTime), + MetadataValue("2023-10-30T09:00:00Z") + ), + MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callA", None, 1)), CallMetadataKeys.VmEndTime), + MetadataValue("2023-10-30T11:00:00Z") + ), + MetadataEvent( + MetadataKey(workflowId, Option(MetadataJobKey("callA", None, 1)), CallMetadataKeys.VmCostPerHour), + MetadataValue(BigDecimal(-1)) // indicates an error when computing vmCostPerHour + ), + MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callB", None, 1)), CallMetadataKeys.VmStartTime), + MetadataValue("2023-10-30T11:01:00Z") + ), + MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callB", None, 1)), CallMetadataKeys.VmEndTime), + MetadataValue("2023-10-30T12:31:00Z") + ), + MetadataEvent(MetadataKey(workflowId, Option(MetadataJobKey("callB", None, 1)), CallMetadataKeys.VmCostPerHour), + MetadataValue(BigDecimal(0.5)) + ) + ) + val query = MetadataQuery(workflowId, None, None, None, None, expandSubWorkflows = false) + + val expectedRes = + s"""{ + |"cost": 0.75, + |"currency": "USD", + |"id": "${workflowId}", + |"status": "${workflowState.toString}", + |"errors": ["Couldn't find valid vmCostPerHour for callA.-1.1"] + |}""".stripMargin + + val action = GetCost(workflowId) + + val mockReadMetadataWorkerActor = TestProbe("mockReadMetadataWorkerActor") + def readMetadataWorkerMaker = () => mockReadMetadataWorkerActor.props + + val mba = system.actorOf( + props = MetadataBuilderActor.props(readMetadataWorkerMaker, 1000000), + name = "mba-cost-builder" + ) + + val response = mba.ask(action).mapTo[MetadataJsonResponse] + mockReadMetadataWorkerActor.expectMsg(defaultTimeout, action) + mockReadMetadataWorkerActor.reply( + CostResponse(workflowId, workflowState, MetadataLookupResponse(query, events)) + ) + response map { r => r shouldBe a[SuccessfulMetadataJsonResponse] } + response.mapTo[SuccessfulMetadataJsonResponse] map { b => b.responseJson shouldBe expectedRes.parseJson } + } + + it should "build an error workflow cost response" in { + val workflowId = WorkflowId.randomId() + + val action = GetCost(workflowId) + + val expectedException = new Exception("Oh nooooo :(") val mockReadMetadataWorkerActor = TestProbe("mockReadMetadataWorkerActor") def readMetadataWorkerMaker = () => mockReadMetadataWorkerActor.props @@ -213,12 +311,171 @@ class MetadataBuilderActorSpec val response = mba.ask(action).mapTo[MetadataJsonResponse] mockReadMetadataWorkerActor.expectMsg(defaultTimeout, action) mockReadMetadataWorkerActor.reply( - CostResponse(workflowId, workflowState, events, false, false) + CostFailure(workflowId, expectedException) + ) + response map { r => r shouldBe a[FailedMetadataJsonResponse] } + response.mapTo[FailedMetadataJsonResponse] map { b => + b.reason.getClass shouldBe expectedException.getClass + b.reason.getMessage shouldBe expectedException.getMessage + } + } + + def costMetadataEvents(wfId: WorkflowId, + callName: String, + shardIndex: Option[Int] = None, + attempt: Int = 1, + costPerHour: BigDecimal = BigDecimal(1) + ) = List( + MetadataEvent( + MetadataKey(wfId, Option(MetadataJobKey(callName, shardIndex, attempt)), CallMetadataKeys.VmStartTime), + MetadataValue("2023-10-30T09:00:00Z") + ), + MetadataEvent(MetadataKey(wfId, Option(MetadataJobKey(callName, shardIndex, attempt)), CallMetadataKeys.VmEndTime), + MetadataValue("2023-10-30T10:00:00Z") + ), + MetadataEvent( + MetadataKey(wfId, Option(MetadataJobKey(callName, shardIndex, attempt)), CallMetadataKeys.VmCostPerHour), + MetadataValue(costPerHour) ) + ) + + it should "build a cost response with subworkflow data" in { + // hard-coding UUIDs to make test failures easier to diagnose + val mainWorkflowId = WorkflowId(UUID.fromString("00000000-f76d-4af3-b371-5ba580916729")) + val subWorkflow1Id = WorkflowId(UUID.fromString("11111111-f76d-4af3-b371-5ba580916729")) + val subWorkflow2Id = WorkflowId(UUID.fromString("22222222-f76d-4af3-b371-5ba580916729")) + val subWorkflow1aId = WorkflowId(UUID.fromString("1a1a1a1a-f76d-4af3-b371-5ba580916729")) + val subWorkflow1bId = WorkflowId(UUID.fromString("1b1b1b1b-f76d-4af3-b371-5ba580916729")) + + val workflowState = WorkflowSucceeded + + val mainEvents = List( + MetadataEvent(MetadataKey(mainWorkflowId, Option(MetadataJobKey("wfMain", None, 1)), "subWorkflowId"), + MetadataValue(subWorkflow1Id) + ), + MetadataEvent(MetadataKey(mainWorkflowId, Option(MetadataJobKey("wfMain", None, 1)), "subWorkflowId"), + MetadataValue(subWorkflow2Id) + ) + ) ++ + costMetadataEvents(mainWorkflowId, "callA", Option(0), 1) ++ + costMetadataEvents(mainWorkflowId, "callA", Option(1), 1) + + val sub1Events = List( + MetadataEvent(MetadataKey(subWorkflow1Id, Option(MetadataJobKey("wfSub1", None, 1)), "subWorkflowId"), + MetadataValue(subWorkflow1aId) + ), + MetadataEvent(MetadataKey(subWorkflow1Id, Option(MetadataJobKey("wfSub1", None, 1)), "subWorkflowId"), + MetadataValue(subWorkflow1bId) + ) + ) + + val sub2Events = + costMetadataEvents(subWorkflow2Id, "call2A") ++ + costMetadataEvents(subWorkflow2Id, "call2B") ++ + costMetadataEvents(subWorkflow2Id, "call2C") + val sub1aEvents = costMetadataEvents(subWorkflow1aId, "call1aA", costPerHour = BigDecimal(-1)) + val sub1bEvents = + costMetadataEvents(subWorkflow1bId, "call1bA", attempt = 1) ++ + costMetadataEvents(subWorkflow1bId, "call1bA", attempt = 2) + + val mainQuery = MetadataQuery(mainWorkflowId, None, None, None, None, expandSubWorkflows = true) + val sub1Query = MetadataQuery(subWorkflow1Id, None, None, None, None, expandSubWorkflows = true) + val sub2Query = MetadataQuery(subWorkflow2Id, None, None, None, None, expandSubWorkflows = true) + val sub1aQuery = MetadataQuery(subWorkflow1aId, None, None, None, None, expandSubWorkflows = true) + val sub1bQuery = MetadataQuery(subWorkflow1bId, None, None, None, None, expandSubWorkflows = true) + val mainAction = GetCost(mainWorkflowId) + + // Mock ReadDatabaseMetadataWorkerActor to return the expected metadata results for each query. + // Would normally have done this with expect/reply on a TestProbe, but that required the messages to + // be sent in a deterministic order, which is not the case here. + class TestReadDatabaseMetadataWorkerActorForCost extends ReadDatabaseMetadataWorkerActor(defaultTimeout, 1000000) { + override def receive: Receive = { + case GetCost(wfId) if wfId == mainWorkflowId => + sender() ! CostResponse(mainWorkflowId, workflowState, MetadataLookupResponse(mainQuery, mainEvents)) + () + case GetCost(wfId) if wfId == subWorkflow1Id => + sender() ! CostResponse(subWorkflow1Id, workflowState, MetadataLookupResponse(sub1Query, sub1Events)) + () + case GetCost(wfId) if wfId == subWorkflow2Id => + sender() ! CostResponse(subWorkflow2Id, workflowState, MetadataLookupResponse(sub2Query, sub2Events)) + () + case GetCost(wfId) if wfId == subWorkflow1aId => + sender() ! CostResponse(subWorkflow1aId, workflowState, MetadataLookupResponse(sub1aQuery, sub1aEvents)) + () + case GetCost(wfId) if wfId == subWorkflow1bId => + sender() ! CostResponse(subWorkflow1bId, workflowState, MetadataLookupResponse(sub1bQuery, sub1bEvents)) + () + case _ => () + } + } + + val expectedRes = + s"""{ + |"cost": 7, + |"currency": "USD", + |"id": "${mainWorkflowId}", + |"status": "${workflowState.toString}", + |"errors": ["Couldn't find valid vmCostPerHour for call1aA.-1.1"] + |}""".stripMargin + + def readMetadataWorkerMaker = () => Props(new TestReadDatabaseMetadataWorkerActorForCost) + val mba = system.actorOf( + props = MetadataBuilderActor.props(readMetadataWorkerMaker, 1000000), + name = "mba-cost-builder" + ) + + val response = mba.ask(mainAction).mapTo[MetadataJsonResponse] response map { r => r shouldBe a[SuccessfulMetadataJsonResponse] } response.mapTo[SuccessfulMetadataJsonResponse] map { b => b.responseJson shouldBe expectedRes.parseJson } } + it should "compute cost for calls" in { + val costJsValRows = Table( + ("jsInput", "expected"), + ("""{"attempt": 1, "shardIndex": -1, "vmStartTime": "2023-10-30T04:00:00Z", "vmEndTime": "2023-10-30T05:00:00Z", "vmCostPerHour": 0.0567}""", + BigDecimal(0.0567).validNel + ), + ("""{"attempt": 1, "shardIndex": -1, "vmStartTime": "2023-10-30T04:00:00Z", "vmEndTime": "2023-10-30T04:30:00Z", "vmCostPerHour": 0.0567}""", + BigDecimal(0.02835).validNel + ), + ("""{"attempt": 1, "shardIndex": -1, "vmEndTime": "2023-10-30T05:00:00Z", "vmCostPerHour": 0.0567}""", + BigDecimal(0).validNel + ), + ("""{"attempt": 1, "shardIndex": -1, "vmEndTime": "2023-10-30T05:00:00Z", "vmEndTime": "2023-10-30T04:30:00Z"}""", + BigDecimal(0).validNel + ), + (s"""{"attempt": 1, "shardIndex": -1, "vmStartTime": "2023-10-30 05:00:00Z", "vmCostPerHour": 0.0567}""", + "Text '2023-10-30 05:00:00Z' could not be parsed at index 10".invalidNel + ), + (s"""{"attempt": 1, "shardIndex": -1, "vmStartTime": "2023-10-30T05:00:00Z", "vmEndTime": "2023-10-30T04:30:00Z", "vmCostPerHour": -1}""", + "Couldn't find valid vmCostPerHour for foo.-1.1".invalidNel + ) + ) + + forAll(costJsValRows) { case (jsInput: String, expected) => + val jsVal = jsInput.parseJson + val retVal = MetadataBuilderActor.computeCost("foo", jsVal) + retVal shouldBe expected + } + } + + it should "compute cost for calls that are still running" in { + val inputJsVal = + s"""{ + |"attempt": 1, + |"shardIndex": -1, + |"vmStartTime": "${OffsetDateTime.now().minusMinutes(5)}", + |"vmCostPerHour": 12 + |}""".stripMargin.parseJson + + val retVal = MetadataBuilderActor.computeCost("foo", inputJsVal) + + // Just test that the cost is approximately 1 - we don't know + // exactly how long the test takes to run + retVal.getOrElse(BigDecimal(0)) should be > BigDecimal(0.9) + retVal.getOrElse(BigDecimal(100)) should be < BigDecimal(1.1) + } + it should "build the call list for failed tasks when prompted" in { def makeEvent(workflow: WorkflowId, key: Option[MetadataJobKey]) = diff --git a/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala b/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala index 1ee62199168..7900c02a0c9 100644 --- a/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala +++ b/engine/src/test/scala/cromwell/webservice/routes/CromwellApiServiceSpec.scala @@ -725,19 +725,16 @@ object CromwellApiServiceSpec { ) ) sender() ! SuccessfulMetadataJsonResponse(request, MetadataBuilderActor.processOutputsResponse(id, event)) - case request @ GetCost(id, includeTaskBreakdown, includeSubworkflowBreakdown) => + case request @ GetCost(id) => sender() ! SuccessfulMetadataJsonResponse( request, - MetadataBuilderActor.processCostResponse( - id, - WorkflowSucceeded, - Vector( - MetadataEvent(MetadataKey(id, None, "outputs:test.hello.salutation"), - MetadataValue("Hello foo!", MetadataString) - ) - ), - includeTaskBreakdown, - includeSubworkflowBreakdown + JsObject( + Map( + WorkflowMetadataKeys.Id -> JsString(id.toString), + WorkflowMetadataKeys.Status -> JsString(WorkflowSucceeded.toString), + "currency" -> JsString("USD"), + "cost" -> JsNumber(3.5) + ) ) ) case request @ GetLogs(id) => diff --git a/engine/src/test/scala/cromwell/webservice/routes/MetadataRouteSupportSpec.scala b/engine/src/test/scala/cromwell/webservice/routes/MetadataRouteSupportSpec.scala index 3bbb5674793..2cd8b64517a 100644 --- a/engine/src/test/scala/cromwell/webservice/routes/MetadataRouteSupportSpec.scala +++ b/engine/src/test/scala/cromwell/webservice/routes/MetadataRouteSupportSpec.scala @@ -171,26 +171,6 @@ class MetadataRouteSupportSpec extends AsyncFlatSpec with ScalatestRouteTest wit costResponse.fields("currency") should be(JsString("USD")) costResponse.fields("cost") should be(JsNumber(3.5)) costResponse.fields("status") should be(JsString("Succeeded")) - costResponse.fields.keys should not contain "taskBreakdown" - costResponse.fields.keys should not contain "subworkflowBreakdown" - } - } - - it should "return 200 with basic cost info with task and subworkflow breakdowns" in { - Get( - s"/workflows/$version/${CromwellApiServiceSpec.ExistingWorkflowId}/cost?includeTaskBreakdown=true&includeSubworkflowBreakdown=true" - ) ~> - akkaHttpService.metadataRoutes ~> - check { - status should be(StatusCodes.OK) - - val costResponse = responseAs[JsObject] - costResponse.fields("id") should be(JsString(CromwellApiServiceSpec.ExistingWorkflowId.toString)) - costResponse.fields("currency") should be(JsString("USD")) - costResponse.fields("cost") should be(JsNumber(3.5)) - costResponse.fields("status") should be(JsString("Succeeded")) - costResponse.fields("taskBreakdown") should be(JsObject(Map("foo.bar" -> JsNumber(3.5)))) - costResponse.fields("subworkflowBreakdown") should be(JsObject(Map("foo.baz" -> JsNumber(3.5)))) } } diff --git a/services/src/main/scala/cromwell/services/metadata/MetadataQuery.scala b/services/src/main/scala/cromwell/services/metadata/MetadataQuery.scala index c88a142605b..a8cf1a6fca7 100644 --- a/services/src/main/scala/cromwell/services/metadata/MetadataQuery.scala +++ b/services/src/main/scala/cromwell/services/metadata/MetadataQuery.scala @@ -62,7 +62,7 @@ object MetadataValue { case WomOptionalValue(_, None) => new MetadataValue("", MetadataNull) case value: WomValue => new MetadataValue(value.valueString, MetadataString) case _: Int | Long | _: java.lang.Long | _: java.lang.Integer => new MetadataValue(value.toString, MetadataInt) - case _: Double | Float | _: java.lang.Double | _: java.lang.Float => + case _: Double | Float | _: BigDecimal | _: java.lang.Double | _: java.lang.Float => new MetadataValue(value.toString, MetadataNumber) case _: Boolean | _: java.lang.Boolean => new MetadataValue(value.toString, MetadataBoolean) case offsetDateTime: OffsetDateTime => new MetadataValue(offsetDateTime.toUtcMilliString, MetadataString) diff --git a/services/src/main/scala/cromwell/services/metadata/MetadataService.scala b/services/src/main/scala/cromwell/services/metadata/MetadataService.scala index c7439d5560a..6c8bd4c7f59 100644 --- a/services/src/main/scala/cromwell/services/metadata/MetadataService.scala +++ b/services/src/main/scala/cromwell/services/metadata/MetadataService.scala @@ -126,8 +126,7 @@ object MetadataService { extends BuildMetadataJsonAction final case class WorkflowOutputs(workflowId: WorkflowId) extends BuildWorkflowMetadataJsonWithOverridableSourceAction final case class GetLogs(workflowId: WorkflowId) extends BuildWorkflowMetadataJsonWithOverridableSourceAction - final case class GetCost(workflowId: WorkflowId, includeTaskBreakdown: Boolean, includeSubworkflowBreakdown: Boolean) - extends BuildWorkflowMetadataJsonWithOverridableSourceAction + final case class GetCost(workflowId: WorkflowId) extends BuildWorkflowMetadataJsonWithOverridableSourceAction case object RefreshSummary extends MetadataServiceAction case object SendMetadataTableSizeMetrics extends MetadataServiceAction @@ -178,12 +177,8 @@ object MetadataService { final case class LogsResponse(id: WorkflowId, logs: Seq[MetadataEvent]) extends MetadataServiceResponse final case class LogsFailure(id: WorkflowId, reason: Throwable) extends MetadataServiceFailure - final case class CostResponse(id: WorkflowId, - status: WorkflowState, - costMetadata: Seq[MetadataEvent], - includeTaskBreakdown: Boolean, - includeSubworkflowBreakdown: Boolean - ) extends MetadataServiceResponse + final case class CostResponse(id: WorkflowId, status: WorkflowState, metadataResponse: MetadataLookupResponse) + extends MetadataServiceResponse final case class CostFailure(id: WorkflowId, reason: Throwable) extends MetadataServiceFailure final case class MetadataWriteSuccess(events: Iterable[MetadataEvent]) extends MetadataServiceResponse diff --git a/services/src/main/scala/cromwell/services/metadata/impl/MetadataDatabaseAccess.scala b/services/src/main/scala/cromwell/services/metadata/impl/MetadataDatabaseAccess.scala index 7c061637b24..e0b547370d3 100644 --- a/services/src/main/scala/cromwell/services/metadata/impl/MetadataDatabaseAccess.scala +++ b/services/src/main/scala/cromwell/services/metadata/impl/MetadataDatabaseAccess.scala @@ -269,15 +269,6 @@ trait MetadataDatabaseAccess { metadataToMetadataEvents(id) } - def queryCost(id: WorkflowId, timeout: Duration)(implicit - ec: ExecutionContext - ): Future[Seq[MetadataEvent]] = { - val keys = List("taskStartTime", "taskEndTime", "vmCostPerHour") - metadataDatabaseInterface - .queryMetadataEntryWithKeyConstraints(id.toString, keys, List.empty, CallOrWorkflowQuery, timeout) - .map(metadataToMetadataEvents(id)) - } - def refreshWorkflowMetadataSummaries(limit: Int)(implicit ec: ExecutionContext): Future[SummaryResult] = for { increasingProcessed <- metadataDatabaseInterface.summarizeIncreasing( diff --git a/services/src/main/scala/cromwell/services/metadata/impl/ReadDatabaseMetadataWorkerActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/ReadDatabaseMetadataWorkerActor.scala index 1392f7988b6..68bbc594356 100644 --- a/services/src/main/scala/cromwell/services/metadata/impl/ReadDatabaseMetadataWorkerActor.scala +++ b/services/src/main/scala/cromwell/services/metadata/impl/ReadDatabaseMetadataWorkerActor.scala @@ -2,11 +2,12 @@ package cromwell.services.metadata.impl import java.sql.SQLTimeoutException import akka.actor.{Actor, ActorLogging, ActorRef, PoisonPill, Props} +import cats.data.NonEmptyList import cromwell.core.Dispatcher.ServiceDispatcher import cromwell.core.{WorkflowId, WorkflowSubmitted} import cromwell.services.MetadataServicesStore import cromwell.services.metadata.MetadataService._ -import cromwell.services.metadata.{MetadataEvent, MetadataQuery, WorkflowQueryParameters} +import cromwell.services.metadata.{CallMetadataKeys, MetadataEvent, MetadataQuery, WorkflowQueryParameters} import scala.concurrent.Future import scala.concurrent.duration.Duration @@ -38,10 +39,8 @@ class ReadDatabaseMetadataWorkerActor(metadataReadTimeout: Duration, metadataRea case GetRootAndSubworkflowLabels(rootWorkflowId: WorkflowId) => evaluateRespondAndStop(sender(), queryRootAndSubworkflowLabelsAndRespond(rootWorkflowId)) case GetLogs(workflowId) => evaluateRespondAndStop(sender(), queryLogsAndRespond(workflowId)) - case GetCost(workflowId, includeTaskBreakdown, includeSubworkflowBreakdown) => - evaluateRespondAndStop(sender(), - queryCostAndRespond(workflowId, includeTaskBreakdown, includeSubworkflowBreakdown) - ) + case GetCost(workflowId) => + evaluateRespondAndStop(sender(), queryCostAndRespond(workflowId)) case QueryForWorkflowsMatchingParameters(parameters) => evaluateRespondAndStop(sender(), queryWorkflowsAndRespond(parameters)) case WorkflowOutputs(id) => evaluateRespondAndStop(sender(), queryWorkflowOutputsAndRespond(id)) @@ -160,21 +159,28 @@ class ReadDatabaseMetadataWorkerActor(metadataReadTimeout: Duration, metadataRea LogsFailure(id, t) } - private def queryCostAndRespond(id: WorkflowId, - includeTaskBreakdown: Boolean, - includeSubworkflowBreakdown: Boolean - ): Future[MetadataServiceResponse] = { + private def queryCostAndRespond(id: WorkflowId): Future[MetadataServiceResponse] = { + + val keys = NonEmptyList.of(CallMetadataKeys.VmStartTime, + CallMetadataKeys.VmEndTime, + CallMetadataKeys.VmCostPerHour, + CallMetadataKeys.SubWorkflowId + ) + val metadataQuery = MetadataQuery(id, None, None, Option(keys), None, expandSubWorkflows = true) + val results = for { status <- getWorkflowStatus(id) - costEvents <- queryCost(id, metadataReadTimeout) + costEvents <- queryMetadata(metadataQuery) } yield (status, costEvents) results.map { case (s, m) => (s, m) match { - case (Some(wfState), metadataEvents) => - CostResponse(id, wfState, metadataEvents, includeTaskBreakdown, includeSubworkflowBreakdown) - // TODO should this be a failure? - case (None, _) => CostFailure(id, new Exception("Couldn't find workflow status")) + case (Some(wfState), resp: MetadataLookupResponse) => + CostResponse(id, wfState, resp) + case (None, resp: MetadataLookupResponse) => + // See note in getStatus above - if we can't find status, it's Submitted + CostResponse(id, WorkflowSubmitted, resp) + case (_, errorMetadataResponse) => errorMetadataResponse } } recover { case t => CostFailure(id, t) diff --git a/services/src/main/scala/cromwell/services/metadata/impl/builder/MetadataBuilderActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/builder/MetadataBuilderActor.scala index 0a0b7cff6ea..c337676e5b7 100644 --- a/services/src/main/scala/cromwell/services/metadata/impl/builder/MetadataBuilderActor.scala +++ b/services/src/main/scala/cromwell/services/metadata/impl/builder/MetadataBuilderActor.scala @@ -2,9 +2,12 @@ package cromwell.services.metadata.impl.builder import java.time.OffsetDateTime import java.util.concurrent.atomic.AtomicLong - import akka.actor.{ActorRef, LoggingFSM, PoisonPill, Props} +import cats.data.Validated.{Invalid, Valid} +import cats.implicits.catsSyntaxValidatedId import common.collections.EnhancedCollections._ +import common.validation.ErrorOr +import common.validation.ErrorOr._ import cromwell.services.metadata.impl.builder.MetadataComponent._ import cromwell.core.ExecutionIndex.ExecutionIndex import cromwell.core._ @@ -16,6 +19,8 @@ import mouse.all._ import org.slf4j.LoggerFactory import spray.json._ +import java.time.temporal.ChronoUnit +import java.util.Currency import scala.language.postfixOps object MetadataBuilderActor { @@ -23,6 +28,7 @@ object MetadataBuilderActor { case object Idle extends MetadataBuilderActorState case object WaitingForMetadataService extends MetadataBuilderActorState case object WaitingForSubWorkflows extends MetadataBuilderActorState + case object WaitingForSubWorkflowCost extends MetadataBuilderActorState sealed trait MetadataBuilderActorData @@ -53,6 +59,8 @@ object MetadataBuilderActor { private val AttemptKey = "attempt" private val ShardKey = "shardIndex" + final private val DefaultCurrency = Currency.getInstance("USD") + /** * Metadata for a call attempt */ @@ -63,6 +71,23 @@ object MetadataBuilderActor { */ private case class MetadataForIndex(index: Int, metadata: List[JsObject]) + /** + * Extract the list of subworkflow ids from a list of metadata events + */ + private def extractSubworkflowIds(events: Seq[MetadataEvent]): Seq[String] = + events + .collect { + case MetadataEvent(key, value, _) if key.key.endsWith(CallMetadataKeys.SubWorkflowId) => value map { _.value } + } + .flatten + .distinct + + private def extractFromJsAs[A: Manifest](js: JsObject, fieldName: String): Option[A] = + js.fields.get(fieldName) match { + case Some(a: A) => Some(a) + case _ => None + } + private def eventsToAttemptMetadata( subWorkflowMetadata: Map[String, JsValue] )(attempt: Int, events: Seq[MetadataEvent]) = { @@ -275,28 +300,6 @@ object MetadataBuilderActor { workflowMetadataResponse(id, updatedEvents, includeCallsIfEmpty = false, Map.empty) } - def processCostResponse(id: WorkflowId, - status: WorkflowState, - eventsList: Seq[MetadataEvent], - includeTaskBreakdown: Boolean, - includeSubworkflowBreakdown: Boolean - ): JsObject = { - // !! add logic to compute real cost here !! - val taskMap = if (includeTaskBreakdown) Map("taskBreakdown" -> JsObject(Map("foo.bar" -> JsNumber(3.5)))) else Map() - val subworkflowMap = - if (includeSubworkflowBreakdown) Map("subworkflowBreakdown" -> JsObject(Map("foo.baz" -> JsNumber(3.5)))) - else Map() - - JsObject( - Map( - WorkflowMetadataKeys.Id -> JsString(id.toString), - WorkflowMetadataKeys.Status -> JsString(status.toString), - "currency" -> JsString("USD"), - "cost" -> JsNumber(3.5) - ) ++ taskMap ++ subworkflowMap - ) - } - def workflowMetadataResponse(workflowId: WorkflowId, eventsList: Seq[MetadataEvent], includeCallsIfEmpty: Boolean, @@ -307,6 +310,49 @@ object MetadataBuilderActor { .parseWorkflowEvents(includeCallsIfEmpty, expandedValues)(eventsList) .fields + ("id" -> JsString(workflowId.toString)) ) + + /* + * Attempt to the cost of a single call attempt from its metadata json, assuming the correct metadata + * exists to allow that. We depend on vmStartTime and vmCostPerHour being present. We also use vmEndTime, + * or fall back to the current time if it is absent. + * + * If the metadata needed to compute cost is missing, return 0. If the VM cost or any of the dates + * can't be parsed, return an error. We will also return an error if we find a negative value for + * vmCostPerHour - this indicates an error when generating the cost. + */ + def computeCost(callName: String, jsVal: JsValue): ErrorOr[BigDecimal] = + jsVal match { + case jsCall: JsObject => + extractFromJsAs[JsString](jsCall, CallMetadataKeys.VmStartTime) map { startTimeVal => + val startTimeErrorOr = ErrorOr(OffsetDateTime.parse(startTimeVal.value)) + + val endTimeErrorOr = + extractFromJsAs[JsString](jsCall, CallMetadataKeys.VmEndTime) + .map(v => ErrorOr(OffsetDateTime.parse(v.value))) + .getOrElse(OffsetDateTime.now().validNel) + + val rawCostPerHour = extractFromJsAs[JsNumber](jsCall, CallMetadataKeys.VmCostPerHour).map(_.value) + val costPerHourErrorOr = rawCostPerHour match { + case Some(c: BigDecimal) if c >= 0 => c.validNel + // A costPerHour < 0 indicates an error + case Some(_: BigDecimal) => + val index = + extractFromJsAs[JsNumber](jsCall, "shardIndex").map(_.value.toString).getOrElse("-1") + val attempt = + extractFromJsAs[JsNumber](jsCall, "attempt").map(_.value.toString).getOrElse("1") + s"Couldn't find valid vmCostPerHour for ${List(callName, index, attempt).mkString(".")}".invalidNel + case None => BigDecimal(0).validNel + } + + for { + start <- startTimeErrorOr + end <- endTimeErrorOr + costPerHour <- costPerHourErrorOr + vmRuntimeInMillis = start.until(end, ChronoUnit.MILLIS).toDouble + } yield (vmRuntimeInMillis / (1000 * 60 * 60)) * costPerHour + } getOrElse (BigDecimal(0).validNel) + case _ => BigDecimal(0).validNel + } } class MetadataBuilderActor(readMetadataWorkerMaker: () => Props, @@ -347,9 +393,8 @@ class MetadataBuilderActor(readMetadataWorkerMaker: () => Props, workflowMetadataResponse(w, l, includeCallsIfEmpty = false, Map.empty) ) allDone() - case Event(CostResponse(w, s, m, t, b), HasWorkData(target, originalRequest)) => - target ! SuccessfulMetadataJsonResponse(originalRequest, processCostResponse(w, s, m, t, b)) - allDone() + case Event(CostResponse(w, s, m), HasWorkData(target, originalRequest)) => + processCostResponse(w, s, m, target, originalRequest) case Event(MetadataLookupResponse(query, metadata), HasWorkData(target, originalRequest)) => processMetadataResponse(query, metadata, target, originalRequest) case Event(FetchFailedJobsMetadataLookupResponse(metadata), HasWorkData(target, originalRequest)) => @@ -386,6 +431,14 @@ class MetadataBuilderActor(readMetadataWorkerMaker: () => Props, allDone() } + when(WaitingForSubWorkflowCost) { + case Event(mbr: MetadataJsonResponse, data: HasReceivedEventsData) => + processSubWorkflowCost(mbr, data) + case Event(failure: MetadataServiceFailure, data: HasReceivedEventsData) => + data.target ! FailedMetadataJsonResponse(data.originalRequest, failure.reason) + allDone() + } + whenUnhandled { case Event(message, IdleData) => log.error(s"Received unexpected message $message in state $stateName with $IdleData") @@ -429,6 +482,37 @@ class MetadataBuilderActor(readMetadataWorkerMaker: () => Props, failAndDie(new Exception(message), data.target, data.originalRequest) } + def processSubWorkflowCost(metadataResponse: MetadataJsonResponse, data: HasReceivedEventsData) = + metadataResponse match { + case SuccessfulMetadataJsonResponse(GetCost(workflowId), js) => + val subId: WorkflowId = workflowId + val newData = data.withSubWorkflow(subId.toString, js) + + if (newData.isComplete) { + buildCostAndStop( + data.originalQuery.workflowId, + extractFromJsAs[JsString](js, "status").map(_.value).getOrElse(""), // should never be empty + data.originalEvents, + newData.subWorkflowsMetadata, + data.target, + data.originalRequest + ) + } else { + stay() using newData + } + case FailedMetadataJsonResponse(originalRequest, e) => + failAndDie(new RuntimeException(s"Failed to retrieve cost for a sub workflow ($originalRequest)", e), + data.target, + data.originalRequest + ) + + case other => + val message = + s"Programmer Error: MetadataBuilderActor expected subworkflow metadata response type but got ${other.getClass.getSimpleName}" + log.error(message) + failAndDie(new Exception(message), data.target, data.originalRequest) + } + def failAndDie(reason: Throwable, target: ActorRef, originalRequest: BuildMetadataJsonAction) = { target ! FailedMetadataJsonResponse(originalRequest, reason) context stop self @@ -454,12 +538,7 @@ class MetadataBuilderActor(readMetadataWorkerMaker: () => Props, ) = if (query.expandSubWorkflows) { // Scan events for sub workflow ids - val subWorkflowIds = eventsList - .collect { - case MetadataEvent(key, value, _) if key.key.endsWith(CallMetadataKeys.SubWorkflowId) => value map { _.value } - } - .flatten - .distinct + val subWorkflowIds = extractSubworkflowIds(eventsList) // If none is found just proceed to build metadata if (subWorkflowIds.isEmpty) buildAndStop(query, eventsList, Map.empty, target, originalRequest) @@ -488,6 +567,94 @@ class MetadataBuilderActor(readMetadataWorkerMaker: () => Props, buildAndStop(query, eventsList, Map.empty, target, originalRequest) } + def processCostResponse(id: WorkflowId, + status: WorkflowState, + metadataResponse: MetadataLookupResponse, + target: ActorRef, + originalRequest: BuildMetadataJsonAction + ): State = { + + // Always expand subworkflows for cost + val subWorkflowIds = extractSubworkflowIds(metadataResponse.eventList) + + if (subWorkflowIds.isEmpty) + // If no subworkflows found, just build cost data + buildCostAndStop(id, status.toString, metadataResponse.eventList, Map.empty, target, originalRequest) + else { + // Otherwise spin up a metadata builder actor for each sub workflow + subWorkflowIds foreach { subId => + val subMetadataBuilder = context.actorOf(MetadataBuilderActor.props(readMetadataWorkerMaker, + metadataReadRowNumberSafetyThreshold, + isForSubworkflows = true + ), + uniqueActorName(subId) + ) + subMetadataBuilder ! GetCost(WorkflowId.fromString(subId)) + } + goto(WaitingForSubWorkflowCost) using HasReceivedEventsData(target, + originalRequest, + metadataResponse.query, + metadataResponse.eventList, + Map.empty, + subWorkflowIds.size + ) + } + } + + def buildCostAndStop(id: WorkflowId, + status: String, + eventsList: Seq[MetadataEvent], + expandedValues: Map[String, JsValue], + target: ActorRef, + originalRequest: BuildMetadataJsonAction + ): State = { + + val metadataEvents = MetadataBuilderActor.parse(groupEvents(eventsList), expandedValues) + + // Walk the structured metadata to attempt to compute cost for each call/shard/attempt in this workflow + val callCostsWithErrors: List[ErrorOr[BigDecimal]] = for { + wfObj <- extractFromJsAs[JsObject](metadataEvents, id.toString).toList + callsObj <- extractFromJsAs[JsObject](wfObj, "calls").toList + singleCallName <- callsObj.fields.keys.toList + singleCallAttemptList <- extractFromJsAs[JsArray](callsObj, singleCallName).toList + singleCallAttempt <- singleCallAttemptList.elements.toList + singleCallAttemptCost = computeCost(singleCallName, singleCallAttempt) + } yield singleCallAttemptCost + + val callCost: BigDecimal = callCostsWithErrors.collect { case (Valid(b)) => b }.sum + val costErrors: Vector[JsString] = + callCostsWithErrors + .collect { case (Invalid(e)) => e.toList.mkString(", ") } + .map(JsString(_)) + .toVector + + // Get the cost and error collection for any subworkflows that are part of this workflow + val subworkflowCost = expandedValues.values.map { + case o: JsObject => + extractFromJsAs[JsNumber](o, "cost").map(_.value).getOrElse(BigDecimal(0)) + case _ => BigDecimal(0) + }.sum + + val subworkflowErrors = expandedValues.values.flatMap { + case o: JsObject => + extractFromJsAs[JsArray](o, "errors").getOrElse(JsArray.empty).elements + case _ => JsArray.empty.elements + } + + val resp = JsObject( + Map( + WorkflowMetadataKeys.Id -> JsString(id.toString), + WorkflowMetadataKeys.Status -> JsString(status), + "currency" -> JsString(DefaultCurrency.getCurrencyCode), + "cost" -> JsNumber(callCost + subworkflowCost), + "errors" -> JsArray(costErrors ++ subworkflowErrors) + ) + ) + + target ! SuccessfulMetadataJsonResponse(originalRequest, resp) + allDone() + } + def processFailedJobsMetadataResponse(eventsList: Seq[MetadataEvent], target: ActorRef, originalRequest: BuildMetadataJsonAction