Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better handling for providers in concurrent actions #319

Merged
merged 2 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 28 additions & 67 deletions broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@
It defines the `Host` class, which represents a cloud resource, and the `Broker` class,
which provides methods for managing hosts.

The `Broker` class is decorated with `mp_decorator`, which enables multiprocessing for
certain methods. The `Host` class is defined in the `broker.hosts` module,
The `Host` class is defined in the `broker.hosts` module,
and the provider classes are defined in the `broker.providers` module.

Exceptions are defined in the `broker.exceptions` module,
Expand Down Expand Up @@ -40,44 +39,6 @@ def _try_teardown(host_obj):
return exceptions.HostError(host_obj, f"error during teardown:\n{err}")


class mp_decorator:
"""Decorator wrapping Broker methods to enable multiprocessing.

The decorated method is expected to return an itearable.
"""

MAX_WORKERS = None
""" If set to integer, the count of workers will be limited to that amount.
If set to None, the max workers count of the EXECUTOR will match the count of items."""

EXECUTOR = ThreadPoolExecutor

def __init__(self, func=None):
self.func = func

def __get__(self, instance, owner):
"""Support instance methods."""
if not instance:
return self.func

def mp_split(*args, **kwargs):
count = instance._kwargs.get("_count", None)
if count is None:
return self.func(instance, *args, **kwargs)

results = []
max_workers_count = self.MAX_WORKERS or count
with self.EXECUTOR(max_workers=max_workers_count) as workers:
completed_futures = as_completed(
workers.submit(self.func, instance, *args, **kwargs) for _ in range(count)
)
for f in completed_futures:
results.extend(f.result())
return results

return mp_split


class Broker:
"""Main Broker class to be used as the primary interface for the Broker API."""

Expand Down Expand Up @@ -112,6 +73,7 @@ def __init__(self, **kwargs):

def _act(self, provider, method, checkout=False):
"""Perform a general action against a provider's method."""
count = self._kwargs.get("_count", 1)
logger.debug(f"Resolving action {method} on provider {provider}.")
provider_inst = provider(**self._kwargs)
helpers.emit(
Expand All @@ -123,54 +85,53 @@ def _act(self, provider, method, checkout=False):
)
method_obj = getattr(provider_inst, method)
logger.debug(f"On {provider_inst=} executing {method_obj=} with params {self._kwargs=}.")
result = method_obj(**self._kwargs)
logger.debug(f"Action {result=}")
# Overkill for a single action, cleaner than splitting the logic
with ThreadPoolExecutor() as workers:
tasks = [workers.submit(method_obj, **self._kwargs) for _ in range(count)]
result = []
for task in as_completed(tasks):
try:
result.append(task.result())
except exceptions.ProviderError as err:
result.append(err)
logger.debug(f"Result:\n{result}")
if result and checkout:
return provider_inst.construct_host(
provider_params=result, host_classes=self.host_classes, **self._kwargs
)
return [
provider_inst.construct_host(
provider_params=res, host_classes=self.host_classes, **self._kwargs
)
for res in result
if not isinstance(res, exceptions.ProviderError)
]
else:
return result
return result if len(result) > 1 else result[0]

def _update_provider_actions(self, kwargs):
if not self._provider_actions:
for key, action in PROVIDER_ACTIONS.items():
if key in kwargs:
self._provider_actions[key] = action

@mp_decorator
def _checkout(self):
"""Checkout one or more VMs.
def checkout(self):
"""Checkout one or more hosts.

:return: List of Host objects
:return: Host obj or list of Host objects
"""
hosts = []
logger.debug(f"Doing _checkout(): {self._provider_actions=}")
logger.debug(f"Starting checkout(s): {self._provider_actions=}")
if not self._provider_actions:
raise self.BrokerError("Could not determine an appropriate provider")
for action in self._provider_actions:
provider, method = PROVIDER_ACTIONS[action]
logger.info(f"Using provider {provider.__name__} to checkout")
try:
host = self._act(provider, method, checkout=True)
except exceptions.ProviderError as err:
host = err
hosts.append(host)
if host and not isinstance(host, exceptions.ProviderError):
logger.info(f"{host.__class__.__name__}: {host.hostname}")
return hosts

def checkout(self):
"""Checkout one or more VMs.

:return: Host obj or list of Host objects
"""
hosts = self._checkout()
# perform the checkout, concurrently if _count is set
hosts = self._act(provider, method, checkout=True)
err = None
for host in hosts[:]:
if isinstance(host, exceptions.ProviderError):
err = host
hosts.remove(host)
else:
logger.info(f"{host.__class__.__name__}: {host.hostname}")
helpers.emit(hosts=[host.to_dict() for host in hosts])
self._hosts.extend(hosts)
helpers.update_inventory([host.to_dict() for host in hosts])
Expand Down
17 changes: 9 additions & 8 deletions broker/providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,22 @@ def __new__(cls, name, bases, attrs):
for attr, obj in attrs.items():
if attr == "provider_help":
# register the help options based on the function arguments
for name, param in inspect.signature(obj).parameters.items():
if name not in ("self", "kwargs"):
# {name: (cls, is_flag)}
PROVIDER_HELP[name] = (
for _name, param in inspect.signature(obj).parameters.items():
if _name not in ("self", "kwargs"):
# {_name: (cls, is_flag)}
PROVIDER_HELP[_name] = (
new_cls,
isinstance(param.default, bool),
)
logger.debug(f"Registered help option {name} for provider {name}")
logger.debug(f"Registered help option {_name} for provider {_name}")
elif hasattr(obj, "_as_action"):
for action in obj._as_action:
PROVIDER_ACTIONS[action] = (new_cls, attr)
logger.debug(f"Registered action {action} for provider {name}")
# register provider settings validators
if validators := attrs.get("_validators"):
logger.debug(f"Adding {len(validators)} validators for {name}")
settings.validators.extend(validators)
return new_cls


Expand Down Expand Up @@ -143,9 +147,6 @@ def _validate_settings(self, instance_name=None):
if not inst_vals.get("override_envars"):
# if a provider instance doesn't want to override envars, load them
settings.execute_loaders(loaders=[dynaconf.loaders.env_loader])
new_validators = [v for v in self._validators if v not in settings.validators]
logger.debug(f"Adding new validators: {[v.names[0] for v in new_validators]}")
settings.validators.extend(new_validators)
# use selective validation to only validate the instance settings
try:
settings.validators.validate(only=section_name)
Expand Down
19 changes: 0 additions & 19 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,3 @@ def test_multi_manager():

class SomeException(Exception):
pass


class MyBroker:
@broker.mp_decorator
def workload(self):
return []

@broker.mp_decorator
def failing_workload(self):
raise SomeException()


def test_mp_decorator():
tested_broker = MyBroker()
tested_broker._kwargs = dict(_count=2)

tested_broker.workload()
with pytest.raises(SomeException):
tested_broker.failing_workload()