From fbb297b90911033a2f1c5c46bd5450aab503e443 Mon Sep 17 00:00:00 2001 From: Joanne Wang Date: Tue, 2 Jul 2024 12:04:33 -0700 Subject: [PATCH] [Backport 2.x] Add support for remote monitors (#694) * changes to add support for remote monitors in alerting (#661) Signed-off-by: Subhobrata Dey * changes to add support for remote monitors in alerting (#662) * changes to add support for remote monitors in alerting Signed-off-by: Subhobrata Dey * add tests for moved classes Signed-off-by: Subhobrata Dey --------- Signed-off-by: Subhobrata Dey * changes to support generic inputs and triggers in remote monitors (#664) Signed-off-by: Subhobrata Dey * add remote doc level monitor input (#665) Signed-off-by: Subhobrata Dey * fix serde of RemoteDocLevelMonitorInput (#666) Signed-off-by: Subhobrata Dey * fix serde for monitor (#692) Signed-off-by: Subhobrata Dey --------- Signed-off-by: Subhobrata Dey Co-authored-by: Subhobrata Dey --- .../action/DocLevelMonitorFanOutAction.kt | 15 + .../action/DocLevelMonitorFanOutRequest.kt | 101 +++++++ .../action/DocLevelMonitorFanOutResponse.kt | 92 ++++++ .../model/BucketLevelTriggerRunResult.kt | 57 ++++ .../model/ChainedAlertTriggerRunResult.kt | 69 +++++ .../model/ClusterMetricsTriggerRunResult.kt | 110 +++++++ .../model/DocumentLevelTriggerRunResult.kt | 82 ++++++ .../alerting/model/IndexExecutionContext.kt | 66 +++++ .../commons/alerting/model/Input.kt | 16 +- .../commons/alerting/model/Monitor.kt | 32 ++- .../commons/alerting/model/MonitorMetadata.kt | 197 +++++++++++++ .../alerting/model/MonitorRunResult.kt | 215 ++++++++++++++ .../model/QueryLevelTriggerRunResult.kt | 66 +++++ .../commons/alerting/model/Trigger.kt | 5 +- .../alerting/model/TriggerRunResult.kt | 55 ++++ .../alerting/model/WorkflowMetadata.kt | 106 +++++++ .../alerting/model/WorkflowRunContext.kt | 55 ++++ .../alerting/model/WorkflowRunResult.kt | 82 ++++++ .../monitors/RemoteDocLevelMonitorInput.kt | 81 ++++++ .../remote/monitors/RemoteMonitorInput.kt | 70 +++++ .../remote/monitors/RemoteMonitorTrigger.kt | 126 ++++++++ .../alerting/util/AlertingException.kt | 89 ++++++ .../commons/alerting/util/IndexUtils.kt | 19 +- .../commons/alerting/TestHelpers.kt | 139 ++++++++- .../DocLevelMonitorFanOutRequestTests.kt | 92 ++++++ .../DocLevelMonitorFanOutResponseTests.kt | 60 ++++ .../action/GetMonitorResponseTests.kt | 2 +- .../action/IndexMonitorResponseTests.kt | 2 +- .../commons/alerting/model/WriteableTests.kt | 268 ++++++++++++++++++ .../commons/alerting/model/XContentTests.kt | 70 ++++- 30 files changed, 2413 insertions(+), 26 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutAction.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutResponse.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTriggerRunResult.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTriggerRunResult.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsTriggerRunResult.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTriggerRunResult.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/MonitorMetadata.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/TriggerRunResult.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowMetadata.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunResult.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteDocLevelMonitorInput.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorInput.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorTrigger.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/util/AlertingException.kt create mode 100644 src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt create mode 100644 src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutResponseTests.kt diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutAction.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutAction.kt new file mode 100644 index 00000000..801edc47 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutAction.kt @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionType + +class DocLevelMonitorFanOutAction private constructor() : ActionType(NAME, ::DocLevelMonitorFanOutResponse) { + companion object { + val INSTANCE = DocLevelMonitorFanOutAction() + const val NAME = "cluster:admin/opensearch/alerting/monitor/doclevel/fanout" + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt new file mode 100644 index 00000000..fe5cfe29 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequest.kt @@ -0,0 +1,101 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.commons.alerting.model.IndexExecutionContext +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.MonitorMetadata +import org.opensearch.commons.alerting.model.WorkflowRunContext +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.index.shard.ShardId +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +class DocLevelMonitorFanOutRequest : ActionRequest, ToXContentObject { + val monitor: Monitor + val dryRun: Boolean + val monitorMetadata: MonitorMetadata + val executionId: String + val indexExecutionContext: IndexExecutionContext? + val shardIds: List + val concreteIndicesSeenSoFar: List + val workflowRunContext: WorkflowRunContext? + + constructor( + monitor: Monitor, + dryRun: Boolean, + monitorMetadata: MonitorMetadata, + executionId: String, + indexExecutionContext: IndexExecutionContext?, + shardIds: List, + concreteIndicesSeenSoFar: List, + workflowRunContext: WorkflowRunContext? + ) : super() { + this.monitor = monitor + this.dryRun = dryRun + this.monitorMetadata = monitorMetadata + this.executionId = executionId + this.indexExecutionContext = indexExecutionContext + this.shardIds = shardIds + this.concreteIndicesSeenSoFar = concreteIndicesSeenSoFar + this.workflowRunContext = workflowRunContext + require(false == shardIds.isEmpty()) { } + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + monitor = Monitor.readFrom(sin)!!, + dryRun = sin.readBoolean(), + monitorMetadata = MonitorMetadata.readFrom(sin), + executionId = sin.readString(), + shardIds = sin.readList(::ShardId), + concreteIndicesSeenSoFar = sin.readStringList(), + workflowRunContext = if (sin.readBoolean()) { + WorkflowRunContext(sin) + } else { null }, + indexExecutionContext = IndexExecutionContext(sin) + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + monitor.writeTo(out) + out.writeBoolean(dryRun) + monitorMetadata.writeTo(out) + out.writeString(executionId) + out.writeCollection(shardIds) + out.writeStringCollection(concreteIndicesSeenSoFar) + out.writeBoolean(workflowRunContext != null) + workflowRunContext?.writeTo(out) + indexExecutionContext?.writeTo(out) + } + + override fun validate(): ActionRequestValidationException? { + var actionValidationException: ActionRequestValidationException? = null + if (shardIds.isEmpty()) { + actionValidationException = ActionRequestValidationException() + actionValidationException.addValidationError("shard_ids is null or empty") + } + return actionValidationException + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field("monitor", monitor) + .field("dry_run", dryRun) + .field("execution_id", executionId) + .field("index_execution_context", indexExecutionContext) + .field("shard_ids", shardIds) + .field("concrete_indices", concreteIndicesSeenSoFar) + .field("workflow_run_context", workflowRunContext) + return builder.endObject() + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutResponse.kt new file mode 100644 index 00000000..6e5cde55 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutResponse.kt @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.commons.alerting.model.DocumentLevelTriggerRunResult +import org.opensearch.commons.alerting.model.InputRunResults +import org.opensearch.commons.alerting.util.AlertingException +import org.opensearch.core.action.ActionResponse +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject { + val nodeId: String + val executionId: String + val monitorId: String + val lastRunContexts: MutableMap + val inputResults: InputRunResults + val triggerResults: Map + val exception: AlertingException? + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + nodeId = sin.readString(), + executionId = sin.readString(), + monitorId = sin.readString(), + lastRunContexts = sin.readMap()!! as MutableMap, + inputResults = InputRunResults.readFrom(sin), + triggerResults = suppressWarning(sin.readMap(StreamInput::readString, DocumentLevelTriggerRunResult::readFrom)), + exception = sin.readException() + ) + + constructor( + nodeId: String, + executionId: String, + monitorId: String, + lastRunContexts: MutableMap, + inputResults: InputRunResults = InputRunResults(), // partial, + triggerResults: Map = mapOf(), + exception: AlertingException? = null + ) : super() { + this.nodeId = nodeId + this.executionId = executionId + this.monitorId = monitorId + this.lastRunContexts = lastRunContexts + this.inputResults = inputResults + this.triggerResults = triggerResults + this.exception = exception + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(nodeId) + out.writeString(executionId) + out.writeString(monitorId) + out.writeMap(lastRunContexts) + inputResults.writeTo(out) + out.writeMap( + triggerResults, + StreamOutput::writeString, + { stream, stats -> stats.writeTo(stream) } + ) + out.writeException(exception) + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field("node_id", nodeId) + .field("execution_id", executionId) + .field("monitor_id", monitorId) + .field("last_run_contexts", lastRunContexts) + .field("input_results", inputResults) + .field("trigger_results", triggerResults) + .field("exception", exception) + .endObject() + return builder + } + + companion object { + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): Map { + return map as Map + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTriggerRunResult.kt new file mode 100644 index 00000000..34328ca2 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/BucketLevelTriggerRunResult.kt @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +data class BucketLevelTriggerRunResult( + override var triggerName: String, + override var error: Exception? = null, + var aggregationResultBuckets: Map, + var actionResultsMap: MutableMap> = mutableMapOf() +) : TriggerRunResult(triggerName, error) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + sin.readString(), + sin.readException() as Exception?, // error + sin.readMap(StreamInput::readString, ::AggregationResultBucket), + sin.readMap() as MutableMap> + ) + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder + .field(AGG_RESULT_BUCKETS, aggregationResultBuckets) + .field(ACTIONS_RESULTS, actionResultsMap as Map) + } + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeMap(aggregationResultBuckets, StreamOutput::writeString) { + valueOut: StreamOutput, aggResultBucket: AggregationResultBucket -> + aggResultBucket.writeTo(valueOut) + } + out.writeMap(actionResultsMap as Map) + } + + companion object { + const val AGG_RESULT_BUCKETS = "agg_result_buckets" + const val ACTIONS_RESULTS = "action_results" + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): TriggerRunResult { + return BucketLevelTriggerRunResult(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTriggerRunResult.kt new file mode 100644 index 00000000..015762cf --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTriggerRunResult.kt @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException +import java.time.Instant + +data class ChainedAlertTriggerRunResult( + override var triggerName: String, + var triggered: Boolean, + override var error: Exception?, + var actionResults: MutableMap = mutableMapOf(), + val associatedAlertIds: Set +) : TriggerRunResult(triggerName, error) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + triggered = sin.readBoolean(), + actionResults = sin.readMap() as MutableMap, + associatedAlertIds = sin.readStringList().toSet() + ) + + override fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}") + } + for (actionResult in actionResults.values) { + if (actionResult.error != null) { + return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}") + } + } + return null + } + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) + return builder + .field("triggered", triggered) + .field("action_results", actionResults as Map) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeBoolean(triggered) + out.writeMap(actionResults as Map) + out.writeStringCollection(associatedAlertIds) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): TriggerRunResult { + return ChainedAlertTriggerRunResult(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsTriggerRunResult.kt new file mode 100644 index 00000000..d3af9be3 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ClusterMetricsTriggerRunResult.kt @@ -0,0 +1,110 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException +import java.time.Instant + +data class ClusterMetricsTriggerRunResult( + override var triggerName: String, + override var triggered: Boolean, + override var error: Exception?, + override var actionResults: MutableMap = mutableMapOf(), + var clusterTriggerResults: List = listOf() +) : QueryLevelTriggerRunResult( + triggerName = triggerName, + error = error, + triggered = triggered, + actionResults = actionResults +) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + triggered = sin.readBoolean(), + actionResults = sin.readMap() as MutableMap, + clusterTriggerResults = sin.readList((ClusterTriggerResult)::readFrom) + ) + + override fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}") + } + for (actionResult in actionResults.values) { + if (actionResult.error != null) { + return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}") + } + } + return null + } + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) + builder + .field(TRIGGERED_FIELD, triggered) + .field(ACTION_RESULTS_FIELD, actionResults as Map) + .startArray(CLUSTER_RESULTS_FIELD) + clusterTriggerResults.forEach { it.toXContent(builder, params) } + return builder.endArray() + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeBoolean(triggered) + out.writeMap(actionResults as Map) + clusterTriggerResults.forEach { it.writeTo(out) } + } + + companion object { + const val TRIGGERED_FIELD = "triggered" + const val ACTION_RESULTS_FIELD = "action_results" + const val CLUSTER_RESULTS_FIELD = "cluster_results" + } + + data class ClusterTriggerResult( + val cluster: String, + val triggered: Boolean + ) : ToXContentObject, Writeable { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + cluster = sin.readString(), + triggered = sin.readBoolean() + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .startObject(cluster) + .field(TRIGGERED_FIELD, triggered) + .endObject() + .endObject() + } + + override fun writeTo(out: StreamOutput) { + out.writeString(cluster) + out.writeBoolean(triggered) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ClusterTriggerResult { + return ClusterTriggerResult(sin) + } + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTriggerRunResult.kt new file mode 100644 index 00000000..1acb354b --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/DocumentLevelTriggerRunResult.kt @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException + +data class DocumentLevelTriggerRunResult( + override var triggerName: String, + var triggeredDocs: List, + override var error: Exception?, + var actionResultsMap: MutableMap> = mutableMapOf() +) : TriggerRunResult(triggerName, error) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + triggeredDocs = sin.readStringList(), + actionResultsMap = readActionResults(sin) + ) + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) + return builder + .field("triggeredDocs", triggeredDocs as List) + .field("action_results", actionResultsMap as Map) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeStringCollection(triggeredDocs) + out.writeInt(actionResultsMap.size) + actionResultsMap.forEach { (alert, actionResults) -> + out.writeString(alert) + out.writeInt(actionResults.size) + actionResults.forEach { (id, result) -> + out.writeString(id) + result.writeTo(out) + } + } + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): TriggerRunResult { + return DocumentLevelTriggerRunResult(sin) + } + + @JvmStatic + fun readActionResults(sin: StreamInput): MutableMap> { + val actionResultsMapReconstruct: MutableMap> = mutableMapOf() + val size = sin.readInt() + var idx = 0 + while (idx < size) { + val alert = sin.readString() + val actionResultsSize = sin.readInt() + val actionRunResultElem = mutableMapOf() + var i = 0 + while (i < actionResultsSize) { + val actionId = sin.readString() + val actionResult = ActionRunResult.readFrom(sin) + actionRunResultElem[actionId] = actionResult + ++i + } + actionResultsMapReconstruct[alert] = actionRunResultElem + ++idx + } + return actionResultsMapReconstruct + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt new file mode 100644 index 00000000..8872b525 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/IndexExecutionContext.kt @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +data class IndexExecutionContext( + val queries: List, + val lastRunContext: MutableMap, // previous execution + val updatedLastRunContext: MutableMap, // without sequence numbers + val indexName: String, + val concreteIndexName: String, + val updatedIndexNames: List, + val concreteIndexNames: List, + val conflictingFields: List, + val docIds: List? = emptyList() +) : Writeable, ToXContent { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + queries = sin.readList { DocLevelQuery(sin) }, + lastRunContext = sin.readMap() as MutableMap, + updatedLastRunContext = sin.readMap() as MutableMap, + indexName = sin.readString(), + concreteIndexName = sin.readString(), + updatedIndexNames = sin.readStringList(), + concreteIndexNames = sin.readStringList(), + conflictingFields = sin.readStringList(), + docIds = sin.readOptionalStringList() + ) + + override fun writeTo(out: StreamOutput?) { + out!!.writeCollection(queries) + out.writeMap(lastRunContext) + out.writeMap(updatedLastRunContext) + out.writeString(indexName) + out.writeString(concreteIndexName) + out.writeStringCollection(updatedIndexNames) + out.writeStringCollection(concreteIndexNames) + out.writeStringCollection(conflictingFields) + out.writeOptionalStringCollection(docIds) + } + + override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { + builder!!.startObject() + .field("queries", queries) + .field("last_run_context", lastRunContext) + .field("updated_last_run_context", updatedLastRunContext) + .field("index_name", indexName) + .field("concrete_index_name", concreteIndexName) + .field("udpated_index_names", updatedIndexNames) + .field("concrete_index_names", concreteIndexNames) + .field("conflicting_fields", conflictingFields) + .field("doc_ids", docIds) + .endObject() + return builder + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt index b3472f8a..7c02420a 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt @@ -3,6 +3,10 @@ package org.opensearch.commons.alerting.model import org.opensearch.commons.alerting.model.ClusterMetricsInput.Companion.URI_FIELD import org.opensearch.commons.alerting.model.DocLevelMonitorInput.Companion.DOC_LEVEL_INPUT_FIELD import org.opensearch.commons.alerting.model.SearchInput.Companion.SEARCH_FIELD +import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput +import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput.Companion.REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput.Companion.REMOTE_MONITOR_INPUT_FIELD import org.opensearch.commons.notifications.model.BaseModel import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.xcontent.XContentParser @@ -14,7 +18,9 @@ interface Input : BaseModel { enum class Type(val value: String) { DOCUMENT_LEVEL_INPUT(DOC_LEVEL_INPUT_FIELD), CLUSTER_METRICS_INPUT(URI_FIELD), - SEARCH_INPUT(SEARCH_FIELD); + SEARCH_INPUT(SEARCH_FIELD), + REMOTE_MONITOR_INPUT(REMOTE_MONITOR_INPUT_FIELD), + REMOTE_DOC_LEVEL_MONITOR_INPUT(REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD); override fun toString(): String { return value @@ -32,8 +38,12 @@ interface Input : BaseModel { SearchInput.parseInner(xcp) } else if (xcp.currentName() == Type.CLUSTER_METRICS_INPUT.value) { ClusterMetricsInput.parseInner(xcp) - } else { + } else if (xcp.currentName() == Type.DOCUMENT_LEVEL_INPUT.value) { DocLevelMonitorInput.parse(xcp) + } else if (xcp.currentName() == Type.REMOTE_MONITOR_INPUT.value) { + RemoteMonitorInput.parse(xcp) + } else { + RemoteDocLevelMonitorInput.parse(xcp) } XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp) return input @@ -46,6 +56,8 @@ interface Input : BaseModel { Type.DOCUMENT_LEVEL_INPUT -> DocLevelMonitorInput(sin) Type.CLUSTER_METRICS_INPUT -> ClusterMetricsInput(sin) Type.SEARCH_INPUT -> SearchInput(sin) + Type.REMOTE_MONITOR_INPUT -> RemoteMonitorInput(sin) + Type.REMOTE_DOC_LEVEL_MONITOR_INPUT -> RemoteDocLevelMonitorInput(sin) // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns // enum can be null in Java else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt index b2099d93..322e4b30 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Monitor.kt @@ -1,6 +1,7 @@ package org.opensearch.commons.alerting.model import org.opensearch.common.CheckedFunction +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger import org.opensearch.commons.alerting.util.IndexUtils.Companion.MONITOR_MAX_INPUTS import org.opensearch.commons.alerting.util.IndexUtils.Companion.MONITOR_MAX_TRIGGERS import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION @@ -22,7 +23,7 @@ import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParserUtils import java.io.IOException import java.time.Instant -import java.util.Locale +import java.util.regex.Pattern data class Monitor( override val id: String = NO_ID, @@ -34,7 +35,7 @@ data class Monitor( override val enabledTime: Instant?, // TODO: Check how this behaves during rolling upgrade/multi-version cluster // Can read/write and parsing break if it's done from an old -> new version of the plugin? - val monitorType: MonitorType, + val monitorType: String, val user: User?, val schemaVersion: Int = NO_SCHEMA_VERSION, val inputs: List, @@ -56,13 +57,13 @@ data class Monitor( require(triggerIds.add(trigger.id)) { "Duplicate trigger id: ${trigger.id}. Trigger ids must be unique." } // Verify Trigger type based on Monitor type when (monitorType) { - MonitorType.QUERY_LEVEL_MONITOR -> + MonitorType.QUERY_LEVEL_MONITOR.value -> require(trigger is QueryLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } - MonitorType.BUCKET_LEVEL_MONITOR -> + MonitorType.BUCKET_LEVEL_MONITOR.value -> require(trigger is BucketLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } - MonitorType.CLUSTER_METRICS_MONITOR -> + MonitorType.CLUSTER_METRICS_MONITOR.value -> require(trigger is QueryLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } - MonitorType.DOC_LEVEL_MONITOR -> + MonitorType.DOC_LEVEL_MONITOR.value -> require(trigger is DocumentLevelTrigger) { "Incompatible trigger [${trigger.id}] for monitor type [$monitorType]" } } } @@ -94,7 +95,7 @@ data class Monitor( schedule = Schedule.readFrom(sin), lastUpdateTime = sin.readInstant(), enabledTime = sin.readOptionalInstant(), - monitorType = sin.readEnum(MonitorType::class.java), + monitorType = sin.readString(), user = if (sin.readBoolean()) { User(sin) } else { @@ -179,7 +180,7 @@ data class Monitor( schedule.writeTo(out) out.writeInstant(lastUpdateTime) out.writeOptionalInstant(enabledTime) - out.writeEnum(monitorType) + out.writeString(monitorType) out.writeBoolean(user != null) user?.writeTo(out) out.writeInt(schemaVersion) @@ -188,8 +189,10 @@ data class Monitor( inputs.forEach { if (it is SearchInput) { out.writeEnum(Input.Type.SEARCH_INPUT) - } else { + } else if (it is DocLevelMonitorInput) { out.writeEnum(Input.Type.DOCUMENT_LEVEL_INPUT) + } else { + out.writeEnum(Input.Type.REMOTE_DOC_LEVEL_MONITOR_INPUT) } it.writeTo(out) } @@ -199,6 +202,7 @@ data class Monitor( when (it) { is BucketLevelTrigger -> out.writeEnum(Trigger.Type.BUCKET_LEVEL_TRIGGER) is DocumentLevelTrigger -> out.writeEnum(Trigger.Type.DOCUMENT_LEVEL_TRIGGER) + is RemoteMonitorTrigger -> out.writeEnum(Trigger.Type.REMOTE_MONITOR_TRIGGER) else -> out.writeEnum(Trigger.Type.QUERY_LEVEL_TRIGGER) } it.writeTo(out) @@ -227,6 +231,7 @@ data class Monitor( const val DATA_SOURCES_FIELD = "data_sources" const val ENABLED_TIME_FIELD = "enabled_time" const val OWNER_FIELD = "owner" + val MONITOR_TYPE_PATTERN = Pattern.compile("[a-zA-Z0-9_]{5,25}") // This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all // the different subclasses and creating circular dependencies @@ -265,9 +270,10 @@ data class Monitor( NAME_FIELD -> name = xcp.text() MONITOR_TYPE_FIELD -> { monitorType = xcp.text() - val allowedTypes = MonitorType.values().map { it.value } - if (!allowedTypes.contains(monitorType)) { - throw IllegalStateException("Monitor type should be one of $allowedTypes") + val matcher = MONITOR_TYPE_PATTERN.matcher(monitorType) + val find = matcher.matches() + if (!find) { + throw IllegalStateException("Monitor type should follow pattern ${MONITOR_TYPE_PATTERN.pattern()}") } } USER_FIELD -> user = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else User.parse(xcp) @@ -325,7 +331,7 @@ data class Monitor( requireNotNull(schedule) { "Monitor schedule is null" }, lastUpdateTime ?: Instant.now(), enabledTime, - MonitorType.valueOf(monitorType.uppercase(Locale.ROOT)), + monitorType, user, schemaVersion, inputs.toList(), diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorMetadata.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorMetadata.kt new file mode 100644 index 00000000..a90f3cc3 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorMetadata.kt @@ -0,0 +1,197 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.commons.alerting.model.Monitor.Companion.NO_ID +import org.opensearch.commons.alerting.util.instant +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.index.seqno.SequenceNumbers +import java.io.IOException +import java.time.Instant + +data class MonitorMetadata( + val id: String, + val seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + val primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + val monitorId: String, + val lastActionExecutionTimes: List, + val lastRunContext: Map, + // Maps (sourceIndex + monitorId) --> concreteQueryIndex + val sourceToQueryIndexMapping: MutableMap = mutableMapOf() +) : Writeable, ToXContent { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + id = sin.readString(), + seqNo = sin.readLong(), + primaryTerm = sin.readLong(), + monitorId = sin.readString(), + lastActionExecutionTimes = sin.readList(ActionExecutionTime.Companion::readFrom), + lastRunContext = Monitor.suppressWarning(sin.readMap()), + sourceToQueryIndexMapping = sin.readMap() as MutableMap + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + out.writeString(monitorId) + out.writeCollection(lastActionExecutionTimes) + out.writeMap(lastRunContext) + out.writeMap(sourceToQueryIndexMapping as MutableMap) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + if (params.paramAsBoolean("with_type", false)) builder.startObject(METADATA) + builder.field(MONITOR_ID_FIELD, monitorId) + .field(LAST_ACTION_EXECUTION_FIELD, lastActionExecutionTimes.toTypedArray()) + if (lastRunContext.isNotEmpty()) builder.field(LAST_RUN_CONTEXT_FIELD, lastRunContext) + if (sourceToQueryIndexMapping.isNotEmpty()) { + builder.field(SOURCE_TO_QUERY_INDEX_MAP_FIELD, sourceToQueryIndexMapping as MutableMap) + } + if (params.paramAsBoolean("with_type", false)) builder.endObject() + return builder.endObject() + } + + companion object { + const val METADATA = "metadata" + const val MONITOR_ID_FIELD = "monitor_id" + const val LAST_ACTION_EXECUTION_FIELD = "last_action_execution_times" + const val LAST_RUN_CONTEXT_FIELD = "last_run_context" + const val SOURCE_TO_QUERY_INDEX_MAP_FIELD = "source_to_query_index_mapping" + + @JvmStatic + @JvmOverloads + @Throws(IOException::class) + fun parse( + xcp: XContentParser, + id: String = NO_ID, + seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ): MonitorMetadata { + lateinit var monitorId: String + val lastActionExecutionTimes = mutableListOf() + var lastRunContext: Map = mapOf() + var sourceToQueryIndexMapping: MutableMap = mutableMapOf() + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + MONITOR_ID_FIELD -> monitorId = xcp.text() + LAST_ACTION_EXECUTION_FIELD -> { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + lastActionExecutionTimes.add(ActionExecutionTime.parse(xcp)) + } + } + LAST_RUN_CONTEXT_FIELD -> lastRunContext = xcp.map() + SOURCE_TO_QUERY_INDEX_MAP_FIELD -> sourceToQueryIndexMapping = xcp.map() as MutableMap + } + } + + return MonitorMetadata( + if (id != NO_ID) id else "$monitorId-metadata", + seqNo = seqNo, + primaryTerm = primaryTerm, + monitorId = monitorId, + lastActionExecutionTimes = lastActionExecutionTimes, + lastRunContext = lastRunContext, + sourceToQueryIndexMapping = sourceToQueryIndexMapping + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): MonitorMetadata { + return MonitorMetadata(sin) + } + + /** workflowMetadataId is used as key for monitor metadata in the case when the workflow execution happens + so the monitor lastRunContext (in the case of doc level monitor) is not interfering with the monitor execution + WorkflowMetadataId will be either workflowId-metadata (when executing the workflow as it is scheduled) + or timestampWithUUID-metadata (when a workflow is executed in a dry-run mode) + In the case of temp workflow, doc level monitors must have lastRunContext created from scratch + That's why we are using workflowMetadataId - in order to ensure that the doc level monitor metadata is created from scratch + **/ + fun getId(monitor: Monitor, workflowMetadataId: String? = null): String { + return if (workflowMetadataId.isNullOrEmpty()) { "${monitor.id}-metadata" } + // WorkflowMetadataId already contains -metadata suffix + else { "$workflowMetadataId-${monitor.id}-metadata" } + } + } +} + +/** + * A value object containing action execution time. + */ +data class ActionExecutionTime( + val actionId: String, + val executionTime: Instant +) : Writeable, ToXContent { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // actionId + sin.readInstant() // executionTime + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(ACTION_ID_FIELD, actionId) + .field(EXECUTION_TIME_FIELD, executionTime) + .endObject() + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(actionId) + out.writeInstant(executionTime) + } + + companion object { + const val ACTION_ID_FIELD = "action_id" + const val EXECUTION_TIME_FIELD = "execution_time" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): ActionExecutionTime { + lateinit var actionId: String + lateinit var executionTime: Instant + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + ACTION_ID_FIELD -> actionId = xcp.text() + EXECUTION_TIME_FIELD -> executionTime = xcp.instant()!! + } + } + + return ActionExecutionTime( + actionId, + executionTime + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ActionExecutionTime { + return ActionExecutionTime(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt new file mode 100644 index 00000000..d403313b --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/MonitorRunResult.kt @@ -0,0 +1,215 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchException +import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.commons.alerting.util.optionalTimeField +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException +import java.time.Instant + +data class MonitorRunResult( + val monitorName: String, + val periodStart: Instant, + val periodEnd: Instant, + val error: Exception? = null, + val inputResults: InputRunResults = InputRunResults(), + val triggerResults: Map = mapOf() +) : Writeable, ToXContent { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + sin.readString(), // monitorName + sin.readInstant(), // periodStart + sin.readInstant(), // periodEnd + sin.readException(), // error + InputRunResults.readFrom(sin), // inputResults + suppressWarning(sin.readMap()) as Map // triggerResults + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field("monitor_name", monitorName) + .optionalTimeField("period_start", periodStart) + .optionalTimeField("period_end", periodEnd) + .field("error", error?.message) + .field("input_results", inputResults) + .field("trigger_results", triggerResults) + .endObject() + } + + /** Returns error information to store in the Alert. Currently it's just the stack trace but it can be more */ + fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed running monitor:\n${error.userErrorMessage()}") + } + + if (inputResults.error != null) { + return AlertError(Instant.now(), "Failed fetching inputs:\n${inputResults.error.userErrorMessage()}") + } + return null + } + + fun scriptContextError(trigger: Trigger): Exception? { + return error ?: inputResults.error ?: triggerResults[trigger.id]?.error + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): MonitorRunResult { + return MonitorRunResult(sin) + } + + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): Map { + return map as Map + } + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(monitorName) + out.writeInstant(periodStart) + out.writeInstant(periodEnd) + out.writeException(error) + inputResults.writeTo(out) + out.writeMap(triggerResults) + } +} + +data class InputRunResults( + val results: List> = listOf(), + val error: Exception? = null, + val aggTriggersAfterKey: MutableMap? = null +) : Writeable, ToXContent { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field("results", results) + .field("error", error?.message) + .endObject() + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeVInt(results.size) + for (map in results) { + out.writeMap(map) + } + out.writeException(error) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): InputRunResults { + val count = sin.readVInt() // count + val list = mutableListOf>() + for (i in 0 until count) { + list.add(suppressWarning(sin.readMap())) // result(map) + } + val error = sin.readException() // error + return InputRunResults(list, error) + } + + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): Map { + return map as Map + } + } + + fun afterKeysPresent(): Boolean { + aggTriggersAfterKey?.forEach { + if (it.value.afterKey != null && !it.value.lastPage) { + return true + } + } + return false + } +} + +data class TriggerAfterKey(val afterKey: Map?, val lastPage: Boolean) + +data class ActionRunResult( + val actionId: String, + val actionName: String, + val output: Map, + val throttled: Boolean = false, + val executionTime: Instant? = null, + val error: Exception? = null +) : Writeable, ToXContent { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // actionId + sin.readString(), // actionName + suppressWarning(sin.readMap()), // output + sin.readBoolean(), // throttled + sin.readOptionalInstant(), // executionTime + sin.readException() // error + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field("id", actionId) + .field("name", actionName) + .field("output", output) + .field("throttled", throttled) + .optionalTimeField("executionTime", executionTime) + .field("error", error?.message) + .endObject() + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(actionId) + out.writeString(actionName) + out.writeMap(output) + out.writeBoolean(throttled) + out.writeOptionalInstant(executionTime) + out.writeException(error) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ActionRunResult { + return ActionRunResult(sin) + } + + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): MutableMap { + return map as MutableMap + } + } +} + +private val logger = LogManager.getLogger(MonitorRunResult::class.java) + +/** Constructs an error message from an exception suitable for human consumption. */ +fun Throwable.userErrorMessage(): String { + return when { + this is ScriptException -> this.scriptStack.joinToString(separator = "\n", limit = 100) + this is OpenSearchException -> this.detailedMessage + this.message != null -> { + logger.info("Internal error: ${this.message}. See the opensearch.log for details", this) + this.message!! + } + else -> { + logger.info("Unknown Internal error. See the OpenSearch log for details.", this) + "Unknown Internal error. See the OpenSearch log for details." + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt new file mode 100644 index 00000000..101d0067 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/QueryLevelTriggerRunResult.kt @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException +import java.time.Instant + +open class QueryLevelTriggerRunResult( + override var triggerName: String, + open var triggered: Boolean, + override var error: Exception?, + open var actionResults: MutableMap = mutableMapOf() +) : TriggerRunResult(triggerName, error) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + triggered = sin.readBoolean(), + actionResults = sin.readMap() as MutableMap + ) + + override fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}") + } + for (actionResult in actionResults.values) { + if (actionResult.error != null) { + return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}") + } + } + return null + } + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) + return builder + .field("triggered", triggered) + .field("action_results", actionResults as Map) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeBoolean(triggered) + out.writeMap(actionResults as Map) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): TriggerRunResult { + return QueryLevelTriggerRunResult(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt index 1834f3b7..7cfb9f41 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt @@ -1,6 +1,7 @@ package org.opensearch.commons.alerting.model import org.opensearch.commons.alerting.model.action.Action +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger import org.opensearch.commons.notifications.model.BaseModel import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.xcontent.XContentParser @@ -14,7 +15,8 @@ interface Trigger : BaseModel { QUERY_LEVEL_TRIGGER(QueryLevelTrigger.QUERY_LEVEL_TRIGGER_FIELD), BUCKET_LEVEL_TRIGGER(BucketLevelTrigger.BUCKET_LEVEL_TRIGGER_FIELD), NOOP_TRIGGER(NoOpTrigger.NOOP_TRIGGER_FIELD), - CHAINED_ALERT_TRIGGER(ChainedAlertTrigger.CHAINED_ALERT_TRIGGER_FIELD); + CHAINED_ALERT_TRIGGER(ChainedAlertTrigger.CHAINED_ALERT_TRIGGER_FIELD), + REMOTE_MONITOR_TRIGGER(RemoteMonitorTrigger.REMOTE_MONITOR_TRIGGER_FIELD); override fun toString(): String { return value @@ -55,6 +57,7 @@ interface Trigger : BaseModel { Type.BUCKET_LEVEL_TRIGGER -> BucketLevelTrigger(sin) Type.DOCUMENT_LEVEL_TRIGGER -> DocumentLevelTrigger(sin) Type.CHAINED_ALERT_TRIGGER -> ChainedAlertTrigger(sin) + Type.REMOTE_MONITOR_TRIGGER -> RemoteMonitorTrigger(sin) // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns // enum can be null in Java else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/TriggerRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/TriggerRunResult.kt new file mode 100644 index 00000000..84efde39 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/TriggerRunResult.kt @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException +import java.time.Instant + +abstract class TriggerRunResult( + open var triggerName: String, + open var error: Exception? = null +) : Writeable, ToXContent { + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field("name", triggerName) + + internalXContent(builder, params) + val msg = error?.message + + builder.field("error", msg) + .endObject() + return builder + } + + abstract fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder + + /** Returns error information to store in the Alert. Currently it's just the stack trace but it can be more */ + open fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}") + } + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(triggerName) + out.writeException(error) + } + + companion object { + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): MutableMap { + return map as MutableMap + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowMetadata.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowMetadata.kt new file mode 100644 index 00000000..48deaed6 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowMetadata.kt @@ -0,0 +1,106 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.commons.alerting.util.instant +import org.opensearch.commons.alerting.util.optionalTimeField +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import java.io.IOException +import java.time.Instant + +data class WorkflowMetadata( + val id: String, + val workflowId: String, + val monitorIds: List, + val latestRunTime: Instant, + val latestExecutionId: String +) : Writeable, ToXContent { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + id = sin.readString(), + workflowId = sin.readString(), + monitorIds = sin.readStringList(), + latestRunTime = sin.readInstant(), + latestExecutionId = sin.readString() + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(workflowId) + out.writeStringCollection(monitorIds) + out.writeInstant(latestRunTime) + out.writeString(latestExecutionId) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + if (params.paramAsBoolean("with_type", false)) builder.startObject(METADATA) + builder.field(WORKFLOW_ID_FIELD, workflowId) + .field(MONITOR_IDS_FIELD, monitorIds) + .optionalTimeField(LATEST_RUN_TIME, latestRunTime) + .field(LATEST_EXECUTION_ID, latestExecutionId) + if (params.paramAsBoolean("with_type", false)) builder.endObject() + return builder.endObject() + } + + companion object { + const val METADATA = "workflow_metadata" + const val WORKFLOW_ID_FIELD = "workflow_id" + const val MONITOR_IDS_FIELD = "monitor_ids" + const val LATEST_RUN_TIME = "latest_run_time" + const val LATEST_EXECUTION_ID = "latest_execution_id" + + @JvmStatic + @JvmOverloads + @Throws(IOException::class) + fun parse(xcp: XContentParser): WorkflowMetadata { + lateinit var workflowId: String + var monitorIds = mutableListOf() + lateinit var latestRunTime: Instant + lateinit var latestExecutionId: String + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + WORKFLOW_ID_FIELD -> workflowId = xcp.text() + MONITOR_IDS_FIELD -> { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + monitorIds.add(xcp.text()) + } + } + LATEST_RUN_TIME -> latestRunTime = xcp.instant()!! + LATEST_EXECUTION_ID -> latestExecutionId = xcp.text() + } + } + return WorkflowMetadata( + id = "$workflowId-metadata", + workflowId = workflowId, + monitorIds = monitorIds, + latestRunTime = latestRunTime, + latestExecutionId = latestExecutionId + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): WorkflowMetadata { + return WorkflowMetadata(sin) + } + + fun getId(workflowId: String? = null) = "$workflowId-metadata" + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt new file mode 100644 index 00000000..d478315e --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunContext.kt @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder + +data class WorkflowRunContext( + // In case of dry run it's random generated id, while in other cases it's workflowId + val workflowId: String, + val workflowMetadataId: String, + val chainedMonitorId: String?, + val matchingDocIdsPerIndex: Map>, + val auditDelegateMonitorAlerts: Boolean +) : Writeable, ToXContentObject { + companion object { + fun readFrom(sin: StreamInput): WorkflowRunContext { + return WorkflowRunContext(sin) + } + } + + constructor(sin: StreamInput) : this( + sin.readString(), + sin.readString(), + sin.readOptionalString(), + sin.readMap() as Map>, + sin.readBoolean() + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeString(workflowMetadataId) + out.writeOptionalString(chainedMonitorId) + out.writeMap(matchingDocIdsPerIndex) + out.writeBoolean(auditDelegateMonitorAlerts) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.startObject() + .field("workflow_id", workflowId) + .field("workflow_metadata_id", workflowMetadataId) + .field("chained_monitor_id", chainedMonitorId) + .field("matching_doc_ids_per_index", matchingDocIdsPerIndex) + .field("audit_delegate_monitor_alerts", auditDelegateMonitorAlerts) + .endObject() + return builder + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunResult.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunResult.kt new file mode 100644 index 00000000..1b5fe3d8 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowRunResult.kt @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException +import java.lang.Exception +import java.time.Instant + +data class WorkflowRunResult( + val workflowId: String, + val workflowName: String, + val monitorRunResults: List> = mutableListOf(), + val executionStartTime: Instant, + var executionEndTime: Instant? = null, + val executionId: String, + val error: Exception? = null, + val triggerResults: Map = mapOf() +) : Writeable, ToXContent { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + workflowId = sin.readString(), + workflowName = sin.readString(), + monitorRunResults = sin.readList> { s: StreamInput -> MonitorRunResult.readFrom(s) }, + executionStartTime = sin.readInstant(), + executionEndTime = sin.readOptionalInstant(), + executionId = sin.readString(), + error = sin.readException(), + triggerResults = suppressWarning(sin.readMap()) as Map + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeString(workflowName) + out.writeList(monitorRunResults) + out.writeInstant(executionStartTime) + out.writeOptionalInstant(executionEndTime) + out.writeString(executionId) + out.writeException(error) + out.writeMap(triggerResults) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + builder.field("execution_id", executionId) + builder.field("workflow_name", workflowName) + builder.field("workflow_id", workflowId) + builder.field("trigger_results", triggerResults) + builder.startArray("monitor_run_results") + for (monitorResult in monitorRunResults) { + monitorResult.toXContent(builder, ToXContent.EMPTY_PARAMS) + } + builder.endArray() + .field("execution_start_time", executionStartTime) + .field("execution_end_time", executionEndTime) + .field("error", error?.message) + .endObject() + return builder + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): WorkflowRunResult { + return WorkflowRunResult(sin) + } + + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): Map { + return map as Map + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteDocLevelMonitorInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteDocLevelMonitorInput.kt new file mode 100644 index 00000000..4d1911df --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteDocLevelMonitorInput.kt @@ -0,0 +1,81 @@ +package org.opensearch.commons.alerting.model.remote.monitors + +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelMonitorInput.Companion.DOC_LEVEL_INPUT_FIELD +import org.opensearch.commons.alerting.model.Input +import org.opensearch.core.common.bytes.BytesReference +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import java.io.IOException +import java.nio.ByteBuffer + +data class RemoteDocLevelMonitorInput(val input: BytesReference, val docLevelMonitorInput: DocLevelMonitorInput) : Input { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readBytesReference(), + DocLevelMonitorInput.readFrom(sin) + ) + + fun asTemplateArg(): Map { + val bytes = input.toBytesRef().bytes + return mapOf( + RemoteDocLevelMonitorInput.INPUT_SIZE to bytes.size, + RemoteDocLevelMonitorInput.INPUT_FIELD to bytes, + DOC_LEVEL_INPUT_FIELD to docLevelMonitorInput + ) + } + + override fun name(): String { + return REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD + } + + override fun writeTo(out: StreamOutput) { + out.writeBytesReference(input) + docLevelMonitorInput.writeTo(out) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + val bytes = input.toBytesRef().bytes + return builder.startObject() + .startObject(REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD) + .field(RemoteMonitorInput.INPUT_SIZE, bytes.size) + .field(RemoteMonitorInput.INPUT_FIELD, bytes) + .field(DOC_LEVEL_INPUT_FIELD, docLevelMonitorInput) + .endObject() + .endObject() + } + + companion object { + const val INPUT_FIELD = "input" + const val INPUT_SIZE = "size" + const val REMOTE_DOC_LEVEL_MONITOR_INPUT_FIELD = "remote_doc_level_monitor_input" + + fun parse(xcp: XContentParser): RemoteDocLevelMonitorInput { + var bytes: ByteArray? = null + var size: Int = 0 + var docLevelMonitorInput: DocLevelMonitorInput? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + RemoteMonitorInput.INPUT_FIELD -> bytes = xcp.binaryValue() + RemoteMonitorInput.INPUT_SIZE -> size = xcp.intValue() + Input.Type.DOCUMENT_LEVEL_INPUT.value -> { + docLevelMonitorInput = DocLevelMonitorInput.parse(xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp) + } + } + } + val input = BytesReference.fromByteBuffer(ByteBuffer.wrap(bytes, 0, size)) + return RemoteDocLevelMonitorInput(input, docLevelMonitorInput!!) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorInput.kt new file mode 100644 index 00000000..c2d3867b --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorInput.kt @@ -0,0 +1,70 @@ +package org.opensearch.commons.alerting.model.remote.monitors + +import org.opensearch.commons.alerting.model.Input +import org.opensearch.core.common.bytes.BytesReference +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import java.io.IOException +import java.nio.ByteBuffer + +data class RemoteMonitorInput(val input: BytesReference) : Input { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readBytesReference() + ) + + fun asTemplateArg(): Map { + val bytes = input.toBytesRef().bytes + return mapOf( + INPUT_SIZE to bytes.size, + INPUT_FIELD to bytes + ) + } + + override fun name(): String { + return REMOTE_MONITOR_INPUT_FIELD + } + + override fun writeTo(out: StreamOutput) { + out.writeBytesReference(input) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + val bytes = input.toBytesRef().bytes + return builder.startObject() + .startObject(REMOTE_MONITOR_INPUT_FIELD) + .field(INPUT_SIZE, bytes.size) + .field(INPUT_FIELD, bytes) + .endObject() + .endObject() + } + + companion object { + const val INPUT_FIELD = "input" + const val INPUT_SIZE = "size" + const val REMOTE_MONITOR_INPUT_FIELD = "remote_monitor_input" + + fun parse(xcp: XContentParser): RemoteMonitorInput { + var bytes: ByteArray? = null + var size: Int = 0 + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + INPUT_FIELD -> bytes = xcp.binaryValue() + INPUT_SIZE -> size = xcp.intValue() + } + } + val input = BytesReference.fromByteBuffer(ByteBuffer.wrap(bytes, 0, size)) + return RemoteMonitorInput(input) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorTrigger.kt new file mode 100644 index 00000000..0e89e5ba --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/remote/monitors/RemoteMonitorTrigger.kt @@ -0,0 +1,126 @@ +package org.opensearch.commons.alerting.model.remote.monitors + +import org.opensearch.common.CheckedFunction +import org.opensearch.common.UUIDs +import org.opensearch.commons.alerting.model.Trigger +import org.opensearch.commons.alerting.model.action.Action +import org.opensearch.core.ParseField +import org.opensearch.core.common.bytes.BytesReference +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import java.io.IOException +import java.nio.ByteBuffer + +data class RemoteMonitorTrigger( + override val id: String, + override val name: String, + override val severity: String, + override val actions: List, + val trigger: BytesReference +) : Trigger { + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), + sin.readString(), + sin.readString(), + sin.readList(::Action), + sin.readBytesReference() + ) + + fun asTemplateArg(): Map { + val bytes = trigger.toBytesRef().bytes + return mapOf( + Trigger.ID_FIELD to id, + Trigger.NAME_FIELD to name, + Trigger.SEVERITY_FIELD to severity, + Trigger.ACTIONS_FIELD to actions.map { it.asTemplateArg() }, + TRIGGER_SIZE to bytes.size, + TRIGGER_FIELD to bytes + ) + } + + override fun name(): String { + return REMOTE_MONITOR_TRIGGER_FIELD + } + + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(name) + out.writeString(severity) + out.writeCollection(actions) + out.writeBytesReference(trigger) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + val bytes = trigger.toBytesRef().bytes + return builder.startObject() + .startObject(REMOTE_MONITOR_TRIGGER_FIELD) + .field(Trigger.ID_FIELD, id) + .field(Trigger.NAME_FIELD, name) + .field(Trigger.SEVERITY_FIELD, severity) + .field(Trigger.ACTIONS_FIELD, actions.toTypedArray()) + .field(TRIGGER_SIZE, bytes.size) + .field(TRIGGER_FIELD, bytes) + .endObject() + .endObject() + } + + companion object { + const val TRIGGER_FIELD = "trigger" + const val TRIGGER_SIZE = "size" + const val REMOTE_MONITOR_TRIGGER_FIELD = "remote_monitor_trigger" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + Trigger::class.java, + ParseField(REMOTE_MONITOR_TRIGGER_FIELD), + CheckedFunction { parseInner(it) } + ) + + fun parseInner(xcp: XContentParser): RemoteMonitorTrigger { + var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified + lateinit var name: String + lateinit var severity: String + val actions: MutableList = mutableListOf() + var bytes: ByteArray? = null + var size: Int = 0 + + if (xcp.currentToken() != XContentParser.Token.START_OBJECT && xcp.currentToken() != XContentParser.Token.FIELD_NAME) { + XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.tokenLocation) + } + + // If the parser began on START_OBJECT, move to the next token so that the while loop enters on + // the fieldName (or END_OBJECT if it's empty). + if (xcp.currentToken() == XContentParser.Token.START_OBJECT) xcp.nextToken() + while (xcp.currentToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + Trigger.ID_FIELD -> id = xcp.text() + Trigger.NAME_FIELD -> name = xcp.text() + Trigger.SEVERITY_FIELD -> severity = xcp.text() + Trigger.ACTIONS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + actions.add(Action.parse(xcp)) + } + } + TRIGGER_FIELD -> bytes = xcp.binaryValue() + TRIGGER_SIZE -> size = xcp.intValue() + } + xcp.nextToken() + } + val trigger = BytesReference.fromByteBuffer(ByteBuffer.wrap(bytes, 0, size)) + return RemoteMonitorTrigger(id, name, severity, actions, trigger) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/util/AlertingException.kt b/src/main/kotlin/org/opensearch/commons/alerting/util/AlertingException.kt new file mode 100644 index 00000000..312758f0 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/util/AlertingException.kt @@ -0,0 +1,89 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.util + +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchException +import org.opensearch.OpenSearchSecurityException +import org.opensearch.OpenSearchStatusException +import org.opensearch.core.common.Strings +import org.opensearch.core.rest.RestStatus +import org.opensearch.index.IndexNotFoundException +import org.opensearch.index.engine.VersionConflictEngineException +import org.opensearch.indices.InvalidIndexNameException + +private val log = LogManager.getLogger(AlertingException::class.java) + +/** + * Converts into a user friendly message. + */ +class AlertingException(message: String, val status: RestStatus, val ex: Exception) : OpenSearchException(message, ex) { + + override fun status(): RestStatus { + return status + } + + companion object { + @JvmStatic + fun wrap(ex: Exception): OpenSearchException { + log.error("Alerting error: $ex") + + var friendlyMsg = "Unknown error" + var status = RestStatus.INTERNAL_SERVER_ERROR + when (ex) { + is IndexNotFoundException -> { + status = ex.status() + friendlyMsg = "Configured indices are not found: ${ex.index}" + } + is OpenSearchSecurityException -> { + status = ex.status() + friendlyMsg = "User doesn't have permissions to execute this action. Contact administrator." + } + is OpenSearchStatusException -> { + status = ex.status() + friendlyMsg = ex.message as String + } + is IllegalArgumentException -> { + status = RestStatus.BAD_REQUEST + friendlyMsg = ex.message as String + } + is VersionConflictEngineException -> { + status = ex.status() + friendlyMsg = ex.message as String + } + is InvalidIndexNameException -> { + status = RestStatus.BAD_REQUEST + friendlyMsg = ex.message as String + } + else -> { + if (!Strings.isNullOrEmpty(ex.message)) { + friendlyMsg = ex.message as String + } + } + } + // Wrapping the origin exception as runtime to avoid it being formatted. + // Currently, alerting-kibana is using `error.root_cause.reason` as text in the toast message. + // Below logic is to set friendly message to error.root_cause.reason. + return AlertingException(friendlyMsg, status, Exception("${ex.javaClass.name}: ${ex.message}")) + } + + @JvmStatic + fun merge(vararg ex: AlertingException): AlertingException { + var friendlyMsg = "" + var unwrappedExceptionMsg = "" + ex.forEach { + if (friendlyMsg != "") { + friendlyMsg += ", ${it.message}" + unwrappedExceptionMsg += ", ${it.ex.message}" + } else { + friendlyMsg = it.message.orEmpty() + unwrappedExceptionMsg = "${it.ex.message}" + } + } + return AlertingException(friendlyMsg, ex.first().status, Exception(unwrappedExceptionMsg)) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt index aa45d99e..887e8430 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt @@ -1,5 +1,6 @@ package org.opensearch.commons.alerting.util +import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.settings.SupportedClusterMetricsSettings import org.opensearch.commons.authuser.User @@ -8,6 +9,7 @@ import org.opensearch.core.xcontent.XContentBuilder import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParserUtils import java.time.Instant +import java.util.Locale class IndexUtils { companion object { @@ -46,7 +48,9 @@ class IndexUtils { } } -fun Monitor.isBucketLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.BUCKET_LEVEL_MONITOR +fun Monitor.isBucketLevelMonitor(): Boolean = + isMonitorOfStandardType() && + Monitor.MonitorType.valueOf(this.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.BUCKET_LEVEL_MONITOR fun XContentBuilder.optionalUserField(name: String, user: User?): XContentBuilder { if (user == null) { @@ -85,3 +89,16 @@ fun XContentParser.instant(): Instant? { * Extension function for ES 6.3 and above that duplicates the ES 6.2 XContentBuilder.string() method. */ fun XContentBuilder.string(): String = BytesReference.bytes(this).utf8ToString() + +fun Monitor.isMonitorOfStandardType(): Boolean { + val standardMonitorTypes = Monitor.MonitorType.values().map { it.value.uppercase(Locale.ROOT) }.toSet() + return standardMonitorTypes.contains(this.monitorType.uppercase(Locale.ROOT)) +} + +fun getBucketKeysHash(bucketKeys: List): String = bucketKeys.joinToString(separator = "#") + +/** + * Since buckets can have multi-value keys, this converts the bucket key values to a string that can be used + * as the key for a HashMap to easily retrieve [AggregationResultBucket] based on the bucket key values. + */ +fun AggregationResultBucket.getBucketKeysHash(): String = getBucketKeysHash(this.bucketKeys) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index 7e3310fd..7ae132ef 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -18,10 +18,12 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter import org.opensearch.commons.alerting.model.ActionExecutionResult +import org.opensearch.commons.alerting.model.ActionRunResult import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BaseAlert import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.BucketLevelTriggerRunResult import org.opensearch.commons.alerting.model.ChainedAlertTrigger import org.opensearch.commons.alerting.model.ChainedMonitorFindings import org.opensearch.commons.alerting.model.ClusterMetricsInput @@ -31,12 +33,16 @@ import org.opensearch.commons.alerting.model.Delegate import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.DocumentLevelTrigger +import org.opensearch.commons.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.commons.alerting.model.Finding import org.opensearch.commons.alerting.model.Input +import org.opensearch.commons.alerting.model.InputRunResults import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.MonitorRunResult import org.opensearch.commons.alerting.model.NoOpTrigger import org.opensearch.commons.alerting.model.QueryLevelTrigger +import org.opensearch.commons.alerting.model.QueryLevelTriggerRunResult import org.opensearch.commons.alerting.model.Schedule import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.commons.alerting.model.Sequence @@ -50,6 +56,8 @@ import org.opensearch.commons.alerting.model.action.AlertCategory import org.opensearch.commons.alerting.model.action.PerAlertActionScope import org.opensearch.commons.alerting.model.action.PerExecutionActionScope import org.opensearch.commons.alerting.model.action.Throttle +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger +import org.opensearch.commons.alerting.util.getBucketKeysHash import org.opensearch.commons.alerting.util.string import org.opensearch.commons.authuser.User import org.opensearch.core.xcontent.NamedXContentRegistry @@ -82,7 +90,7 @@ fun randomQueryLevelMonitor( withMetadata: Boolean = false ): Monitor { return Monitor( - name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() ) @@ -100,7 +108,7 @@ fun randomQueryLevelMonitorWithoutUser( withMetadata: Boolean = false ): Monitor { return Monitor( - name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = null, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() ) @@ -124,7 +132,7 @@ fun randomBucketLevelMonitor( withMetadata: Boolean = false ): Monitor { return Monitor( - name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() ) @@ -142,7 +150,7 @@ fun randomClusterMetricsMonitor( withMetadata: Boolean = false ): Monitor { return Monitor( - name = name, monitorType = Monitor.MonitorType.CLUSTER_METRICS_MONITOR, enabled = enabled, inputs = inputs, + name = name, monitorType = Monitor.MonitorType.CLUSTER_METRICS_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() ) @@ -160,7 +168,7 @@ fun randomDocumentLevelMonitor( withMetadata: Boolean = false ): Monitor { return Monitor( - name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() ) @@ -507,6 +515,12 @@ fun parser(xc: String): XContentParser { return parser } +fun parser(xc: ByteArray): XContentParser { + val parser = XContentType.JSON.xContent().createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, xc) + parser.nextToken() + return parser +} + fun xContentRegistry(): NamedXContentRegistry { return NamedXContentRegistry( listOf( @@ -516,7 +530,8 @@ fun xContentRegistry(): NamedXContentRegistry { BucketLevelTrigger.XCONTENT_REGISTRY, DocumentLevelTrigger.XCONTENT_REGISTRY, ChainedAlertTrigger.XCONTENT_REGISTRY, - NoOpTrigger.XCONTENT_REGISTRY + NoOpTrigger.XCONTENT_REGISTRY, + RemoteMonitorTrigger.XCONTENT_REGISTRY ) + SearchModule(Settings.EMPTY, emptyList()).namedXContents ) } @@ -657,3 +672,115 @@ fun createCorrelationAlertTemplateArgs(correlationAlert: CorrelationAlert): Map< CorrelationAlert.CORRELATION_RULE_NAME to correlationAlert.correlationRuleName ) } + +fun randomInputRunResults(): InputRunResults { + return InputRunResults(listOf(), null) +} + +fun randomActionRunResult(): ActionRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", "val1")) + map.plus(Pair("key2", "val2")) + return ActionRunResult( + "1234", + "test-action", + map, + false, + Instant.now(), + null + ) +} + +fun randomDocumentLevelTriggerRunResult(): DocumentLevelTriggerRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", randomActionRunResult())) + map.plus(Pair("key2", randomActionRunResult())) + return DocumentLevelTriggerRunResult( + "trigger-name", + mutableListOf(UUIDs.randomBase64UUID().toString()), + null, + mutableMapOf(Pair("alertId", map)) + ) +} +fun randomDocumentLevelMonitorRunResult(): MonitorRunResult { + val triggerResults = mutableMapOf() + val triggerRunResult = randomDocumentLevelTriggerRunResult() + triggerResults.plus(Pair("test", triggerRunResult)) + + return MonitorRunResult( + "test-monitor", + Instant.now(), + Instant.now(), + null, + randomInputRunResults(), + triggerResults + ) +} + +fun randomBucketLevelTriggerRunResult(): BucketLevelTriggerRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", randomActionRunResult())) + map.plus(Pair("key2", randomActionRunResult())) + + val aggBucket1 = AggregationResultBucket( + "parent_bucket_path_1", + listOf("bucket_key_1"), + mapOf("k1" to "val1", "k2" to "val2") + ) + val aggBucket2 = AggregationResultBucket( + "parent_bucket_path_2", + listOf("bucket_key_2"), + mapOf("k1" to "val1", "k2" to "val2") + ) + + val actionResultsMap: MutableMap> = mutableMapOf() + actionResultsMap[aggBucket1.getBucketKeysHash()] = map + actionResultsMap[aggBucket2.getBucketKeysHash()] = map + + return BucketLevelTriggerRunResult( + "trigger-name", + null, + mapOf( + aggBucket1.getBucketKeysHash() to aggBucket1, + aggBucket2.getBucketKeysHash() to aggBucket2 + ), + actionResultsMap + ) +} + +fun randomBucketLevelMonitorRunResult(): MonitorRunResult { + val triggerResults = mutableMapOf() + val triggerRunResult = randomBucketLevelTriggerRunResult() + triggerResults.plus(Pair("test", triggerRunResult)) + + return MonitorRunResult( + "test-monitor", + Instant.now(), + Instant.now(), + null, + randomInputRunResults(), + triggerResults + ) +} + +fun randomQueryLevelTriggerRunResult(): QueryLevelTriggerRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", randomActionRunResult())) + map.plus(Pair("key2", randomActionRunResult())) + return QueryLevelTriggerRunResult("trigger-name", true, null, map) +} + +fun randomQueryLevelMonitorRunResult(): MonitorRunResult { + val triggerResults = mutableMapOf() + val triggerRunResult = randomQueryLevelTriggerRunResult() + triggerResults.plus(Pair("test", triggerRunResult)) + + return MonitorRunResult( + "test-monitor", + Instant.now(), + Instant.now(), + null, + randomInputRunResults(), + triggerResults + ) +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt new file mode 100644 index 00000000..dda45483 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutRequestTests.kt @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.junit.Assert.assertEquals +import org.junit.jupiter.api.Test +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.commons.alerting.model.ActionExecutionTime +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.IndexExecutionContext +import org.opensearch.commons.alerting.model.IntervalSchedule +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.MonitorMetadata +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.model.WorkflowRunContext +import org.opensearch.commons.alerting.randomDocumentLevelMonitor +import org.opensearch.commons.alerting.randomDocumentLevelTrigger +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.index.shard.ShardId +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.script.Script +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.UUID + +class DocLevelMonitorFanOutRequestTests { + + @Test + fun `test doc level monitor fan out request as stream`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = Script("return true")) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) + ) + val monitorMetadata = MonitorMetadata( + "test", + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + Monitor.NO_ID, + listOf(ActionExecutionTime("", Instant.now())), + mutableMapOf("index" to mutableMapOf("1" to "1")), + mutableMapOf("test-index" to ".opensearch-sap-test_windows-queries-000001") + ) + val indexExecutionContext = IndexExecutionContext( + listOf(docQuery), + mutableMapOf("index" to mutableMapOf("1" to "1")), + mutableMapOf("index" to mutableMapOf("1" to "1")), + "test-index", + "test-index", + listOf("test-index"), + listOf("test-index"), + listOf("test-field"), + listOf("1", "2") + ) + val workflowRunContext = WorkflowRunContext( + Workflow.NO_ID, + Workflow.NO_ID, + Monitor.NO_ID, + mutableMapOf("index" to listOf("1")), + true + ) + val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest( + monitor, + false, + monitorMetadata, + UUID.randomUUID().toString(), + indexExecutionContext, + listOf(ShardId("test-index", UUID.randomUUID().toString(), 0)), + listOf("test-index"), + workflowRunContext + ) + val out = BytesStreamOutput() + docLevelMonitorFanOutRequest.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newDocLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(sin) + assertEquals(docLevelMonitorFanOutRequest.monitor, newDocLevelMonitorFanOutRequest.monitor) + assertEquals(docLevelMonitorFanOutRequest.executionId, newDocLevelMonitorFanOutRequest.executionId) + assertEquals(docLevelMonitorFanOutRequest.monitorMetadata, newDocLevelMonitorFanOutRequest.monitorMetadata) + assertEquals(docLevelMonitorFanOutRequest.indexExecutionContext, newDocLevelMonitorFanOutRequest.indexExecutionContext) + assertEquals(docLevelMonitorFanOutRequest.shardIds, newDocLevelMonitorFanOutRequest.shardIds) + assertEquals(docLevelMonitorFanOutRequest.workflowRunContext, newDocLevelMonitorFanOutRequest.workflowRunContext) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutResponseTests.kt new file mode 100644 index 00000000..645b7d5c --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DocLevelMonitorFanOutResponseTests.kt @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.junit.Assert.assertEquals +import org.junit.jupiter.api.Test +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.commons.alerting.model.InputRunResults +import org.opensearch.commons.alerting.randomDocumentLevelTriggerRunResult +import org.opensearch.core.common.io.stream.StreamInput + +class DocLevelMonitorFanOutResponseTests { + + @Test + fun `test doc level monitor fan out response with errors as stream`() { + val docLevelMonitorFanOutResponse = DocLevelMonitorFanOutResponse( + "nodeid", + "eid", + "monitorId", + mutableMapOf("index" to mutableMapOf("1" to "1")), + InputRunResults(error = null), + mapOf("1" to randomDocumentLevelTriggerRunResult(), "2" to randomDocumentLevelTriggerRunResult()) + ) + val out = BytesStreamOutput() + docLevelMonitorFanOutResponse.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newDocLevelMonitorFanOutResponse = DocLevelMonitorFanOutResponse(sin) + assertEquals(docLevelMonitorFanOutResponse.nodeId, newDocLevelMonitorFanOutResponse.nodeId) + assertEquals(docLevelMonitorFanOutResponse.executionId, newDocLevelMonitorFanOutResponse.executionId) + assertEquals(docLevelMonitorFanOutResponse.monitorId, newDocLevelMonitorFanOutResponse.monitorId) + assertEquals(docLevelMonitorFanOutResponse.lastRunContexts, newDocLevelMonitorFanOutResponse.lastRunContexts) + assertEquals(docLevelMonitorFanOutResponse.inputResults, newDocLevelMonitorFanOutResponse.inputResults) + assertEquals(docLevelMonitorFanOutResponse.triggerResults, newDocLevelMonitorFanOutResponse.triggerResults) + } + + @Test + fun `test doc level monitor fan out response as stream`() { + val workflow = DocLevelMonitorFanOutResponse( + "nodeid", + "eid", + "monitorId", + mapOf("index" to mapOf("1" to "1")) as MutableMap, + InputRunResults(), + mapOf("1" to randomDocumentLevelTriggerRunResult(), "2" to randomDocumentLevelTriggerRunResult()) + ) + val out = BytesStreamOutput() + workflow.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newWorkflow = DocLevelMonitorFanOutResponse(sin) + assertEquals(workflow.nodeId, newWorkflow.nodeId) + assertEquals(workflow.executionId, newWorkflow.executionId) + assertEquals(workflow.monitorId, newWorkflow.monitorId) + assertEquals(workflow.lastRunContexts, newWorkflow.lastRunContexts) + assertEquals(workflow.inputResults, newWorkflow.inputResults) + assertEquals(workflow.triggerResults, newWorkflow.triggerResults) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt index d91c7471..eb3f08e4 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt @@ -42,7 +42,7 @@ class GetMonitorResponseTests : OpenSearchTestCase() { schedule = cronSchedule, lastUpdateTime = Instant.now(), enabledTime = Instant.now(), - monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, + monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, user = randomUser(), schemaVersion = 0, inputs = mutableListOf(), diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexMonitorResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexMonitorResponseTests.kt index 2b5ee04d..ca3afa3e 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexMonitorResponseTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexMonitorResponseTests.kt @@ -26,7 +26,7 @@ class IndexMonitorResponseTests { schedule = cronSchedule, lastUpdateTime = Instant.now(), enabledTime = Instant.now(), - monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, + monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, user = randomUser(), schemaVersion = 0, inputs = mutableListOf(), diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt index 220806c1..170317b2 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt @@ -1,26 +1,43 @@ package org.opensearch.commons.alerting.model +import org.junit.Assert import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test +import org.opensearch.common.UUIDs import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.Throttle +import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger import org.opensearch.commons.alerting.randomAction import org.opensearch.commons.alerting.randomActionExecutionPolicy +import org.opensearch.commons.alerting.randomBucketLevelMonitorRunResult import org.opensearch.commons.alerting.randomBucketLevelTrigger +import org.opensearch.commons.alerting.randomBucketLevelTriggerRunResult import org.opensearch.commons.alerting.randomChainedAlertTrigger import org.opensearch.commons.alerting.randomDocLevelQuery +import org.opensearch.commons.alerting.randomDocumentLevelMonitorRunResult import org.opensearch.commons.alerting.randomDocumentLevelTrigger +import org.opensearch.commons.alerting.randomInputRunResults import org.opensearch.commons.alerting.randomQueryLevelMonitor +import org.opensearch.commons.alerting.randomQueryLevelMonitorRunResult import org.opensearch.commons.alerting.randomQueryLevelTrigger +import org.opensearch.commons.alerting.randomQueryLevelTriggerRunResult import org.opensearch.commons.alerting.randomThrottle import org.opensearch.commons.alerting.randomUser import org.opensearch.commons.alerting.randomUserEmpty +import org.opensearch.commons.alerting.util.IndexUtils import org.opensearch.commons.authuser.User import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.OpenSearchTestCase +import java.io.IOException import java.time.Instant +import java.time.temporal.ChronoUnit import kotlin.test.assertTrue class WriteableTests { @@ -217,4 +234,255 @@ class WriteableTests { Assertions.assertEquals(createdTime, newComment.createdTime) Assertions.assertEquals(user, newComment.user) } + + @Test + fun `test actionrunresult as stream`() { + val actionRunResult = randomActionRunResult() + val out = BytesStreamOutput() + actionRunResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newActionRunResult = ActionRunResult(sin) + OpenSearchTestCase.assertEquals( + "Round tripping ActionRunResult doesn't work", + actionRunResult, + newActionRunResult + ) + } + + @Test + fun `test query-level triggerrunresult as stream`() { + val runResult = randomQueryLevelTriggerRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = QueryLevelTriggerRunResult(sin) + OpenSearchTestCase.assertEquals(runResult.triggerName, newRunResult.triggerName) + OpenSearchTestCase.assertEquals(runResult.triggered, newRunResult.triggered) + OpenSearchTestCase.assertEquals(runResult.error, newRunResult.error) + OpenSearchTestCase.assertEquals(runResult.actionResults, newRunResult.actionResults) + } + + @Test + fun `test bucket-level triggerrunresult as stream`() { + val runResult = randomBucketLevelTriggerRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = BucketLevelTriggerRunResult(sin) + OpenSearchTestCase.assertEquals("Round tripping ActionRunResult doesn't work", runResult, newRunResult) + } + + @Test + fun `test doc-level triggerrunresult as stream`() { + val runResult = randomDocumentLevelTriggerRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = DocumentLevelTriggerRunResult(sin) + OpenSearchTestCase.assertEquals("Round tripping ActionRunResult doesn't work", runResult, newRunResult) + } + + @Test + fun `test inputrunresult as stream`() { + val runResult = randomInputRunResults() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = InputRunResults.readFrom(sin) + OpenSearchTestCase.assertEquals("Round tripping InputRunResults doesn't work", runResult, newRunResult) + } + + @Test + fun `test query-level monitorrunresult as stream`() { + val runResult = randomQueryLevelMonitorRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = MonitorRunResult(sin) + OpenSearchTestCase.assertEquals("Round tripping MonitorRunResult doesn't work", runResult, newRunResult) + } + + @Test + fun `test bucket-level monitorrunresult as stream`() { + val runResult = randomBucketLevelMonitorRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = MonitorRunResult(sin) + OpenSearchTestCase.assertEquals("Round tripping MonitorRunResult doesn't work", runResult, newRunResult) + } + + @Test + fun `test doc-level monitorrunresult as stream`() { + val runResult = randomDocumentLevelMonitorRunResult() + val out = BytesStreamOutput() + runResult.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRunResult = MonitorRunResult(sin) + OpenSearchTestCase.assertEquals("Round tripping MonitorRunResult doesn't work", runResult, newRunResult) + } + + @Test + fun `test DocumentLevelTriggerRunResult as stream`() { + val workflow = randomDocumentLevelTriggerRunResult() + val out = BytesStreamOutput() + workflow.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newWorkflow = DocumentLevelTriggerRunResult(sin) + Assert.assertEquals("Round tripping dltrr failed", newWorkflow, workflow) + } + + @Test + fun `test RemoteMonitorInput as stream`() { + val myMonitorInput = MyMonitorInput(1, "hello", MyMonitorInput(2, "world", null)) + val myObjOut = BytesStreamOutput() + myMonitorInput.writeTo(myObjOut) + val remoteMonitorInput = RemoteMonitorInput(myObjOut.bytes()) + + val out = BytesStreamOutput() + remoteMonitorInput.writeTo(out) + + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRemoteMonitorInput = RemoteMonitorInput(sin) + val newMyMonitorInput = MyMonitorInput(StreamInput.wrap(newRemoteMonitorInput.input.toBytesRef().bytes)) + Assert.assertEquals("Round tripping RemoteMonitorInput failed", newMyMonitorInput, myMonitorInput) + } + + @Test + fun `test RemoteMonitorTrigger as stream`() { + val myMonitorTrigger = MyMonitorTrigger(1, "hello", MyMonitorTrigger(2, "world", null)) + val myObjOut = BytesStreamOutput() + myMonitorTrigger.writeTo(myObjOut) + val remoteMonitorTrigger = RemoteMonitorTrigger("id", "name", "1", listOf(), myObjOut.bytes()) + + val out = BytesStreamOutput() + remoteMonitorTrigger.writeTo(out) + + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRemoteMonitorTrigger = RemoteMonitorTrigger(sin) + val newMyMonitorTrigger = MyMonitorTrigger(StreamInput.wrap(newRemoteMonitorTrigger.trigger.toBytesRef().bytes)) + Assert.assertEquals("Round tripping RemoteMonitorTrigger failed", newMyMonitorTrigger, myMonitorTrigger) + } + + @Test + fun `test RemoteDocLevelMonitorInput as stream`() { + val myMonitorInput = MyMonitorInput(1, "hello", MyMonitorInput(2, "world", null)) + val myObjOut = BytesStreamOutput() + myMonitorInput.writeTo(myObjOut) + val docLevelMonitorInput = DocLevelMonitorInput( + "test", + listOf("test"), + listOf(randomDocLevelQuery()) + ) + val remoteDocLevelMonitorInput = RemoteDocLevelMonitorInput(myObjOut.bytes(), docLevelMonitorInput) + + val out = BytesStreamOutput() + remoteDocLevelMonitorInput.writeTo(out) + + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRemoteDocLevelMonitorInput = RemoteDocLevelMonitorInput(sin) + val newMyMonitorInput = MyMonitorInput(StreamInput.wrap(newRemoteDocLevelMonitorInput.input.toBytesRef().bytes)) + Assert.assertEquals("Round tripping RemoteMonitorInput failed", newMyMonitorInput, myMonitorInput) + val newDocLevelMonitorInput = newRemoteDocLevelMonitorInput.docLevelMonitorInput + Assert.assertEquals("Round tripping DocLevelMonitorInput failed", newDocLevelMonitorInput, docLevelMonitorInput) + } + + @Test + fun `test RemoteMonitor as stream`() { + val myMonitorInput = MyMonitorInput(1, "hello", MyMonitorInput(2, "world", null)) + var myObjOut = BytesStreamOutput() + myMonitorInput.writeTo(myObjOut) + val docLevelMonitorInput = DocLevelMonitorInput( + "test", + listOf("test"), + listOf(randomDocLevelQuery()) + ) + val remoteDocLevelMonitorInput = RemoteDocLevelMonitorInput(myObjOut.bytes(), docLevelMonitorInput) + + val myMonitorTrigger = MyMonitorTrigger(1, "hello", MyMonitorTrigger(2, "world", null)) + myObjOut = BytesStreamOutput() + myMonitorTrigger.writeTo(myObjOut) + val remoteMonitorTrigger = RemoteMonitorTrigger("id", "name", "1", listOf(), myObjOut.bytes()) + + val monitor = Monitor( + Monitor.NO_ID, + Monitor.NO_VERSION, + "hello", + true, + IntervalSchedule(1, ChronoUnit.MINUTES), + Instant.now(), + Instant.now(), + "remote_doc_level_monitor", + null, + IndexUtils.NO_SCHEMA_VERSION, + listOf(remoteDocLevelMonitorInput), + listOf(remoteMonitorTrigger), + mapOf() + ) + + val out = BytesStreamOutput() + monitor.writeTo(out) + + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newMonitor = Monitor(sin) + Assert.assertEquals("Round tripping RemoteMonitor failed", monitor, newMonitor) + } + + fun randomDocumentLevelTriggerRunResult(): DocumentLevelTriggerRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", randomActionRunResult())) + map.plus(Pair("key2", randomActionRunResult())) + return DocumentLevelTriggerRunResult( + "trigger-name", + mutableListOf(UUIDs.randomBase64UUID().toString()), + null, + mutableMapOf(Pair("alertId", map)) + ) + } + + fun randomActionRunResult(): ActionRunResult { + val map = mutableMapOf() + map.plus(Pair("key1", "val1")) + map.plus(Pair("key2", "val2")) + return ActionRunResult( + "1234", + "test-action", + map, + false, + Instant.now(), + null + ) + } +} + +data class MyMonitorInput(val a: Int, val b: String, val c: MyMonitorInput?) : Writeable { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readInt(), + sin.readString(), + sin.readOptionalWriteable { MyMonitorInput(it) } + ) + + override fun writeTo(out: StreamOutput) { + out.writeInt(a) + out.writeString(b) + out.writeOptionalWriteable(c) + } +} + +data class MyMonitorTrigger(val a: Int, val b: String, val c: MyMonitorTrigger?) : Writeable { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readInt(), + sin.readString(), + sin.readOptionalWriteable { MyMonitorTrigger(it) } + ) + + override fun writeTo(out: StreamOutput) { + out.writeInt(a) + out.writeString(b) + out.writeOptionalWriteable(c) + } } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index 7c928e92..42e5ab53 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -3,11 +3,17 @@ package org.opensearch.commons.alerting.model import org.junit.Assert.assertEquals import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.builder import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.PerExecutionActionScope import org.opensearch.commons.alerting.model.action.Throttle +import org.opensearch.commons.alerting.model.remote.monitors.RemoteDocLevelMonitorInput +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger import org.opensearch.commons.alerting.parser import org.opensearch.commons.alerting.randomAction import org.opensearch.commons.alerting.randomActionExecutionPolicy @@ -16,6 +22,7 @@ import org.opensearch.commons.alerting.randomActionWithPolicy import org.opensearch.commons.alerting.randomAlert import org.opensearch.commons.alerting.randomBucketLevelMonitor import org.opensearch.commons.alerting.randomBucketLevelTrigger +import org.opensearch.commons.alerting.randomDocLevelQuery import org.opensearch.commons.alerting.randomQueryLevelMonitor import org.opensearch.commons.alerting.randomQueryLevelMonitorWithoutUser import org.opensearch.commons.alerting.randomQueryLevelTrigger @@ -27,6 +34,7 @@ import org.opensearch.commons.alerting.toJsonString import org.opensearch.commons.alerting.toJsonStringWithUser import org.opensearch.commons.alerting.util.string import org.opensearch.commons.authuser.User +import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.xcontent.ToXContent import org.opensearch.index.query.QueryBuilders import org.opensearch.search.builder.SearchSourceBuilder @@ -367,7 +375,7 @@ class XContentTests { """.trimIndent() val parsedMonitor = Monitor.parse(parser(monitorString)) Assertions.assertEquals( - Monitor.MonitorType.QUERY_LEVEL_MONITOR, + Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, parsedMonitor.monitorType, "Incorrect monitor type" ) @@ -543,4 +551,64 @@ class XContentTests { val parsedComment = Comment.parse(parser(commentString), "123") Assertions.assertEquals(comment, parsedComment, "Round tripping Comment doesn't work") } + + @Test + fun `test MonitorMetadata`() { + val monitorMetadata = MonitorMetadata( + id = "monitorId-metadata", + monitorId = "monitorId", + lastActionExecutionTimes = emptyList(), + lastRunContext = emptyMap(), + sourceToQueryIndexMapping = mutableMapOf() + ) + val monitorMetadataString = monitorMetadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).string() + val parsedMonitorMetadata = MonitorMetadata.parse(parser(monitorMetadataString)) + assertEquals("Round tripping MonitorMetadata doesn't work", monitorMetadata, parsedMonitorMetadata) + } + + @Test + fun `test RemoteMonitorInput`() { + val myMonitorInput = MyMonitorInput(1, "hello", MyMonitorInput(2, "world", null)) + val myObjOut = BytesStreamOutput() + myMonitorInput.writeTo(myObjOut) + val remoteMonitorInput = RemoteMonitorInput(myObjOut.bytes()) + + val xContent = remoteMonitorInput.toXContent(JsonXContent.contentBuilder(), ToXContent.EMPTY_PARAMS).string() + val parsedRemoteMonitorInput = RemoteMonitorInput.parse(parser(xContent)) + val parsedMyMonitorInput = MyMonitorInput(StreamInput.wrap(parsedRemoteMonitorInput.input.toBytesRef().bytes)) + assertEquals("Round tripping RemoteMonitorInput doesn't work", myMonitorInput, parsedMyMonitorInput) + } + + @Test + fun `test RemoteMonitorTrigger`() { + val myMonitorTrigger = MyMonitorTrigger(1, "hello", MyMonitorTrigger(2, "world", null)) + val myObjOut = BytesStreamOutput() + myMonitorTrigger.writeTo(myObjOut) + val remoteMonitorTrigger = RemoteMonitorTrigger("id", "name", "1", listOf(), myObjOut.bytes()) + + val xContent = remoteMonitorTrigger.toXContent(JsonXContent.contentBuilder(), ToXContent.EMPTY_PARAMS).string() + val parsedRemoteMonitorTrigger = Trigger.parse(parser(xContent)) as RemoteMonitorTrigger + val parsedMyMonitorTrigger = MyMonitorTrigger(StreamInput.wrap(parsedRemoteMonitorTrigger.trigger.toBytesRef().bytes)) + assertEquals("Round tripping RemoteMonitorTrigger doesn't work", myMonitorTrigger, parsedMyMonitorTrigger) + } + + @Test + fun `test RemoteDocLevelMonitorInput`() { + val myMonitorInput = MyMonitorInput(1, "hello", MyMonitorInput(2, "world", null)) + val myObjOut = BytesStreamOutput() + myMonitorInput.writeTo(myObjOut) + val docLevelMonitorInput = DocLevelMonitorInput( + "test", + listOf("test"), + listOf(randomDocLevelQuery()) + ) + val remoteDocLevelMonitorInput = RemoteDocLevelMonitorInput(myObjOut.bytes(), docLevelMonitorInput) + + val xContent = remoteDocLevelMonitorInput.toXContent(JsonXContent.contentBuilder(), ToXContent.EMPTY_PARAMS).string() + val parsedRemoteDocLevelMonitorInput = RemoteDocLevelMonitorInput.parse(parser(xContent)) + val parsedMyMonitorInput = MyMonitorInput(StreamInput.wrap(parsedRemoteDocLevelMonitorInput.input.toBytesRef().bytes)) + assertEquals("Round tripping RemoteDocLevelMonitorInput doesn't work", myMonitorInput, parsedMyMonitorInput) + val parsedDocLevelMonitorInput = parsedRemoteDocLevelMonitorInput.docLevelMonitorInput + assertEquals("Round tripping RemoteDocLevelMonitorInput doesn't work", docLevelMonitorInput, parsedDocLevelMonitorInput) + } }