Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to disable openlineage at operator level #33685

Merged
merged 11 commits into from
Aug 29, 2023

Conversation

RNHTTR
Copy link
Contributor

@RNHTTR RNHTTR commented Aug 24, 2023

It might be valuable to disable OpenLineage extraction at the Operator level instead of at the Airflow level. This PR adds an environment variable to check for Operators for which OpenLineage extraction has been disabled.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@uranusjr
Copy link
Member

How is this different from setting this in config?

@RNHTTR
Copy link
Contributor Author

RNHTTR commented Aug 24, 2023

The current config/env var openlineage_disabled is a boolean that disables OpenLineage for an entire Airflow instance. This allows users to disable OpenLineage for just a subset of Operators.

@RNHTTR
Copy link
Contributor Author

RNHTTR commented Aug 24, 2023

My local shows the precommit for mypy for providers passed...

image

🤔

@potiuk
Copy link
Member

potiuk commented Aug 24, 2023

My local shows the precommit for mypy for providers passed...

image 🤔

Happens. Mypy uses heuristics that "guesses" type of the variables. And sometimes it gets it wrong when it does not run on "all files". Local version of pre-commit runs only on the files that are affected by commit and MyPy often will pull the other files and import them to feed it's heuristcs. But not always. So sometimes mypy is not able to guess correctly the type if it is not given explicitly all files.

Also sometimes the result depend if you use the same breeze image or whether you have an old one - because you might have older version of dependencies.

Also sometimes it depends on the cache you have locally built. Sometimes that cache is just not invalidated properly.

And to add up, all this is succintly explained JUST below the error you see in CI. Including precise instructions on what you should do to reproduce the failure you see locally.

image

I wonder (maybe you have an idea @RNHTTR ) how to better communicate it than the yellow message above :) ? Any ideas?

@Taragolis
Copy link
Contributor

Taragolis commented Aug 24, 2023

This allows users to disable OpenLineage for just a subset of Operators.

Is it duplicate of standard configurations Airflow Env Var?
AIRFLOW__OPENLINEAGE__OPENLINEAGE_DISABLED_FOR_OPERATORS
upd
AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS

@RNHTTR
Copy link
Contributor Author

RNHTTR commented Aug 24, 2023

This allows users to disable OpenLineage for just a subset of Operators.

Is it duplicate of standard configurations Airflow Env Var? AIRFLOW__OPENLINEAGE__OPENLINEAGE_DISABLED_FOR_OPERATORS upd AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS

I don't think I understand; AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS doesn't exist unless there's some wild cosmic coincidence and I named it the same :)

@potiuk
Copy link
Member

potiuk commented Aug 24, 2023

This allows users to disable OpenLineage for just a subset of Operators.

Is it duplicate of standard configurations Airflow Env Var? AIRFLOW__OPENLINEAGE__OPENLINEAGE_DISABLED_FOR_OPERATORS upd AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS

I don't think I understand; AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS doesn't exist unless there's some wild cosmic coincidence and I named it the same :)

@RHATTR: When yo do this (note I removed the "openlineage_" from your code to avoid double-openlineage in variable):

conf.get("openlineage", "disabled_for_operators", fallback=""),

Then conf.get will automatically read stuff from AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS - like for all the other config values.

What @Taragolis is saying is that:

   _openlineage_disabled_for_operators = os.getenv(
        "OPENLINEAGE_DISABLED_FOR_OPERATORS",
        conf.get("openlineage", "openlineage_disabled_for_operators", fallback=""),
    )

can be replaced with:

   _openlineage_disabled_for_operators = conf.get("openlineage", "disabled_for_operators")

And it will:

  • automatically use the defaults defined in provider.yaml (so no need for a fallback)
  • automaticallly read configuration from AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS env var

No need to do all the os.getenv stuff nor define a new variable for it (nor even set fallback if you configure default value for it in provider.yaml).

@RNHTTR RNHTTR force-pushed the disable-openlineage-for-operator branch 3 times, most recently from 59aaef6 to 6aa2c52 Compare August 26, 2023 13:31
@eladkal eladkal changed the title Disable openlineage for operator Allow to disable openlineage at operator level Aug 28, 2023
@RNHTTR RNHTTR force-pushed the disable-openlineage-for-operator branch from 6aa2c52 to 6f3f9ac Compare August 28, 2023 13:20
@potiuk potiuk force-pushed the disable-openlineage-for-operator branch from 6f3f9ac to 26298b7 Compare August 28, 2023 20:01
@potiuk
Copy link
Member

potiuk commented Aug 28, 2023

This is an interesting/subtle issue you see here in the tests....

I believe the problem is that at the time of openlineage import in tests, ProvidersManager have not yet extracted the configuration for openlineage provider. (it is intialized by pytest automated fixture when the test is run).

That's why - I think - the default value is missing and you are getting [openlineage/disabyled_for_operators] not found in config"

The solution to that is to extract this one:

    _openlineage_disabled_for_operators = conf.get("openlineage", "disabled_for_operators")
    openlineage_disabled_for_operators = set(
        operator.strip() for operator in _openlineage_disabled_for_operators.split(";")
    )

to become a cached_property returning the list of disabled operators:

@cached_property
def disabled_operators() -> set[str]:
    return set(
        operator.strip() for operator in conf.get("openlineage", "disabled_for_operators").split(";")
    )

then change:

if fully_qualified_class_name in self.openlineage_disabled_for_operators:

into:

if fully_qualified_class_name in self.disabled_operators

This will change the list to be retrieved only after provider manager already loaded defaults for provider configuration.

That should solve the test failure.

@RNHTTR RNHTTR force-pushed the disable-openlineage-for-operator branch from 26298b7 to 0990ab2 Compare August 28, 2023 21:44
@potiuk potiuk merged commit 0d49d1f into apache:main Aug 29, 2023
42 checks passed
@RNHTTR RNHTTR deleted the disable-openlineage-for-operator branch August 29, 2023 13:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants