-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
prototype(worker-tasks): Create pending task store abstraction #77658
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
import logging | ||
from collections.abc import Sequence | ||
|
||
from django.db.models import Max | ||
from django.utils import timezone | ||
|
||
from sentry.taskworker.pending_tasks import PendingTask | ||
|
||
logger = logging.getLogger("sentry.taskworker.consumer") | ||
|
||
|
||
class PendingTaskStore: | ||
def store(self, batch: Sequence[PendingTask]): | ||
# Takes in a batch of pending tasks and stores them in some datastore | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
PendingTasks.objects.bulk_create( | ||
[ | ||
PendingTasks.from_message(task.proto_task, task.topic, task.partition, task.offset) | ||
for task in batch | ||
] | ||
) | ||
|
||
def handle_retry_state_tasks(self) -> None: | ||
from sentry.taskworker.config import taskregistry | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
retry_qs = PendingTasks.objects.filter(state=PendingTasks.States.RETRY) | ||
for item in retry_qs: | ||
task_ns = taskregistry.get(item.task_namespace) | ||
task_ns.retry_task(item) | ||
# With retries scheduled, the tasks are complete now. | ||
retry_qs.update(state=PendingTasks.States.COMPLETE) | ||
|
||
def handle_deadletter_at(self) -> None: | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
# Require a completed task with a higher offset to be present | ||
max_completed_id = ( | ||
PendingTasks.objects.filter(state=PendingTasks.States.COMPLETE).aggregate( | ||
max_offset=Max("offset") | ||
)["max_offset"] | ||
or 0 | ||
) | ||
expired_qs = PendingTasks.objects.filter( | ||
deadletter_at__lt=timezone.now(), | ||
offset__lt=max_completed_id, | ||
state=PendingTasks.States.PENDING, | ||
) | ||
# Messages that are still pending and exceeded their deadletter_at are failures | ||
updated = expired_qs.update(state=PendingTasks.States.FAILURE) | ||
logger.info("task.deadletter_at", extra={"count": updated}) | ||
|
||
def handle_processing_deadlines(self) -> None: | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
past_deadline = PendingTasks.objects.filter( | ||
processing_deadline__lt=timezone.now(), | ||
).exclude(state=PendingTasks.States.COMPLETE) | ||
to_update = [] | ||
for item in past_deadline: | ||
if item.has_retries_remaining(): | ||
to_update.append(item.id) | ||
|
||
# Move processing deadline tasks back to pending | ||
PendingTasks.objects.filter(id__in=to_update).update( | ||
state=PendingTasks.States.PENDING, | ||
processing_deadline=None, | ||
) | ||
logger.info("task.processingdeadline", extra={"count": len(to_update)}) | ||
|
||
def handle_failed_tasks(self) -> None: | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
failed = PendingTasks.objects.filter(state=PendingTasks.States.FAILURE) | ||
to_discard = [] | ||
to_deadletter = [] | ||
for item in failed: | ||
if item.discard_after_attempt is not None: | ||
to_discard.append(item.id) | ||
if item.deadletter_after_attempt is not None: | ||
to_deadletter.append(item.id) | ||
|
||
# Discard messages are simply acked and never processed again | ||
PendingTasks.objects.filter(id__in=to_discard).update(state=PendingTasks.States.COMPLETE) | ||
logger.info("task.failed.discarded", extra={"count": len(to_discard)}) | ||
|
||
# TODO do deadletter delivery | ||
PendingTasks.objects.filter(id__in=to_deadletter).update(state=PendingTasks.States.COMPLETE) | ||
logger.info("task.failed.deadletter", extra={"count": len(to_deadletter)}) | ||
logger.info("task.failed.deadletter", extra={"count": len(to_deadletter)}) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from dataclasses import dataclass | ||
from enum import Enum | ||
|
||
from sentry_protos.hackweek_team_no_celery_pls.v1alpha.pending_task_pb2 import ( | ||
PendingTask as PendingTaskProto, | ||
) | ||
|
||
|
||
class PendingTaskState(Enum): | ||
PENDING = "pending" | ||
PROCESSING = "processing" | ||
COMPLETE = "complete" | ||
FAILURE = "failure" | ||
RETRY = "retry" | ||
|
||
|
||
@dataclass | ||
class PendingTask: | ||
proto_task: PendingTaskProto | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 Decorating with a 'domain object' is a good path to take. Copying all the values out of the protobuf will be expensive in aggregate. We might want to add methods/properties to this object that proxy to the |
||
state: PendingTaskState | ||
topic: str | ||
partition: int | ||
offset: int |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
|
||
import logging | ||
from collections.abc import Mapping, MutableSequence, Sequence | ||
from typing import TYPE_CHECKING, Any | ||
from typing import Any | ||
|
||
from arroyo.backends.kafka.consumer import KafkaPayload | ||
from arroyo.processing.strategies import ( | ||
|
@@ -17,16 +17,13 @@ | |
from arroyo.processing.strategies.run_task_with_multiprocessing import MultiprocessingPool | ||
from arroyo.types import BaseValue, Commit, Message, Partition | ||
from django.conf import settings | ||
from django.db.models import Max | ||
from django.utils import timezone | ||
from sentry_protos.hackweek_team_no_celery_pls.v1alpha.pending_task_pb2 import ( | ||
PendingTask as PendingTaskProto, | ||
) | ||
|
||
from sentry.conf.types.kafka_definition import Topic | ||
|
||
if TYPE_CHECKING: | ||
from sentry.taskworker.models import PendingTasks | ||
from sentry.taskworker.pending_task_store import PendingTaskStore | ||
from sentry.taskworker.pending_tasks import PendingTask, PendingTaskState | ||
|
||
logger = logging.getLogger("sentry.taskworker.consumer") | ||
|
||
|
@@ -54,6 +51,7 @@ def __init__( | |
# after this time tasks should be deadlettered if they are followed | ||
# by completed records. Should come from CLI/options | ||
self.max_pending_timeout = 8 * 60 | ||
self.pending_task_store = PendingTaskStore() | ||
|
||
self.do_imports() | ||
|
||
|
@@ -64,6 +62,7 @@ def do_imports(self) -> None: | |
def transform_msg_batch( | ||
self, message: Message[PendingTaskProto | None] | ||
) -> MutableSequence[Mapping[str, Any]]: | ||
# TODO: clean up with create_pending_tasks_batch | ||
transformed_msg_batch: MutableSequence = [] | ||
for msg in message.payload: | ||
task = PendingTaskProto() | ||
|
@@ -84,90 +83,20 @@ def transform_msg_batch( | |
|
||
def create_pending_tasks_batch( | ||
self, message: Message[MutableSequence[Mapping[str, Any]]] | ||
) -> Sequence[PendingTasks]: | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
) -> Sequence[PendingTask]: | ||
transformed_msg_batch = self.transform_msg_batch(message) | ||
pending_tasks = [] | ||
for (m, partition, offset) in transformed_msg_batch: | ||
pending_task = PendingTasks.from_message(m, self.topic.value, partition, offset) | ||
for m, partition, offset in transformed_msg_batch: | ||
pending_task = PendingTask( | ||
m, PendingTaskState.PENDING, self.topic.value, partition, offset | ||
) | ||
pending_tasks.append(pending_task) | ||
|
||
return pending_tasks | ||
|
||
def handle_retry_state_tasks(self) -> None: | ||
from sentry.taskworker.config import taskregistry | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
retry_qs = PendingTasks.objects.filter(state=PendingTasks.States.RETRY) | ||
for item in retry_qs: | ||
task_ns = taskregistry.get(item.task_namespace) | ||
task_ns.retry_task(item) | ||
# With retries scheduled, the tasks are complete now. | ||
retry_qs.update(state=PendingTasks.States.COMPLETE) | ||
|
||
def handle_deadletter_at(self) -> None: | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
# Require a completed task with a higher offset to be present | ||
max_completed_id = ( | ||
PendingTasks.objects.filter(state=PendingTasks.States.COMPLETE).aggregate( | ||
max_offset=Max("offset") | ||
)["max_offset"] | ||
or 0 | ||
) | ||
expired_qs = PendingTasks.objects.filter( | ||
deadletter_at__lt=timezone.now(), | ||
offset__lt=max_completed_id, | ||
state=PendingTasks.States.PENDING, | ||
) | ||
# Messages that are still pending and exceeded their deadletter_at are failures | ||
updated = expired_qs.update(state=PendingTasks.States.FAILURE) | ||
logger.info("task.deadletter_at", extra={"count": updated}) | ||
|
||
def handle_processing_deadlines(self) -> None: | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
past_deadline = PendingTasks.objects.filter( | ||
processing_deadline__lt=timezone.now(), | ||
).exclude(state=PendingTasks.States.COMPLETE) | ||
to_update = [] | ||
for item in past_deadline: | ||
if item.has_retries_remaining(): | ||
to_update.append(item.id) | ||
|
||
# Move processing deadline tasks back to pending | ||
PendingTasks.objects.filter(id__in=to_update).update( | ||
state=PendingTasks.States.PENDING, | ||
processing_deadline=None, | ||
) | ||
logger.info("task.processingdeadline", extra={"count": len(to_update)}) | ||
|
||
def handle_failed_tasks(self) -> None: | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
failed = PendingTasks.objects.filter(state=PendingTasks.States.FAILURE) | ||
to_discard = [] | ||
to_deadletter = [] | ||
for item in failed: | ||
if item.discard_after_attempt is not None: | ||
to_discard.append(item.id) | ||
if item.deadletter_after_attempt is not None: | ||
to_deadletter.append(item.id) | ||
|
||
# Discard messages are simply acked and never processed again | ||
PendingTasks.objects.filter(id__in=to_discard).update(state=PendingTasks.States.COMPLETE) | ||
logger.info("task.failed.discarded", extra={"count": len(to_discard)}) | ||
|
||
# TODO do deadletter delivery | ||
PendingTasks.objects.filter(id__in=to_deadletter).update(state=PendingTasks.States.COMPLETE) | ||
logger.info("task.failed.deadletter", extra={"count": len(to_deadletter)}) | ||
|
||
def create_with_partitions( | ||
self, commit: Commit, partitions: Mapping[Partition, int] | ||
) -> ProcessingStrategy[KafkaPayload]: | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
def accumulator( | ||
batched_results: MutableSequence[Mapping[str, Any]], | ||
message: BaseValue[Mapping[str, Any]], | ||
|
@@ -180,16 +109,16 @@ def flush_batch( | |
) -> Message[MutableSequence[Mapping[str, Any]]]: | ||
logger.info("Flushing batch. Messages: %r...", len(message.payload)) | ||
batch = self.create_pending_tasks_batch(message) | ||
PendingTasks.objects.bulk_create(batch) | ||
self.pending_task_store.store(batch) | ||
return message | ||
|
||
def do_upkeep( | ||
message: Message[MutableSequence[Mapping[str, Any]]] | ||
) -> Message[MutableSequence[Mapping[str, Any]]]: | ||
self.handle_processing_deadlines() | ||
self.handle_retry_state_tasks() | ||
self.handle_deadletter_at() | ||
self.handle_failed_tasks() | ||
self.pending_task_store.handle_processing_deadlines() | ||
self.pending_task_store.handle_retry_state_tasks() | ||
self.pending_task_store.handle_deadletter_at() | ||
self.pending_task_store.handle_failed_tasks() | ||
|
||
return message | ||
|
||
|
@@ -224,3 +153,4 @@ def do_upkeep( | |
|
||
def shutdown(self): | ||
self.pool.close() | ||
self.pool.close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why twice? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a mistake, it's fixed now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll have to come up with a better name for this 😄