Skip to content

Commit

Permalink
Review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheffl committed Sep 24, 2024
1 parent c87913d commit f2a3593
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions airflow/providers/edge/cli/edge_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from airflow.providers.edge.models.edge_logs import EdgeLogs
from airflow.providers.edge.models.edge_worker import EdgeWorker, EdgeWorkerState
from airflow.utils import cli as cli_utils
from airflow.utils.platform import IS_WINDOWS
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
from airflow.utils.state import TaskInstanceState

Expand All @@ -61,11 +62,11 @@
@providers_configuration_loaded
def force_use_internal_api_on_edge_worker():
"""
Force that environment is made for internal API w/o need to declare outside.
Ensure that the environment is configured for the internal API without needing to declare it outside.
This is only needed for Edge Worker and need to be made before click CLI wrapper
is started as the CLI wrapper attempts to make a DB connection which will fail before
function call can influence. On Edge Worker we need to "patch" the env before start.
This is only required for an Edge worker and must to be done before the Click CLI wrapper is initiated.
That is because the CLI wrapper will attempt to establish a DB connection, which will fail before the
function call can take effect. In an Edge worker, we need to "patch" the environment before starting.
"""
if "airflow" in sys.argv[0] and sys.argv[1:3] == ["edge", "worker"]:
api_url = conf.get("edge", "api_url")
Expand All @@ -76,14 +77,15 @@ def force_use_internal_api_on_edge_worker():
os.environ["AIRFLOW_ENABLE_AIP_44"] = "True"
os.environ["AIRFLOW__CORE__INTERNAL_API_URL"] = api_url
InternalApiConfig.set_use_internal_api("edge-worker")
# Disable mini-scheduler post task execution and leave next task schedule to core scheduler
os.environ["AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION"] = "False"


force_use_internal_api_on_edge_worker()


def _hostname() -> str:
if platform.system() == "Windows":
if IS_WINDOWS:
return platform.uname().node
else:
return os.uname()[1]
Expand All @@ -98,8 +100,7 @@ def _get_sysinfo() -> dict:


def _pid_file_path(pid_file: str | None) -> str:
pid_file_path, _, _, _ = cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, pid=pid_file)
return pid_file_path
return cli_utils.setup_locations(process=EDGE_WORKER_PROCESS_NAME, pid=pid_file)[0]


@dataclass
Expand Down

0 comments on commit f2a3593

Please sign in to comment.