Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(hybridcloud) Move outbox tasks to sentry.hybridcloud #75739

Merged
merged 3 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,8 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.data_export.tasks",
"sentry.discover.tasks",
"sentry.hybridcloud.tasks.deliver_webhooks",
"sentry.hybridcloud.tasks.backfill_outboxes",
"sentry.hybridcloud.tasks.deliver_from_outbox",
Comment on lines +740 to +741
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tasks didn't have their names changed, so updating CELERY_IMPORTS should be all that is necessary for tasks to continue executing without interruption.

"sentry.incidents.tasks",
"sentry.integrations.github.tasks",
"sentry.integrations.github.tasks.pr_comment",
Expand All @@ -752,7 +754,6 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.tasks.auth",
"sentry.tasks.auto_remove_inbox",
"sentry.tasks.auto_resolve_issues",
"sentry.tasks.backfill_outboxes",
"sentry.tasks.embeddings_grouping.backfill_seer_grouping_records_for_project",
"sentry.tasks.beacon",
"sentry.tasks.check_auth",
Expand All @@ -768,7 +769,6 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
"sentry.tasks.deletion.scheduled",
"sentry.tasks.deletion.groups",
"sentry.tasks.deletion.hybrid_cloud",
"sentry.tasks.deliver_from_outbox",
"sentry.tasks.digests",
"sentry.tasks.email",
"sentry.tasks.files",
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/hybridcloud/outbox/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ def handle_async_replication(self, region_name: str, shard_identifier: int) -> N
def run_outbox_replications_for_self_hosted(*args: Any, **kwds: Any) -> None:
from django.conf import settings

from sentry.hybridcloud.tasks.backfill_outboxes import backfill_outboxes_for
from sentry.models.outbox import OutboxBase
from sentry.tasks.backfill_outboxes import backfill_outboxes_for

if not settings.SENTRY_SELF_HOSTED:
return
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/hybridcloud/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .backfill_outboxes import * # noqa
from .deliver_from_outbox import * # noqa
from .deliver_webhooks import * # noqa
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def set_processing_state(table_name: str, value: int, version: int) -> None:

