Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prototype(worker-tasks): Create pending task store abstraction #77658

Merged
merged 3 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions src/sentry/taskworker/pending_task_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import logging
from collections.abc import Sequence

from django.db.models import Max
from django.utils import timezone

from sentry.taskworker.pending_tasks import PendingTask

logger = logging.getLogger("sentry.taskworker.consumer")


class PendingTaskStore:
def store(self, batch: Sequence[PendingTask]):
# Takes in a batch of pending tasks and stores them in some datastore
from sentry.taskworker.models import PendingTasks

PendingTasks.objects.bulk_create(
[
PendingTasks.from_message(task.proto_task, task.topic, task.partition, task.offset)
for task in batch
]
)

def handle_retry_state_tasks(self) -> None:
from sentry.taskworker.config import taskregistry
from sentry.taskworker.models import PendingTasks

retry_qs = PendingTasks.objects.filter(state=PendingTasks.States.RETRY)
for item in retry_qs:
task_ns = taskregistry.get(item.task_namespace)
task_ns.retry_task(item)
# With retries scheduled, the tasks are complete now.
retry_qs.update(state=PendingTasks.States.COMPLETE)

def handle_deadletter_at(self) -> None:
from sentry.taskworker.models import PendingTasks

# Require a completed task with a higher offset to be present
max_completed_id = (
PendingTasks.objects.filter(state=PendingTasks.States.COMPLETE).aggregate(
max_offset=Max("offset")
)["max_offset"]
or 0
)
expired_qs = PendingTasks.objects.filter(
deadletter_at__lt=timezone.now(),
offset__lt=max_completed_id,
state=PendingTasks.States.PENDING,
)
# Messages that are still pending and exceeded their deadletter_at are failures
updated = expired_qs.update(state=PendingTasks.States.FAILURE)
logger.info("task.deadletter_at", extra={"count": updated})

def handle_processing_deadlines(self) -> None:
from sentry.taskworker.models import PendingTasks

past_deadline = PendingTasks.objects.filter(
processing_deadline__lt=timezone.now(),
).exclude(state=PendingTasks.States.COMPLETE)
to_update = []
for item in past_deadline:
if item.has_retries_remaining():
to_update.append(item.id)

# Move processing deadline tasks back to pending
PendingTasks.objects.filter(id__in=to_update).update(
state=PendingTasks.States.PENDING,
processing_deadline=None,
)
logger.info("task.processingdeadline", extra={"count": len(to_update)})

def handle_failed_tasks(self) -> None:
from sentry.taskworker.models import PendingTasks

failed = PendingTasks.objects.filter(state=PendingTasks.States.FAILURE)
to_discard = []
to_deadletter = []
for item in failed:
if item.discard_after_attempt is not None:
to_discard.append(item.id)
if item.deadletter_after_attempt is not None:
to_deadletter.append(item.id)

# Discard messages are simply acked and never processed again
PendingTasks.objects.filter(id__in=to_discard).update(state=PendingTasks.States.COMPLETE)
logger.info("task.failed.discarded", extra={"count": len(to_discard)})

# TODO do deadletter delivery
PendingTasks.objects.filter(id__in=to_deadletter).update(state=PendingTasks.States.COMPLETE)
logger.info("task.failed.deadletter", extra={"count": len(to_deadletter)})
logger.info("task.failed.deadletter", extra={"count": len(to_deadletter)})
23 changes: 23 additions & 0 deletions src/sentry/taskworker/pending_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from dataclasses import dataclass
from enum import Enum

from sentry_protos.hackweek_team_no_celery_pls.v1alpha.pending_task_pb2 import (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll have to come up with a better name for this 😄

PendingTask as PendingTaskProto,
)


