Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQueryCheckOperator skips value and error check when in deferrable mode but not deferred #37885

Closed
2 tasks done
kacpermuda opened this issue Mar 4, 2024 · 1 comment · Fixed by #38408
Closed
2 tasks done
Assignees
Labels
area:providers good first issue kind:bug This is a clearly a bug provider:google Google (including GCP) related issues

Comments

@kacpermuda
Copy link
Contributor

kacpermuda commented Mar 4, 2024

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

10.2.0 and above

Apache Airflow version

main branch

Operating System

MacOS

Deployment

Other

Deployment details

BREEZE

What happened

This PR optimized the defferable mode in BigQueryCheckOperator and introduced this bug. When the operator is in defferable mode, but the job finishes quickly enough and is not deffered (it does not fall into if job.running() condition), no error is raised when the job fails and no value check is performed.

Similar optimization has been made here and then was fixed here.

What you think should happen instead

Operator should raise an error when job fails and check te value returned by the job, even without being deffered.

I think something like this similar to what was made in BigQueryValueCheckOperator should be enough:

            if job.running():
                self.defer(
                    timeout=self.execution_timeout,
                    trigger=BigQueryCheckTrigger(
                        conn_id=self.gcp_conn_id,
                        job_id=job.job_id,
                        project_id=hook.project_id,
                        location=self.location or hook.location,
                        poll_interval=self.poll_interval,
                        impersonation_chain=self.impersonation_chain,
                    ),
                    method_name="execute_complete",
                )
            self._handle_job_error(job)
            # job.result() returns a RowIterator. Mypy expects an instance of SupportsNext[Any] for
            # the next() call which the RowIterator does not resemble to. Hence, ignore the arg-type error.
            records = next(job.result())  # type: ignore[arg-type]
            self._validate_records(records)  # type: ignore[attr-defined]
            self.log.info("Current state of job %s is %s", job.job_id, job.state)

    @staticmethod
    def _handle_job_error(job: BigQueryJob | UnknownJob) -> None:
        if job.error_result:
            raise AirflowException(f"BigQuery job {job.job_id} failed: {job.error_result}")

    def _validate_records(self, records) -> None:
        if not records:
            raise AirflowException("The query returned empty results")
        elif not all(records):
            self._raise_exception(  # type: ignore[attr-defined]
                f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}"
            )

How to reproduce

Run a really quick query in defferable mode, if it fails no error will be raised, if not, the values will not be checked.

Anything else

I'll make a PR in free time, but maybe somebody will pick it up by then.

Also, there are no tests checking that operator (both CheckOperator and CheckValueOperator) raised an error, when in defferable mode but not deffered, so something like this should be added:

    @pytest.mark.db_test
    @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
    def test_bigquery_value_check_operator_async_finish_with_error_before_deferred(
        self, mock_hook, create_task_instance_of_operator
    ):
        job_id = "123456"
        hash_ = "hash"
        real_job_id = f"{job_id}_{hash_}"

        mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=True)
        mock_hook.return_value.insert_job.return_value.running.return_value = False

        ti = create_task_instance_of_operator(
            BigQueryValueCheckOperator,
            dag_id="dag_id",
            task_id="check_value",
            sql="SELECT COUNT(*) FROM Any",
            pass_value=2,
            use_legacy_sql=True,
            deferrable=True,
        )

        with pytest.raises(AirflowException) as exc:
            ti.task.execute(MagicMock())

        assert str(exc.value) == f"BigQuery job {real_job_id} failed: True"

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@kacpermuda kacpermuda added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Mar 4, 2024
Copy link

boring-cyborg bot commented Mar 4, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers good first issue kind:bug This is a clearly a bug provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants