Skip to content

Commit

Permalink
ref(alerts): Update Snuba queries to match events-stats more closely (#…
Browse files Browse the repository at this point in the history
…77755)

When a user creates an anomaly detection alert we need to query snuba
for 28 days worth of historical data to send to Seer to calculate the
anomalies. Originally (#74614)
I'd tried to pull out the relevant parts of the `events-stats` endpoint
to mimic the data we see populated in metric alert preview charts (but
for a larger time period, and it's happening after the rule is saved so
I can't use any of the `request` object stuff) but I think I missed some
things, so this PR aims to make that data be the same.

Closes https://getsentry.atlassian.net/browse/ALRT-288 (hopefully)
  • Loading branch information
ceorourke authored Sep 24, 2024
1 parent 827b22e commit 583a084
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 94 deletions.
40 changes: 22 additions & 18 deletions src/sentry/api/bases/organization_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@
from sentry.utils.snuba import MAX_FIELDS, SnubaTSResult


def get_query_columns(columns, rollup):
"""
Backwards compatibility for incidents which uses the old
column aliases as it straddles both versions of events/discover.
We will need these aliases until discover2 flags are enabled for all users.
We need these rollup columns to generate correct events-stats results
"""
column_map = {
"user_count": "count_unique(user)",
"event_count": "count()",
"epm()": "epm(%d)" % rollup,
"eps()": "eps(%d)" % rollup,
"tpm()": "tpm(%d)" % rollup,
"tps()": "tps(%d)" % rollup,
"sps()": "sps(%d)" % rollup,
"spm()": "spm(%d)" % rollup,
}

return [column_map.get(column, column) for column in columns]


def resolve_axis_column(column: str, index: int = 0) -> str:
return get_function_alias(column) if not is_equation(column) else f"equation[{index}]"

Expand Down Expand Up @@ -438,30 +459,13 @@ def get_event_stats_data(
date_range = snuba_params.date_range
stats_period = parse_stats_period(get_interval_from_range(date_range, False))
rollup = int(stats_period.total_seconds()) if stats_period is not None else 3600

if comparison_delta is not None:
retention = quotas.get_event_retention(organization=organization)
comparison_start = snuba_params.start_date - comparison_delta
if retention and comparison_start < timezone.now() - timedelta(days=retention):
raise ValidationError("Comparison period is outside your retention window")

# Backwards compatibility for incidents which uses the old
# column aliases as it straddles both versions of events/discover.
# We will need these aliases until discover2 flags are enabled for all
# users.
# We need these rollup columns to generate correct events-stats results
column_map = {
"user_count": "count_unique(user)",
"event_count": "count()",
"epm()": "epm(%d)" % rollup,
"eps()": "eps(%d)" % rollup,
"tpm()": "tpm(%d)" % rollup,
"tps()": "tps(%d)" % rollup,
"sps()": "sps(%d)" % rollup,
"spm()": "spm(%d)" % rollup,
}

query_columns = [column_map.get(column, column) for column in columns]
query_columns = get_query_columns(columns, rollup)
with sentry_sdk.start_span(op="discover.endpoint", description="base.stats_query"):
result = get_event_stats(
query_columns, query, snuba_params, rollup, zerofill_results, comparison_delta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ def get(self, request: Request, organization: Organization, alert_rule: AlertRul
status=400,
)

anomalies = get_historical_anomaly_data_from_seer(alert_rule, project, start, end)
anomalies = get_historical_anomaly_data_from_seer(
alert_rule=alert_rule, project=project, start_string=start, end_string=end
)
# NOTE: returns None if there's a problem with the Seer response
if anomalies is None:
return Response("Unable to get historical anomaly data", status=400)
Expand Down
16 changes: 14 additions & 2 deletions src/sentry/seer/anomaly_detection/get_historical_anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from django.conf import settings
from urllib3.exceptions import MaxRetryError, TimeoutError

from sentry.api.bases.organization_events import get_query_columns
from sentry.conf.server import SEER_ANOMALY_DETECTION_ENDPOINT_URL
from sentry.incidents.models.alert_rule import AlertRule, AlertRuleStatus
from sentry.models.project import Project
Expand Down Expand Up @@ -172,8 +173,14 @@ def get_historical_anomaly_data_from_seer(
window_min = int(snuba_query.time_window / 60)
start = datetime.fromisoformat(start_string)
end = datetime.fromisoformat(end_string)
query_columns = get_query_columns([snuba_query.aggregate], snuba_query.time_window)
historical_data = fetch_historical_data(
alert_rule=alert_rule, snuba_query=snuba_query, project=project, start=start, end=end
alert_rule=alert_rule,
snuba_query=snuba_query,
query_columns=query_columns,
project=project,
start=start,
end=end,
)

if not historical_data:
Expand All @@ -188,7 +195,12 @@ def get_historical_anomaly_data_from_seer(
},
)
return None
formatted_data = format_historical_data(historical_data, dataset)
formatted_data = format_historical_data(
data=historical_data,
query_columns=query_columns,
dataset=dataset,
organization=project.organization,
)
if (
not alert_rule.sensitivity
or not alert_rule.seasonality
Expand Down
13 changes: 11 additions & 2 deletions src/sentry/seer/anomaly_detection/store_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from parsimonious.exceptions import ParseError
from urllib3.exceptions import MaxRetryError, TimeoutError

from sentry.api.bases.organization_events import get_query_columns
from sentry.conf.server import SEER_ANOMALY_DETECTION_STORE_DATA_URL
from sentry.incidents.models.alert_rule import AlertRule, AlertRuleStatus
from sentry.models.project import Project
Expand Down Expand Up @@ -63,12 +64,20 @@ def send_historical_data_to_seer(alert_rule: AlertRule, project: Project) -> Ale
snuba_query = SnubaQuery.objects.get(id=alert_rule.snuba_query_id)
window_min = int(snuba_query.time_window / 60)
dataset = get_dataset(snuba_query.dataset)
historical_data = fetch_historical_data(alert_rule, snuba_query, project)
query_columns = get_query_columns([snuba_query.aggregate], snuba_query.time_window)
historical_data = fetch_historical_data(
alert_rule=alert_rule, snuba_query=snuba_query, query_columns=query_columns, project=project
)

if not historical_data:
raise ValidationError("No historical data available.")

formatted_data = format_historical_data(historical_data, dataset)
formatted_data = format_historical_data(
data=historical_data,
query_columns=query_columns,
dataset=dataset,
organization=project.organization,
)
if not formatted_data:
raise ValidationError("Unable to get historical data for this alert.")

Expand Down
154 changes: 103 additions & 51 deletions src/sentry/seer/anomaly_detection/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@
from django.utils.datastructures import MultiValueDict

from sentry import release_health
from sentry.api.bases.organization_events import resolve_axis_column
from sentry.api.serializers.snuba import SnubaTSResultSerializer
from sentry.incidents.models.alert_rule import AlertRule, AlertRuleThresholdType
from sentry.models.organization import Organization
from sentry.models.project import Project
from sentry.search.events.types import SnubaParams
from sentry.seer.anomaly_detection.types import TimeSeriesPoint
from sentry.snuba import metrics_performance
from sentry.snuba.models import SnubaQuery
from sentry.snuba.metrics.extraction import MetricSpecType
from sentry.snuba.models import SnubaQuery, SnubaQueryEventType
from sentry.snuba.referrer import Referrer
from sentry.snuba.sessions_v2 import QueryDefinition
from sentry.snuba.utils import get_dataset
from sentry.utils.snuba import SnubaTSResult

NUM_DAYS = 28


def translate_direction(direction: int) -> str:
"""
Expand All @@ -30,7 +35,34 @@ def translate_direction(direction: int) -> str:
return direction_map[AlertRuleThresholdType(direction)]


NUM_DAYS = 28
def get_snuba_query_string(snuba_query: SnubaQuery) -> str:
"""
Generate a query string that matches what the OrganizationEventsStatsEndpoint does
"""
SNUBA_QUERY_EVENT_TYPE_TO_STRING = {
SnubaQueryEventType.EventType.ERROR: "error",
SnubaQueryEventType.EventType.DEFAULT: "default",
SnubaQueryEventType.EventType.TRANSACTION: "transaction",
}

if len(snuba_query.event_types) > 1:
# e.g. (is:unresolved) AND (event.type:[error, default])
event_types_list = [
SNUBA_QUERY_EVENT_TYPE_TO_STRING[event_type] for event_type in snuba_query.event_types
]
event_types_string = "(event.type:["
event_types_string += ", ".join(event_types_list)
event_types_string += "])"
else:
# e.g. (is:unresolved) AND (event.type:error)
snuba_query_event_type_string = SNUBA_QUERY_EVENT_TYPE_TO_STRING[snuba_query.event_types[0]]
event_types_string = f"(event.type:{snuba_query_event_type_string})"
if snuba_query.query:
snuba_query_string = f"({snuba_query.query}) AND {event_types_string}"
else:
snuba_query_string = event_types_string

return snuba_query_string


def get_crash_free_historical_data(
Expand Down Expand Up @@ -75,55 +107,63 @@ def get_crash_free_historical_data(
)


def format_historical_data(data: SnubaTSResult, dataset: Any) -> list[TimeSeriesPoint]:
"""
Format Snuba data into the format the Seer API expects.
For errors/transactions data:
If there are no results, it's just the timestamp
{'time': 1719012000}, {'time': 1719018000}, {'time': 1719024000}
def format_crash_free_data(data: SnubaTSResult) -> list[TimeSeriesPoint]:
formatted_data: list[TimeSeriesPoint] = []

If there are results, the aggregate is added
{'time': 1721300400, 'count': 2}
nested_data = data.data.get("data", [])
groups = nested_data.get("groups")
if not len(groups):
return formatted_data
series = groups[0].get("series")

for time, count in zip(nested_data.get("intervals"), series.get("sum(session)", 0)):
date = datetime.strptime(time, "%Y-%m-%dT%H:%M:%SZ")
ts_point = TimeSeriesPoint(timestamp=date.timestamp(), value=count)
formatted_data.append(ts_point)
return formatted_data

For metrics_performance dataset/sessions data:
The count is stored separately from the timestamps, if there is no data the count is 0
"""

def format_snuba_ts_data(
data: SnubaTSResult, query_columns: list[str], organization: Organization
) -> list[TimeSeriesPoint]:
formatted_data: list[TimeSeriesPoint] = []
nested_data = data.data.get("data", [])

if dataset == metrics_performance:
groups = nested_data.get("groups")
if not len(groups):
return formatted_data
series = groups[0].get("series")

for time, count in zip(nested_data.get("intervals"), series.get("sum(session)", 0)):
date = datetime.strptime(time, "%Y-%m-%dT%H:%M:%SZ")
ts_point = TimeSeriesPoint(timestamp=date.timestamp(), value=count)
formatted_data.append(ts_point)
else:
# we don't know what the aggregation key of the query is
# so we should see it when we see a data point that has a value
agg_key = ""
for datum in nested_data:
if len(datum) == 1:
# this data point has no value
ts_point = TimeSeriesPoint(timestamp=datum.get("time"), value=0)
else:
# if we don't know the aggregation key yet, we should set it
if not agg_key:
for key in datum: # only two keys in this dict
if key != "time":
agg_key = key
break
ts_point = TimeSeriesPoint(timestamp=datum.get("time"), value=datum.get(agg_key, 0))
serializer = SnubaTSResultSerializer(organization=organization, lookup=None, user=None)
serialized_result = serializer.serialize(
data,
resolve_axis_column(query_columns[0]),
allow_partial_buckets=False,
zerofill_results=False,
extra_columns=None,
)

for data in serialized_result.get("data"):
if len(data) > 1:
count_data = data[1]
count = 0
if len(count_data):
count = count_data[0].get("count", 0)
ts_point = TimeSeriesPoint(timestamp=data[0], value=count)
formatted_data.append(ts_point)
return formatted_data


def format_historical_data(
data: SnubaTSResult, query_columns: list[str], dataset: Any, organization: Organization
) -> list[TimeSeriesPoint]:
"""
Format Snuba data into the format the Seer API expects.
"""
if dataset == metrics_performance:
return format_crash_free_data(data)

return format_snuba_ts_data(data, query_columns, organization)


def fetch_historical_data(
alert_rule: AlertRule,
snuba_query: SnubaQuery,
query_columns: list[str],
project: Project,
start: datetime | None = None,
end: datetime | None = None,
Expand All @@ -143,37 +183,49 @@ def fetch_historical_data(
granularity = snuba_query.time_window

dataset_label = snuba_query.dataset

if dataset_label == "events":
# DATASET_OPTIONS expects the name 'errors'
dataset_label = "errors"
elif dataset_label == "generic_metrics":
dataset_label = "transactions"
elif dataset_label in ["generic_metrics", "transactions"]:
# XXX: performance alerts dataset differs locally vs in prod
dataset_label = "discover"
dataset = get_dataset(dataset_label)

if not project or not dataset or not alert_rule.organization:
return None

environments = []
if snuba_query.environment:
environments = [snuba_query.environment]

snuba_params = SnubaParams(
organization=alert_rule.organization,
projects=[project],
start=start,
end=end,
stats_period=None,
environments=environments,
)

if dataset == metrics_performance:
return get_crash_free_historical_data(
start, end, project, alert_rule.organization, granularity
)

else:
snuba_query_string = get_snuba_query_string(snuba_query)
historical_data = dataset.timeseries_query(
selected_columns=[snuba_query.aggregate],
query=snuba_query.query,
snuba_params=SnubaParams(
organization=alert_rule.organization,
projects=[project],
start=start,
end=end,
),
selected_columns=query_columns,
query=snuba_query_string,
snuba_params=snuba_params,
rollup=granularity,
referrer=(
Referrer.ANOMALY_DETECTION_HISTORICAL_DATA_QUERY.value
if is_store_data_request
else Referrer.ANOMALY_DETECTION_RETURN_HISTORICAL_ANOMALIES.value
),
zerofill_results=True,
allow_metric_aggregates=True,
on_demand_metrics_type=MetricSpecType.SIMPLE_QUERY,
)
return historical_data
Loading

0 comments on commit 583a084

Please sign in to comment.