Skip to content

Commit

Permalink
Introduce mypy section in pyproject.toml, ignore untyped imports and …
Browse files Browse the repository at this point in the history
…untyped external calls
  • Loading branch information
Jonas Dedden committed May 30, 2024
1 parent 82c1d93 commit 85c9a0b
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 84 deletions.
137 changes: 60 additions & 77 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
from collections import defaultdict
from contextlib import suppress
from datetime import datetime
from typing import TYPE_CHECKING, Any, Awaitable, Final, cast
from typing import TYPE_CHECKING, Any, Final
from uuid import uuid4

import aiohttp
import dask.config
import kopf
import kr8s # type: ignore[import-untyped]
import kr8s
import pkg_resources
from distributed.core import clean_exception, rpc
from distributed.protocol.pickle import dumps
from kr8s.asyncio.objects import Deployment, Pod, Service # type: ignore[import-untyped]
from kr8s.asyncio.objects import Deployment, Pod, Service

from dask_kubernetes.constants import SCHEDULER_NAME_TEMPLATE
from dask_kubernetes.exceptions import ValidationError
Expand All @@ -40,9 +40,9 @@

KUBERNETES_DATETIME_FORMAT: Final[str] = "%Y-%m-%dT%H:%M:%SZ"

DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION: Final[
str
] = "kubernetes.dask.org/cooldown-until"
DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION: Final[str] = (
"kubernetes.dask.org/cooldown-until"
)

