Skip to content

Commit

Permalink
post debug changes to tests and new run env in tox
Browse files Browse the repository at this point in the history
  • Loading branch information
danyilq committed Aug 6, 2023
1 parent e7f514d commit 7b2491c
Show file tree
Hide file tree
Showing 22 changed files with 424 additions and 358 deletions.
12 changes: 7 additions & 5 deletions src/marqo/cloud_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
from marqo.marqo_logging import mq_logger


def cloud_wait_for_index_status(req, index_name, status):
creation = req.get(f"indexes/{index_name}/status")
while creation['index_status'] != status:
def cloud_wait_for_index_status(req , index_name: str, status):
""" Wait for index to be created on Marqo Cloud by checking
it's status every 10 seconds until it becomes expected value"""
current_status = req.get(f"indexes/{index_name}/status")
while current_status['index_status'] != status:
time.sleep(10)
creation = req.get(f"indexes/{index_name}/status")
mq_logger.info(f"Index creation status: {creation['index_status']}")
current_status = req.get(f"indexes/{index_name}/status")
mq_logger.info(f"Index creation status: {current_status['index_status']}")
mq_logger.info("Index created successfully")
return True
10 changes: 5 additions & 5 deletions src/marqo/index.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import functools
import json
import logging
import pprint
import time

