Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move the try outside the loop when this is possible in kubernetes provider #33977

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""
from __future__ import annotations

import contextlib
import json
import logging
import multiprocessing
Expand Down Expand Up @@ -354,8 +355,8 @@ def sync(self) -> None:
self.kube_scheduler.sync()

last_resource_version: dict[str, str] = defaultdict(lambda: "0")
while True:
try:
with contextlib.suppress(Empty):
Copy link
Member

@Lee-W Lee-W Sep 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it break when Empty is raised as it used to be or just suppress it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty should be raised by queue.get_nowait(), so previously this would be caught by the exception block to break the loop. Now Empty would be raised and break out of the loop, and then suppressed by this context manager. The end result should be the same.

while True:
results = self.result_queue.get_nowait()
try:
key, state, pod_name, namespace, resource_version = results
Expand All @@ -373,8 +374,6 @@ def sync(self) -> None:
self.result_queue.put(results)
finally:
self.result_queue.task_done()
except Empty:
break

from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ResourceVersion

Expand All @@ -386,8 +385,8 @@ def sync(self) -> None:

from kubernetes.client.rest import ApiException

for _ in range(self.kube_config.worker_pods_creation_batch_size):
try:
with contextlib.suppress(Empty):
for _ in range(self.kube_config.worker_pods_creation_batch_size):
task = self.task_queue.get_nowait()

try:
Expand Down Expand Up @@ -423,8 +422,6 @@ def sync(self) -> None:
self.fail(key, e)
finally:
self.task_queue.task_done()
except Empty:
break

# Run any pending timed events
next_event = self.event_scheduler.run(blocking=False)
Expand Down Expand Up @@ -666,22 +663,20 @@ def _flush_task_queue(self) -> None:
assert self.task_queue

self.log.debug("Executor shutting down, task_queue approximate size=%d", self.task_queue.qsize())
while True:
try:
with contextlib.suppress(Empty):
while True:
task = self.task_queue.get_nowait()
# This is a new task to run thus ok to ignore.
self.log.warning("Executor shutting down, will NOT run task=%s", task)
self.task_queue.task_done()
except Empty:
break

def _flush_result_queue(self) -> None:
if TYPE_CHECKING:
assert self.result_queue

self.log.debug("Executor shutting down, result_queue approximate size=%d", self.result_queue.qsize())
while True:
try:
with contextlib.suppress(Empty):
while True:
results = self.result_queue.get_nowait()
self.log.warning("Executor shutting down, flushing results=%s", results)
try:
Expand All @@ -700,8 +695,6 @@ def _flush_result_queue(self) -> None:
)
finally:
self.result_queue.task_done()
except Empty:
break

def end(self) -> None:
"""Called when the executor shuts down."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import contextlib
import json
import multiprocessing
import time
Expand Down Expand Up @@ -440,16 +441,14 @@ def sync(self) -> None:
"""
self.log.debug("Syncing KubernetesExecutor")
self._health_check_kube_watchers()
while True:
try:
with contextlib.suppress(Empty):
while True:
task = self.watcher_queue.get_nowait()
try:
self.log.debug("Processing task %s", task)
self.process_watcher_task(task)
finally:
self.watcher_queue.task_done()
except Empty:
break

def process_watcher_task(self, task: KubernetesWatchType) -> None:
"""Process the task by watcher."""
Expand All @@ -467,14 +466,12 @@ def process_watcher_task(self, task: KubernetesWatchType) -> None:

def _flush_watcher_queue(self) -> None:
self.log.debug("Executor shutting down, watcher_queue approx. size=%d", self.watcher_queue.qsize())
while True:
try:
with contextlib.suppress(Empty):
while True:
task = self.watcher_queue.get_nowait()
# Ignoring it since it can only have either FAILED or SUCCEEDED pods
self.log.warning("Executor shutting down, IGNORING watcher task=%s", task)
self.watcher_queue.task_done()
except Empty:
break

def terminate(self) -> None:
"""Terminates the watcher."""
Expand Down
66 changes: 32 additions & 34 deletions airflow/providers/cncf/kubernetes/triggers/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current pod status and yields a TriggerEvent."""
hook = self._get_async_hook()
self.log.info("Checking pod %r in namespace %r.", self.pod_name, self.pod_namespace)
while True:
try:
try:
while True:
pod = await hook.get_pod(
name=self.pod_name,
namespace=self.pod_namespace,
Expand Down Expand Up @@ -195,40 +195,38 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
}
)
return
except CancelledError:
# That means that task was marked as failed
if self.get_logs:
self.log.info("Outputting container logs...")
await self._get_async_hook().read_logs(
name=self.pod_name,
namespace=self.pod_namespace,
)
if self.on_finish_action == OnFinishAction.DELETE_POD:
self.log.info("Deleting pod...")
await self._get_async_hook().delete_pod(
name=self.pod_name,
namespace=self.pod_namespace,
)
yield TriggerEvent(
{
"name": self.pod_name,
"namespace": self.pod_namespace,
"status": "cancelled",
"message": "Pod execution was cancelled",
}
except CancelledError:
# That means that task was marked as failed
if self.get_logs:
self.log.info("Outputting container logs...")
await self._get_async_hook().read_logs(
name=self.pod_name,
namespace=self.pod_namespace,
)
return
except Exception as e:
self.log.exception("Exception occurred while checking pod phase:")
yield TriggerEvent(
{
"name": self.pod_name,
"namespace": self.pod_namespace,
"status": "error",
"message": str(e),
}
if self.on_finish_action == OnFinishAction.DELETE_POD:
self.log.info("Deleting pod...")
await self._get_async_hook().delete_pod(
name=self.pod_name,
namespace=self.pod_namespace,
)
return
yield TriggerEvent(
{
"name": self.pod_name,
"namespace": self.pod_namespace,
"status": "cancelled",
"message": "Pod execution was cancelled",
}
)
except Exception as e:
self.log.exception("Exception occurred while checking pod phase:")
yield TriggerEvent(
{
"name": self.pod_name,
"namespace": self.pod_namespace,
"status": "error",
"message": str(e),
}
)

def _get_async_hook(self) -> AsyncKubernetesHook:
if self._hook is None:
Expand Down