Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task-aware celery worker autoscaling (+ pod-deletion-cost) #339

Open
thesuperzapper opened this issue Jul 30, 2021 · 6 comments
Open

task-aware celery worker autoscaling (+ pod-deletion-cost) #339

thesuperzapper opened this issue Jul 30, 2021 · 6 comments
Assignees
Labels
kind/enhancement kind - new features or changes

Comments

@thesuperzapper
Copy link
Member

thesuperzapper commented Jul 30, 2021

The chart currently supports primitive autoscaling for celery workers, using HorizontalPodAutoscalers with memory metrics. But this is very flawed, as there is not necessarily a link between RAM usage, and the number of pending tasks, meaning you could have a situation where your workers don't scale up despite having pending tasks.

We can make a task-aware autoscaler that will scale up the number of celery workers when there are not enough task slots, and scale down when there are too many.

In past, scale down was dangerous to use with airflow workers, as Kubernetes had no way to influence which Pods were removed, meaning Kubernetes often removes a busy worker where there are workers that are doing nothing.

As of Kubernetes 1.22, there is a beta annotation for Pods managed by ReplicaSets called controller.kubernetes.io/pod-deletion-cost, which tells Kubernetes how "expensive" killing a particular Pod is when decreasing the replicas count.

NOTE: Previously we considered using KEDA (#103) to manage autoscaling, but this will not work with controller.kubernetes.io/pod-deletion-cost, as the HorizontalPodAutoscaler created by KEDA can not patch the required annotations BEFORE scaling down.


Our Celery Worker Autoscaler can perform the following loop:

  1. Cleanup from any past loops:
    1. Remove any controller.kubernetes.io/pod-deletion-cost annotations
      • NOTE: there will only be dangling annotations if Kubernetes did not remove our "chosen" Pods, or if the autoscaler crashed halfway through a loop
      • NOTE: we need to attempt to prevent multiple instances of our autoscaler running at a time
    2. Send each worker Pod that we removed an annotation from an app.control.add_consumer() command, so it resumes picking up new airflow tasks
  2. Calculate the ideal number of worker replicas for the current task load:
    • if the load factor of workers is above A for B time --> increase replicas to meet the target load factor
    • if the load factor of workers is below X for Y time --> decrease replicas to meet the target load factor
      • NOTE: the load factor is the number of available task slots which are consumed
      • NOTE: we should put some limit on the number of scaling decisions per A seconds (to prevent a yo-yo effect), (perhaps have separate limits for down and up to allow faster upscaling)
      • NOTE: we should have a "scaling algorithm" config, even if we only start with 1
      • NOTE: we should have minium and maximum replicas configs
      • NOTE: if using CeleryKubernetesExecutor, we must exclude tasks that are in the AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE
  3. If replicas are going to be decreased by N:
    1. Sort the worker pods by their pod-deletion-cost in ascending order
      • NOTE: the pod-deletion-cost is the number of running tasks, weighted by the total running time of each task (so long-running tasks are not needlessly evicted), specifically we want smaller numbers of long-running tasks to be weighted higher than larger numbers of short-running tasks
      • NOTE: add a DAG/Task label which will prevent any worker running it from being killed (or allow a "weighting" per Task)
    2. Annotate the N worker Pods with the lowest cost Pods with the controller.kubernetes.io/pod-deletion-cost annotation
      • NOTE: if there are pods in a Pending/Unready state, we can reduce N by this number, as Kubernetes will remove these pods first
    3. Send each worker Pod that was annotated an app.control.cancel_consumer(...) command, so does not pick up new airflow tasks after being "marked" for deletion
    4. Patch the replicas down by N

Important changes to make this work:

  • We will need to use a Deployment for the workers(rather than a StatefulSet), as controller.kubernetes.io/pod-deletion-cost is only for Pods in ReplicaSets
  • Because controller.kubernetes.io/pod-deletion-cost is alpha in 1.21 and beta in 1.22, for older Kubernetes versions we can let users use the CloneSet from the CNCF project called OpenKruise (instead of Deployment), as they have back-ported the controller.kubernetes.io/pod-deletion-cost annotation.
@thesuperzapper thesuperzapper added the kind/enhancement kind - new features or changes label Jul 30, 2021
@thesuperzapper thesuperzapper changed the title safer down-scaling of workers with controller.kubernetes.io/pod-deletion-cost create celery worker autoscaler controller (using controller.kubernetes.io/pod-deletion-cost) Sep 22, 2021
@stale stale bot added lifecycle/stale lifecycle - this is stale and removed lifecycle/stale lifecycle - this is stale labels Nov 22, 2021
@thesuperzapper thesuperzapper added lifecycle/frozen lifecycle - this can't become stale status/help-wanted status - this needs contributions labels Dec 14, 2021
@thesuperzapper thesuperzapper removed the lifecycle/frozen lifecycle - this can't become stale label Mar 22, 2022
@airflow-helm airflow-helm deleted a comment from stale bot Mar 22, 2022
@thesuperzapper thesuperzapper added this to the airflow-8.7.0 milestone Apr 13, 2022
@thesuperzapper thesuperzapper removed the status/help-wanted status - this needs contributions label Apr 13, 2022
@thesuperzapper thesuperzapper changed the title create celery worker autoscaler controller (using controller.kubernetes.io/pod-deletion-cost) create celery worker autoscaler (make use of pod-deletion-cost) Apr 13, 2022
@thesuperzapper thesuperzapper changed the title create celery worker autoscaler (make use of pod-deletion-cost) task-aware celery worker autoscaling (make use of pod-deletion-cost) Apr 16, 2022
@thesuperzapper thesuperzapper changed the title task-aware celery worker autoscaling (make use of pod-deletion-cost) task-aware celery worker autoscaling (+ pod-deletion-cost) Apr 16, 2022
@thesuperzapper thesuperzapper self-assigned this Apr 21, 2022
@potiuk
Copy link

potiuk commented Sep 24, 2022

Interesting approach 👀

@NitinKeshavB
Copy link

Cant wait to see this in action. please let us know once this is available.

@thesuperzapper
Copy link
Member Author

Cant wait to see this in action. please let us know once this is available.

@NitinKeshavB I agree, I am sorry it's taken so long!

I actually have a mostly working prototype, but I have paused work on it until I can get the first release of deployKF (a new open-source ML Platform for Kubernetes, which will include Airflow) out the door.

After that, it is top of my list!

@brtkwr
Copy link

brtkwr commented Dec 14, 2023

Ping :D would this support scale to 0 by any chance?

@lexey-e-shelf
Copy link

Hi @thesuperzapper, I'm very interested in this feature as well, and I see that you recently added a new Kubernetes proposal related to controller.kubernetes.io/pod-deletion-cost. I don't fully grasp the details, but will that change this approach as well? Perhaps more pertinently, will the implementation of this approach depend on the implementation of that Kubernetes proposal?

@Joffreybvn
Copy link

Joffreybvn commented Aug 14, 2024

I prototyped a "Keda Airflow autoscaler", and it doesn't work as good as I expected.

The autoscaler:

  • Query Airflow's database (to determine replicas)
  • Send "TERM"
  • Annotate the pod-deletion-costs
  • Returns the replicas to Keda

The autoscaler is deployed as a new endpoint in Airflow"s API, called every 10 seconds by Keda (metrics-api).

Problem: Sometimes, a task get picked up by an empty worker right after the database is queried (before getting a 'TERM' signal). Which leads to task eviction. This is especially true with short running dynamic tasks.


Some thoughts/ learnings:

  • I like the add_consumer / cancel_consumer strategy - I should have found this issue earlier :)
  • Keda and the HPA add unessecary complexity.
    • Simpler to edit the replicas directly via kubeapi
    • Creating a standalone Pod that reacts and make the necessary API calls / signals when needed (instead of being triggered by Keda)
  • Scale down logic can be simple (to begin with): Keep only the Pods with running tasks, remove all idle Pods.
    • Determining if a task will run during short or long time is not easy (edge cases: first run, task that suddenly run way longer / shorter, ...)
    • Users bother more than a few CPU / memory being wasted
    • Alternative solution for long running tasks occupying alone a 16 slots worker: Creating multiple queues, each associated with a pool / Deployment of Celery workers. A 'default' queue gets 16 slots and 16GB of memory per worker. A 'slim' queue gets 1 slot and 1GB of memory per worker. Task policy uses prior run to determine if it's a long one, and run it on the slim queue.
    • The previous point means one autoscaler per queue / Deployment.
  • With a simple scale down logic, the end of the flow could be safer: determine replicas, send cancel consumer -> determine replicas again (prevent race condition)-> annotate -> patch the replicas

That said, I'm going to give a try to this flow, with the 'simple'/'safe' downscale logic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/enhancement kind - new features or changes
Projects
None yet
Development

No branches or pull requests

6 participants