Expand Down Expand Up @@ -55,14 +54,16 @@ def __init__(
if config.is_marqo_cloud:
try:
if self.get_status()["index_status"] != IndexStatus.CREATED:
logging.warning(f"Index {index_name} is not ready. Status: {self.get_status()}, operations may fail.")
mq_logger.warning(f"Index {index_name} is not ready. Status: {self.get_status()}. Common operations, "
f"such as search and add_documents, may fail until the index is ready. "
f"Please check `mq.index('{index_name}').get_status()` for the index's status. "
f"Skipping version check.")
skip_version_check = True
except Exception as e:
skip_version_check = True
mq_logger.warning(f"Failed to get index status for index {index_name}. Skipping version check. Error: {e}")
if not skip_version_check:
self._marqo_minimum_supported_version_check()
else:
logging.warning("Version check is skipped because index is not ready yet.")

def delete(self) -> Dict[str, Any]:
"""Delete the index.
Expand Down Expand Up @@ -435,7 +436,6 @@ def _add_docs_organiser(
f"docs (server unbatched), for an average of {(res['processingTimeMs'] / (1000 * num_docs)):.3f}s per doc.")
if 'errors' in res and res['errors']:
mq_logger.info(error_detected_message)

if errors_detected:
mq_logger.info(error_detected_message)
total_add_docs_time = timer() - t0
Expand Down
96 changes: 51 additions & 45 deletions tests/marqo_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@ def wrapper(self, *args, **kwargs):
return decorator


def create_settings_hash(settings_dict, kwargs):
combined_dict = {**settings_dict, **kwargs}
combined_str = ''.join(f"{key}{value}" for key, value in combined_dict.items())
crc32_hash = zlib.crc32(combined_str.encode())
short_hash = hex(crc32_hash & 0xffffffff)[2:][
:10] # Take the first 10 characters of the hexadecimal representation
print(f"Created index with settings hash: {short_hash} for settings: {combined_dict}")
return short_hash


class MarqoTestCase(TestCase):

@classmethod
Expand All @@ -115,10 +125,7 @@ def setUpClass(cls) -> None:
api_key = os.environ.get("MARQO_API_KEY", None)
if (api_key):
local_marqo_settings["api_key"] = api_key
cls.index_suffix = os.environ.get("MARQO_INDEX_SUFFIX", None)
if not cls.index_suffix:
os.environ["MARQO_INDEX_SUFFIX"] = str(uuid.uuid4())[:8]
cls.index_suffix = os.environ["MARQO_INDEX_SUFFIX"]
cls.index_suffix = os.environ.get("MARQO_INDEX_SUFFIX", "")
cls.client_settings = local_marqo_settings
cls.authorized_url = cls.client_settings["url"]
cls.generic_test_index_name = 'test-index'
Expand All @@ -133,32 +140,35 @@ def tearDownClass(cls) -> None:
"""
client = marqo.Client(**cls.client_settings)
for index in client.get_indexes()['results']:
if not client.config.is_marqo_cloud:
try:
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')
if index.index_name.startswith(cls.generic_test_index_name):
if not client.config.is_marqo_cloud:
try:
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')

def setUp(self) -> None:
self.client = Client(**self.client_settings)
for index in self.client.get_indexes()['results']:
if not self.client.config.is_marqo_cloud:
try:
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')
else:
self.cleanup_documents_from_all_indices()
if self.client.config.is_marqo_cloud:
self.cleanup_documents_from_all_indices()
else:
for index in self.client.get_indexes()['results']:
if index.index_name.startswith(self.generic_test_index_name):
try:
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')

def tearDown(self) -> None:
for index in self.client.get_indexes()['results']:
if not self.client.config.is_marqo_cloud:
try:
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')
else:
self.cleanup_documents_from_all_indices()
if self.client.config.is_marqo_cloud:
self.cleanup_documents_from_all_indices()
else:
for index in self.client.get_indexes()['results']:
if index.index_name.startswith(self.generic_test_index_name):
try:
index.delete()
except marqo.errors.MarqoApiError as e:
logging.debug(f'received error `{e}` from index deletion request.')

def warm_request(self, func, *args, **kwargs):
'''
Expand All @@ -170,33 +180,28 @@ def warm_request(self, func, *args, **kwargs):
func(*args, **kwargs)

def create_cloud_index(self, index_name, settings_dict=None, **kwargs):
def create_settings_hash():
combined_dict = {**settings_dict, **kwargs}
combined_str = ''.join(f"{key}{value}" for key, value in combined_dict.items())
crc32_hash = zlib.crc32(combined_str.encode())
short_hash = hex(crc32_hash & 0xffffffff)[2:][
:10] # Take the first 10 characters of the hexadecimal representation
print(f"Created index with settings hash: {short_hash} for settings: {combined_dict}")
return short_hash

client = marqo.Client(**self.client_settings)
settings_dict = settings_dict if settings_dict else {}
index_name = f"{index_name}-{self.index_suffix}"
if settings_dict or kwargs:
index_name = f"{index_name}-{create_settings_hash()}"
index_name = f"{index_name}-{create_settings_hash(settings_dict, kwargs)}"
settings_dict.update({
"inference_type": "marqo.CPU", "storage_class": "marqo.basic", "model": "hf/all_datasets_v4_MiniLM-L6"
})
while True:
try:
if client.http.get(f"/indexes/{index_name}/status")["index_status"] == "READY":
break
except Exception as e:
pass
try:
status = client.http.get(f"/indexes/{index_name}/status")["index_status"]
if status == "CREATING":
while status == "CREATING":
time.sleep(10)
status = client.http.get(f"/indexes/{index_name}/status")["index_status"]
if status != "READY":
self.client.create_index(index_name, settings_dict=settings_dict, **kwargs)
except Exception as e:
self.client.create_index(index_name, settings_dict=settings_dict, **kwargs)
return index_name

def create_test_index(self, index_name, settings_dict=None, **kwargs):
def create_test_index(self, index_name: str, settings_dict: dict = None, **kwargs):
"""Create a test index with the given name and settings and triggers specific logic if index is cloud index"""
client = marqo.Client(**self.client_settings)
if client.config.is_marqo_cloud:
index_name = self.create_cloud_index(index_name, settings_dict, **kwargs)
Expand All @@ -208,9 +213,10 @@ def cleanup_documents_from_all_indices(self):
client = marqo.Client(**self.client_settings)
indexes = client.get_indexes()
for index in indexes['results']:
if self.index_suffix in index.index_name.split('-'):
if index.index_name.startswith(self.generic_test_index_name) and \
self.index_suffix in index.index_name.split('-'):
if client.http.get(f"/indexes/{index.index_name}/status")["index_status"] == "READY":
docs_to_delete = [i['_id'] for i in index.search("")['hits']]
docs_to_delete = [i['_id'] for i in index.search("", limit=100)['hits']]
while docs_to_delete:
index.delete_documents(docs_to_delete, auto_refresh=True)
docs_to_delete = [i['_id'] for i in index.search("")['hits']]
docs_to_delete = [i['_id'] for i in index.search("", limit=100)['hits']]
Empty file added tests/scripts/__init__.py
Empty file.
8 changes: 8 additions & 0 deletions tests/scripts/create_test_suffix.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import os
import uuid


def set_index_suffix():
index_suffix = os.environ.get("MARQO_INDEX_SUFFIX", None)
if not index_suffix:
os.environ["MARQO_INDEX_SUFFIX"] = str(uuid.uuid4())[:8]
14 changes: 7 additions & 7 deletions tests/scripts/delete_all_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@
import marqo


def cleanup_documents_from_all_indices():
def delete_all_test_indices():
local_marqo_settings = {
"url": os.environ.get("MARQO_URL", 'http://localhost:8882'),
}
suffix = os.environ.get("MARQO_INDEX_SUFFIX", None)
api_key = os.environ.get("MARQO_API_KEY", None)
if api_key:
local_marqo_settings["api_key"] = api_key
client = marqo.Client(**local_marqo_settings)
indexes = client.get_indexes()
for index in indexes['results']:
if client.config.is_marqo_cloud:
if index.get_status()["index_status"] == marqo.enums.IndexStatus.CREATED:
index.delete()
else:
index.delete()
if index.index_name.startswith('test-index'):
if suffix is not None and suffix in index.index_name.split('-'):
if index.get_status()["index_status"] == marqo.enums.IndexStatus.CREATED:
index.delete()


if __name__ == '__main__':
cleanup_documents_from_all_indices()
delete_all_test_indices()
18 changes: 18 additions & 0 deletions tests/scripts/run_cloud_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import sys
from create_test_suffix import set_index_suffix
from delete_all_indexes import delete_all_test_indices

if __name__ == '__main__':
# Generate the random suffix
set_index_suffix()

# Run the first command to generate the suffix (already done)
# generate_index_suffix.py will set the TEST_INDEX_SUFFIX environment variable

# Run the second command with the generated suffix and pass posargs to pytest
import pytest
pytest_args = ['tests/', '-m', 'not ignore_cloud_tests'] + sys.argv[1:]
pytest.main(pytest_args)

# Run the third command that uses the suffix
delete_all_test_indices()
Loading

0 comments on commit 7b2491c

Please sign in to comment.