Skip to content

Commit

Permalink
Improve importing the module in Airflow decorators package (#33804)
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala committed Aug 27, 2023
1 parent 58d8577 commit a23f318
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 24 deletions.
26 changes: 18 additions & 8 deletions airflow/decorators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from functools import cached_property
from textwrap import dedent
from typing import (
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Expand All @@ -39,7 +40,6 @@
import attr
import re2
import typing_extensions
from sqlalchemy.orm import Session

from airflow import Dataset
from airflow.exceptions import AirflowException
Expand All @@ -51,28 +51,38 @@
get_merged_defaults,
parse_retries,
)
from airflow.models.dag import DAG, DagContext
from airflow.models.dag import DagContext
from airflow.models.expandinput import (
EXPAND_INPUT_EMPTY,
DictOfListsExpandInput,
ExpandInput,
ListOfDictsExpandInput,
OperatorExpandArgument,
OperatorExpandKwargsArgument,
is_mappable,
)
from airflow.models.mappedoperator import MappedOperator, ValidationSource, ensure_xcomarg_return_value
from airflow.models.mappedoperator import MappedOperator, ensure_xcomarg_return_value
from airflow.models.pool import Pool
from airflow.models.xcom_arg import XComArg
from airflow.typing_compat import ParamSpec, Protocol
from airflow.utils import timezone
from airflow.utils.context import KNOWN_CONTEXT_KEYS, Context
from airflow.utils.context import KNOWN_CONTEXT_KEYS
from airflow.utils.decorators import remove_task_decorator
from airflow.utils.helpers import prevent_duplicates
from airflow.utils.task_group import TaskGroup, TaskGroupContext
from airflow.utils.task_group import TaskGroupContext
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import NOTSET

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.models.dag import DAG
from airflow.models.expandinput import (
ExpandInput,
OperatorExpandArgument,
OperatorExpandKwargsArgument,
)
from airflow.models.mappedoperator import ValidationSource
from airflow.utils.context import Context
from airflow.utils.task_group import TaskGroup


class ExpandableFactory(Protocol):
"""Protocol providing inspection against wrapped function.
Expand Down
7 changes: 5 additions & 2 deletions airflow/decorators/branch_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
# under the License.
from __future__ import annotations

from typing import Callable
from typing import TYPE_CHECKING, Callable

from airflow.decorators.base import TaskDecorator, task_decorator_factory
from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import BranchPythonOperator

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator


class _BranchPythonDecoratedOperator(_PythonDecoratedOperator, BranchPythonOperator):
"""Wraps a Python callable and captures args/kwargs when called for execution."""
Expand Down
7 changes: 5 additions & 2 deletions airflow/decorators/external_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
# under the License.
from __future__ import annotations

from typing import Callable
from typing import TYPE_CHECKING, Callable

from airflow.decorators.base import TaskDecorator, task_decorator_factory
from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import ExternalPythonOperator

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator


class _PythonExternalDecoratedOperator(_PythonDecoratedOperator, ExternalPythonOperator):
"""Wraps a Python callable and captures args/kwargs when called for execution."""
Expand Down
7 changes: 5 additions & 2 deletions airflow/decorators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
# under the License.
from __future__ import annotations

from typing import Callable, Sequence
from typing import TYPE_CHECKING, Callable, Sequence

from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory
from airflow.decorators.base import DecoratedOperator, task_decorator_factory
from airflow.operators.python import PythonOperator

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator


class _PythonDecoratedOperator(DecoratedOperator, PythonOperator):
"""
Expand Down
7 changes: 5 additions & 2 deletions airflow/decorators/python_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
# under the License.
from __future__ import annotations

from typing import Callable
from typing import TYPE_CHECKING, Callable

from airflow.decorators.base import TaskDecorator, task_decorator_factory
from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import PythonVirtualenvOperator

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator


class _PythonVirtualenvDecoratedOperator(_PythonDecoratedOperator, PythonVirtualenvOperator):
"""Wraps a Python callable and captures args/kwargs when called for execution."""
Expand Down
7 changes: 5 additions & 2 deletions airflow/decorators/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

from __future__ import annotations

from typing import Callable, Sequence
from typing import TYPE_CHECKING, Callable, Sequence

from airflow.decorators.base import TaskDecorator, get_unique_task_id, task_decorator_factory
from airflow.decorators.base import get_unique_task_id, task_decorator_factory
from airflow.sensors.python import PythonSensor

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator


class DecoratedSensorOperator(PythonSensor):
"""
Expand Down
7 changes: 5 additions & 2 deletions airflow/decorators/setup_teardown.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
from __future__ import annotations

import types
from typing import Callable
from typing import TYPE_CHECKING, Callable

from airflow import AirflowException, XComArg
from airflow import AirflowException
from airflow.decorators import python_task
from airflow.decorators.task_group import _TaskGroupFactory
from airflow.models import BaseOperator
from airflow.utils.setup_teardown import SetupTeardownContext

if TYPE_CHECKING:
from airflow import XComArg


def setup_task(func: Callable) -> Callable:
# Using FunctionType here since _TaskDecorator is also a callable
Expand Down
7 changes: 5 additions & 2 deletions airflow/decorators/short_circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
# under the License.
from __future__ import annotations

from typing import Callable
from typing import TYPE_CHECKING, Callable

from airflow.decorators.base import TaskDecorator, task_decorator_factory
from airflow.decorators.base import task_decorator_factory
from airflow.decorators.python import _PythonDecoratedOperator
from airflow.operators.python import ShortCircuitOperator

if TYPE_CHECKING:
from airflow.decorators.base import TaskDecorator


class _ShortCircuitDecoratedOperator(_PythonDecoratedOperator, ShortCircuitOperator):
"""Wraps a Python callable and captures args/kwargs when called for execution."""
Expand Down
6 changes: 4 additions & 2 deletions airflow/decorators/task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
DictOfListsExpandInput,
ListOfDictsExpandInput,
MappedArgument,
OperatorExpandArgument,
OperatorExpandKwargsArgument,
)
from airflow.models.taskmixin import DAGNode
from airflow.models.xcom_arg import XComArg
Expand All @@ -47,6 +45,10 @@

if TYPE_CHECKING:
from airflow.models.dag import DAG
from airflow.models.expandinput import (
OperatorExpandArgument,
OperatorExpandKwargsArgument,
)

FParams = ParamSpec("FParams")
FReturn = TypeVar("FReturn", None, DAGNode)
Expand Down

0 comments on commit a23f318

Please sign in to comment.