Skip to content

Commit

Permalink
Respect soft_fail parameter in GlacierJobOperationSensor (#34557)
Browse files Browse the repository at this point in the history
  • Loading branch information
utkarsharma2 committed Sep 22, 2023
1 parent 4c0459d commit ed3df3d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
8 changes: 6 additions & 2 deletions airflow/providers/amazon/aws/sensors/glacier.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from functools import cached_property
from typing import TYPE_CHECKING, Any, Sequence

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.providers.amazon.aws.hooks.glacier import GlacierHook
from airflow.sensors.base import BaseSensorOperator

Expand Down Expand Up @@ -98,6 +98,10 @@ def poke(self, context: Context) -> bool:
self.log.warning("Code status: %s", response["StatusCode"])
return False
else:
raise AirflowException(
# TODO: remove this if block when min_airflow_version is set to higher than 2.7.1
message = (
f'Sensor failed. Job status: {response["Action"]}, code status: {response["StatusCode"]}'
)
if self.soft_fail:
raise AirflowSkipException(message)
raise AirflowException(message)
14 changes: 13 additions & 1 deletion tests/providers/amazon/aws/sensors/test_glacier.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import pytest

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.providers.amazon.aws.sensors.glacier import GlacierJobOperationSensor, JobStatus

SUCCEEDED = "Succeeded"
Expand Down Expand Up @@ -61,6 +61,18 @@ def test_poke_fail(self, _):
self.op.poke(None)
assert "Sensor failed" in str(ctx.value)

@pytest.mark.parametrize(
"soft_fail, expected_exception", ((False, AirflowException), (True, AirflowSkipException))
)
@mock.patch("airflow.providers.amazon.aws.hooks.glacier.GlacierHook.describe_job")
def test_fail_poke(self, describe_job, soft_fail, expected_exception):
self.op.soft_fail = soft_fail
response = {"Action": "some action", "StatusCode": "Failed"}
message = f'Sensor failed. Job status: {response["Action"]}, code status: {response["StatusCode"]}'
with pytest.raises(expected_exception, match=message):
describe_job.return_value = response
self.op.poke(context={})


class TestSensorJobDescription:
def test_job_status_success(self):
Expand Down

0 comments on commit ed3df3d

Please sign in to comment.