-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
prototype(worker-tasks): Create pending task store abstraction #77658
Merged
+139
−129
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
import logging | ||
from collections.abc import Sequence | ||
|
||
from django.db.models import Max | ||
from django.utils import timezone | ||
|
||
from sentry.taskworker.pending_tasks import PendingTask | ||
|
||
logger = logging.getLogger("sentry.taskworker.consumer") | ||
|
||
|
||
class PendingTaskStore: | ||
def store(self, batch: Sequence[PendingTask]): | ||
# Takes in a batch of pending tasks and stores them in some datastore | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
PendingTasks.objects.bulk_create( | ||
[ | ||
PendingTasks.from_message(task.proto_task, task.topic, task.partition, task.offset) | ||
for task in batch | ||
] | ||
) | ||
|
||
def handle_retry_state_tasks(self) -> None: | ||
from sentry.taskworker.config import taskregistry | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
retry_qs = PendingTasks.objects.filter(state=PendingTasks.States.RETRY) | ||
for item in retry_qs: | ||
task_ns = taskregistry.get(item.task_namespace) | ||
task_ns.retry_task(item) | ||
# With retries scheduled, the tasks are complete now. | ||
retry_qs.update(state=PendingTasks.States.COMPLETE) | ||
|
||
def handle_deadletter_at(self) -> None: | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
# Require a completed task with a higher offset to be present | ||
max_completed_id = ( | ||
PendingTasks.objects.filter(state=PendingTasks.States.COMPLETE).aggregate( | ||
max_offset=Max("offset") | ||
)["max_offset"] | ||
or 0 | ||
) | ||
expired_qs = PendingTasks.objects.filter( | ||
deadletter_at__lt=timezone.now(), | ||
offset__lt=max_completed_id, | ||
state=PendingTasks.States.PENDING, | ||
) | ||
# Messages that are still pending and exceeded their deadletter_at are failures | ||
updated = expired_qs.update(state=PendingTasks.States.FAILURE) | ||
logger.info("task.deadletter_at", extra={"count": updated}) | ||
|
||
def handle_processing_deadlines(self) -> None: | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
past_deadline = PendingTasks.objects.filter( | ||
processing_deadline__lt=timezone.now(), | ||
).exclude(state=PendingTasks.States.COMPLETE) | ||
to_update = [] | ||
for item in past_deadline: | ||
if item.has_retries_remaining(): | ||
to_update.append(item.id) | ||
|
||
# Move processing deadline tasks back to pending | ||
PendingTasks.objects.filter(id__in=to_update).update( | ||
state=PendingTasks.States.PENDING, | ||
processing_deadline=None, | ||
) | ||
logger.info("task.processingdeadline", extra={"count": len(to_update)}) | ||
|
||
def handle_failed_tasks(self) -> None: | ||
from sentry.taskworker.models import PendingTasks | ||
|
||
failed = PendingTasks.objects.filter(state=PendingTasks.States.FAILURE) | ||
to_discard = [] | ||
to_deadletter = [] | ||
for item in failed: | ||
if item.discard_after_attempt is not None: | ||
to_discard.append(item.id) | ||
if item.deadletter_after_attempt is not None: | ||
to_deadletter.append(item.id) | ||
|
||
# Discard messages are simply acked and never processed again | ||
PendingTasks.objects.filter(id__in=to_discard).update(state=PendingTasks.States.COMPLETE) | ||
logger.info("task.failed.discarded", extra={"count": len(to_discard)}) | ||
|
||
# TODO do deadletter delivery | ||
PendingTasks.objects.filter(id__in=to_deadletter).update(state=PendingTasks.States.COMPLETE) | ||
logger.info("task.failed.deadletter", extra={"count": len(to_deadletter)}) | ||
logger.info("task.failed.deadletter", extra={"count": len(to_deadletter)}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from dataclasses import dataclass | ||
from enum import Enum | ||
|
||
from sentry_protos.hackweek_team_no_celery_pls.v1alpha.pending_task_pb2 import ( | ||
PendingTask as PendingTaskProto, | ||
) | ||
|
||
|
||
class PendingTaskState(Enum): | ||
PENDING = "pending" | ||
PROCESSING = "processing" | ||
COMPLETE = "complete" | ||
FAILURE = "failure" | ||
RETRY = "retry" | ||
|
||
|
||
@dataclass | ||
class PendingTask: | ||
proto_task: PendingTaskProto | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 Decorating with a 'domain object' is a good path to take. Copying all the values out of the protobuf will be expensive in aggregate. We might want to add methods/properties to this object that proxy to the |
||
state: PendingTaskState | ||
topic: str | ||
partition: int | ||
offset: int |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll have to come up with a better name for this 😄