Skip to content

Commit

Permalink
Merge branch 'master' into feature/use-personal-branch
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanBaker authored Apr 18, 2024
2 parents 7ca33fa + 743f64f commit 23f5805
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
15 changes: 14 additions & 1 deletion spectacles/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from spectacles.logger import set_file_handler
from spectacles.runner import Runner
from spectacles.utils import log_duration
from spectacles.validators.data_test import DATA_TEST_CONCURRENCY

__version__ = importlib.metadata.version("spectacles")

Expand Down Expand Up @@ -343,6 +344,7 @@ def main() -> None:
remote_reset=args.remote_reset,
pin_imports=pin_imports,
use_personal_branch=args.use_personal_branch,
concurrency=args.concurrency,
)
)
elif args.command == "content":
Expand Down Expand Up @@ -719,6 +721,16 @@ def _build_assert_subparser(
_build_validator_subparser(subparser_action, subparser)
_build_select_subparser(subparser_action, subparser)

subparser.add_argument(
"--concurrency",
type=int,
default=DATA_TEST_CONCURRENCY,
help=(
"Specify the number of concurrent queries you want to have running "
f"against your data warehouse. The default is {DATA_TEST_CONCURRENCY}."
),
)


def _build_content_subparser(
subparser_action: argparse._SubParsersAction, # type: ignore[type-arg]
Expand Down Expand Up @@ -909,6 +921,7 @@ async def run_assert(
remote_reset: bool,
pin_imports: Dict[str, str],
use_personal_branch: bool,
concurrency: int,
) -> None:
# Don't trust env to ignore .netrc credentials
async_client = httpx.AsyncClient(trust_env=False)
Expand All @@ -918,7 +931,7 @@ async def run_assert(
)
runner = Runner(client, project, remote_reset, pin_imports, use_personal_branch)

results = await runner.validate_data_tests(ref, filters)
results = await runner.validate_data_tests(ref, filters, concurrency)
finally:
await async_client.aclose()

Expand Down
4 changes: 3 additions & 1 deletion spectacles/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
LookMLValidator,
SqlValidator,
)
from spectacles.validators.data_test import DATA_TEST_CONCURRENCY
from spectacles.validators.sql import (
DEFAULT_CHUNK_SIZE,
DEFAULT_QUERY_CONCURRENCY,
Expand Down Expand Up @@ -490,6 +491,7 @@ async def validate_data_tests(
self,
ref: Optional[str] = None,
filters: Optional[List[str]] = None,
concurrency: int = DATA_TEST_CONCURRENCY,
) -> JsonDict:
if filters is None:
filters = ["*/*"]
Expand All @@ -508,7 +510,7 @@ async def validate_data_tests(
f"{'explore' if explore_count == 1 else 'explores'}"
)
tests = await validator.get_tests(project)
await validator.validate(tests)
await validator.validate(tests, concurrency)

results = project.get_results(validator="data_test")
return results
Expand Down
10 changes: 7 additions & 3 deletions spectacles/validators/data_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from spectacles.exceptions import DataTestError, SpectaclesException
from spectacles.lookml import Explore, Project

QUERY_SLOT_LIMIT = 15 # This is the per-user query limit in Looker for most instances
DATA_TEST_CONCURRENCY = (
15 # This is the per-user query limit in Looker for most instances
)


@dataclass
Expand Down Expand Up @@ -94,9 +96,11 @@ async def get_tests(self, project: Project) -> List[DataTest]:

return selected_tests

async def validate(self, tests: List[DataTest]) -> List[DataTestError]:
async def validate(
self, tests: List[DataTest], concurrency: int = DATA_TEST_CONCURRENCY
) -> List[DataTestError]:
data_test_errors: List[DataTestError] = []
query_slot = asyncio.Semaphore(QUERY_SLOT_LIMIT)
query_slot = asyncio.Semaphore(concurrency)

async def run_test(test: DataTest, query_slot: asyncio.Semaphore) -> None:
async with query_slot:
Expand Down

0 comments on commit 23f5805

Please sign in to comment.