# Load operator plugins from other packages
PLUGINS: list[Any] = []
Expand Down Expand Up @@ -315,8 +315,8 @@ async def daskcluster_create(
This allows us to track that the operator is running.
"""
if name is None or namespace is None:
raise kopf.PermanentError("Name or Namespace not given.")
assert name
assert namespace
logger.info(f"DaskCluster {name} created in {namespace}.")
try:
validate_cluster_name(name)
Expand All @@ -338,8 +338,8 @@ async def daskcluster_create_components(
**__: Any,
) -> None:
"""When the DaskCluster status.phase goes into Created create the cluster components."""
if name is None or namespace is None:
raise kopf.PermanentError("Name or Namespace not given.")
assert name
assert namespace
logger.info("Creating Dask cluster components.")

# Create scheduler deployment
Expand Down Expand Up @@ -398,8 +398,7 @@ async def handle_scheduler_service_status(
namespace: str | None,
**__: Any,
) -> None:
if namespace is None:
raise kopf.PermanentError("Namespace not given.")
assert namespace
# If the Service is a LoadBalancer with no ingress endpoints mark the cluster as Pending
if spec["type"] == "LoadBalancer" and not len(
status.get("load_balancer", {}).get("ingress", [])
Expand All @@ -418,23 +417,19 @@ async def handle_scheduler_service_status(
async def daskworkergroup_create(
body: kopf.Body, namespace: str | None, logger: kopf.Logger, **kwargs: Any
) -> None:
if namespace is None:
raise kopf.PermanentError("Namespace not given.")
assert namespace
wg = await DaskWorkerGroup(body, namespace=namespace)
cluster = await wg.cluster()
await cluster.adopt(wg)
logger.info(f"Successfully adopted by {cluster.name}")

del kwargs["new"]
await cast(
Awaitable[None],
daskworkergroup_replica_update(
body=body,
logger=logger,
new=wg.replicas,
namespace=namespace,
**kwargs,
),
await daskworkergroup_replica_update( # type: ignore[misc]
body=body,
logger=logger,
new=wg.replicas,
namespace=namespace,
**kwargs,
)


Expand All @@ -445,10 +440,9 @@ async def retire_workers(
namespace: str | None,
logger: kopf.Logger,
) -> list[str]:
if namespace is None:
raise kopf.PermanentError("Namespace not given.")
assert namespace
# Try gracefully retiring via the HTTP API
dashboard_address = await get_scheduler_address( # type: ignore[no-untyped-call]
dashboard_address = await get_scheduler_address(
scheduler_service_name,
namespace,
port_name="http-dashboard",
Expand All @@ -474,18 +468,19 @@ async def retire_workers(
)
# Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways
with suppress(Exception):
comm_address = await get_scheduler_address( # type: ignore[no-untyped-call]
comm_address = await get_scheduler_address(
scheduler_service_name,
namespace,
allow_external=False,
)
async with rpc(comm_address) as scheduler_comm: # type: ignore[no-untyped-call]
async with rpc(comm_address) as scheduler_comm:
workers_to_close = await scheduler_comm.workers_to_close(
n=n_workers,
attribute="name",
)
await scheduler_comm.retire_workers(names=workers_to_close)
return cast(list[str], workers_to_close)
assert isinstance(workers_to_close, list)
return workers_to_close

# Finally fall back to last-in-first-out scaling
logger.warning(
Expand All @@ -502,11 +497,10 @@ async def retire_workers(

async def check_scheduler_idle(
scheduler_service_name: str, namespace: str | None, logger: kopf.Logger
) -> str:
if namespace is None:
raise kopf.PermanentError("Namespace not given.")
) -> float:
assert namespace
# Try getting idle time via HTTP API
dashboard_address = await get_scheduler_address( # type: ignore[no-untyped-call]
dashboard_address = await get_scheduler_address(
scheduler_service_name,
namespace,
port_name="http-dashboard",
Expand All @@ -519,7 +513,7 @@ async def check_scheduler_idle(
idle_since = (await resp.json())["idle_since"]
if idle_since:
logger.debug("Scheduler idle since: %s", idle_since)
return cast(str, idle_since)
return float(idle_since)
logger.debug(
"Received %d response from scheduler API with body %s",
resp.status,
Expand All @@ -532,61 +526,55 @@ async def check_scheduler_idle(
)
# Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways
with suppress(Exception):
comm_address = await get_scheduler_address( # type: ignore[no-untyped-call]
comm_address = await get_scheduler_address(
scheduler_service_name,
namespace,
allow_external=False,
)
async with rpc(comm_address) as scheduler_comm: # type: ignore[no-untyped-call]
async with rpc(comm_address) as scheduler_comm:
idle_since = await scheduler_comm.check_idle()
if idle_since:
logger.debug("Scheduler idle since: %s", idle_since)
return cast(str, idle_since)
return float(idle_since)

# Finally fall back to code injection via the Dask RPC for distributed<=2023.3.1
logger.debug(
f"Checking {scheduler_service_name} idleness failed via the Dask RPC, falling back to run_on_scheduler"
)

def idle_since_func(
dask_scheduler: Scheduler
| None = None, # TODO: why is None allowed here? It will crash immediately.
) -> float:
if not dask_scheduler:
raise TypeError("Dask Scheduler is None.")
def idle_since_func(dask_scheduler: Scheduler) -> float:
if not dask_scheduler.idle_timeout:
dask_scheduler.idle_timeout = 300
dask_scheduler.check_idle()
return cast(float, dask_scheduler.idle_since)
assert dask_scheduler.idle_since
return dask_scheduler.idle_since

comm_address = await get_scheduler_address( # type: ignore[no-untyped-call]
comm_address = await get_scheduler_address(
scheduler_service_name,
namespace,
allow_external=False,
)
async with rpc(comm_address) as scheduler_comm: # type: ignore[no-untyped-call]
async with rpc(comm_address) as scheduler_comm:
response = await scheduler_comm.run_function(
function=dumps(idle_since_func), # type: ignore[no-untyped-call]
function=dumps(idle_since_func),
)
if response["status"] == "error":
typ, exc, tb = clean_exception(**response)
if exc is None:
raise TypeError("Exception was None.")
assert exc
raise exc.with_traceback(tb)
else:
idle_since = response["result"]
if idle_since:
logger.debug("Scheduler idle since: %s", idle_since)
return cast(str, idle_since)
return float(idle_since)


async def get_desired_workers(
scheduler_service_name: str, namespace: str | None
) -> Any:
if namespace is None:
raise kopf.PermanentError("Namespace not given.")
assert namespace
# Try gracefully retiring via the HTTP API
dashboard_address = await get_scheduler_address( # type: ignore[no-untyped-call]
dashboard_address = await get_scheduler_address(
scheduler_service_name,
namespace,
port_name="http-dashboard",
Expand All @@ -602,12 +590,12 @@ async def get_desired_workers(
# Otherwise try gracefully retiring via the RPC
# Dask version mismatches between the operator and scheduler may cause this to fail in any number of unexpected ways
try:
comm_address = await get_scheduler_address( # type: ignore[no-untyped-call]
comm_address = await get_scheduler_address(
scheduler_service_name,
namespace,
allow_external=False,
)
async with rpc(comm_address) as scheduler_comm: # type: ignore[no-untyped-call]
async with rpc(comm_address) as scheduler_comm:
return await scheduler_comm.adaptive_target()
except Exception as e:
raise SchedulerCommError(
Expand All @@ -626,12 +614,11 @@ async def daskcluster_default_worker_group_replica_update(
new: Any | None,
**__: Any,
) -> None:
if name is None or namespace is None:
raise kopf.PermanentError("Name or Namespace not given.")
assert name
assert namespace
if old is not None:
if not all(isinstance(num, int) for num in (old, new)):
raise kopf.PermanentError("Old or new replicas are not int.")
wg = await DaskWorkerGroup.get(f"{name}-default", namespace=namespace)
assert isinstance(new, int)
await wg.scale(new)


Expand All @@ -646,8 +633,8 @@ async def daskworkergroup_replica_update(
logger: kopf.Logger,
**__: Any,
) -> None:
if name is None or namespace is None:
raise kopf.PermanentError("Name or Namespace not given.")
assert name
assert namespace
cluster_name = spec["cluster"]
wg = await DaskWorkerGroup(body, namespace=namespace)
try:
Expand All @@ -666,8 +653,7 @@ async def daskworkergroup_replica_update(
label_selector={"dask.org/workergroup-name": name},
)
)
if not isinstance(new, int):
raise kopf.PermanentError("New is not an int.")
assert isinstance(new, int)
desired_workers = new
workers_needed = desired_workers - current_workers
labels = _get_labels(meta)
Expand Down Expand Up @@ -731,8 +717,8 @@ async def daskworkergroup_replica_update(
async def daskworkergroup_remove(
name: str | None, namespace: str | None, **__: Any
) -> None:
if name is None or namespace is None:
raise kopf.PermanentError("Name or Namespace not given.")
assert name
assert namespace
lock_key = f"{name}/{namespace}"
if lock_key in worker_group_scale_locks:
del worker_group_scale_locks[lock_key]
Expand All @@ -746,8 +732,8 @@ async def daskjob_create(
patch: kopf.Patch,
**__: Any,
) -> None:
if name is None or namespace is None:
raise kopf.PermanentError("Name or Namespace not given.")
assert name
assert namespace
logger.info(f"A DaskJob has been created called {name} in {namespace}.")
patch.status["jobStatus"] = "JobCreated"

Expand All @@ -764,8 +750,8 @@ async def daskjob_create_components(
meta: kopf.Meta,
**__: Any,
) -> None:
if name is None or namespace is None:
raise kopf.PermanentError("Name or Namespace not given.")
assert name
assert namespace
logger.info("Creating Dask job components.")
cluster_name = f"{name}"
labels = _get_labels(meta)
Expand Down Expand Up @@ -823,8 +809,7 @@ async def daskjob_create_components(
async def handle_runner_status_change_running(
meta: kopf.Meta, namespace: str | None, logger: kopf.Logger, **__: Any
) -> None:
if namespace is None:
raise kopf.PermanentError("Namespace not given.")
assert namespace
logger.info("Job now in running")
name = meta["labels"]["dask.org/cluster-name"]
job = await DaskJob.get(name, namespace=namespace)
Expand All @@ -848,8 +833,7 @@ async def handle_runner_status_change_running(
async def handle_runner_status_change_succeeded(
meta: kopf.Meta, namespace: str | None, logger: kopf.Logger, **__: Any
) -> None:
if namespace is None:
raise kopf.PermanentError("Namespace not given.")
assert namespace
logger.info("Job succeeded, deleting Dask cluster.")
name = meta["labels"]["dask.org/cluster-name"]
cluster = await DaskCluster.get(name, namespace=namespace)
Expand All @@ -875,8 +859,7 @@ async def handle_runner_status_change_succeeded(
async def handle_runner_status_change_failed(
meta: kopf.Meta, namespace: str | None, logger: kopf.Logger, **__: Any
) -> None:
if namespace is None:
raise kopf.PermanentError("Namespace not given.")
assert namespace
logger.info("Job failed, deleting Dask cluster.")
name = meta["labels"]["dask.org/cluster-name"]
cluster = await DaskCluster.get(name, namespace=namespace)
Expand Down Expand Up @@ -912,8 +895,8 @@ async def daskautoscaler_adapt(
logger: kopf.Logger,
**__: Any,
) -> None:
if name is None or namespace is None:
raise kopf.PermanentError("Name or Namespace not given.")
assert name
assert namespace
try:
scheduler = await Pod.get(
label_selector={
Expand Down
4 changes: 3 additions & 1 deletion dask_kubernetes/operator/controller/plugins/noop/noop.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import Any

import kopf


@kopf.on.startup()
async def noop_startup(logger, **kwargs):
async def noop_startup(logger: kopf.Logger, **__: Any) -> None:
logger.info(
"Plugin 'noop' running. This does nothing. "
"See https://kubernetes.dask.org/en/latest/operator_extending.html "
Expand Down
Loading

0 comments on commit 85c9a0b

Please sign in to comment.