From 15e58519175b1eda8b23d8644842342fa21c2cee Mon Sep 17 00:00:00 2001 From: Jacob Callahan Date: Thu, 19 Sep 2024 15:17:44 -0400 Subject: [PATCH] Improve concurrency This change rewrites the way broker actions handle concurrency. Previously, this was offloaded in a manner that was wasteful in the way it handled pre-action setup. Now, we don't go concurrent until we're ready to perform the action we want. --- broker/broker.py | 95 +++++++++++++------------------------------- tests/test_broker.py | 19 --------- 2 files changed, 28 insertions(+), 86 deletions(-) diff --git a/broker/broker.py b/broker/broker.py index af9598e3..e94e280e 100644 --- a/broker/broker.py +++ b/broker/broker.py @@ -6,8 +6,7 @@ It defines the `Host` class, which represents a cloud resource, and the `Broker` class, which provides methods for managing hosts. -The `Broker` class is decorated with `mp_decorator`, which enables multiprocessing for -certain methods. The `Host` class is defined in the `broker.hosts` module, +The `Host` class is defined in the `broker.hosts` module, and the provider classes are defined in the `broker.providers` module. Exceptions are defined in the `broker.exceptions` module, @@ -40,44 +39,6 @@ def _try_teardown(host_obj): return exceptions.HostError(host_obj, f"error during teardown:\n{err}") -class mp_decorator: - """Decorator wrapping Broker methods to enable multiprocessing. - - The decorated method is expected to return an itearable. - """ - - MAX_WORKERS = None - """ If set to integer, the count of workers will be limited to that amount. - If set to None, the max workers count of the EXECUTOR will match the count of items.""" - - EXECUTOR = ThreadPoolExecutor - - def __init__(self, func=None): - self.func = func - - def __get__(self, instance, owner): - """Support instance methods.""" - if not instance: - return self.func - - def mp_split(*args, **kwargs): - count = instance._kwargs.get("_count", None) - if count is None: - return self.func(instance, *args, **kwargs) - - results = [] - max_workers_count = self.MAX_WORKERS or count - with self.EXECUTOR(max_workers=max_workers_count) as workers: - completed_futures = as_completed( - workers.submit(self.func, instance, *args, **kwargs) for _ in range(count) - ) - for f in completed_futures: - results.extend(f.result()) - return results - - return mp_split - - class Broker: """Main Broker class to be used as the primary interface for the Broker API.""" @@ -112,6 +73,7 @@ def __init__(self, **kwargs): def _act(self, provider, method, checkout=False): """Perform a general action against a provider's method.""" + count = self._kwargs.get("_count", 1) logger.debug(f"Resolving action {method} on provider {provider}.") provider_inst = provider(**self._kwargs) helpers.emit( @@ -123,14 +85,26 @@ def _act(self, provider, method, checkout=False): ) method_obj = getattr(provider_inst, method) logger.debug(f"On {provider_inst=} executing {method_obj=} with params {self._kwargs=}.") - result = method_obj(**self._kwargs) - logger.debug(f"Action {result=}") + # Overkill for a single action, cleaner than splitting the logic + with ThreadPoolExecutor() as workers: + tasks = [workers.submit(method_obj, **self._kwargs) for _ in range(count)] + result = [] + for task in as_completed(tasks): + try: + result.append(task.result()) + except exceptions.ProviderError as err: + result.append(err) + logger.debug(f"Result:\n{result}") if result and checkout: - return provider_inst.construct_host( - provider_params=result, host_classes=self.host_classes, **self._kwargs - ) + return [ + provider_inst.construct_host( + provider_params=res, host_classes=self.host_classes, **self._kwargs + ) + for res in result + if not isinstance(res, exceptions.ProviderError) + ] else: - return result + return result if len(result) > 1 else result[0] def _update_provider_actions(self, kwargs): if not self._provider_actions: @@ -138,39 +112,26 @@ def _update_provider_actions(self, kwargs): if key in kwargs: self._provider_actions[key] = action - @mp_decorator - def _checkout(self): - """Checkout one or more VMs. + def checkout(self): + """Checkout one or more hosts. - :return: List of Host objects + :return: Host obj or list of Host objects """ - hosts = [] - logger.debug(f"Doing _checkout(): {self._provider_actions=}") + logger.debug(f"Starting checkout(s): {self._provider_actions=}") if not self._provider_actions: raise self.BrokerError("Could not determine an appropriate provider") for action in self._provider_actions: provider, method = PROVIDER_ACTIONS[action] logger.info(f"Using provider {provider.__name__} to checkout") - try: - host = self._act(provider, method, checkout=True) - except exceptions.ProviderError as err: - host = err - hosts.append(host) - if host and not isinstance(host, exceptions.ProviderError): - logger.info(f"{host.__class__.__name__}: {host.hostname}") - return hosts - - def checkout(self): - """Checkout one or more VMs. - - :return: Host obj or list of Host objects - """ - hosts = self._checkout() + # perform the checkout, concurrently if _count is set + hosts = self._act(provider, method, checkout=True) err = None for host in hosts[:]: if isinstance(host, exceptions.ProviderError): err = host hosts.remove(host) + else: + logger.info(f"{host.__class__.__name__}: {host.hostname}") helpers.emit(hosts=[host.to_dict() for host in hosts]) self._hosts.extend(hosts) helpers.update_inventory([host.to_dict() for host in hosts]) diff --git a/tests/test_broker.py b/tests/test_broker.py index ab2e109d..7b1a3736 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -130,22 +130,3 @@ def test_multi_manager(): class SomeException(Exception): pass - - -class MyBroker: - @broker.mp_decorator - def workload(self): - return [] - - @broker.mp_decorator - def failing_workload(self): - raise SomeException() - - -def test_mp_decorator(): - tested_broker = MyBroker() - tested_broker._kwargs = dict(_count=2) - - tested_broker.workload() - with pytest.raises(SomeException): - tested_broker.failing_workload()