From cd746c9f23b1d9d6767226aa88275bc75199888c Mon Sep 17 00:00:00 2001 From: Phani Kumar Date: Tue, 20 Jun 2023 19:39:17 +0530 Subject: [PATCH] Add job failure handler --- airflow/providers/google/cloud/operators/bigquery.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index a4f6fcc6b6940..057853d5d826f 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -379,7 +379,12 @@ def execute(self, context: Context) -> None: # type: ignore[override] ), method_name="execute_complete", ) - self.log.info("Current state of job %s is %s", job.job_id, job.state) + self._handle_job_error(job) + + @staticmethod + def _handle_job_error(job: BigQueryJob | UnknownJob) -> None: + if job.error_result: + raise AirflowException(f"BigQuery job {job.job_id} failed: {job.error_result}") def execute_complete(self, context: Context, event: dict[str, Any]) -> None: """