Skip to content

Commit

Permalink
chore(hybridcloud) Move outbox tasks to sentry.hybridcloud (#75739)
Browse files Browse the repository at this point in the history
Consolidate more outbox subsystem logic into sentry.hybridcloud.
  • Loading branch information
markstory authored Aug 9, 2024
1 parent 3630a24 commit 67339b2
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 23 deletions.
4 changes: 2 additions & 2 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,8 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.data_export.tasks",
"sentry.discover.tasks",
"sentry.hybridcloud.tasks.deliver_webhooks",
"sentry.hybridcloud.tasks.backfill_outboxes",
"sentry.hybridcloud.tasks.deliver_from_outbox",
"sentry.incidents.tasks",
"sentry.integrations.github.tasks",
"sentry.integrations.github.tasks.pr_comment",
Expand All @@ -752,7 +754,6 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.tasks.auth",
"sentry.tasks.auto_remove_inbox",
"sentry.tasks.auto_resolve_issues",
"sentry.tasks.backfill_outboxes",
"sentry.tasks.embeddings_grouping.backfill_seer_grouping_records_for_project",
"sentry.tasks.beacon",
"sentry.tasks.check_auth",
Expand All @@ -768,7 +769,6 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.tasks.deletion.scheduled",
"sentry.tasks.deletion.groups",
"sentry.tasks.deletion.hybrid_cloud",
"sentry.tasks.deliver_from_outbox",
"sentry.tasks.digests",
"sentry.tasks.email",
"sentry.tasks.files",
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/hybridcloud/outbox/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ def handle_async_replication(self, region_name: str, shard_identifier: int) -> N
def run_outbox_replications_for_self_hosted(*args: Any, **kwds: Any) -> None:
from django.conf import settings

from sentry.hybridcloud.tasks.backfill_outboxes import backfill_outboxes_for
from sentry.models.outbox import OutboxBase
from sentry.tasks.backfill_outboxes import backfill_outboxes_for

if not settings.SENTRY_SELF_HOSTED:
return
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/hybridcloud/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .backfill_outboxes import * # noqa
from .deliver_from_outbox import * # noqa
from .deliver_webhooks import * # noqa
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def set_processing_state(table_name: str, value: int, version: int) -> None:

def find_replication_version(
model: type[ControlOutboxProducingModel] | type[RegionOutboxProducingModel] | type[User],
force_synchronous=False,
force_synchronous: bool = False,
) -> int:
"""
:param model: Model for finding the current replication version
Expand All @@ -91,7 +91,7 @@ def _chunk_processing_batch(
model: type[ControlOutboxProducingModel] | type[RegionOutboxProducingModel] | type[User],
*,
batch_size: int,
force_synchronous=False,
force_synchronous: bool = False,
) -> BackfillBatch | None:
lower, version = get_processing_state(model._meta.db_table)
target_version = find_replication_version(model, force_synchronous=force_synchronous)
Expand All @@ -112,7 +112,7 @@ def _chunk_processing_batch(


def process_outbox_backfill_batch(
model: type[Model], batch_size: int, force_synchronous=False
model: type[Model], batch_size: int, force_synchronous: bool = False
) -> BackfillBatch | None:
if (
not issubclass(model, RegionOutboxProducingModel)
Expand Down Expand Up @@ -152,7 +152,7 @@ def backfill_outboxes_for(
silo_mode: SiloMode,
scheduled_count: int = 0,
max_batch_rate: int = OUTBOX_BACKFILLS_PER_MINUTE,
force_synchronous=False,
force_synchronous: bool = False,
) -> bool:
# Maintain a steady state of outbox processing by subtracting any regularly scheduled rows
# from an expected rate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from django.conf import settings
from django.db.models import Max, Min

from sentry.hybridcloud.tasks.backfill_outboxes import backfill_outboxes_for
from sentry.models.outbox import ControlOutboxBase, OutboxBase, OutboxFlushError, RegionOutboxBase
from sentry.silo.base import SiloMode
from sentry.tasks.backfill_outboxes import backfill_outboxes_for
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics
from sentry.utils.env import in_test_environment
Expand Down Expand Up @@ -59,7 +59,7 @@ def schedule_batch(
silo_mode: SiloMode,
drain_task: Task,
concurrency: int | None = None,
process_outbox_backfills=True,
process_outbox_backfills: bool = True,
) -> None:
scheduled_count = 0

Expand Down
5 changes: 4 additions & 1 deletion src/sentry/testutils/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
from django.core.handlers.wsgi import WSGIRequest

from sentry.hybridcloud.models.webhookpayload import THE_PAST, WebhookPayload
from sentry.hybridcloud.tasks.deliver_from_outbox import (
enqueue_outbox_jobs,
enqueue_outbox_jobs_control,
)
from sentry.models.outbox import OutboxBase
from sentry.silo.base import SiloMode
from sentry.tasks.deliver_from_outbox import enqueue_outbox_jobs, enqueue_outbox_jobs_control
from sentry.testutils.silo import assume_test_silo_mode


Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from collections.abc import Callable
from typing import Any
from unittest.mock import patch

from django.apps import apps
from django.test.utils import override_settings

from sentry.db.models import BaseModel
from sentry.hybridcloud.outbox.base import run_outbox_replications_for_self_hosted
from sentry.hybridcloud.tasks.backfill_outboxes import (
backfill_outboxes_for,
get_backfill_key,
get_processing_state,
process_outbox_backfill_batch,
)
from sentry.models.authidentity import AuthIdentity
from sentry.models.authidentityreplica import AuthIdentityReplica
from sentry.models.authprovider import AuthProvider
Expand All @@ -13,12 +21,6 @@
from sentry.models.organizationmapping import OrganizationMapping
from sentry.models.outbox import ControlOutbox, RegionOutbox, outbox_context
from sentry.silo.base import SiloMode
from sentry.tasks.backfill_outboxes import (
backfill_outboxes_for,
get_backfill_key,
get_processing_state,
process_outbox_backfill_batch,
)
from sentry.testutils.factories import Factories
from sentry.testutils.helpers import override_options
from sentry.testutils.outbox import outbox_runner
Expand All @@ -27,7 +29,7 @@
from sentry.utils import redis


def reset_processing_state():
def reset_processing_state() -> None:
with redis.clusters.get("default").get_local_client_for_key("backfill_outboxes") as client:
for app_models in apps.all_models.values():
for model in app_models.values():
Expand All @@ -36,7 +38,7 @@ def reset_processing_state():

@django_db_all
@no_silo_test
def test_processing_awaits_options():
def test_processing_awaits_options() -> None:
reset_processing_state()
org = Factories.create_organization()
with outbox_context(flush=False):
Expand All @@ -60,7 +62,7 @@ def test_processing_awaits_options():


@django_db_all
def test_region_processing(task_runner):
def test_region_processing(task_runner: Callable[..., Any]) -> None:
with outbox_context(flush=False):
for i in range(5):
Factories.create_organization()
Expand All @@ -78,7 +80,7 @@ def test_region_processing(task_runner):

@django_db_all
@control_silo_test
def test_control_processing(task_runner):
def test_control_processing(task_runner: Callable[..., Any]) -> None:
reset_processing_state()

org = Factories.create_organization()
Expand All @@ -96,7 +98,7 @@ def test_control_processing(task_runner):
assert not AuthProviderReplica.objects.filter(auth_provider_id=ap.id).exists()
assert not AuthIdentityReplica.objects.filter(auth_provider_id=ap.id).exists()

def run_for_model(model: type[BaseModel]):
def run_for_model(model: type[BaseModel]) -> None:
while True:
if process_outbox_backfill_batch(model, 1, force_synchronous=True) is None:
break
Expand Down Expand Up @@ -159,7 +161,7 @@ def run_for_model(model: type[BaseModel]):

@django_db_all
@no_silo_test
def test_run_outbox_replications_for_self_hosted():
def test_run_outbox_replications_for_self_hosted() -> None:
reset_processing_state()

with outbox_context(flush=False):
Expand Down
2 changes: 1 addition & 1 deletion tests/sentry/models/test_outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
from pytest import raises

from sentry.hybridcloud.outbox.category import OutboxCategory, OutboxScope
from sentry.hybridcloud.tasks.deliver_from_outbox import enqueue_outbox_jobs
from sentry.models.organization import Organization
from sentry.models.organizationmember import OrganizationMember
from sentry.models.organizationmemberteam import OrganizationMemberTeam
from sentry.models.organizationmemberteamreplica import OrganizationMemberTeamReplica
from sentry.models.outbox import ControlOutbox, OutboxFlushError, RegionOutbox, outbox_context
from sentry.models.user import User
from sentry.silo.base import SiloMode
from sentry.tasks.deliver_from_outbox import enqueue_outbox_jobs
from sentry.testutils.cases import TestCase, TransactionTestCase
from sentry.testutils.factories import Factories
from sentry.testutils.helpers.datetime import freeze_time
Expand Down

0 comments on commit 67339b2

Please sign in to comment.