Skip to content

Commit

Permalink
Update cloud test logic (#245)
Browse files Browse the repository at this point in the history
We update the cloud test logic with the following points:
- We add a retry mechanism to avoid transient network error
- We exit the whole pipeline if there is an error in the index creation process
- Fix a bug in index settings for no_model index
  • Loading branch information
wanliAlex authored Jul 18, 2024
1 parent 32db554 commit 7562b24
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 18 deletions.
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ numpy
pytest
dataclasses
pydantic>=2.0.0
requests_mock
requests_mock
tenacity
2 changes: 1 addition & 1 deletion tests/cloud_test_logic/cloud_test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class CloudTestIndex(str, Enum):
"model": "no_model",
"modelProperties": {
"type": "no_model",
"dimensions": "512"
"dimensions": 512
},
}
}
38 changes: 32 additions & 6 deletions tests/cloud_test_logic/delete_all_cloud_test_indexes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
import os

import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

import marqo
from marqo.enums import IndexStatus
from marqo.errors import MarqoWebError


@retry(stop=stop_after_attempt(5), # Stop after 5 attempts
wait=wait_exponential(multiplier=1, min=4, max=10), # Wait exponentially between retries
retry=retry_if_exception_type(requests.exceptions.RequestException)) # Retry on network-related exceptions
def fetch_marqo_indexes(client: marqo.Client):
"""A function to fetch all Marqo indexes with retries to handle transient network errors and Marqo API errors"""
response = requests.get(f"{client.config.instance_mapping.get_control_base_url()}/v2/indexes",
headers={"x-api-key": client.config.api_key})
response.raise_for_status() # Raise an exception for HTTP errors
return response


@retry(stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type((requests.exceptions.RequestException, MarqoWebError)))
def fetch_marqo_index(client: marqo.Client, index_name: str):
"""A function to fetch a Marqo index by name with retries to handle transient network errors and Marqo API errors"""
return client.index(index_name)


def delete_all_test_indices(wait_for_readiness=False):
Expand Down Expand Up @@ -37,19 +61,21 @@ def delete_all_test_indices(wait_for_readiness=False):
print("Indices to delete: ", indices_to_delete)
print("Marqo Cloud deletion responses:")
for index_name in indices_to_delete:
index = client.index(index_name)
if index.get_status()["indexStatus"] == marqo.enums.IndexStatus.READY:
index = fetch_marqo_index(client, index_name)
if index.get_status()["indexStatus"] == IndexStatus.READY:
print(index_name, index.delete(wait_for_readiness=False))
elif index.get_status()["indexStatus"] == 'DELETING':
print(f"Index {index_name} is already being deleted")
elif index.get_status()["indexStatus"] == IndexStatus.DELETED:
print(f"Index {index_name} is already deleted")
elif index.get_status()["indexStatus"] == IndexStatus.FAILED:
print(f"Index {index_name} has failed status, deleting anyway")
index.delete(wait_for_readiness=False)
else:
print(f"Index {index_name} is not ready for deletion, status: {index.get_status()['indexStatus']}")
if wait_for_readiness:
max_retries = 100
attempt = 0
while indices_to_delete:
resp = requests.get(f"{client.config.instance_mapping.get_control_base_url()}/v2/indexes",
headers={"x-api-key": client.config.api_key})
resp = fetch_marqo_indexes(client)
resp_json = resp.json()
all_index_names = [index["indexName"] for index in resp_json['results']]
for index_for_deletion_name in indices_to_delete:
Expand Down
4 changes: 4 additions & 0 deletions tests/cloud_test_logic/populate_indices_for_cloud_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@


def populate_indices():
"""Create indices in Marqo Cloud Account with the settings specified in cloud_test_index.py.
Raise MarqoWebError if any of the index creation fails."""
populate_indices_start_time = time.time()
test_uniqueness_id = os.environ.get("MQ_TEST_RUN_IDENTIFIER", "")

Expand Down Expand Up @@ -40,6 +43,7 @@ def populate_indices():
)
except MarqoWebError as e:
print(f"Attempting to create index {index_name} resulting in error {e}")
raise e


# Around 30 min:
Expand Down
12 changes: 9 additions & 3 deletions tests/cloud_test_logic/run_cloud_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
import signal
import sys

import pytest

from create_and_set_cloud_unique_run_identifier import set_unique_run_identifier
from delete_all_cloud_test_indexes import delete_all_test_indices
from marqo.errors import MarqoWebError
from populate_indices_for_cloud_tests import populate_indices

tests_specific_kwargs = {
Expand Down Expand Up @@ -48,17 +51,20 @@ def convert_string_to_boolean(string_value):
os.environ['MQ_TEST_RUN_IDENTIFIER'] = 'cinteg'
print(f"Using unique identifier: {os.environ['MQ_TEST_RUN_IDENTIFIER']}")
if tests_specific_kwargs['create-indexes']:
populate_indices()
try:
populate_indices()
except MarqoWebError as e:
print("Detected an error while creating indices, deleting all indices and exiting the workflow.")
delete_all_test_indices(wait_for_readiness=True)
sys.exit(1)
print(f"All indices has been created, proceeding to run tests with pytest. Arguments: {sys.argv[1:]}")

import pytest
pytest_args = ['tests/', '-m', 'not ignore_during_cloud_tests'] + sys.argv[1:]
print("running integration tests with args:", pytest_args)
pytest_exit_code = pytest.main(pytest_args)
if pytest_exit_code != 0:
raise RuntimeError(f"Pytest failed with exit code: {pytest_exit_code}")
print("All tests has been executed successfully")

if tests_specific_kwargs['delete-indexes']:
delete_all_test_indices(wait_for_readiness=True)
except Exception as e:
Expand Down
14 changes: 7 additions & 7 deletions tests/v2_tests/test_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,14 @@ def test_no_marqo1_recommendation_if_major_is_not_1(self):
def test_warning_not_printed_for_ready_index(self):
if not self.client.config.is_marqo_cloud:
self.skipTest("Test only applicable for Marqo Cloud")
with mock.patch("marqo.index.mq_logger.warning") as mock_warning:
for cloud_test_index_to_use, _ in self.test_cases:
test_index_name = self.get_test_index_name(
cloud_test_index_to_use=cloud_test_index_to_use,
open_source_test_index_name=None
)
for cloud_test_index_to_use, _ in self.test_cases:
test_index_name = self.get_test_index_name(
cloud_test_index_to_use=cloud_test_index_to_use,
open_source_test_index_name=None
)
with mock.patch("marqo.index.mq_logger.warning") as mock_warning:
self.client.index(test_index_name)
mock_warning.assert_not_called()
mock_warning.assert_not_called()

def test_warning_not_printed_for_not_ready_index(self):
if not self.client.config.is_marqo_cloud:
Expand Down

0 comments on commit 7562b24

Please sign in to comment.