From d696dbef102093ec242d2afa78b0ddc337c3af98 Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware <159135491+sagar-salvi-apptware@users.noreply.github.com> Date: Tue, 24 Sep 2024 00:58:30 +0530 Subject: [PATCH] fix(ingest/mode): add connection timeouts to avoid RemoteDisconnected errors (#11245) --- .../src/datahub/ingestion/source/mode.py | 23 +++++++++++++++---- .../tests/integration/mode/test_mode.py | 10 +++++--- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py index 73427d9084dd3..56b8ce00a4d1f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mode.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py @@ -15,6 +15,8 @@ import yaml from liquid import Template, Undefined from pydantic import Field, validator +from requests.adapters import HTTPAdapter, Retry +from requests.exceptions import ConnectionError from requests.models import HTTPBasicAuth, HTTPError from sqllineage.runner import LineageRunner from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential @@ -127,6 +129,10 @@ class ModeAPIConfig(ConfigModel): max_attempts: int = Field( default=5, description="Maximum number of attempts to retry before failing" ) + timeout: int = Field( + default=40, + description="Timout setting, how long to wait for the Mode rest api to send data before giving up", + ) class ModeConfig(StatefulIngestionConfigBase, DatasetLineageProviderConfigBase): @@ -299,7 +305,15 @@ def __init__(self, ctx: PipelineContext, config: ModeConfig): self.report = ModeSourceReport() self.ctx = ctx - self.session = requests.session() + self.session = requests.Session() + # Handling retry and backoff + retries = 3 + backoff_factor = 10 + retry = Retry(total=retries, backoff_factor=backoff_factor) + adapter = HTTPAdapter(max_retries=retry) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + self.session.auth = HTTPBasicAuth( self.config.token, self.config.password.get_secret_value(), @@ -1469,15 +1483,16 @@ def _get_request_json(self, url: str) -> Dict: multiplier=self.config.api_options.retry_backoff_multiplier, max=self.config.api_options.max_retry_interval, ), - retry=retry_if_exception_type(HTTPError429), + retry=retry_if_exception_type((HTTPError429, ConnectionError)), stop=stop_after_attempt(self.config.api_options.max_attempts), ) @r.wraps def get_request(): try: - response = self.session.get(url) - response.raise_for_status() + response = self.session.get( + url, timeout=self.config.api_options.timeout + ) return response.json() except HTTPError as http_error: error_response = http_error.response diff --git a/metadata-ingestion/tests/integration/mode/test_mode.py b/metadata-ingestion/tests/integration/mode/test_mode.py index 7ea6597460de2..ce7533d5611e4 100644 --- a/metadata-ingestion/tests/integration/mode/test_mode.py +++ b/metadata-ingestion/tests/integration/mode/test_mode.py @@ -45,8 +45,12 @@ def __init__(self, error_list, status_code): def json(self): return self.json_data - def get(self, url): + def mount(self, prefix, adaptor): + return self + + def get(self, url, timeout=40): self.url = url + self.timeout = timeout response_json_path = f"{test_resources_dir}/setup/{JSON_RESPONSE_MAP.get(url)}" with open(response_json_path) as file: data = json.loads(file.read()) @@ -74,7 +78,7 @@ def mocked_requests_failure(*args, **kwargs): @freeze_time(FROZEN_TIME) def test_mode_ingest_success(pytestconfig, tmp_path): with patch( - "datahub.ingestion.source.mode.requests.session", + "datahub.ingestion.source.mode.requests.Session", side_effect=mocked_requests_sucess, ): pipeline = Pipeline.create( @@ -111,7 +115,7 @@ def test_mode_ingest_success(pytestconfig, tmp_path): @freeze_time(FROZEN_TIME) def test_mode_ingest_failure(pytestconfig, tmp_path): with patch( - "datahub.ingestion.source.mode.requests.session", + "datahub.ingestion.source.mode.requests.Session", side_effect=mocked_requests_failure, ): global test_resources_dir