Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

openlineage: adjust log levels #34801

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow/providers/openlineage/extractors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def extract(self) -> OperatorLineage | None:
self.operator.__class__.__module__ + "." + self.operator.__class__.__name__
)
if fully_qualified_class_name in self.disabled_operators:
self.log.warning(
self.log.debug(
f"Skipping extraction for operator {self.operator.task_type} "
"due to its presence in [openlineage] openlineage_disabled_for_operators."
)
Expand Down Expand Up @@ -116,7 +116,7 @@ def _execute_extraction(self) -> OperatorLineage | None:
)
return None
except AttributeError:
self.log.warning(
self.log.debug(
f"Operator {self.operator.task_type} does not have the "
"get_openlineage_facets_on_start method."
)
Expand Down Expand Up @@ -149,5 +149,5 @@ def _get_openlineage_facets(self, get_facets_method, *args) -> OperatorLineage |
"This should not happen."
)
except Exception:
self.log.exception("OpenLineage provider method failed to extract data from provider. ")
self.log.warning("OpenLineage provider method failed to extract data from provider. ")
return None
7 changes: 4 additions & 3 deletions airflow/providers/openlineage/extractors/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def extract_metadata(self, dagrun, task, complete: bool = False, task_instance=N
return task_metadata

except Exception as e:
self.log.exception(
self.log.warning(
"Failed to extract metadata using found extractor %s - %s %s", extractor, e, task_info
)
else:
Expand Down Expand Up @@ -157,7 +157,8 @@ def extract_inlets_and_outlets(
inlets: list,
outlets: list,
):
self.log.debug("Manually extracting lineage metadata from inlets and outlets")
if inlets or outlets:
self.log.debug("Manually extracting lineage metadata from inlets and outlets")
for i in inlets:
d = self.convert_to_ol_dataset(i)
if d:
Expand Down Expand Up @@ -193,5 +194,5 @@ def validate_task_metadata(self, task_metadata) -> OperatorLineage | None:
job_facets=task_metadata.job_facets,
)
except AttributeError:
self.log.error("Extractor returns non-valid metadata: %s", task_metadata)
self.log.warning("Extractor returns non-valid metadata: %s", task_metadata)
return None
2 changes: 1 addition & 1 deletion airflow/providers/openlineage/extractors/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def get_source_code(self, callable: Callable) -> str | None:
# Trying to extract source code of builtin_function_or_method
return str(callable)
except OSError:
self.log.exception(
self.log.warning(
"Can't get source code facet of PythonOperator %s",
self.operator.task_id,
)
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import uuid
from typing import TYPE_CHECKING

import requests.exceptions
import yaml
from openlineage.client import OpenLineageClient, set_producer
from openlineage.client.facet import (
Expand Down Expand Up @@ -115,8 +114,9 @@ def emit(self, event: RunEvent):
redacted_event: RunEvent = self._redacter.redact(event, max_depth=20) # type: ignore[assignment]
try:
return self._client.emit(redacted_event)
except requests.exceptions.RequestException:
self.log.exception(f"Failed to emit OpenLineage event of id {event.run.runId}")
except Exception as e:
self.log.warning("Failed to emit OpenLineage event of id %s", event.run.runId)
self.log.debug("OpenLineage emission failure: %s", e)

def start_task(
self,
Expand Down
12 changes: 6 additions & 6 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
get_airflow_run_facet,
get_custom_facets,
get_job_name,
print_exception,
print_warning,
)
from airflow.utils.timeout import timeout

Expand Down Expand Up @@ -65,7 +65,7 @@ def on_task_instance_running(
task = task_instance.task
dag = task.dag

@print_exception
@print_warning(self.log)
def on_running():
# that's a workaround to detect task running from deferred state
# we return here because Airflow 2.3 needs task from deferred state
Expand Down Expand Up @@ -117,7 +117,7 @@ def on_task_instance_success(self, previous_state, task_instance: TaskInstance,
task.task_id, task_instance.execution_date, task_instance.try_number - 1
)

@print_exception
@print_warning(self.log)
def on_success():
task_metadata = self.extractor_manager.extract_metadata(
dagrun, task, complete=True, task_instance=task_instance
Expand Down Expand Up @@ -145,7 +145,7 @@ def on_task_instance_failed(self, previous_state, task_instance: TaskInstance, s
task.task_id, task_instance.execution_date, task_instance.try_number - 1
)

@print_exception
@print_warning(self.log)
def on_failure():
task_metadata = self.extractor_manager.extract_metadata(
dagrun, task, complete=True, task_instance=task_instance
Expand Down Expand Up @@ -194,14 +194,14 @@ def on_dag_run_running(self, dag_run: DagRun, msg: str):
@hookimpl
def on_dag_run_success(self, dag_run: DagRun, msg: str):
if not self.executor:
self.log.error("Executor have not started before `on_dag_run_success`")
self.log.debug("Executor have not started before `on_dag_run_success`")
return
self.executor.submit(self.adapter.dag_success, dag_run=dag_run, msg=msg)

@hookimpl
def on_dag_run_failed(self, dag_run: DagRun, msg: str):
if not self.executor:
self.log.error("Executor have not started before `on_dag_run_failed`")
self.log.debug("Executor have not started before `on_dag_run_failed`")
return
self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)

Expand Down
4 changes: 0 additions & 4 deletions airflow/providers/openlineage/utils/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# under the License.
from __future__ import annotations

import logging
from collections import defaultdict
from contextlib import closing
from enum import IntEnum
Expand All @@ -34,9 +33,6 @@
from airflow.hooks.base import BaseHook


logger = logging.getLogger(__name__)


class ColumnIndex(IntEnum):
"""Enumerates the indices of columns in information schema view."""

Expand Down
21 changes: 12 additions & 9 deletions airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,15 +397,18 @@ def _is_name_redactable(name, redacted):
return name not in redacted.skip_redact


def print_exception(f):
@wraps(f)
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
log.exception(e)

return wrapper
def print_warning(log):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
log.warning(e)

return wrapper

return decorator


@cache
Expand Down