Skip to content

Commit

Permalink
Merge branch 'dask:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
BitTheByte committed Sep 4, 2024
2 parents a5845c4 + 2ecfdcd commit 499e55c
Show file tree
Hide file tree
Showing 15 changed files with 473 additions and 275 deletions.
10 changes: 4 additions & 6 deletions .github/workflows/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12"]
kubernetes-version: ["1.29.2"]
python-version: ["3.10", "3.11", "3.12"]
kubernetes-version: ["1.30.2"]
include:
- python-version: '3.10'
kubernetes-version: 1.28.7
kubernetes-version: 1.29.4
- python-version: '3.10'
kubernetes-version: 1.27.11
- python-version: '3.10'
kubernetes-version: 1.26.14
kubernetes-version: 1.28.9

env:
KUBECONFIG: .pytest-kind/pytest-kind/kubeconfig
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on: [push, pull_request]
jobs:
build-distribution:
runs-on: "ubuntu-latest"
permissions: write-all

steps:
- name: Checkout source
Expand Down
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ Dask Kubernetes
:target: https://anaconda.org/conda-forge/dask-kubernetes
:alt: Conda Forge

.. image:: https://img.shields.io/badge/python%20support-3.9%7C3.10%7C3.11%7C3.12-blue
.. image:: https://img.shields.io/badge/python%20support-3.10%7C3.11%7C3.12-blue
:target: https://kubernetes.dask.org/en/latest/installing.html#supported-versions
:alt: Python Support

.. image:: https://img.shields.io/badge/Kubernetes%20support-1.26%7C1.27%7C1.28%7C1.29-blue
.. image:: https://img.shields.io/badge/Kubernetes%20support-1.28%7C1.29%7C1.30-blue
:target: https://kubernetes.dask.org/en/latest/installing.html#supported-versions
:alt: Kubernetes Support

Expand Down
29 changes: 16 additions & 13 deletions dask_kubernetes/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import sys
import tempfile
import uuid
from typing import Final, Iterator

import pytest
from kopf.testing import KopfRunner
from pytest_kind.cluster import KindCluster

DIR = pathlib.Path(__file__).parent.absolute()
DIR: Final[pathlib.Path] = pathlib.Path(__file__).parent.absolute()


def check_dependency(dependency):
Expand All @@ -26,24 +27,24 @@ def check_dependency(dependency):
check_dependency("kubectl")
check_dependency("docker")

DISABLE_LOGGERS = ["httpcore", "httpx"]
DISABLE_LOGGERS: Final[list[str]] = ["httpcore", "httpx"]


def pytest_configure():
def pytest_configure() -> None:
for logger_name in DISABLE_LOGGERS:
logger = logging.getLogger(logger_name)
logger.disabled = True


@pytest.fixture()
def kopf_runner(k8s_cluster, ns):
def kopf_runner(k8s_cluster: KindCluster, namespace: str) -> KopfRunner:
yield KopfRunner(
["run", "-m", "dask_kubernetes.operator", "--verbose", "--namespace", ns]
["run", "-m", "dask_kubernetes.operator", "--verbose", "--namespace", namespace]
)


