From 1f414a7f879f1ae54cfae8f9169b8f6e6b690c83 Mon Sep 17 00:00:00 2001 From: annatisch Date: Fri, 19 Nov 2021 13:13:27 -0800 Subject: [PATCH] [Perf] Add PerfTestBase and BatchPerfTest (#21812) * Added new base class * Updated perf test base class * Updated runner discovery + logging * Updated test runner * Fix test errors * Improved documentation and importing * Remove extra ssl config * Fix typehint * Fix init * Added executable batch sample --- doc/dev/perfstress_tests.md | 104 ++++++- tools/azure-devtools/setup.py | 3 +- .../perfstress_tests/__init__.py | 28 +- ...ndom_stream.py => _async_random_stream.py} | 2 +- .../perfstress_tests/_batch_perf_test.py | 196 ++++++++++++ .../perfstress_tests/_perf_stress_base.py | 212 +++++++++++++ .../perfstress_tests/_perf_stress_runner.py | 226 ++++++++++++++ .../perfstress_tests/_perf_stress_test.py | 23 ++ .../perfstress_tests/_policies.py | 1 - .../{random_stream.py => _random_stream.py} | 0 .../{repeated_timer.py => _repeated_timer.py} | 0 .../perfstress_tests/perf_stress_runner.py | 290 ------------------ .../perfstress_tests/perf_stress_test.py | 141 --------- .../system_perfstress/README.md | 4 + .../system_perfstress/aiohttp_get_test.py | 2 +- .../system_perfstress/httpx_get_test.py | 2 +- .../system_perfstress/requests_get_test.py | 2 +- .../system_perfstress/sample_batch_test.py | 45 +++ .../system_perfstress/sleep_test.py | 1 + .../system_perfstress/socket_http_get_test.py | 2 +- .../system_perfstress/tornado_get_test.py | 2 +- 21 files changed, 828 insertions(+), 458 deletions(-) rename tools/azure-devtools/src/azure_devtools/perfstress_tests/{async_random_stream.py => _async_random_stream.py} (96%) create mode 100644 tools/azure-devtools/src/azure_devtools/perfstress_tests/_batch_perf_test.py create mode 100644 tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py create mode 100644 tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py create mode 100644 tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_test.py rename tools/azure-devtools/src/azure_devtools/perfstress_tests/{random_stream.py => _random_stream.py} (100%) rename tools/azure-devtools/src/azure_devtools/perfstress_tests/{repeated_timer.py => _repeated_timer.py} (100%) delete mode 100644 tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py delete mode 100644 tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py create mode 100644 tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/README.md create mode 100644 tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sample_batch_test.py diff --git a/doc/dev/perfstress_tests.md b/doc/dev/perfstress_tests.md index adb986e9320d..e8fb7d40cc08 100644 --- a/doc/dev/perfstress_tests.md +++ b/doc/dev/perfstress_tests.md @@ -3,10 +3,13 @@ - [The PerfStressTest base](#the-perfstresstest-base) - [Default command options](#default-command-options) - [Running with test proxy](#running-with-the-test-proxy) + - [The BatchPerfTest base](#the-batchperftest-base) 2. [Adding performance tests to an SDK](#adding-performance-tests-to-an-sdk) - [Writing a test](#writing-a-test) + - [Writing a batch test](#writing-a-batch-test) - [Adding legacy T1 tests](#adding-legacy-t1-tests) 3. [Running the tests](#running-the-tests) + - [Running the system tests](#running-the-system-tests) 4. [Readme](#readme) # The perfstress framework @@ -20,6 +23,7 @@ the tests. To start using the framework, make sure that `azure-devtools` is incl The perfstress framework offers the following: - The `perfstress` commandline tool. - The `PerfStressTest` baseclass. +- The `BatchPerfTest` baseclass. - Stream utilities for uploading/downloading without storing in memory: `RandomStream`, `AsyncRandomStream`, `WriteStream`. - A `get_random_bytes` utility for returning randomly generated data. - A series of "system tests" to test the perfstress framework along with the performance of the raw transport layers (requests, aiohttp, etc). @@ -39,14 +43,14 @@ class PerfStressTest: async def global_cleanup(self): # Can be optionally defined. Only run once, regardless of parallelism. - async def record_and_start_playback(self): - # Set up the recording on the test proxy, and configure the proxy in playback mode. - # This function is only run if a test proxy URL is provided (-x). + async def post_setup(self): + # Post-setup called once per parallel test instance. + # Used by base classes to setup state (like test-proxy) after all derived class setup is complete. # There should be no need to overwrite this function. - async def stop_playback(self): - # Configure the proxy out of playback mode and discard the recording. - # This function is only run if a test proxy URL is provided (-x). + async def pre_cleanup(self): + # Pre-cleanup called once per parallel test instance. + # Used by base classes to cleanup state (like test-proxy) before all derived class cleanup runs. # There should be no need to overwrite this function. async def setup(self): @@ -82,9 +86,9 @@ The framework has a series of common command line options built in: - `-w --warm-up=5` Number of seconds to spend warming up the connection before measuring begins. Default is 5. - `--sync` Whether to run the tests in sync or async. Default is False (async). - `--no-cleanup` Whether to keep newly created resources after test run. Default is False (resources will be deleted). -- `-x --test-proxies` Whether to run the tests against the test proxy server. Specify the URL(s) for the proxy endpoint(s) (e.g. "https://localhost:5001"). -- `--profile` Whether to run the perftest with cProfile. If enabled (default is False), the output file of the **last completed single iteration** will be written to the current working directory in the format `"cProfile---.pstats"`. - +- `--insecure` Whether to run without SSL validation. Default is False. +- `-x --test-proxies` Whether to run the tests against the test proxy server. Specify the URL(s) for the proxy endpoint(s) (e.g. "https://localhost:5001"). Multiple values should be semi-colon-separated. +- `--profile` Whether to run the perftest with cProfile. If enabled (default is False), the output file of a single iteration will be written to the current working directory in the format `"cProfile---.pstats"`. ## Running with the test proxy Follow the instructions here to install and run the test proxy server: @@ -94,6 +98,29 @@ Once running, in a separate process run the perf test in question, combined with ```cmd (env) ~/azure-storage-blob/tests> perfstress DownloadTest -x "https://localhost:5001" ``` +## The BatchPerfTest base +The `BatchPerfTest` class is the parent class of the above `PerfStressTest` class that is further abstracted to allow for more flexible testing of SDKs that don't conform to a 1:1 ratio of operations to results. +An example of this is a messaging SDK that streams multiple messages for a period of time. +This base class uses the same setup/cleanup/close functions described above, however instead of `run_sync` and `run_async`, it has `run_batch_sync` and `run_batch_async`: +```python +class BatchPerfTest: + + def run_batch_sync(self) -> int: + """ + Run cumultive operation(s) - i.e. an operation that results in more than a single logical result. + :returns: The number of completed results. + :rtype: int + """ + + async def run_batch_async(self) -> int: + """ + Run cumultive operation(s) - i.e. an operation that results in more than a single logical result. + :returns: The number of completed results. + :rtype: int + """ + +``` +An example test case using the `BatchPerfTest` base can be found below. # Adding performance tests to an SDK The performance tests will be in a submodule called `perfstress_tests` within the `tests` directory in an SDK project. @@ -314,6 +341,44 @@ class DownloadTest(_StorageStreamTestBase): stream = await self.async_blob_client.download_blob(max_concurrency=self.args.max_concurrency) await stream.readinto(self.download_stream) ``` +## Writing a batch test +#### Example messaging receive test +```python +from azure_devtools.perfstress_tests import BatchPerfTest + +from azure.messaging.foo import MockReceiver +from azure.messaging.foo.aio import MockReceiver as AsyncMockReceiver + +class MessageReceiveTest(BatchPerfTest): + def __init__(self, arguments): + super().__init__(arguments) + + # Setup service clients + self.receiver_client = MockReceiver() + self.async_receiver_client = AsyncMockReceiver() + + def run_batch_sync(self) -> int: + messages = self.receiver_client.receive( + max_messages=self.args.max_message_count, + min_messages=self.args.min_message_count + ) + return len(messages) + + async def run_batch_async(self) -> int: + messages = await self.async_receiver_client.receive( + max_messages=self.args.max_message_count, + min_messages=self.args.min_message_count + ) + return len(messages) + + @staticmethod + def add_arguments(parser): + super(MessageReceiveTest, MessageReceiveTest).add_arguments(parser) + parser.add_argument('--max-message-count', nargs='?', type=int, default=10) + parser.add_argument('--min-message-count', nargs='?', type=int, default=0) + +``` + ## Adding legacy T1 tests To compare performance against T1 libraries, you can add tests for a legacy SDK. To do this, add a submodule into the `perfstress_tests` module called `T1_legacy_tests` (and add an empty `__init__.py`). To configure the exact T1 SDK you wish to compare perf against, add a `t1_test_requirements.txt` file to install any package requirements. Note that this will likely be incompatible with the T2 SDK testing environment, and running the legacy tests will probably need to be from a separate virtual environment (see the [Running the tests](#running-the-tests) section below). @@ -360,15 +425,30 @@ AZURE_STORAGE_CONNECTION_STRING= When `azure-devtools` is installed, you will have access to the `perfstress` command line tool, which will scan the current module for runable perf tests. Only a specific test can be run at a time (i.e. there is no "run all" feature). ```cmd -(env) ~/azure-storage-file-share> cd tests -(env) ~/azure-storage-file-share/tests> perfstress +(env) ~/azure-storage-file-share> perfstress ``` Using the `perfstress` command alone will list the available perf tests found. Note that the available tests discovered will vary depending on whether your environment is configured for the T1 or T2 SDK. +If your tests are not being discovered, run the `perfstressdebug` command instead for additional logging. ### Example test run command ```cmd -(env) ~/azure-storage-file-share/tests> perfstress UploadTest --parallel=2 --size=10240 +(env) ~/azure-storage-file-share> perfstress UploadTest --parallel=2 --size=10240 ``` +## Running the system tests +The system tests are used to test the performance of the Python HTTP layers exclusive of the Azure SDK in order to set a performance benchmark. +In order to run these, you will need a Python environment with `systemperf` flavour of `azure-devtools` installed. Installing to a fresh Python environment is recommended. +```cmd +(env) ~/> pip install -e azure-sdk-for-python/tools/azure-devtools[systemperf] +``` +Once these dependencies are installed, the `systemperf` command can be run directly to list the available tests: +```cmd +(env)~/> systemperf +``` +A specific test can be run in the same manner as an SDK perf test: +```cmd +(env)~/> systemperf AioHttpGetTest --url="http://test-endpoint.com" +``` + # Readme diff --git a/tools/azure-devtools/setup.py b/tools/azure-devtools/setup.py index 7cbd7512d7dd..a59fb559653e 100644 --- a/tools/azure-devtools/setup.py +++ b/tools/azure-devtools/setup.py @@ -48,6 +48,7 @@ entry_points={ "console_scripts": [ "perfstress = azure_devtools.perfstress_tests:run_perfstress_cmd", + "perfstressdebug = azure_devtools.perfstress_tests:run_perfstress_debug_cmd", "systemperf = azure_devtools.perfstress_tests:run_system_perfstress_tests_cmd", ], }, @@ -57,7 +58,7 @@ "GitPython", "requests>=2.0", ], - "systemperf": ["aiohttp>=3.0", "requests>=2.0", "tornado==6.0.3" "pycurl==7.43.0.5" "httpx==0.11.1"], + "systemperf": ["aiohttp>=3.0", "requests>=2.0", "tornado==6.0.3", "pycurl==7.43.0.5", "httpx>=0.21", "azure-core"], }, package_dir={"": "src"}, install_requires=DEPENDENCIES, diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/__init__.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/__init__.py index ee28bb7bdcf4..6f8290729696 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/__init__.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/__init__.py @@ -6,16 +6,30 @@ import os import asyncio -from .perf_stress_runner import PerfStressRunner -from .perf_stress_test import PerfStressTest -from .random_stream import RandomStream, WriteStream, get_random_bytes -from .async_random_stream import AsyncRandomStream +from ._perf_stress_runner import _PerfStressRunner +from ._perf_stress_test import PerfStressTest +from ._random_stream import RandomStream, WriteStream, get_random_bytes +from ._async_random_stream import AsyncRandomStream +from ._batch_perf_test import BatchPerfTest -__all__ = ["PerfStressRunner", "PerfStressTest", "RandomStream", "WriteStream", "AsyncRandomStream", "get_random_bytes"] +__all__ = [ + "PerfStressTest", + "BatchPerfTest", + "RandomStream", + "WriteStream", + "AsyncRandomStream", + "get_random_bytes" +] def run_perfstress_cmd(): - main_loop = PerfStressRunner() + main_loop = _PerfStressRunner() + loop = asyncio.get_event_loop() + loop.run_until_complete(main_loop.start()) + + +def run_perfstress_debug_cmd(): + main_loop = _PerfStressRunner(debug=True) loop = asyncio.get_event_loop() loop.run_until_complete(main_loop.start()) @@ -23,6 +37,6 @@ def run_perfstress_cmd(): def run_system_perfstress_tests_cmd(): root_dir = os.path.dirname(os.path.abspath(__file__)) sys_test_dir = os.path.join(root_dir, "system_perfstress") - main_loop = PerfStressRunner(test_folder_path=sys_test_dir) + main_loop = _PerfStressRunner(test_folder_path=sys_test_dir, debug=True) loop = asyncio.get_event_loop() loop.run_until_complete(main_loop.start()) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/async_random_stream.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_async_random_stream.py similarity index 96% rename from tools/azure-devtools/src/azure_devtools/perfstress_tests/async_random_stream.py rename to tools/azure-devtools/src/azure_devtools/perfstress_tests/_async_random_stream.py index 4e8e28fee403..21af66414ddd 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/async_random_stream.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_async_random_stream.py @@ -5,7 +5,7 @@ from io import BytesIO -from .random_stream import get_random_bytes, _DEFAULT_LENGTH +from ._random_stream import get_random_bytes, _DEFAULT_LENGTH class AsyncRandomStream(BytesIO): diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_batch_perf_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_batch_perf_test.py new file mode 100644 index 000000000000..faeef30a9427 --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_batch_perf_test.py @@ -0,0 +1,196 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import cProfile +import os +import threading +import aiohttp +import time +from typing import Optional, Any, Dict, List + +from urllib.parse import urljoin + +from ._perf_stress_base import _PerfTestBase +from ._policies import PerfTestProxyPolicy + + +class BatchPerfTest(_PerfTestBase): + + def __init__(self, arguments): + super().__init__(arguments) + + self._session: Optional[aiohttp.ClientSession] = None + self._test_proxy: Optional[List[str]] = None + self._test_proxy_policy: Optional[PerfTestProxyPolicy] = None + self._client_kwargs: Dict[str, Any] = {} + self._recording_id: Optional[str] = None + + if self.args.insecure: + # Disable SSL verification for SDK Client + self._client_kwargs['connection_verify'] = False + + # Disable SSL verification for test proxy session + self._session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) + + # Suppress warnings + import warnings + from urllib3.exceptions import InsecureRequestWarning + warnings.simplefilter('ignore', InsecureRequestWarning) + else: + self._session = aiohttp.ClientSession() + + if self.args.test_proxies: + # Add policy to redirect requests to the test proxy + self._test_proxy = self.args.test_proxies[self._parallel_index % len(self.args.test_proxies)] + self._test_proxy_policy = PerfTestProxyPolicy(self._test_proxy) + self._client_kwargs['per_retry_policies'] = [self._test_proxy_policy] + + async def post_setup(self) -> None: + """ + Post-setup called once per parallel test instance. + Used by base classes to setup state (like test-proxy) after all derived class setup is complete. + """ + if self._test_proxy_policy: + # Make one call to run() before starting recording, to avoid capturing + # one-time setup like authorization requests. + if self.args.sync: + self.run_batch_sync() + else: + await self.run_batch_async() + await self._start_recording() + self._test_proxy_policy.recording_id = self._recording_id + self._test_proxy_policy.mode = "record" + + # Record one call to run() + if self.args.sync: + self.run_batch_sync() + else: + await self.run_batch_async() + + await self._stop_recording() + await self._start_playback() + self._test_proxy_policy.recording_id = self._recording_id + self._test_proxy_policy.mode = "playback" + + async def pre_cleanup(self) -> None: + """ + Pre-cleanup called once per parallel test instance. + Used by base classes to cleanup state (like test-proxy) before all derived class cleanup runs. + """ + # Only stop playback if it was successfully started + if self._test_proxy_policy and self._test_proxy_policy.mode == 'playback': + headers = { + "x-recording-id": self._recording_id, + "x-purge-inmemory-recording": "true" + } + url = urljoin(self._test_proxy, "/playback/stop") + async with self._session.post(url, headers=headers) as resp: + assert resp.status == 200 + + # Stop redirecting requests to test proxy + self._test_proxy_policy.recording_id = None + self._test_proxy_policy.mode = None + + async def close(self) -> None: + """ + Close any open client resources/connections per parallel test instance. + """ + await self._session.close() + + async def _start_recording(self) -> None: + url = urljoin(self._test_proxy, "/record/start") + async with self._session.post(url) as resp: + assert resp.status == 200 + self._recording_id = resp.headers["x-recording-id"] + + async def _stop_recording(self) -> None: + headers = {"x-recording-id": self._recording_id} + url = urljoin(self._test_proxy, "/record/stop") + async with self._session.post(url, headers=headers) as resp: + assert resp.status == 200 + + async def _start_playback(self) -> None: + headers = {"x-recording-id": self._recording_id} + url = urljoin(self._test_proxy, "/playback/start") + async with self._session.post(url, headers=headers) as resp: + assert resp.status == 200 + self._recording_id = resp.headers["x-recording-id"] + + def run_batch_sync(self) -> int: + """ + Run cumultive operation(s) - i.e. an operation that results in more than a single logical result. + :returns: The number of completed results. + :rtype: int + """ + raise NotImplementedError("run_batch_sync must be implemented for {}".format(self.__class__.__name__)) + + async def run_batch_async(self) -> int: + """ + Run cumultive operation(s) - i.e. an operation that results in more than a single logical result. + :returns: The number of completed results. + :rtype: int + """ + raise NotImplementedError("run_batch_async must be implemented for {}".format(self.__class__.__name__)) + + def run_all_sync(self, duration: int) -> None: + """ + Run all sync tests, including both warmup and duration. + """ + self._completed_operations = 0 + self._last_completion_time = 0.0 + starttime = time.time() + if self.args.profile: + # If the profiler is used, ignore the duration and run once. + import cProfile + profile = cProfile.Profile() + try: + profile.enable() + self._completed_operations += self.run_batch_sync() + finally: + profile.disable() + self._last_completion_time = time.time() - starttime + self._save_profile(profile, "sync") + else: + while self._last_completion_time < duration: + self._completed_operations += self.run_batch_sync() + self._last_completion_time = time.time() - starttime + + async def run_all_async(self, duration: int) -> None: + """ + Run all async tests, including both warmup and duration. + """ + self._completed_operations = 0 + self._last_completion_time = 0.0 + starttime = time.time() + if self.args.profile: + # If the profiler is used, ignore the duration and run once. + import cProfile + profile = cProfile.Profile() + try: + profile.enable() + self._completed_operations += await self.run_batch_async() + finally: + profile.disable() + self._last_completion_time = time.time() - starttime + self._save_profile(profile, "async") + else: + while self._last_completion_time < duration: + self._completed_operations += await self.run_batch_async() + self._last_completion_time = time.time() - starttime + + def _save_profile(self, profile: cProfile.Profile, sync: str) -> None: + """ + Dump the profiler data to file in the current working directory. + """ + if profile: + profile_name = "{}/cProfile-{}-{}-{}.pstats".format( + os.getcwd(), + self.__class__.__name__, + self._parallel_index, + sync) + print("Dumping profile data to {}".format(profile_name)) + profile.dump_stats(profile_name) + else: + print("No profile generated.") diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py new file mode 100644 index 000000000000..c484f83793ee --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_base.py @@ -0,0 +1,212 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import os +import abc +import threading +import argparse + + +class _PerfTestABC(abc.ABC): + + @property + @abc.abstractmethod + def completed_operations(self) -> int: + """ + Total number of operations completed by run_all(). + Reset after warmup. + """ + + @property + @abc.abstractmethod + def last_completion_time(self) -> float: + """ + Elapsed time between start of warmup/run and last completed operation. + Reset after warmup. + """ + + @abc.abstractmethod + async def global_setup(self) -> None: + """ + Setup called once across all parallel test instances. + Used to setup state that can be used by all test instances. + """ + + @abc.abstractmethod + async def global_cleanup(self) -> None: + """ + Cleanup called once across all parallel test instances. + Used to cleanup state that can be used by all test instances. + """ + + @abc.abstractmethod + async def setup(self) -> None: + """ + Setup called once per parallel test instance. + Used to setup state specific to this test instance. + """ + + @abc.abstractmethod + async def cleanup(self): + """ + Cleanup called once per parallel test instance. + Used to cleanup state specific to this test instance. + """ + + @abc.abstractmethod + async def post_setup(self) -> None: + """ + Post-setup called once per parallel test instance. + Used by base classes to setup state (like test-proxy) after all derived class setup is complete. + """ + + @abc.abstractmethod + async def pre_cleanup(self) -> None: + """ + Pre-cleanup called once per parallel test instance. + Used by base classes to cleanup state (like test-proxy) before all derived class cleanup runs. + """ + + @abc.abstractmethod + async def close(self) -> None: + """ + Close any open client resources/connections per parallel test instance. + """ + + @abc.abstractmethod + def run_all_sync(self, duration: int) -> None: + """ + Run all sync tests, including both warmup and duration. + """ + + @abc.abstractmethod + async def run_all_async(self, duration: int) -> None: + """ + Run all async tests, including both warmup and duration. + """ + + @staticmethod + @abc.abstractmethod + def add_arguments(parser: argparse.ArgumentParser) -> None: + """ + Add test class specific command line arguments. + """ + + @staticmethod + @abc.abstractmethod + def get_from_env(variable: str) -> str: + """ + Get the value of an env var. If empty or not found, a ValueError will be raised. + """ + + +class _PerfTestBase(_PerfTestABC): + """Base class for implementing a python perf test.""" + + args = {} + _global_parallel_index_lock = threading.Lock() + _global_parallel_index = 0 + + def __init__(self, arguments): + self.args = arguments + self._completed_operations = 0 + self._last_completion_time = 0.0 + + with _PerfTestBase._global_parallel_index_lock: + self._parallel_index = _PerfTestBase._global_parallel_index + _PerfTestBase._global_parallel_index += 1 + + @property + def completed_operations(self) -> int: + """ + Total number of operations completed by run_all(). + Reset after warmup. + """ + return self._completed_operations + + @property + def last_completion_time(self) -> float: + """ + Elapsed time between start of warmup/run and last completed operation. + Reset after warmup. + """ + return self._last_completion_time + + async def global_setup(self) -> None: + """ + Setup called once across all parallel test instances. + Used to setup state that can be used by all test instances. + """ + return + + async def global_cleanup(self) -> None: + """ + Cleanup called once across all parallel test instances. + Used to cleanup state that can be used by all test instances. + """ + return + + async def setup(self) -> None: + """ + Setup called once per parallel test instance. + Used to setup state specific to this test instance. + """ + return + + async def cleanup(self): + """ + Cleanup called once per parallel test instance. + Used to cleanup state specific to this test instance. + """ + return + + async def post_setup(self) -> None: + """ + Post-setup called once per parallel test instance. + Used by base classes to setup state (like test-proxy) after all derived class setup is complete. + """ + return + + async def pre_cleanup(self) -> None: + """ + Pre-cleanup called once per parallel test instance. + Used by base classes to cleanup state (like test-proxy) before all derived class cleanup runs. + """ + return + + async def close(self): + """ + Close any open client resources/connections per parallel test instance. + """ + return + + def run_all_sync(self, duration: int) -> None: + """ + Run all sync tests, including both warmup and duration. + """ + raise NotImplementedError("run_all_sync must be implemented for {}".format(self.__class__.__name__)) + + async def run_all_async(self, duration: int) -> None: + """ + Run all async tests, including both warmup and duration. + """ + raise NotImplementedError("run_all_async must be implemented for {}".format(self.__class__.__name__)) + + @staticmethod + def add_arguments(parser: argparse.ArgumentParser) -> None: + """ + Add test class specific command line arguments. + """ + return + + @staticmethod + def get_from_env(variable: str) -> str: + """ + Get the value of an env var. If empty or not found, a ValueError will be raised. + """ + value = os.environ.get(variable) + if not value: + raise ValueError("Undefined environment variable {}".format(variable)) + return value diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py new file mode 100644 index 000000000000..8acae2b8b1af --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_runner.py @@ -0,0 +1,226 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +import argparse +import asyncio +import time +import inspect +import logging +import os +import pkgutil +import sys +from typing import List, Optional +from concurrent.futures import ThreadPoolExecutor, as_completed + +from ._perf_stress_base import _PerfTestABC +from ._batch_perf_test import BatchPerfTest +from ._perf_stress_test import PerfStressTest +from ._repeated_timer import RepeatedTimer + + +class _PerfStressRunner: + def __init__(self, test_folder_path: Optional[str] = None, debug: bool = False): + self._tests: List[_PerfTestABC] = [] + self._operation_status_tracker: int = -1 + + self.logger = logging.getLogger(__name__) + handler = logging.StreamHandler() + if debug: + self.logger.setLevel(level=logging.DEBUG) + handler.setLevel(level=logging.DEBUG) + else: + self.logger.setLevel(level=logging.INFO) + handler.setLevel(level=logging.INFO) + self.logger.addHandler(handler) + + # NOTE: If you need to support registering multiple test locations, move this into Initialize, call lazily on Run, expose RegisterTestLocation function. + self._discover_tests(test_folder_path or os.getcwd()) + self._parse_args() + + def _get_completed_operations(self) -> int: + return sum([t.completed_operations for t in self._tests]) + + def _get_operations_per_second(self) -> float: + test_results = [(t.completed_operations, t.last_completion_time) for t in self._tests] + return sum(map(lambda x: x[0] / x[1] if x[1] else 0, test_results)) + + def _parse_args(self): + # First, detect which test we're running. + arg_parser = argparse.ArgumentParser( + description="Python Perf Test Runner", usage="{} []".format(__file__) + ) + + # NOTE: remove this and add another help string to query for available tests + # if/when # of classes become enough that this isn't practical. + arg_parser.add_argument( + "test", help="Which test to run. Supported tests: {}".format(" ".join(sorted(self._test_classes.keys()))) + ) + + args = arg_parser.parse_args(sys.argv[1:2]) + try: + self._test_class_to_run = self._test_classes[args.test] + except KeyError as e: + self.logger.error( + "Invalid test: {}\n Test must be one of: {}\n".format( + args.test, " ".join(sorted(self._test_classes.keys())) + ) + ) + raise + + # Next, parse args for that test. We also do global args here too so as not to confuse the initial test parse. + per_test_arg_parser = argparse.ArgumentParser( + description=self._test_class_to_run.__doc__ or args.test, usage="{} {} []".format(__file__, args.test) + ) + + # Global args + per_test_arg_parser.add_argument( + "-p", "--parallel", nargs="?", type=int, help="Degree of parallelism to run with. Default is 1.", default=1 + ) + per_test_arg_parser.add_argument( + "-d", "--duration", nargs="?", type=int, help="Duration of the test in seconds. Default is 10.", default=10 + ) + per_test_arg_parser.add_argument( + "-i", + "--iterations", + nargs="?", + type=int, + help="Number of iterations in the main test loop. Default is 1.", + default=1, + ) + per_test_arg_parser.add_argument( + "-w", "--warmup", nargs="?", type=int, help="Duration of warmup in seconds. Default is 5.", default=5 + ) + per_test_arg_parser.add_argument( + "--no-cleanup", action="store_true", help="Do not run cleanup logic. Default is false.", default=False + ) + per_test_arg_parser.add_argument( + "--sync", action="store_true", help="Run tests in sync mode. Default is False.", default=False + ) + per_test_arg_parser.add_argument( + "--profile", action="store_true", help="Run tests with profiler. Default is False.", default=False + ) + per_test_arg_parser.add_argument( + "-x", "--test-proxies", help="URIs of TestProxy Servers (separated by ';')", + type=lambda s: s.split(';') + ) + per_test_arg_parser.add_argument( + "--insecure", action="store_true", help="Disable SSL validation. Default is False.", default=False + ) + + # Per-test args + self._test_class_to_run.add_arguments(per_test_arg_parser) + self.per_test_args = per_test_arg_parser.parse_args(sys.argv[2:]) + + self.logger.info("") + self.logger.info("=== Options ===") + self.logger.info(args) + self.logger.info(self.per_test_args) + self.logger.info("") + + def _discover_tests(self, test_folder_path): + self._test_classes = {} + if os.path.isdir(os.path.join(test_folder_path, 'tests')): + test_folder_path = os.path.join(test_folder_path, 'tests') + self.logger.debug("Searching for tests in {}".format(test_folder_path)) + + # Dynamically enumerate all python modules under the tests path for classes that implement PerfStressTest + for loader, name, _ in pkgutil.walk_packages([test_folder_path]): + try: + module = loader.find_module(name).load_module(name) + except Exception as e: + self.logger.debug("Unable to load module {}: {}".format(name, e)) + continue + for name, value in inspect.getmembers(module): + if name.startswith("_"): + continue + if inspect.isclass(value): + if issubclass(value, _PerfTestABC) and value not in [PerfStressTest, BatchPerfTest]: + self.logger.info("Loaded test class: {}".format(name)) + self._test_classes[name] = value + + async def start(self): + self.logger.info("=== Setup ===") + self._tests = [self._test_class_to_run(self.per_test_args) for _ in range(self.per_test_args.parallel)] + + try: + try: + await self._tests[0].global_setup() + try: + await asyncio.gather(*[test.setup() for test in self._tests]) + self.logger.info("") + self.logger.info("=== Post Setup ===") + await asyncio.gather(*[test.post_setup() for test in self._tests]) + self.logger.info("") + + if self.per_test_args.warmup and not self.per_test_args.profile: + await self._run_tests("Warmup", self.per_test_args.warmup) + + for i in range(self.per_test_args.iterations): + title = "Test" if self.per_test_args.iterations == 1 else "Test {}".format(i + 1) + await self._run_tests(title, self.per_test_args.duration) + except Exception as e: + self.logger.warn("Exception: " + str(e)) + finally: + self.logger.info("=== Pre Cleanup ===") + await asyncio.gather(*[test.pre_cleanup() for test in self._tests]) + self.logger.info("") + + if not self.per_test_args.no_cleanup: + self.logger.info("=== Cleanup ===") + await asyncio.gather(*[test.cleanup() for test in self._tests]) + except Exception as e: + self.logger.warn("Exception: " + str(e)) + finally: + if not self.per_test_args.no_cleanup: + await self._tests[0].global_cleanup() + except Exception as e: + self.logger.warn("Exception: " + str(e)) + finally: + await asyncio.gather(*[test.close() for test in self._tests]) + + async def _run_tests(self, title: str, duration: int) -> None: + self._operation_status_tracker = -1 + status_thread = RepeatedTimer(1, self._print_status, title) + try: + if self.per_test_args.sync: + with ThreadPoolExecutor(max_workers=self.per_test_args.parallel) as ex: + futures = [ex.submit(test.run_all_sync, duration) for test in self._tests] + for future in as_completed(futures): + future.result() + + else: + tasks = [test.run_all_async(duration) for test in self._tests] + await asyncio.gather(*tasks) + finally: + status_thread.stop() + + self.logger.info("") + self.logger.info("=== Results ===") + + total_operations = self._get_completed_operations() + operations_per_second = self._get_operations_per_second() + if operations_per_second: + seconds_per_operation = 1 / operations_per_second + weighted_average_seconds = total_operations / operations_per_second + self.logger.info( + "Completed {:,} operations in a weighted-average of {:,.2f}s ({:,.2f} ops/s, {:,.3f} s/op)".format( + total_operations, weighted_average_seconds, operations_per_second, seconds_per_operation + ) + ) + else: + self.logger.info("Completed without generating operation statistics.") + self.logger.info("") + + def _print_status(self, title): + if self._operation_status_tracker == -1: + self._operation_status_tracker = 0 + self.logger.info("=== {} ===\nCurrent\t\tTotal\t\tAverage".format(title)) + + total_operations = self._get_completed_operations() + current_operations = total_operations - self._operation_status_tracker + average_operations = self._get_operations_per_second() + + self._operation_status_tracker = total_operations + self.logger.info("{}\t\t{}\t\t{:.2f}".format(current_operations, total_operations, average_operations)) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_test.py new file mode 100644 index 000000000000..01f6f95febc8 --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_perf_stress_test.py @@ -0,0 +1,23 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from ._batch_perf_test import BatchPerfTest + + +class PerfStressTest(BatchPerfTest): + + def run_batch_sync(self) -> int: + self.run_sync() + return 1 + + async def run_batch_async(self) -> int: + await self.run_async() + return 1 + + def run_sync(self) -> None: + raise NotImplementedError("run_sync must be implemented for {}".format(self.__class__.__name__)) + + async def run_async(self) -> None: + raise NotImplementedError("run_async must be implemented for {}".format(self.__class__.__name__)) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py index 5be416c1175a..7b1f4f25f977 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_policies.py @@ -17,7 +17,6 @@ def __init__(self, url): def redirect_to_test_proxy(self, request): if self.recording_id and self.mode: - request.context.options['connection_verify'] = False live_endpoint = urlparse(request.http_request.url) redirected = live_endpoint._replace( scheme=self._proxy_url.scheme, diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/random_stream.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_random_stream.py similarity index 100% rename from tools/azure-devtools/src/azure_devtools/perfstress_tests/random_stream.py rename to tools/azure-devtools/src/azure_devtools/perfstress_tests/_random_stream.py diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/repeated_timer.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/_repeated_timer.py similarity index 100% rename from tools/azure-devtools/src/azure_devtools/perfstress_tests/repeated_timer.py rename to tools/azure-devtools/src/azure_devtools/perfstress_tests/_repeated_timer.py diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py deleted file mode 100644 index 3d6de2dc5e68..000000000000 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_runner.py +++ /dev/null @@ -1,290 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -import argparse -import asyncio -import time -import inspect -import logging -import os -import pkgutil -import sys -import threading - -from .perf_stress_test import PerfStressTest -from .repeated_timer import RepeatedTimer - - -class PerfStressRunner: - def __init__(self, test_folder_path=None): - if test_folder_path is None: - # Use current working directory - test_folder_path = os.getcwd() - - self.logger = logging.getLogger(__name__) - self.logger.setLevel(level=logging.INFO) - handler = logging.StreamHandler() - handler.setLevel(level=logging.INFO) - self.logger.addHandler(handler) - - # NOTE: If you need to support registering multiple test locations, move this into Initialize, call lazily on Run, expose RegisterTestLocation function. - self._discover_tests(test_folder_path) - self._parse_args() - - def _get_completed_operations(self): - return sum(self._completed_operations) - - def _get_operations_per_second(self): - return sum( - map(lambda x: x[0] / x[1] if x[1] else 0, zip(self._completed_operations, self._last_completion_times)) - ) - - def _parse_args(self): - # First, detect which test we're running. - arg_parser = argparse.ArgumentParser( - description="Python Perf Test Runner", usage="{} []".format(__file__) - ) - - # NOTE: remove this and add another help string to query for available tests - # if/when # of classes become enough that this isn't practical. - arg_parser.add_argument( - "test", help="Which test to run. Supported tests: {}".format(" ".join(sorted(self._test_classes.keys()))) - ) - - args = arg_parser.parse_args(sys.argv[1:2]) - try: - self._test_class_to_run = self._test_classes[args.test] - except KeyError as e: - self.logger.error( - "Invalid test: {}\n Test must be one of: {}\n".format( - args.test, " ".join(sorted(self._test_classes.keys())) - ) - ) - raise - - # Next, parse args for that test. We also do global args here too so as not to confuse the initial test parse. - per_test_arg_parser = argparse.ArgumentParser( - description=self._test_class_to_run.__doc__ or args.test, usage="{} {} []".format(__file__, args.test) - ) - - # Global args - per_test_arg_parser.add_argument( - "-p", "--parallel", nargs="?", type=int, help="Degree of parallelism to run with. Default is 1.", default=1 - ) - per_test_arg_parser.add_argument( - "-d", "--duration", nargs="?", type=int, help="Duration of the test in seconds. Default is 10.", default=10 - ) - per_test_arg_parser.add_argument( - "-i", - "--iterations", - nargs="?", - type=int, - help="Number of iterations in the main test loop. Default is 1.", - default=1, - ) - per_test_arg_parser.add_argument( - "-w", "--warmup", nargs="?", type=int, help="Duration of warmup in seconds. Default is 5.", default=5 - ) - per_test_arg_parser.add_argument( - "--no-cleanup", action="store_true", help="Do not run cleanup logic. Default is false.", default=False - ) - per_test_arg_parser.add_argument( - "--sync", action="store_true", help="Run tests in sync mode. Default is False.", default=False - ) - per_test_arg_parser.add_argument( - "--profile", action="store_true", help="Run tests with profiler. Default is False.", default=False - ) - per_test_arg_parser.add_argument( - "-x", "--test-proxies", help="URIs of TestProxy Servers (separated by ';')", - type=lambda s: s.split(';') - ) - - # Per-test args - self._test_class_to_run.add_arguments(per_test_arg_parser) - self.per_test_args = per_test_arg_parser.parse_args(sys.argv[2:]) - - self.logger.info("") - self.logger.info("=== Options ===") - self.logger.info(args) - self.logger.info(self.per_test_args) - self.logger.info("") - - def _discover_tests(self, test_folder_path): - self._test_classes = {} - - # Dynamically enumerate all python modules under the tests path for classes that implement PerfStressTest - for loader, name, _ in pkgutil.walk_packages([test_folder_path]): - try: - module = loader.find_module(name).load_module(name) - except Exception as e: - self.logger.warn("Unable to load module {}: {}".format(name, e)) - continue - for name, value in inspect.getmembers(module): - - if name.startswith("_"): - continue - if inspect.isclass(value) and issubclass(value, PerfStressTest) and value != PerfStressTest: - self.logger.info("Loaded test class: {}".format(name)) - self._test_classes[name] = value - - async def start(self): - self.logger.info("=== Setup ===") - - tests = [] - for _ in range(0, self.per_test_args.parallel): - tests.append(self._test_class_to_run(self.per_test_args)) - - try: - try: - await tests[0].global_setup() - try: - await asyncio.gather(*[test.setup() for test in tests]) - self.logger.info("") - - if self.per_test_args.test_proxies: - self.logger.info("=== Record and Start Playback ===") - await asyncio.gather(*[test.record_and_start_playback() for test in tests]) - self.logger.info("") - - if self.per_test_args.warmup > 0: - await self._run_tests(tests, self.per_test_args.warmup, "Warmup") - - for i in range(0, self.per_test_args.iterations): - title = "Test" - if self.per_test_args.iterations > 1: - title += " " + (i + 1) - await self._run_tests( - tests, - self.per_test_args.duration, - title, - with_profiler=self.per_test_args.profile) - except Exception as e: - print("Exception: " + str(e)) - finally: - if self.per_test_args.test_proxies: - self.logger.info("=== Stop Playback ===") - await asyncio.gather(*[test.stop_playback() for test in tests]) - self.logger.info("") - - if not self.per_test_args.no_cleanup: - self.logger.info("=== Cleanup ===") - await asyncio.gather(*[test.cleanup() for test in tests]) - except Exception as e: - print("Exception: " + str(e)) - finally: - if not self.per_test_args.no_cleanup: - await tests[0].global_cleanup() - except Exception as e: - print("Exception: " + str(e)) - finally: - await asyncio.gather(*[test.close() for test in tests]) - - async def _run_tests(self, tests, duration, title, with_profiler=False): - self._completed_operations = [0] * len(tests) - self._last_completion_times = [0] * len(tests) - self._last_total_operations = -1 - - status_thread = RepeatedTimer(1, self._print_status, title) - - if self.per_test_args.sync: - threads = [] - for id, test in enumerate(tests): - thread = threading.Thread( - target=lambda: self._run_sync_loop(test, duration, id, with_profiler) - ) - threads.append(thread) - thread.start() - for thread in threads: - thread.join() - else: - tasks = [self._run_async_loop(test, duration, id, with_profiler) for id, test in enumerate(tests)] - await asyncio.gather(*tasks) - - status_thread.stop() - - self.logger.info("") - self.logger.info("=== Results ===") - - total_operations = self._get_completed_operations() - operations_per_second = self._get_operations_per_second() - seconds_per_operation = 1 / operations_per_second - weighted_average_seconds = total_operations / operations_per_second - - self.logger.info( - "Completed {:,} operations in a weighted-average of {:,.2f}s ({:,.2f} ops/s, {:,.3f} s/op)".format( - total_operations, weighted_average_seconds, operations_per_second, seconds_per_operation - ) - ) - self.logger.info("") - - def _run_sync_loop(self, test, duration, id, with_profiler): - start = time.time() - runtime = 0 - if with_profiler: - import cProfile - profile = None - while runtime < duration: - profile = cProfile.Profile() - profile.enable() - test.run_sync() - profile.disable() - runtime = time.time() - start - self._completed_operations[id] += 1 - self._last_completion_times[id] = runtime - - if profile: - # Store only profile for final iteration - profile_name = "{}/cProfile-{}-{}-sync.pstats".format(os.getcwd(), test.__class__.__name__, id) - print("Dumping profile data to {}".format(profile_name)) - profile.dump_stats(profile_name) - else: - print("No profile generated.") - else: - while runtime < duration: - test.run_sync() - runtime = time.time() - start - self._completed_operations[id] += 1 - self._last_completion_times[id] = runtime - - async def _run_async_loop(self, test, duration, id, with_profiler): - start = time.time() - runtime = 0 - if with_profiler: - import cProfile - profile = None - while runtime < duration: - profile = cProfile.Profile() - profile.enable() - await test.run_async() - profile.disable() - runtime = time.time() - start - self._completed_operations[id] += 1 - self._last_completion_times[id] = runtime - - if profile: - # Store only profile for final iteration - profile_name = "{}/cProfile-{}-{}-async.pstats".format(os.getcwd(), test.__class__.__name__, id) - print("Dumping profile data to {}".format(profile_name)) - profile.dump_stats(profile_name) - else: - print("No profile generated.") - else: - while runtime < duration: - await test.run_async() - runtime = time.time() - start - self._completed_operations[id] += 1 - self._last_completion_times[id] = runtime - - def _print_status(self, title): - if self._last_total_operations == -1: - self._last_total_operations = 0 - self.logger.info("=== {} ===\nCurrent\t\tTotal\t\tAverage".format(title)) - - total_operations = self._get_completed_operations() - current_operations = total_operations - self._last_total_operations - average_operations = self._get_operations_per_second() - - self._last_total_operations = total_operations - self.logger.info("{}\t\t{}\t\t{:.2f}".format(current_operations, total_operations, average_operations)) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py deleted file mode 100644 index a31da53ffa11..000000000000 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/perf_stress_test.py +++ /dev/null @@ -1,141 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# -------------------------------------------------------------------------------------------- - -import os -import threading -import aiohttp - -from urllib.parse import urljoin -from ._policies import PerfTestProxyPolicy - - -class PerfStressTest: - """Base class for implementing a python perf test. - - - run_sync and run_async must be implemented. - - global_setup and global_cleanup are optional and run once, ever, regardless of parallelism. - - setup and cleanup are run once per test instance (where each instance runs in its own thread/process), regardless of #iterations. - - close is run once per test instance, after cleanup and global_cleanup. - - run_sync/run_async are run once per iteration. - """ - - args = {} - _global_parallel_index_lock = threading.Lock() - _global_parallel_index = 0 - - def __init__(self, arguments): - self.args = arguments - self._session = None - self._test_proxy = None - self._test_proxy_policy = None - self._client_kwargs = {} - self._recording_id = None - - with PerfStressTest._global_parallel_index_lock: - self._parallel_index = PerfStressTest._global_parallel_index - PerfStressTest._global_parallel_index += 1 - - if self.args.test_proxies: - self._session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) - - # SSL will be disabled for the test proxy requests, so suppress warnings - import warnings - from urllib3.exceptions import InsecureRequestWarning - warnings.simplefilter('ignore', InsecureRequestWarning) - - # Add policy to redirect requests to the test proxy - self._test_proxy = self.args.test_proxies[self._parallel_index % len(self.args.test_proxies)] - self._test_proxy_policy = PerfTestProxyPolicy(self._test_proxy) - self._client_kwargs['per_retry_policies'] = [self._test_proxy_policy] - - async def global_setup(self): - return - - async def global_cleanup(self): - return - - async def record_and_start_playback(self): - # Make one call to Run() before starting recording, to avoid capturing one-time setup like authorization requests - if self.args.sync: - self.run_sync() - else: - await self.run_async() - - await self._start_recording() - self._test_proxy_policy.recording_id = self._recording_id - self._test_proxy_policy.mode = "record" - - # Record one call to run() - if self.args.sync: - self.run_sync() - else: - await self.run_async() - - await self._stop_recording() - await self._start_playback() - self._test_proxy_policy.recording_id = self._recording_id - self._test_proxy_policy.mode = "playback" - - async def stop_playback(self): - headers = { - "x-recording-id": self._recording_id, - "x-purge-inmemory-recording": "true" - } - url = urljoin(self._test_proxy, "/playback/stop") - async with self._session.post(url, headers=headers) as resp: - assert resp.status == 200 - - self._test_proxy_policy.recording_id = None - self._test_proxy_policy.mode = None - - async def setup(self): - return - - async def cleanup(self): - return - - async def close(self): - if self._session: - await self._session.close() - - def run_sync(self): - raise Exception("run_sync must be implemented for {}".format(self.__class__.__name__)) - - async def run_async(self): - raise Exception("run_async must be implemented for {}".format(self.__class__.__name__)) - - async def _start_recording(self): - url = urljoin(self._test_proxy, "/record/start") - async with self._session.post(url) as resp: - assert resp.status == 200 - self._recording_id = resp.headers["x-recording-id"] - - async def _stop_recording(self): - headers = {"x-recording-id": self._recording_id} - url = urljoin(self._test_proxy, "/record/stop") - async with self._session.post(url, headers=headers) as resp: - assert resp.status == 200 - - async def _start_playback(self): - headers = {"x-recording-id": self._recording_id} - url = urljoin(self._test_proxy, "/playback/start") - async with self._session.post(url, headers=headers) as resp: - assert resp.status == 200 - self._recording_id = resp.headers["x-recording-id"] - - @staticmethod - def add_arguments(parser): - """ - Override this method to add test-specific argparser args to the class. - These are accessible in __init__() and the self.args property. - """ - return - - @staticmethod - def get_from_env(variable): - value = os.environ.get(variable) - if not value: - raise Exception("Undefined environment variable {}".format(variable)) - return value diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/README.md b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/README.md new file mode 100644 index 000000000000..9d5c57353f93 --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/README.md @@ -0,0 +1,4 @@ +# Python system performance tests + +These tests are intend as a benchmark for the raw performance of the various Python HTTP layers. +For further details on how to configure and run these tests, [please see the perf test documentation](https://github.com/Azure/azure-sdk-for-python/blob/main/doc/dev/perfstress_tests.md#running-the-system-tests). \ No newline at end of file diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/aiohttp_get_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/aiohttp_get_test.py index b49eb29ec979..c112173f49bb 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/aiohttp_get_test.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/aiohttp_get_test.py @@ -16,7 +16,7 @@ async def global_cleanup(self): await type(self).session.close() async def run_async(self): - async with type(self).session.get(self.Arguments.url) as response: + async with type(self).session.get(self.args.url) as response: await response.text() @staticmethod diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/httpx_get_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/httpx_get_test.py index 193fad43a2fe..cd3820cb6f2b 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/httpx_get_test.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/httpx_get_test.py @@ -16,7 +16,7 @@ async def global_cleanup(self): await type(self).client.aclose() async def run_async(self): - response = await type(self).client.get(self.Arguments.url) + response = await type(self).client.get(self.args.url) _ = response.text @staticmethod diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/requests_get_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/requests_get_test.py index b6de6db04e5a..2acdf8fbf56b 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/requests_get_test.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/requests_get_test.py @@ -13,7 +13,7 @@ async def global_setup(self): type(self).session = requests.Session() def run_sync(self): - type(self).session.get(self.Arguments.url).text + type(self).session.get(self.args.url).text @staticmethod def add_arguments(parser): diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sample_batch_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sample_batch_test.py new file mode 100644 index 000000000000..2f0c8c050b28 --- /dev/null +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sample_batch_test.py @@ -0,0 +1,45 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +from azure_devtools.perfstress_tests import BatchPerfTest + +class MockReceiver(): + def receive(self, min_messages=1, max_messages=5): + for i in range(min_messages, max_messages): + yield 1 + +class AsyncMockReceiver(): + async def receive(self, min_messages=1, max_messages=5): + for i in range(min_messages, max_messages): + yield i + + +class SampleBatchTest(BatchPerfTest): + def __init__(self, arguments): + super().__init__(arguments) + + # Setup service clients + self.receiver_client = MockReceiver() + self.async_receiver_client = AsyncMockReceiver() + + def run_batch_sync(self) -> int: + messages = self.receiver_client.receive( + max_messages=self.args.max_message_count, + min_messages=self.args.min_message_count + ) + return len(list(messages)) + + async def run_batch_async(self) -> int: + messages = self.async_receiver_client.receive( + max_messages=self.args.max_message_count, + min_messages=self.args.min_message_count + ) + return len([m async for m in messages]) + + @staticmethod + def add_arguments(parser): + super(SampleBatchTest, SampleBatchTest).add_arguments(parser) + parser.add_argument('--max-message-count', nargs='?', type=int, default=10) + parser.add_argument('--min-message-count', nargs='?', type=int, default=0) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sleep_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sleep_test.py index c4f37867f79e..84a171c47e93 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sleep_test.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/sleep_test.py @@ -15,6 +15,7 @@ class SleepTest(PerfStressTest): instance_count = 0 def __init__(self, arguments): + super().__init__(arguments) type(self).instance_count += 1 self.seconds_per_operation = math.pow(2, type(self).instance_count) diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/socket_http_get_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/socket_http_get_test.py index fb0d10b22ec7..2de7fc446923 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/socket_http_get_test.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/socket_http_get_test.py @@ -11,7 +11,7 @@ class SocketHttpGetTest(PerfStressTest): async def setup(self): - parsed_url = urlparse(self.Arguments.url) + parsed_url = urlparse(self.args.url) hostname = parsed_url.hostname port = parsed_url.port path = parsed_url.path diff --git a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/tornado_get_test.py b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/tornado_get_test.py index cafcecbc8922..880fc91f8728 100644 --- a/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/tornado_get_test.py +++ b/tools/azure-devtools/src/azure_devtools/perfstress_tests/system_perfstress/tornado_get_test.py @@ -14,7 +14,7 @@ async def global_setup(self): type(self).client = httpclient.AsyncHTTPClient() async def run_async(self): - await type(self).client.fetch(self.Arguments.url) + await type(self).client.fetch(self.args.url) @staticmethod def add_arguments(parser):