def find_replication_version(
model: type[ControlOutboxProducingModel] | type[RegionOutboxProducingModel] | type[User],
force_synchronous=False,
force_synchronous: bool = False,
) -> int:
"""
:param model: Model for finding the current replication version
Expand All @@ -91,7 +91,7 @@ def _chunk_processing_batch(
model: type[ControlOutboxProducingModel] | type[RegionOutboxProducingModel] | type[User],
*,
batch_size: int,
force_synchronous=False,
force_synchronous: bool = False,
) -> BackfillBatch | None:
lower, version = get_processing_state(model._meta.db_table)
target_version = find_replication_version(model, force_synchronous=force_synchronous)
Expand All @@ -112,7 +112,7 @@ def _chunk_processing_batch(


def process_outbox_backfill_batch(
model: type[Model], batch_size: int, force_synchronous=False
model: type[Model], batch_size: int, force_synchronous: bool = False
) -> BackfillBatch | None:
if (
not issubclass(model, RegionOutboxProducingModel)
Expand Down Expand Up @@ -152,7 +152,7 @@ def backfill_outboxes_for(
silo_mode: SiloMode,
scheduled_count: int = 0,
max_batch_rate: int = OUTBOX_BACKFILLS_PER_MINUTE,
force_synchronous=False,
force_synchronous: bool = False,
) -> bool:
# Maintain a steady state of outbox processing by subtracting any regularly scheduled rows
# from an expected rate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from django.conf import settings
from django.db.models import Max, Min

from sentry.hybridcloud.tasks.backfill_outboxes import backfill_outboxes_for
from sentry.models.outbox import ControlOutboxBase, OutboxBase, OutboxFlushError, RegionOutboxBase
from sentry.silo.base import SiloMode
from sentry.tasks.backfill_outboxes import backfill_outboxes_for
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics
from sentry.utils.env import in_test_environment
Expand Down Expand Up @@ -59,7 +59,7 @@ def schedule_batch(
silo_mode: SiloMode,
drain_task: Task,
concurrency: int | None = None,
process_outbox_backfills=True,
process_outbox_backfills: bool = True,
) -> None:
scheduled_count = 0

Expand Down
5 changes: 4 additions & 1 deletion src/sentry/testutils/outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
from django.core.handlers.wsgi import WSGIRequest

from sentry.hybridcloud.models.webhookpayload import THE_PAST, WebhookPayload
from sentry.hybridcloud.tasks.deliver_from_outbox import (
enqueue_outbox_jobs,
enqueue_outbox_jobs_control,
)
from sentry.models.outbox import OutboxBase
from sentry.silo.base import SiloMode
from sentry.tasks.deliver_from_outbox import enqueue_outbox_jobs, enqueue_outbox_jobs_control
from sentry.testutils.silo import assume_test_silo_mode


Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from collections.abc import Callable
from typing import Any
from unittest.mock import patch

from django.apps import apps
from django.test.utils import override_settings

from sentry.db.models import BaseModel
from sentry.hybridcloud.outbox.base import run_outbox_replications_for_self_hosted
from sentry.hybridcloud.tasks.backfill_outboxes import (
backfill_outboxes_for,
get_backfill_key,
get_processing_state,
process_outbox_backfill_batch,
)
from sentry.models.authidentity import AuthIdentity
from sentry.models.authidentityreplica import AuthIdentityReplica
from sentry.models.authprovider import AuthProvider
Expand All @@ -13,12 +21,6 @@
from sentry.models.organizationmapping import OrganizationMapping
from sentry.models.outbox import ControlOutbox, RegionOutbox, outbox_context
from sentry.silo.base import SiloMode
from sentry.tasks.backfill_outboxes import (
backfill_outboxes_for,
get_backfill_key,
get_processing_state,
process_outbox_backfill_batch,
)
from sentry.testutils.factories import Factories
from sentry.testutils.helpers import override_options
from sentry.testutils.outbox import outbox_runner
Expand All @@ -27,7 +29,7 @@
from sentry.utils import redis


def reset_processing_state():
def reset_processing_state() -> None:
with redis.clusters.get("default").get_local_client_for_key("backfill_outboxes") as client:
for app_models in apps.all_models.values():
for model in app_models.values():
Expand All @@ -36,7 +38,7 @@ def reset_processing_state():

@django_db_all
@no_silo_test
def test_processing_awaits_options():
def test_processing_awaits_options() -> None:
reset_processing_state()
org = Factories.create_organization()
with outbox_context(flush=False):
Expand All @@ -60,7 +62,7 @@ def test_processing_awaits_options():


@django_db_all
def test_region_processing(task_runner):
def test_region_processing(task_runner: Callable[..., Any]) -> None:
with outbox_context(flush=False):
for i in range(5):
Factories.create_organization()
Expand All @@ -78,7 +80,7 @@ def test_region_processing(task_runner):

@django_db_all
@control_silo_test
def test_control_processing(task_runner):
def test_control_processing(task_runner: Callable[..., Any]) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any type exemptions on these files that we need to clear now that we're explicitly typing things?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In their new location they get opted into our standard mypy rules which led to additional types being added.

reset_processing_state()

org = Factories.create_organization()
Expand All @@ -96,7 +98,7 @@ def test_control_processing(task_runner):
assert not AuthProviderReplica.objects.filter(auth_provider_id=ap.id).exists()
assert not AuthIdentityReplica.objects.filter(auth_provider_id=ap.id).exists()

def run_for_model(model: type[BaseModel]):
def run_for_model(model: type[BaseModel]) -> None:
while True:
if process_outbox_backfill_batch(model, 1, force_synchronous=True) is None:
break
Expand Down Expand Up @@ -159,7 +161,7 @@ def run_for_model(model: type[BaseModel]):

@django_db_all
@no_silo_test
def test_run_outbox_replications_for_self_hosted():
def test_run_outbox_replications_for_self_hosted() -> None:
reset_processing_state()

with outbox_context(flush=False):
Expand Down
2 changes: 1 addition & 1 deletion tests/sentry/models/test_outbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
from pytest import raises

from sentry.hybridcloud.outbox.category import OutboxCategory, OutboxScope
from sentry.hybridcloud.tasks.deliver_from_outbox import enqueue_outbox_jobs
from sentry.models.organization import Organization
from sentry.models.organizationmember import OrganizationMember
from sentry.models.organizationmemberteam import OrganizationMemberTeam
from sentry.models.organizationmemberteamreplica import OrganizationMemberTeamReplica
from sentry.models.outbox import ControlOutbox, OutboxFlushError, RegionOutbox, outbox_context
from sentry.models.user import User
from sentry.silo.base import SiloMode
from sentry.tasks.deliver_from_outbox import enqueue_outbox_jobs
from sentry.testutils.cases import TestCase, TransactionTestCase
from sentry.testutils.factories import Factories
from sentry.testutils.helpers.datetime import freeze_time
Expand Down
Loading