@pytest.fixture(scope="session")
def docker_image():
def docker_image() -> str:
image_name = "dask-kubernetes:dev"
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
subprocess.run(
Expand All @@ -62,7 +63,9 @@ def docker_image():


@pytest.fixture(scope="session")
def k8s_cluster(request, docker_image):
def k8s_cluster(
request: pytest.FixtureRequest, docker_image: str
) -> Iterator[KindCluster]:
image = None
if version := os.environ.get("KUBERNETES_VERSION"):
image = f"kindest/node:v{version}"
Expand All @@ -81,7 +84,7 @@ def k8s_cluster(request, docker_image):


@pytest.fixture(scope="session", autouse=True)
def install_istio(k8s_cluster):
def install_istio(k8s_cluster: KindCluster) -> None:
if bool(os.environ.get("TEST_ISTIO", False)):
check_dependency("istioctl")
subprocess.run(
Expand All @@ -93,15 +96,15 @@ def install_istio(k8s_cluster):


@pytest.fixture(autouse=True)
def ns(k8s_cluster):
def namespace(k8s_cluster: KindCluster) -> Iterator[str]:
ns = "dask-k8s-pytest-" + uuid.uuid4().hex[:10]
k8s_cluster.kubectl("create", "ns", ns)
yield ns
k8s_cluster.kubectl("delete", "ns", ns, "--wait=false", "--ignore-not-found=true")


@pytest.fixture(scope="session", autouse=True)
def install_gateway(k8s_cluster):
def install_gateway(k8s_cluster: KindCluster) -> Iterator[None]:
if bool(os.environ.get("TEST_DASK_GATEWAY", False)):
check_dependency("helm")
# To ensure the operator can coexist with Gateway
Expand Down Expand Up @@ -137,11 +140,11 @@ def install_gateway(k8s_cluster):


@pytest.fixture(scope="session", autouse=True)
def customresources(k8s_cluster):
def customresources(k8s_cluster: KindCluster) -> Iterator[None]:
temp_dir = tempfile.TemporaryDirectory()
crd_path = os.path.join(DIR, "operator", "customresources")

def run_generate(crd_path, patch_path, temp_path):
def run_generate(crd_path: str, patch_path: str, temp_path: str) -> None:
subprocess.run(
["k8s-crd-resolver", "-r", "-j", patch_path, crd_path, temp_path],
check=True,
Expand All @@ -162,5 +165,5 @@ def run_generate(crd_path, patch_path, temp_path):


@pytest.fixture
def anyio_backend():
def anyio_backend() -> str:
return "asyncio"
8 changes: 8 additions & 0 deletions dask_kubernetes/operator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,11 @@
make_worker_spec,
discover,
)

__all__ = [
"KubeCluster",
"make_cluster_spec",
"make_scheduler_spec",
"make_worker_spec",
"discover",
]
57 changes: 12 additions & 45 deletions dask_kubernetes/operator/_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,23 @@

from typing import List

from kr8s.asyncio.objects import APIObject, Deployment, Pod, Service
from kr8s.asyncio.objects import Deployment, Pod, Service, new_class


class DaskCluster(APIObject):
version = "kubernetes.dask.org/v1"
endpoint = "daskclusters"
kind = "DaskCluster"
plural = "daskclusters"
singular = "daskcluster"
namespaced = True
class DaskCluster(new_class("DaskCluster", "kubernetes.dask.org/v1")):
scalable = True
scalable_spec = "worker.replicas"

async def worker_groups(self) -> List[DaskWorkerGroup]:
return await self.api.get(
DaskWorkerGroup.endpoint,
return await DaskWorkerGroup.list(
label_selector=f"dask.org/cluster-name={self.name}",
namespace=self.namespace,
)

async def scheduler_pod(self) -> Pod:
pods = []
while not pods:
pods = await self.api.get(
Pod.endpoint,
pods = await Pod.list(
label_selector=",".join(
[
f"dask.org/cluster-name={self.name}",
Expand All @@ -41,8 +33,7 @@ async def scheduler_pod(self) -> Pod:
async def scheduler_deployment(self) -> Deployment:
deployments = []
while not deployments:
deployments = await self.api.get(
Deployment.endpoint,
deployments = await Deployment.list(
label_selector=",".join(
[
f"dask.org/cluster-name={self.name}",
Expand All @@ -57,8 +48,7 @@ async def scheduler_deployment(self) -> Deployment:
async def scheduler_service(self) -> Service:
services = []
while not services:
services = await self.api.get(
Service.endpoint,
services = await Service.list(
label_selector=",".join(
[
f"dask.org/cluster-name={self.name}",
Expand All @@ -79,19 +69,12 @@ async def ready(self) -> bool:
)


class DaskWorkerGroup(APIObject):
version = "kubernetes.dask.org/v1"
endpoint = "daskworkergroups"
kind = "DaskWorkerGroup"
plural = "daskworkergroups"
singular = "daskworkergroup"
namespaced = True
class DaskWorkerGroup(new_class("DaskWorkerGroup", "kubernetes.dask.org/v1")):
scalable = True
scalable_spec = "worker.replicas"

async def pods(self) -> List[Pod]:
return await self.api.get(
Pod.endpoint,
return await Pod.list(
label_selector=",".join(
[
f"dask.org/cluster-name={self.spec.cluster}",
Expand All @@ -103,8 +86,7 @@ async def pods(self) -> List[Pod]:
)

async def deployments(self) -> List[Deployment]:
return await self.api.get(
Deployment.endpoint,
return await Deployment.list(
label_selector=",".join(
[
f"dask.org/cluster-name={self.spec.cluster}",
Expand All @@ -119,34 +101,19 @@ async def cluster(self) -> DaskCluster:
return await DaskCluster.get(self.spec.cluster, namespace=self.namespace)


class DaskAutoscaler(APIObject):
version = "kubernetes.dask.org/v1"
endpoint = "daskautoscalers"
kind = "DaskAutoscaler"
plural = "daskautoscalers"
singular = "daskautoscaler"
namespaced = True

class DaskAutoscaler(new_class("DaskAutoscaler", "kubernetes.dask.org/v1")):
async def cluster(self) -> DaskCluster:
return await DaskCluster.get(self.spec.cluster, namespace=self.namespace)


class DaskJob(APIObject):
version = "kubernetes.dask.org/v1"
endpoint = "daskjobs"
kind = "DaskJob"
plural = "daskjobs"
singular = "daskjob"
namespaced = True

class DaskJob(new_class("DaskJob", "kubernetes.dask.org/v1")):
async def cluster(self) -> DaskCluster:
return await DaskCluster.get(self.name, namespace=self.namespace)

async def pod(self) -> Pod:
pods = []
while not pods:
pods = await self.api.get(
Pod.endpoint,
pods = await Pod.list(
label_selector=",".join(
[
f"dask.org/cluster-name={self.name}",
Expand Down
Loading

0 comments on commit 499e55c

Please sign in to comment.