Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix BigQueryValueCheckOperator deferrable mode optimisation #34018

Merged

Conversation

pankajkoti
Copy link
Member

PR #31872 tried to optimise the deferrable mode in
BigQueryValueCheckOperator. However for deciding on
whether to defer it just checked the job status but did not
actually verified the passed value to check for and
returned a success prematurely. This PR adds on the missing
logic with the optimisation to check and compare the pass
value and tolerations.

closes: #34010


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

PR apache#31872 tried to optimise the deferrable mode in BigQueryValueCheckOperator.
However for deciding on whether to defer it just checked the
job status but did not actually verified the passed value
to check for and returned a success prematurely.
This PR adds on the missing logic with the optimisation to check
and compare the pass value and tolerations.

closes: apache#34010
@pankajkoti pankajkoti force-pushed the fix-big-query-value-check-deferrable branch from 231c9e0 to f340b32 Compare September 1, 2023 17:03
# job.result() returns a RowIterator. Mypy expects an instance of SupportsNext[Any] for
# the next() call which the RowIterator does not resemble to. Hence, ignore the arg-type error.
records = next(job.result()) # type: ignore[arg-type]
self.check_value(records)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're checking the values here, can we remove the check_value call in the BigQueryValueCheckTrigger?

hook.value_check(self.sql, self.pass_value, records, self.tolerance)

I'm not really sure why the check_value there wasn't picking up the failed status 🤔 I'd assume that if the check failed it would raise an exception which would then be caught and returned as a failed trigger status:

except Exception as e:
self.log.exception("Exception occurred while checking for query completion")
yield TriggerEvent({"status": "error", "message": str(e)})
return

Copy link
Member Author

@pankajkoti pankajkoti Sep 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave the DAG a local run and saw that the task was not getting deferred at all, which means it does not defer after setting deferrrable=True.

This happens because of


The running state of the job evaluates to False and it does not defer. Had it deferred, the trigger would check it well and pass on the execution to execute_complete in the operator.

But in scenarios where it does not defer, I have added this check later to actually achive what the operator is meant to do :)

Sorry I missed explaining this earlier, hope it makes sense now.

And yes, we cannot remove the check from the Trigger code as when it defers it goes to Triggererer where it checks and then returns to execute_complete

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting, nice catch!

@potiuk potiuk merged commit d757f6a into apache:main Sep 3, 2023
42 checks passed
pankajkoti added a commit to astronomer/airflow that referenced this pull request Sep 10, 2023
With PR apache#34018, the google provider depends on the common-sql
provider changes that are getting released in the 1.7.2 version
of the common-sql provider. Hence, bump the minimum common-sql
provider version to 1.7.2 in the Google provider dependencies.
potiuk pushed a commit that referenced this pull request Sep 10, 2023
With PR #34018, the google provider depends on the common-sql
provider changes that are getting released in the 1.7.2 version
of the common-sql provider. Hence, bump the minimum common-sql
provider version to 1.7.2 in the Google provider dependencies.
@nathadfield
Copy link
Collaborator

@pankajkoti I just tried out the new provider with the example code I provided on the original issue.

However, the result doesn't seem to be quite right as the error is reporting an attribute error on check_value.

[2023-09-12, 08:41:22 UTC] {taskinstance.py:1935} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/airflow/providers/google/cloud/operators/bigquery.py", line 449, in execute
    self.check_value(records)
AttributeError: 'BigQueryValueCheckOperator' object has no attribute 'check_value'

@pankajkoti
Copy link
Member Author

Hi @nathadfield you would also need to upgrade the apache-airflow-providers-common-sql to the latest release 1.7.2

Because the check_value function was added in the same PR and has been added in the latest release of common-sql provider

@nathadfield
Copy link
Collaborator

@pankajkoti Is that not a requirement of Google provider 10.8?

@pankajkoti
Copy link
Member Author

@nathadfield I am not exactly sure if that should be a requirement or not.

I tried to add it as a requirement while testing the RC in PR #34257

Could you please check the discussion on the PR and see if it sounds alright?

cc: @eladkal

@nathadfield
Copy link
Collaborator

nathadfield commented Sep 12, 2023

Instinctively it feels like it should be a requirement otherwise how would anyone know that they also need to install the common-sql provider?

In fact, for us, we don't even specify the common-sql provider and just accept what get's delivered as part of the Astro runtime image.

@nathadfield
Copy link
Collaborator

Maybe I should add [common.sql] to ensure that the correct version is picked up then? As per https://airflow.apache.org/docs/apache-airflow-providers-google/stable/index.html#cross-provider-package-dependencies

@potiuk
Copy link
Member

potiuk commented Sep 12, 2023

You have three options:

  1. You can have requirement and specify minimum version as requirement in provider.yaml. This is already done in a number of providers - see here:
    https://github.com/apache/airflow/blob/main/airflow/providers/apache/hive/provider.yaml#L62

This is when the dependency is "crucial" for the package to work.

  1. The cross-provider deps are added automatically (we see which other provider packages are imported from each provider package). You don't have to do anything., but then the extra will be added without min version

  2. For providers where the depdendcy is really "optional" (i.e. most functionality works but only some is needed) but you would like to specify min versio we have always the option to specify the addional-extras:

https://github.com/apache/airflow/blob/main/airflow/providers/amazon/provider.yaml#L702

In this case amazon providers has apache-airflow-providers-amazon[cncf.kubernetes] extra and when you install the provider will that extra it will install apache-airflow-providers-cncf-kubernetes>=7.2.0

The last one is pretty "soft" - i.e. it only works when you use the extra directly - but it is a way to store and present the dependency on min-version and even automate it when you use extra. And this is the best we can do. pip does not have a way to specify the condition if you install this package, the minimum required verson is this . There is simply no such feature. You can either have a strict requirement that both installs and forces the min version or the extra when - if used, applies the soft limit during installation only.

Maybe some day it will be added, but currently there is no way to force min version for optional dependency.

@potiuk
Copy link
Member

potiuk commented Sep 12, 2023

BTW. If someone would like to take what I just described above and find the right place in our contributing documentation to add it and describe in better words - absolutely do not hesitate @pankajkoti @nathadfield -- it seems that we need some kind of description about it, as examples we have in current providers are not enough, it's just a matter of finding the right place and wording it in the way that it will be easy for others to follow it :).

Feel free :)

@nathadfield
Copy link
Collaborator

Actually, it looks like the dependencies in provider.yaml were changed in the last few days but this is not part of v10.8.

https://github.com/apache/airflow/blob/providers-google/10.8.0/airflow/providers/google/provider.yaml#L78
https://github.com/apache/airflow/blob/main/airflow/providers/google/provider.yaml#L78

@pankajkoti
Copy link
Member Author

Thank you @potiuk

Last week and this week has been real tight due to a team offsite, personal and office work, I have added it to my backlog and soon will try to enhance the docs around this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:common-sql provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

BigQueryValueCheckOperator doesn't respect pass_value in deferrable mode
4 participants