class PendingTaskState(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETE = "complete"
FAILURE = "failure"
RETRY = "retry"


@dataclass
class PendingTask:
proto_task: PendingTaskProto
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Decorating with a 'domain object' is a good path to take. Copying all the values out of the protobuf will be expensive in aggregate.

We might want to add methods/properties to this object that proxy to the proto_task and have the proto_task be a private property.

state: PendingTaskState
topic: str
partition: int
offset: int
102 changes: 16 additions & 86 deletions src/sentry/taskworker/processors/strategy_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
from collections.abc import Mapping, MutableSequence, Sequence
from typing import TYPE_CHECKING, Any
from typing import Any

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies import (
Expand All @@ -17,16 +17,13 @@
from arroyo.processing.strategies.run_task_with_multiprocessing import MultiprocessingPool
from arroyo.types import BaseValue, Commit, Message, Partition
from django.conf import settings
from django.db.models import Max
from django.utils import timezone
from sentry_protos.hackweek_team_no_celery_pls.v1alpha.pending_task_pb2 import (
PendingTask as PendingTaskProto,
)

from sentry.conf.types.kafka_definition import Topic

if TYPE_CHECKING:
from sentry.taskworker.models import PendingTasks
from sentry.taskworker.pending_task_store import PendingTaskStore
from sentry.taskworker.pending_tasks import PendingTask, PendingTaskState

logger = logging.getLogger("sentry.taskworker.consumer")

Expand Down Expand Up @@ -54,6 +51,7 @@ def __init__(
# after this time tasks should be deadlettered if they are followed
# by completed records. Should come from CLI/options
self.max_pending_timeout = 8 * 60
self.pending_task_store = PendingTaskStore()

self.do_imports()

Expand All @@ -64,6 +62,7 @@ def do_imports(self) -> None:
def transform_msg_batch(
self, message: Message[PendingTaskProto | None]
) -> MutableSequence[Mapping[str, Any]]:
# TODO: clean up with create_pending_tasks_batch
transformed_msg_batch: MutableSequence = []
for msg in message.payload:
task = PendingTaskProto()
Expand All @@ -84,90 +83,20 @@ def transform_msg_batch(

def create_pending_tasks_batch(
self, message: Message[MutableSequence[Mapping[str, Any]]]
) -> Sequence[PendingTasks]:
from sentry.taskworker.models import PendingTasks

) -> Sequence[PendingTask]:
transformed_msg_batch = self.transform_msg_batch(message)
pending_tasks = []
for (m, partition, offset) in transformed_msg_batch:
pending_task = PendingTasks.from_message(m, self.topic.value, partition, offset)
for m, partition, offset in transformed_msg_batch:
pending_task = PendingTask(
m, PendingTaskState.PENDING, self.topic.value, partition, offset
)
pending_tasks.append(pending_task)

return pending_tasks

def handle_retry_state_tasks(self) -> None:
from sentry.taskworker.config import taskregistry
from sentry.taskworker.models import PendingTasks

retry_qs = PendingTasks.objects.filter(state=PendingTasks.States.RETRY)
for item in retry_qs:
task_ns = taskregistry.get(item.task_namespace)
task_ns.retry_task(item)
# With retries scheduled, the tasks are complete now.
retry_qs.update(state=PendingTasks.States.COMPLETE)

def handle_deadletter_at(self) -> None:
from sentry.taskworker.models import PendingTasks

# Require a completed task with a higher offset to be present
max_completed_id = (
PendingTasks.objects.filter(state=PendingTasks.States.COMPLETE).aggregate(
max_offset=Max("offset")
)["max_offset"]
or 0
)
expired_qs = PendingTasks.objects.filter(
deadletter_at__lt=timezone.now(),
offset__lt=max_completed_id,
state=PendingTasks.States.PENDING,
)
# Messages that are still pending and exceeded their deadletter_at are failures
updated = expired_qs.update(state=PendingTasks.States.FAILURE)
logger.info("task.deadletter_at", extra={"count": updated})

def handle_processing_deadlines(self) -> None:
from sentry.taskworker.models import PendingTasks

past_deadline = PendingTasks.objects.filter(
processing_deadline__lt=timezone.now(),
).exclude(state=PendingTasks.States.COMPLETE)
to_update = []
for item in past_deadline:
if item.has_retries_remaining():
to_update.append(item.id)

# Move processing deadline tasks back to pending
PendingTasks.objects.filter(id__in=to_update).update(
state=PendingTasks.States.PENDING,
processing_deadline=None,
)
logger.info("task.processingdeadline", extra={"count": len(to_update)})

def handle_failed_tasks(self) -> None:
from sentry.taskworker.models import PendingTasks

failed = PendingTasks.objects.filter(state=PendingTasks.States.FAILURE)
to_discard = []
to_deadletter = []
for item in failed:
if item.discard_after_attempt is not None:
to_discard.append(item.id)
if item.deadletter_after_attempt is not None:
to_deadletter.append(item.id)

# Discard messages are simply acked and never processed again
PendingTasks.objects.filter(id__in=to_discard).update(state=PendingTasks.States.COMPLETE)
logger.info("task.failed.discarded", extra={"count": len(to_discard)})

# TODO do deadletter delivery
PendingTasks.objects.filter(id__in=to_deadletter).update(state=PendingTasks.States.COMPLETE)
logger.info("task.failed.deadletter", extra={"count": len(to_deadletter)})

def create_with_partitions(
self, commit: Commit, partitions: Mapping[Partition, int]
) -> ProcessingStrategy[KafkaPayload]:
from sentry.taskworker.models import PendingTasks

def accumulator(
batched_results: MutableSequence[Mapping[str, Any]],
message: BaseValue[Mapping[str, Any]],
Expand All @@ -180,16 +109,16 @@ def flush_batch(
) -> Message[MutableSequence[Mapping[str, Any]]]:
logger.info("Flushing batch. Messages: %r...", len(message.payload))
batch = self.create_pending_tasks_batch(message)
PendingTasks.objects.bulk_create(batch)
self.pending_task_store.store(batch)
return message

def do_upkeep(
message: Message[MutableSequence[Mapping[str, Any]]]
) -> Message[MutableSequence[Mapping[str, Any]]]:
self.handle_processing_deadlines()
self.handle_retry_state_tasks()
self.handle_deadletter_at()
self.handle_failed_tasks()
self.pending_task_store.handle_processing_deadlines()
self.pending_task_store.handle_retry_state_tasks()
self.pending_task_store.handle_deadletter_at()
self.pending_task_store.handle_failed_tasks()

return message

Expand Down Expand Up @@ -224,3 +153,4 @@ def do_upkeep(

def shutdown(self):
self.pool.close()
self.pool.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why twice?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a mistake, it's fixed now

Loading