Skip to content

Commit

Permalink
fix(ingest/mode): add connection timeouts to avoid RemoteDisconnected…
Browse files Browse the repository at this point in the history
… errors (#11245)
  • Loading branch information
sagar-salvi-apptware committed Sep 23, 2024
1 parent 3c1dcf9 commit d696dbe
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 7 deletions.
23 changes: 19 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import yaml
from liquid import Template, Undefined
from pydantic import Field, validator
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import ConnectionError
from requests.models import HTTPBasicAuth, HTTPError
from sqllineage.runner import LineageRunner
from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential
Expand Down Expand Up @@ -127,6 +129,10 @@ class ModeAPIConfig(ConfigModel):
max_attempts: int = Field(
default=5, description="Maximum number of attempts to retry before failing"
)
timeout: int = Field(
default=40,
description="Timout setting, how long to wait for the Mode rest api to send data before giving up",
)


class ModeConfig(StatefulIngestionConfigBase, DatasetLineageProviderConfigBase):
Expand Down Expand Up @@ -299,7 +305,15 @@ def __init__(self, ctx: PipelineContext, config: ModeConfig):
self.report = ModeSourceReport()
self.ctx = ctx

self.session = requests.session()
self.session = requests.Session()
# Handling retry and backoff
retries = 3
backoff_factor = 10
retry = Retry(total=retries, backoff_factor=backoff_factor)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)

self.session.auth = HTTPBasicAuth(
self.config.token,
self.config.password.get_secret_value(),
Expand Down Expand Up @@ -1469,15 +1483,16 @@ def _get_request_json(self, url: str) -> Dict:
multiplier=self.config.api_options.retry_backoff_multiplier,
max=self.config.api_options.max_retry_interval,
),
retry=retry_if_exception_type(HTTPError429),
retry=retry_if_exception_type((HTTPError429, ConnectionError)),
stop=stop_after_attempt(self.config.api_options.max_attempts),
)

@r.wraps
def get_request():
try:
response = self.session.get(url)
response.raise_for_status()
response = self.session.get(
url, timeout=self.config.api_options.timeout
)
return response.json()
except HTTPError as http_error:
error_response = http_error.response
Expand Down
10 changes: 7 additions & 3 deletions metadata-ingestion/tests/integration/mode/test_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ def __init__(self, error_list, status_code):
def json(self):
return self.json_data

def get(self, url):
def mount(self, prefix, adaptor):
return self

def get(self, url, timeout=40):
self.url = url
self.timeout = timeout
response_json_path = f"{test_resources_dir}/setup/{JSON_RESPONSE_MAP.get(url)}"
with open(response_json_path) as file:
data = json.loads(file.read())
Expand Down Expand Up @@ -74,7 +78,7 @@ def mocked_requests_failure(*args, **kwargs):
@freeze_time(FROZEN_TIME)
def test_mode_ingest_success(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.session",
"datahub.ingestion.source.mode.requests.Session",
side_effect=mocked_requests_sucess,
):
pipeline = Pipeline.create(
Expand Down Expand Up @@ -111,7 +115,7 @@ def test_mode_ingest_success(pytestconfig, tmp_path):
@freeze_time(FROZEN_TIME)
def test_mode_ingest_failure(pytestconfig, tmp_path):
with patch(
"datahub.ingestion.source.mode.requests.session",
"datahub.ingestion.source.mode.requests.Session",
side_effect=mocked_requests_failure,
):
global test_resources_dir
Expand Down

0 comments on commit d696dbe

Please sign in to comment.