diff --git a/.editorconfig b/.editorconfig index 52895ec8..e93c4ff4 100644 --- a/.editorconfig +++ b/.editorconfig @@ -4,10 +4,7 @@ indent_size = 4 insert_final_newline = true trim_trailing_whitespace = true -[*.json] -indent_size = 2 - -[*.yml] +[*.{json,yml,yaml}] indent_size = 2 [*.xml] diff --git a/.vscode/launch.json b/.vscode/launch.json index 442bbeed..6baa77f5 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -9,10 +9,10 @@ "type": "python", "request": "launch", "stopOnEntry": false, - "internalConsoleOptions": "openOnSessionStart", "pythonPath": "${config:python.pythonPath}", "program": "${workspaceFolder}/cli/entrypoint.py", "cwd": "${workspaceFolder}", + "console": "integratedTerminal", "args": [ "spark", "cluster", "list" ], @@ -27,9 +27,9 @@ "type": "python", "request": "launch", "stopOnEntry": false, - "internalConsoleOptions": "openOnSessionStart", "pythonPath": "${config:python.pythonPath}", "program": "${workspaceFolder}/cli/entrypoint.py", + "console": "integratedTerminal", "cwd": "${workspaceFolder}", "args": [ "spark", "cluster", "create", "--id", "spark-debug" diff --git a/aztk/client.py b/aztk/client.py index f36e565f..112d0033 100644 --- a/aztk/client.py +++ b/aztk/client.py @@ -1,6 +1,7 @@ import asyncio import concurrent.futures import sys +import yaml from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta, timezone @@ -25,6 +26,7 @@ def __init__(self, secrets_config: models.SecretsConfiguration): self.batch_client = azure_api.make_batch_client(secrets_config) self.blob_client = azure_api.make_blob_client(secrets_config) + ''' General Batch Operations ''' @@ -54,7 +56,7 @@ def __delete_pool_and_job(self, pool_id: str): return job_exists or pool_exists - def __create_pool_and_job(self, cluster_conf, software_metadata_key: str, start_task, VmImageModel): + def __create_pool_and_job(self, cluster_conf: models.ClusterConfiguration, software_metadata_key: str, start_task, VmImageModel): """ Create a pool and job :param cluster_conf: the configuration object used to create the cluster @@ -64,6 +66,7 @@ def __create_pool_and_job(self, cluster_conf, software_metadata_key: str, start_ :param VmImageModel: the type of image to provision for the cluster :param wait: wait until the cluster is ready """ + helpers.save_cluster_config(cluster_conf, self.blob_client) # reuse pool_id as job_id pool_id = cluster_conf.cluster_id job_id = cluster_conf.cluster_id diff --git a/aztk/error.py b/aztk/error.py index 85dbb170..7fc2cfdc 100644 --- a/aztk/error.py +++ b/aztk/error.py @@ -1,15 +1,31 @@ +""" +Contains all errors used in Aztk. +All error should inherit from `AztkError` +""" class AztkError(Exception): def __init__(self, message: str = None): - super().__init__() - self.message = message + super().__init__(message) class ClusterNotReadyError(AztkError): pass class AzureApiInitError(AztkError): - def __init__(self, message: str = None): - super().__init__() - self.message = message + pass + +class InvalidPluginConfigurationError(AztkError): + pass + +class InvalidModelError(AztkError): + pass + +class MissingRequiredAttributeError(InvalidModelError): + pass + +class InvalidCustomScriptError(InvalidModelError): + pass + +class InvalidPluginReferenceError(InvalidModelError): + pass diff --git a/aztk/internal/__init__.py b/aztk/internal/__init__.py new file mode 100644 index 00000000..74745aec --- /dev/null +++ b/aztk/internal/__init__.py @@ -0,0 +1,5 @@ +""" +Module containing classes used in the library but without any use for SDK user +""" + +from .configuration_base import * diff --git a/aztk/internal/configuration_base.py b/aztk/internal/configuration_base.py new file mode 100644 index 00000000..1fdd6ab6 --- /dev/null +++ b/aztk/internal/configuration_base.py @@ -0,0 +1,42 @@ +import yaml +from aztk.error import AztkError + +class ConfigurationBase: + """ + Base class for any configuration. + Include methods to help with validation + """ + + @classmethod + def from_dict(cls, args: dict): + """ + Create a new model from a dict values + The dict is cleaned from null values and passed expanded to the constructor + """ + try: + clean = dict((k, v) for k, v in args.items() if v) + return cls(**clean) + except TypeError as e: + pretty_args = yaml.dump(args, default_flow_style=False) + raise AztkError("{0} {1}\n{2}".format(cls.__name__, str(e), pretty_args)) + + def validate(self): + raise NotImplementedError("Validate not implemented") + + def valid(self): + try: + self.validate() + return True + except AztkError: + return False + + def _validate_required(self, attrs): + for attr in attrs: + if not getattr(self, attr): + raise AztkError("{0} missing {1}.".format(self.__class__.__name__, attr)) + + def _merge_attributes(self, other, attrs): + for attr in attrs: + val = getattr(other, attr) + if val is not None: + setattr(self, attr, val) diff --git a/aztk/models/__init__.py b/aztk/models/__init__.py new file mode 100644 index 00000000..aed4fa32 --- /dev/null +++ b/aztk/models/__init__.py @@ -0,0 +1 @@ +from .models import * diff --git a/aztk/models.py b/aztk/models/models.py similarity index 80% rename from aztk/models.py rename to aztk/models/models.py index 8002ac87..30d12064 100644 --- a/aztk/models.py +++ b/aztk/models/models.py @@ -1,37 +1,14 @@ import io from typing import List -import azure.batch.models as batch_models -import aztk.utils.constants as constants from aztk import error +from aztk.utils import constants +import azure.batch.models as batch_models +from aztk.models.plugins import PluginConfiguration +from aztk.internal import ConfigurationBase +import yaml +import logging -class ConfigurationBase: - """ - Base class for any configuration. - Include methods to help with validation - """ - def validate(self): - raise NotImplementedError("Validate not implemented") - - def valid(self): - try: - self.validate() - return True - except error.AztkError: - return False - - def _validate_required(self, attrs): - for attr in attrs: - if not getattr(self, attr): - raise error.AztkError( - "{0} missing {1}.".format(self.__class__.__name__, attr)) - - def _merge_attributes(self, other, attrs): - for attr in attrs: - val = getattr(other, attr) - if val is not None: - setattr(self, attr, val) - class FileShare: def __init__(self, storage_account_name: str = None, @@ -57,7 +34,10 @@ def __init__(self, name: str = None, script = None, run_on=None): class UserConfiguration(ConfigurationBase): - def __init__(self, username: str, ssh_key: str = None, password: str = None): + def __init__(self, + username: str, + ssh_key: str = None, + password: str = None): self.username = username self.ssh_key = ssh_key self.password = password @@ -69,21 +49,24 @@ def merge(self, other): "password", ]) + + class ClusterConfiguration(ConfigurationBase): """ Cluster configuration model """ - def __init__( - self, - custom_scripts: List[CustomScript] = None, - file_shares: List[FileShare] = None, - cluster_id: str = None, - vm_count=0, - vm_low_pri_count=0, - vm_size=None, - subnet_id=None, - docker_repo: str=None, - user_configuration: UserConfiguration=None): + + def __init__(self, + custom_scripts: List[CustomScript] = None, + file_shares: List[FileShare] = None, + cluster_id: str = None, + vm_count=0, + vm_low_pri_count=0, + vm_size=None, + subnet_id=None, + docker_repo: str = None, + plugins: List[PluginConfiguration] = None, + user_configuration: UserConfiguration = None): super().__init__() self.custom_scripts = custom_scripts self.file_shares = file_shares @@ -94,6 +77,7 @@ def __init__( self.subnet_id = subnet_id self.docker_repo = docker_repo self.user_configuration = user_configuration + self.plugins = plugins def merge(self, other): """ @@ -110,6 +94,7 @@ def merge(self, other): "docker_repo", "vm_count", "vm_low_pri_count", + "plugins", ]) if other.user_configuration: @@ -118,6 +103,10 @@ def merge(self, other): else: self.user_configuration = other.user_configuration + if self.plugins: + for plugin in self.plugins: + plugin.validate() + def mixed_mode(self) -> bool: return self.vm_count > 0 and self.vm_low_pri_count > 0 @@ -133,15 +122,18 @@ def validate(self) -> bool: if self.vm_count == 0 and self.vm_low_pri_count == 0: raise error.AztkError( - "Please supply a valid (greater than 0) size or size_low_pri value either in the cluster.yaml configuration file or with a parameter (--size or --size-low-pri)") + "Please supply a valid (greater than 0) size or size_low_pri value either in the cluster.yaml configuration file or with a parameter (--size or --size-low-pri)" + ) if self.vm_size is None: raise error.AztkError( - "Please supply a vm_size in either the cluster.yaml configuration file or with a parameter (--vm-size)") + "Please supply a vm_size in either the cluster.yaml configuration file or with a parameter (--vm-size)" + ) if self.mixed_mode() and not self.subnet_id: raise error.AztkError( - "You must configure a VNET to use AZTK in mixed mode (dedicated and low priority nodes). Set the VNET's subnet_id in your cluster.yaml.") + "You must configure a VNET to use AZTK in mixed mode (dedicated and low priority nodes). Set the VNET's subnet_id in your cluster.yaml." + ) class RemoteLogin: @@ -217,11 +209,7 @@ def validate(self) -> bool: class DockerConfiguration(ConfigurationBase): - def __init__( - self, - endpoint=None, - username=None, - password=None): + def __init__(self, endpoint=None, username=None, password=None): self.endpoint = endpoint self.username = username @@ -232,13 +220,12 @@ def validate(self): class SecretsConfiguration(ConfigurationBase): - def __init__( - self, - service_principal=None, - shared_key=None, - docker=None, - ssh_pub_key=None, - ssh_priv_key=None): + def __init__(self, + service_principal=None, + shared_key=None, + docker=None, + ssh_pub_key=None, + ssh_priv_key=None): self.service_principal = service_principal self.shared_key = shared_key self.docker = docker @@ -249,14 +236,16 @@ def __init__( def validate(self): if self.service_principal and self.shared_key: raise error.AztkError( - "Both service_principal and shared_key auth are configured, must use only one") + "Both service_principal and shared_key auth are configured, must use only one" + ) elif self.service_principal: self.service_principal.validate() elif self.shared_key: self.shared_key.validate() else: raise error.AztkError( - "Neither service_principal and shared_key auth are configured, must use only one") + "Neither service_principal and shared_key auth are configured, must use only one" + ) def is_aad(self): return self.service_principal is not None @@ -270,7 +259,9 @@ def __init__(self, publisher, offer, sku): class Cluster: - def __init__(self, pool: batch_models.CloudPool, nodes: batch_models.ComputeNodePaged = None): + def __init__(self, + pool: batch_models.CloudPool, + nodes: batch_models.ComputeNodePaged = None): self.id = pool.id self.pool = pool self.nodes = nodes @@ -288,11 +279,13 @@ def __init__(self, pool: batch_models.CloudPool, nodes: batch_models.ComputeNode self.target_dedicated_nodes = pool.target_dedicated_nodes self.target_low_pri_nodes = pool.target_low_priority_nodes + class SSHLog(): def __init__(self, output, node_id): self.output = output self.node_id = node_id + class Software: """ Enum with list of available softwares diff --git a/aztk/models/plugins/__init__.py b/aztk/models/plugins/__init__.py new file mode 100644 index 00000000..5d9241b7 --- /dev/null +++ b/aztk/models/plugins/__init__.py @@ -0,0 +1,2 @@ +from .plugin_file import * +from .plugin_configuration import * diff --git a/aztk/models/plugins/internal/__init__.py b/aztk/models/plugins/internal/__init__.py new file mode 100644 index 00000000..af3f8e14 --- /dev/null +++ b/aztk/models/plugins/internal/__init__.py @@ -0,0 +1,2 @@ +from .plugin_manager import * +from .plugin_reference import * diff --git a/aztk/models/plugins/internal/plugin_manager.py b/aztk/models/plugins/internal/plugin_manager.py new file mode 100644 index 00000000..090f80e4 --- /dev/null +++ b/aztk/models/plugins/internal/plugin_manager.py @@ -0,0 +1,73 @@ +import os +import inspect +import importlib.util +from aztk.utils import constants +from aztk.error import InvalidPluginReferenceError +from aztk.spark.models import plugins + + +class PluginArgument: + def __init__(self, name: str, required: bool, default=None): + self.name = name + self.required = required + self.default = default + + +class PluginManager: + # Indexing of all the predefined plugins + plugins = dict( + jupyter=plugins.JupyterPlugin, + rstudio_server=plugins.RStudioServerPlugin, + hdfs=plugins.HDFSPlugin, + ) + + def __init__(self): + self.loaded = False + + def has_plugin(self, name: str): + return name in self.plugins + + def get_plugin(self, name: str, args: dict = None): + args = args or dict() + if not self.has_plugin(name): + raise InvalidPluginReferenceError("Cannot find a plugin with name '{0}'".format(name)) + plugin_cls = self.plugins[name] + self._validate_args(plugin_cls, args) + + return plugin_cls(**args) + + def get_args_for(self, cls): + signature = inspect.signature(cls) + args = dict() + + for k, v in signature.parameters.items(): + args[k] = PluginArgument(k, default=v.default, required=v.default is inspect.Parameter.empty) + + return args + + def _validate_args(self, plugin_cls, args: dict): + """ + Validate the given args are valid for the plugin + """ + plugin_args = self.get_args_for(plugin_cls) + + self._validate_no_extra_args(plugin_cls, plugin_args, args) + + for arg in plugin_args.values(): + if args.get(arg.name) is None: + if arg.required: + message = "Missing a required argument {0} for plugin {1}".format( + arg.name, plugin_cls.__name__) + raise InvalidPluginReferenceError(message) + args[arg.name] = arg.default + + + def _validate_no_extra_args(self, plugin_cls, plugin_args: dict, args: dict): + for name in args: + if not name in plugin_args: + message = "Plugin {0} doesn't have an argument called '{1}'".format( + plugin_cls.__name__, name) + raise InvalidPluginReferenceError(message) + + +plugin_manager = PluginManager() diff --git a/aztk/models/plugins/internal/plugin_reference.py b/aztk/models/plugins/internal/plugin_reference.py new file mode 100644 index 00000000..92930bc9 --- /dev/null +++ b/aztk/models/plugins/internal/plugin_reference.py @@ -0,0 +1,21 @@ +from aztk.error import InvalidPluginConfigurationError, InvalidModelError +from aztk.internal import ConfigurationBase +from aztk.models import PluginConfiguration +from .plugin_manager import plugin_manager + +class PluginReference(ConfigurationBase): + """ + Contains the configuration to use a plugin + """ + def __init__(self, name, args: dict = None): + super().__init__() + self.name = name + self.args = args or dict() + + def get_plugin(self) -> PluginConfiguration: + return plugin_manager.get_plugin(self.name, self.args) + + def validate(self) -> bool: + if not self.name: + raise InvalidModelError("Plugin is missing a name") + diff --git a/aztk/models/plugins/plugin_configuration.py b/aztk/models/plugins/plugin_configuration.py new file mode 100644 index 00000000..c32dc3a8 --- /dev/null +++ b/aztk/models/plugins/plugin_configuration.py @@ -0,0 +1,74 @@ +import inspect +from typing import List, Union +from enum import Enum +from .plugin_file import PluginFile +from aztk.internal import ConfigurationBase + +class PluginPort: + """ + Definition for a port that should be opened on node + :param internal: Port on the node + :param public: [Optional] Port available to the user. If none won't open any port to the user + :param name: [Optional] name to differentiate ports if you have multiple + """ + + def __init__(self, internal: int, public: Union[int, bool] = False, name=None): + + self.internal = internal + self.expose_publicly = bool(public) + self.public_port = None + if self.expose_publicly: + if public is True: + self.public_port = internal + else: + self.public_port = public + + self.name = name + + +class PluginRunTarget(Enum): + Master = "master" + Worker = "worker" + All = "all-nodes" + + + +class PluginConfiguration(ConfigurationBase): + """ + Plugin manifest that should be returned in the main.py of your plugin + :param name: Name of the plugin. Used to reference the plugin + :param runOn: Where the plugin should run + :param files: List of files to upload + :param args: + :param env: + """ + + def __init__(self, + name: str, + ports: List[PluginPort] = None, + files: List[PluginFile] = None, + execute: str = None, + args=None, + env=None, + run_on: PluginRunTarget = PluginRunTarget.Master): + self.name = name + # self.docker_image = docker_image + self.run_on = run_on + self.ports = ports or [] + self.files = files or [] + self.args = args or [] + self.env = env or dict() + self.execute = execute + + def has_arg(self, name: str): + for x in self.args: + if x.name == name: + return True + else: + return False + + def validate(self): + self._validate_required([ + "name", + "execute", + ]) diff --git a/aztk/models/plugins/plugin_file.py b/aztk/models/plugins/plugin_file.py new file mode 100644 index 00000000..93ab80cd --- /dev/null +++ b/aztk/models/plugins/plugin_file.py @@ -0,0 +1,28 @@ +import io +from typing import Union + +class PluginFile: + """ + Reference to a file for a plugin. + """ + def __init__(self, target: str, local_path: str): + self.target = target + self.local_path = local_path + + + # TODO handle folders? + + def content(self): + with open(self.local_path, "r") as f: + return f.read() + + +class TextPluginFile: + def __init__(self, target: str, content: Union[str,io.StringIO]): + if isinstance(content, str): + self._content = content + else: + self._content = content.getValue() + + def content(self): + return self._content diff --git a/aztk/spark/__init__.py b/aztk/spark/__init__.py index ada095b6..f7ccbfd4 100644 --- a/aztk/spark/__init__.py +++ b/aztk/spark/__init__.py @@ -1,2 +1,2 @@ -from . import models -from .client import Client +from .models import * +from .client import Client diff --git a/aztk/spark/client.py b/aztk/spark/client.py index 03ae07f1..bfde5fd0 100644 --- a/aztk/spark/client.py +++ b/aztk/spark/client.py @@ -28,13 +28,15 @@ def create_cluster(self, cluster_conf: models.ClusterConfiguration, wait: bool = cluster_conf.cluster_id, cluster_conf.custom_scripts, cluster_conf.spark_configuration, - cluster_conf.user_configuration) + cluster_conf.user_configuration, + cluster_conf.plugins) start_task = create_cluster_helper.generate_cluster_start_task(self, zip_resource_files, cluster_conf.gpu_enabled(), cluster_conf.docker_repo, cluster_conf.file_shares, + cluster_conf.plugins, cluster_conf.mixed_mode(), cluster_conf.worker_on_master) diff --git a/aztk/spark/helpers/create_cluster.py b/aztk/spark/helpers/create_cluster.py index e05b0c32..8e99e586 100644 --- a/aztk/spark/helpers/create_cluster.py +++ b/aztk/spark/helpers/create_cluster.py @@ -18,6 +18,7 @@ def __docker_run_cmd(docker_repo: str = None, gpu_enabled: bool = False, worker_on_master: bool = True, file_mounts = None, + plugins = None, mixed_mode = False) -> str: """ Build the docker run command by setting up the environment variables @@ -60,23 +61,18 @@ def __docker_run_cmd(docker_repo: str = None, cmd.add_option('-e', 'SPARK_WORKER_UI_PORT=$SPARK_WORKER_UI_PORT') cmd.add_option('-e', 'SPARK_CONTAINER_NAME=$SPARK_CONTAINER_NAME') cmd.add_option('-e', 'SPARK_SUBMIT_LOGS_FILE=$SPARK_SUBMIT_LOGS_FILE') - cmd.add_option('-e', 'SPARK_JUPYTER_PORT=$SPARK_JUPYTER_PORT') cmd.add_option('-e', 'SPARK_JOB_UI_PORT=$SPARK_JOB_UI_PORT') cmd.add_option('-p', '8080:8080') # Spark Master UI cmd.add_option('-p', '7077:7077') # Spark Master cmd.add_option('-p', '7337:7337') # Spark Shuffle Service cmd.add_option('-p', '4040:4040') # Job UI - cmd.add_option('-p', '8888:8888') # Jupyter UI - cmd.add_option('-p', '8787:8787') # Rstudio Server cmd.add_option('-p', '18080:18080') # Spark History Server UI cmd.add_option('-p', '3022:3022') # Docker SSH - cmd.add_option('-p', '8020:8020') # Namenode IPC: ClientProtocol - cmd.add_option('-p', '9000:9000') # Namenode IPC: ClientProtocol - cmd.add_option('-p', '50010:50010') # Datanode http data transfer - cmd.add_option('-p', '50020:50020') # Datanode IPC metaata operations - cmd.add_option('-p', '50070:50070') # Namenode WebUI - cmd.add_option('-p', '50075:50075') # DataNode WebUI - cmd.add_option('-p', '50090:50090') # Secondary NameNode http address + if plugins: + for plugin in plugins: + for port in plugin.ports: + cmd.add_option('-p', '{0}:{1}'.format(port.internal, port.internal)) # Jupyter UI + cmd.add_option('-d', docker_repo) cmd.add_argument('/bin/bash /mnt/batch/tasks/startup/wd/docker_main.sh') @@ -133,6 +129,7 @@ def __get_secrets_env(spark_client): def __cluster_install_cmd(zip_resource_file: batch_models.ResourceFile, gpu_enabled: bool, docker_repo: str = None, + plugins = None, worker_on_master: bool = True, file_mounts = None, mixed_mode: bool = False): @@ -170,7 +167,7 @@ def __cluster_install_cmd(zip_resource_file: batch_models.ResourceFile, constants.DOCKER_SPARK_CONTAINER_NAME, gpu_enabled, docker_repo, - __docker_run_cmd(docker_repo, gpu_enabled, worker_on_master, file_mounts, mixed_mode)), + __docker_run_cmd(docker_repo, gpu_enabled, worker_on_master, file_mounts, plugins, mixed_mode)), ] commands = shares + setup @@ -182,6 +179,7 @@ def generate_cluster_start_task( gpu_enabled: bool, docker_repo: str = None, file_shares: List[aztk_models.FileShare] = None, + plugins: List[aztk_models.PluginConfiguration] = None, mixed_mode: bool = False, worker_on_master: bool = True): """ @@ -193,9 +191,7 @@ def generate_cluster_start_task( resource_files = [zip_resource_file] spark_web_ui_port = constants.DOCKER_SPARK_WEB_UI_PORT spark_worker_ui_port = constants.DOCKER_SPARK_WORKER_UI_PORT - spark_jupyter_port = constants.DOCKER_SPARK_JUPYTER_PORT spark_job_ui_port = constants.DOCKER_SPARK_JOB_UI_PORT - spark_rstudio_server_port = constants.DOCKER_SPARK_RSTUDIO_SERVER_PORT spark_container_name = constants.DOCKER_SPARK_CONTAINER_NAME spark_submit_logs_file = constants.SPARK_SUBMIT_LOGS_FILE @@ -206,16 +202,12 @@ def generate_cluster_start_task( name="SPARK_WEB_UI_PORT", value=spark_web_ui_port), batch_models.EnvironmentSetting( name="SPARK_WORKER_UI_PORT", value=spark_worker_ui_port), - batch_models.EnvironmentSetting( - name="SPARK_JUPYTER_PORT", value=spark_jupyter_port), batch_models.EnvironmentSetting( name="SPARK_JOB_UI_PORT", value=spark_job_ui_port), batch_models.EnvironmentSetting( name="SPARK_CONTAINER_NAME", value=spark_container_name), batch_models.EnvironmentSetting( name="SPARK_SUBMIT_LOGS_FILE", value=spark_submit_logs_file), - batch_models.EnvironmentSetting( - name="SPARK_RSTUDIO_SERVER_PORT", value=spark_rstudio_server_port), ] + __get_docker_credentials(spark_client) # start task command diff --git a/aztk/spark/models/__init__.py b/aztk/spark/models/__init__.py new file mode 100644 index 00000000..cf4f59d6 --- /dev/null +++ b/aztk/spark/models/__init__.py @@ -0,0 +1 @@ +from .models import * \ No newline at end of file diff --git a/aztk/spark/models.py b/aztk/spark/models/models.py similarity index 99% rename from aztk/spark/models.py rename to aztk/spark/models/models.py index d69c2572..446db6f4 100644 --- a/aztk/spark/models.py +++ b/aztk/spark/models/models.py @@ -82,6 +82,9 @@ class SharedKeyConfiguration(aztk.models.SharedKeyConfiguration): class DockerConfiguration(aztk.models.DockerConfiguration): pass +class PluginConfiguration(aztk.models.PluginConfiguration): + pass + class ClusterConfiguration(aztk.models.ClusterConfiguration): def __init__( diff --git a/aztk/spark/models/plugins/__init__.py b/aztk/spark/models/plugins/__init__.py new file mode 100644 index 00000000..6fe8fb03 --- /dev/null +++ b/aztk/spark/models/plugins/__init__.py @@ -0,0 +1,3 @@ +from .hdfs import * +from .jupyter import * +from .rstudio_server import * diff --git a/aztk/spark/models/plugins/hdfs/__init__.py b/aztk/spark/models/plugins/hdfs/__init__.py new file mode 100644 index 00000000..2ec26f31 --- /dev/null +++ b/aztk/spark/models/plugins/hdfs/__init__.py @@ -0,0 +1 @@ +from .configuration import * diff --git a/aztk/spark/models/plugins/hdfs/configuration.py b/aztk/spark/models/plugins/hdfs/configuration.py new file mode 100644 index 00000000..ddc9d190 --- /dev/null +++ b/aztk/spark/models/plugins/hdfs/configuration.py @@ -0,0 +1,46 @@ +import os +from aztk.models.plugins.plugin_configuration import PluginConfiguration, PluginPort, PluginRunTarget +from aztk.models.plugins.plugin_file import PluginFile +from aztk.utils import constants + +dir_path = os.path.dirname(os.path.realpath(__file__)) + + +class HDFSPlugin(PluginConfiguration): + def __init__(self): + super().__init__( + name="hdfs", + ports=[ + PluginPort( + name="File system metadata operations", + internal=8020, + ), + PluginPort( + name="File system metadata operations(Backup)", + internal=9000, + ), + PluginPort( + name="Datanode data transfer", + internal=50010, + ), + PluginPort( + name="Datanode IPC metadata operations", + internal=50020, + ), + PluginPort( + name="Namenode", + internal=50070, + public=True, + ), + PluginPort( + name="Datanodes", + internal=50075, + public=True, + ), + ], + run_on=PluginRunTarget.All, + execute="hdfs.sh", + files=[ + PluginFile("hdfs.sh", os.path.join(dir_path, "hdfs.sh")), + ], + ) diff --git a/aztk/spark/models/plugins/hdfs/hdfs.sh b/aztk/spark/models/plugins/hdfs/hdfs.sh new file mode 100644 index 00000000..b2e94f70 --- /dev/null +++ b/aztk/spark/models/plugins/hdfs/hdfs.sh @@ -0,0 +1,70 @@ +#! /bin/bash + +# install dependencies +apt update +apt install -y ssh rsync + +# configure and run ssh +cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys +chmod 600 /root/.ssh/id_rsa +chmod 700 /root/.ssh/authorized_keys +mkdir /var/run/sshd +echo 'root:screencast' | chpasswd +sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config +sed -i 's/Port 22/Port 3022/' /etc/ssh/sshd_config +sed -i 's/# Port 22/ Port 3022/' /etc/ssh/ssh_config +sed -i 's/# StrictHostKeyChecking ask/StrictHostKeyChecking no/' /etc/ssh/ssh_config + +# SSH login fix. Otherwise user is kicked off after login +sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd + +/usr/sbin/sshd + +# install and configure hadoop +mkdir /home/hadoop-2.8.2 +curl http://apache.claz.org/hadoop/common/hadoop-2.8.2/hadoop-2.8.2.tar.gz | tar -xz -C /home + +export HADOOP_HOME=/home/hadoop-2.8.2 +echo 'export HADOOP_HOME=/home/hadoop-2.8.2' >> ~/.bashrc + +export HADOOP_CONF_DIR=/home/hadoop-2.8.2/etc/hadoop +echo 'export HADOOP_CONF_DIR=/home/hadoop-2.8.2/etc/hadoop' >> ~/.bashrc + +export PATH=$PATH:$HADOOP_HOME/bin +echo 'export PATH=$PATH:$HADOOP_HOME/bin' >> ~/.bashrc + +# Create a directory for the hadoop file system +mkdir -p /batch/hadoop + +echo ' + + + + fs.defaultFS + hdfs://'$MASTER_IP':8020 + +' > $HADOOP_HOME/etc/hadoop/core-site.xml + +echo ' + + + + dfs.namenode.datanode.registration.ip-hostname-check + false + + + dfs.datanode.data.dir + file:///batch/hadoop + + ' > $HADOOP_HOME/etc/hadoop/hdfs-site.xml + +# run HDFS +if [ $IS_MASTER -eq "1" ]; then + echo 'starting namenode and datanode' + hdfs namenode -format + $HADOOP_HOME/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start namenode + $HADOOP_HOME/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start datanode +else + echo 'starting datanode - namenode at ' $MASTER_IP ':8020' + $HADOOP_HOME/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start datanode +fi diff --git a/aztk/spark/models/plugins/jupyter/__init__.py b/aztk/spark/models/plugins/jupyter/__init__.py new file mode 100644 index 00000000..2ec26f31 --- /dev/null +++ b/aztk/spark/models/plugins/jupyter/__init__.py @@ -0,0 +1 @@ +from .configuration import * diff --git a/aztk/spark/models/plugins/jupyter/configuration.py b/aztk/spark/models/plugins/jupyter/configuration.py new file mode 100644 index 00000000..01f4289d --- /dev/null +++ b/aztk/spark/models/plugins/jupyter/configuration.py @@ -0,0 +1,23 @@ +import os +from aztk.models.plugins.plugin_configuration import PluginConfiguration, PluginPort, PluginRunTarget +from aztk.models.plugins.plugin_file import PluginFile +from aztk.utils import constants + +dir_path = os.path.dirname(os.path.realpath(__file__)) + +class JupyterPlugin(PluginConfiguration): + def __init__(self): + super().__init__( + name="jupyter", + ports=[ + PluginPort( + internal=8888, + public=True, + ), + ], + run_on=PluginRunTarget.All, + execute="jupyter.sh", + files=[ + PluginFile("jupyter.sh", os.path.join(dir_path, "jupyter.sh")), + ], + ) diff --git a/aztk/spark/models/plugins/jupyter/jupyter.sh b/aztk/spark/models/plugins/jupyter/jupyter.sh new file mode 100644 index 00000000..ab71b2fa --- /dev/null +++ b/aztk/spark/models/plugins/jupyter/jupyter.sh @@ -0,0 +1,58 @@ +#!/bin/bash + +# This custom script only works on images where jupyter is pre-installed on the Docker image +# +# This custom script has been tested to work on the following docker images: +# - aztk/python:spark2.2.0-python3.6.2-base +# - aztk/python:spark2.2.0-python3.6.2-gpu +# - aztk/python:spark2.1.0-python3.6.2-base +# - aztk/python:spark2.1.0-python3.6.2-gpu + +if [ "$IS_MASTER" = "1" ]; then + pip install jupyter --upgrade + pip install notebook --upgrade + + PYSPARK_DRIVER_PYTHON="/.pyenv/versions/${USER_PYTHON_VERSION}/bin/jupyter" + JUPYTER_KERNELS="/.pyenv/versions/${USER_PYTHON_VERSION}/share/jupyter/kernels" + + # disable password/token on jupyter notebook + jupyter notebook --generate-config --allow-root + JUPYTER_CONFIG='/.jupyter/jupyter_notebook_config.py' + echo >> $JUPYTER_CONFIG + echo -e 'c.NotebookApp.token=""' >> $JUPYTER_CONFIG + echo -e 'c.NotebookApp.password=""' >> $JUPYTER_CONFIG + + # get master ip + MASTER_IP=$(hostname -i) + + # remove existing kernels + rm -rf $JUPYTER_KERNELS/* + + # set up jupyter to use pyspark + mkdir $JUPYTER_KERNELS/pyspark + touch $JUPYTER_KERNELS/pyspark/kernel.json + cat << EOF > $JUPYTER_KERNELS/pyspark/kernel.json +{ + "display_name": "PySpark", + "language": "python", + "argv": [ + "python", + "-m", + "ipykernel", + "-f", + "{connection_file}" + ], + "env": { + "SPARK_HOME": "$SPARK_HOME", + "PYSPARK_PYTHON": "python", + "PYSPARK_SUBMIT_ARGS": "--master spark://$MASTER_IP:7077 pyspark-shell" + } +} +EOF + + # start jupyter notebook from /mnt - this is where we recommend you put your azure files mount point as well + cd /mnt + (PYSPARK_DRIVER_PYTHON=$PYSPARK_DRIVER_PYTHON PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=8888 --allow-root" pyspark &) +fi + + diff --git a/aztk/spark/models/plugins/rstudio_server/__init__.py b/aztk/spark/models/plugins/rstudio_server/__init__.py new file mode 100644 index 00000000..2ec26f31 --- /dev/null +++ b/aztk/spark/models/plugins/rstudio_server/__init__.py @@ -0,0 +1 @@ +from .configuration import * diff --git a/aztk/spark/models/plugins/rstudio_server/configuration.py b/aztk/spark/models/plugins/rstudio_server/configuration.py new file mode 100644 index 00000000..02081e0c --- /dev/null +++ b/aztk/spark/models/plugins/rstudio_server/configuration.py @@ -0,0 +1,24 @@ +import os +from aztk.models.plugins.plugin_configuration import PluginConfiguration, PluginPort, PluginRunTarget +from aztk.models.plugins.plugin_file import PluginFile +from aztk.utils import constants +dir_path = os.path.dirname(os.path.realpath(__file__)) + + +class RStudioServerPlugin(PluginConfiguration): + def __init__(self, version="1.1.383"): + super().__init__( + name="rstudio_server", + ports=[ + PluginPort( + internal=8787, + public=True, + ), + ], + run_on=PluginRunTarget.Master, + execute="rstudio_server.sh", + files=[ + PluginFile("rstudio_server.sh", os.path.join(dir_path, "rstudio_server.sh")), + ], + env=dict(RSTUDIO_SERVER_VERSION=version), + ) diff --git a/aztk/spark/models/plugins/rstudio_server/rstudio_server.sh b/aztk/spark/models/plugins/rstudio_server/rstudio_server.sh new file mode 100644 index 00000000..cd9c3538 --- /dev/null +++ b/aztk/spark/models/plugins/rstudio_server/rstudio_server.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +# This custom script only works on images where rstudio server is pre-installed on the Docker image +# +# This custom script has been tested to work on the following docker images: +# - jiata/aztk-r:0.1.0-spark2.2.0-r3.4.1 +# - jiata/aztk-r:0.1.0-spark2.1.0-r3.4.1 +# - jiata/aztk-r:0.1.0-spark1.6.3-r3.4.1 + +if [ "$IS_MASTER" = "1" ]; then + + ## Download and install Rstudio Server + wget https://download2.rstudio.org/rstudio-server-$RSTUDIO_SERVER_VERSION-amd64.deb + gdebi rstudio-server-$RSTUDIO_SERVER_VERSION-amd64.deb --non-interactive + echo "server-app-armor-enabled=0" | tee -a /etc/rstudio/rserver.conf + rm rstudio-server-$RSTUDIO_SERVER_VERSION-amd64.deb + + ## Preparing default user for Rstudio Server + set -e + useradd -m -d /home/rstudio rstudio -g staff + echo rstudio:rstudio | chpasswd + + rstudio-server start + +fi diff --git a/aztk/spark/utils/upload_node_scripts.py b/aztk/spark/utils/upload_node_scripts.py index 761bf5f6..88b66199 100644 --- a/aztk/spark/utils/upload_node_scripts.py +++ b/aztk/spark/utils/upload_node_scripts.py @@ -5,13 +5,17 @@ import logging import zipfile import yaml +import json from pathlib import Path from aztk.utils import constants from aztk.utils import helpers +from aztk.error import InvalidCustomScriptError import aztk.spark.models from Crypto.PublicKey import RSA from Crypto.Random import get_random_bytes from Crypto.Cipher import AES, PKCS1_OAEP +from aztk.spark.models import ClusterConfiguration, PluginConfiguration +from typing import List root = constants.ROOT_PATH @@ -88,13 +92,17 @@ def __add_custom_scripts(zipf, custom_scripts): for index, custom_script in enumerate(custom_scripts): if isinstance(custom_script.script, (str, bytes)): new_file_name = str(index) + '_' + os.path.basename(custom_script.script) - with io.open(custom_script.script, 'r') as f: - zipf.writestr(os.path.join('custom-scripts', new_file_name), f.read().replace('\r\n', '\n')) + try: + with io.open(custom_script.script, 'r') as f: + zipf.writestr(os.path.join('custom-scripts', new_file_name), f.read().replace('\r\n', '\n')) + except FileNotFoundError: + raise InvalidCustomScriptError("Custom script '{0}' doesn't exists.".format(custom_script.script)) elif isinstance(custom_script.script, aztk.spark.models.File): new_file_name = str(index) + '_' + custom_script.script.name zipf.writestr(os.path.join('custom-scripts', new_file_name), custom_script.script.payload.getvalue()) data.append(dict(script=new_file_name, runOn=str(custom_script.run_on))) + zipf.writestr(os.path.join('custom-scripts', 'custom-scripts.yaml'), yaml.dump(data, default_flow_style=False)) return zipf @@ -106,13 +114,15 @@ def __add_file_to_zip(zipf, file_path, zip_file_path, binary): zipf = zip_file_to_dir(file_path, zip_file_path, zipf, binary) return zipf + def __add_str_to_zip(zipf, payload, zipf_file_path=None): if not payload: return zipf zipf.writestr(zipf_file_path, payload) return zipf -def zip_scripts(blob_client, container_id, custom_scripts, spark_configuration, user_conf=None): + +def zip_scripts(blob_client, container_id, custom_scripts, spark_configuration, user_conf=None, plugins=None): zipf = __create_zip() if custom_scripts: zipf = __add_custom_scripts(zipf, custom_scripts) @@ -128,6 +138,9 @@ def zip_scripts(blob_client, container_id, custom_scripts, spark_configuration, for jar in spark_configuration.jars: zipf = __add_file_to_zip(zipf, jar, 'jars', binary=True) + if plugins: + zipf = __add_plugins(zipf, plugins) + if user_conf: encrypted_aes_session_key, cipher_aes_nonce, tag, ciphertext = encrypt_password(spark_configuration.ssh_key_pair['pub_key'], user_conf.password) user_conf = yaml.dump({'username': user_conf.username, @@ -161,3 +174,20 @@ def encrypt_password(ssh_pub_key, password): cipher_aes = AES.new(session_key, AES.MODE_EAX) ciphertext, tag = cipher_aes.encrypt_and_digest(password.encode()) return [encrypted_aes_session_key, cipher_aes.nonce, tag, ciphertext] + +def __add_plugins(zipf, plugins: List[PluginConfiguration]): + data = [] + for plugin in plugins: + for file in plugin.files: + zipf = __add_str_to_zip(zipf, file.content(), 'plugins/{0}/{1}'.format(plugin.name, file.target)) + if plugin.execute: + data.append(dict( + name=plugin.name, + execute='{0}/{1}'.format(plugin.name, plugin.execute), + args=plugin.args, + env=plugin.env, + runOn=plugin.run_on.value, + )) + + zipf.writestr(os.path.join('plugins', 'plugins-manifest.yaml'), yaml.dump(data)) + return zipf diff --git a/aztk/utils/constants.py b/aztk/utils/constants.py index 15405f6b..02760e17 100644 --- a/aztk/utils/constants.py +++ b/aztk/utils/constants.py @@ -1,5 +1,4 @@ import os - """ DOCKER """ @@ -12,30 +11,21 @@ # DOCKER SPARK DOCKER_SPARK_WEB_UI_PORT = 8080 DOCKER_SPARK_WORKER_UI_PORT = 8081 -DOCKER_SPARK_RSTUDIO_SERVER_PORT = 8787 -DOCKER_SPARK_JUPYTER_PORT = 8888 DOCKER_SPARK_JOB_UI_PORT = 4040 DOCKER_SPARK_JOB_UI_HISTORY_PORT = 18080 DOCKER_SPARK_HOME = "/home/spark-current" - -# DOCKER HDFS -DOCKER_SPARK_NAMENODE_UI_PORT = 50070 - """ Root path of this repository """ ROOT_PATH = os.path.normpath(os.path.join(os.path.dirname(__file__), '..', '..')) - """ User home directory path """ HOME_DIRECTORY_PATH = os.path.expanduser('~') - """ Path to the secrets file """ DEFAULT_SECRETS_PATH = os.path.join(os.getcwd(), '.aztk/secrets.yaml') - """ Paths to the cluster configuration files """ @@ -50,27 +40,25 @@ GLOBAL_SPARK_JOB_CONFIG = os.path.join(HOME_DIRECTORY_PATH, '.aztk', 'job.yaml') CUSTOM_SCRIPTS_DEST = os.path.join(ROOT_PATH, 'node_scripts', 'custom-scripts') - """ Source and destination paths for spark init """ INIT_DIRECTORY_SOURCE = os.path.join(ROOT_PATH, 'config') LOCAL_INIT_DIRECTORY_DEST = os.path.join(os.getcwd(), '.aztk') GLOBAL_INIT_DIRECTORY_DEST = os.path.join(HOME_DIRECTORY_PATH, '.aztk') - """ Key of the metadata entry for the pool that is used to store the master node id """ MASTER_NODE_METADATA_KEY = "_spark_master_node" - """ Timeout in seconds to wait for the master to be ready Value: 20 minutes """ WAIT_FOR_MASTER_TIMEOUT = 60 * 20 - AZTK_SOFTWARE_METADATA_KEY = "_aztk_software" +AZTK_CLUSTER_CONFIG_METADATA_KEY = "_aztk_cluster_config" + TASK_WORKING_DIR = "wd" SPARK_SUBMIT_LOGS_FILE = "output.log" diff --git a/aztk/utils/helpers.py b/aztk/utils/helpers.py index 8aa94f37..c0a90274 100644 --- a/aztk/utils/helpers.py +++ b/aztk/utils/helpers.py @@ -4,6 +4,7 @@ import os import time import re +import azure.common import azure.batch.batch_service_client as batch import azure.batch.batch_auth as batch_auth import azure.batch.models as batch_models @@ -12,6 +13,8 @@ from aztk.utils import constants from aztk import error import aztk.models +import yaml +import logging _STANDARD_OUT_FILE_NAME = 'stdout.txt' _STANDARD_ERROR_FILE_NAME = 'stderr.txt' @@ -38,8 +41,10 @@ def wait_for_tasks_to_complete(job_id, batch_client): while True: tasks = batch_client.task.list(job_id) - incomplete_tasks = [task for task in tasks if - task.state != batch_models.TaskState.completed] + incomplete_tasks = [ + task for task in tasks + if task.state != batch_models.TaskState.completed + ] if not incomplete_tasks: return time.sleep(5) @@ -61,9 +66,13 @@ def wait_for_task_to_complete(job_id: str, task_id: str, batch_client): return -def upload_text_to_container(container_name: str, application_name: str, content: str, file_path: str, blob_client=None) -> batch_models.ResourceFile: +def upload_text_to_container(container_name: str, + application_name: str, + content: str, + file_path: str, + blob_client=None) -> batch_models.ResourceFile: blob_name = file_path - blob_path = application_name + '/' + blob_name # + '/' + time_stamp + '/' + blob_name + blob_path = application_name + '/' + blob_name # + '/' + time_stamp + '/' + blob_name blob_client.create_container(container_name, fail_on_exist=False) blob_client.create_blob_from_text(container_name, blob_path, content) @@ -73,12 +82,10 @@ def upload_text_to_container(container_name: str, application_name: str, content permission=blob.BlobPermissions.READ, expiry=datetime.datetime.utcnow() + datetime.timedelta(days=365)) - sas_url = blob_client.make_blob_url(container_name, - blob_path, - sas_token=sas_token) + sas_url = blob_client.make_blob_url( + container_name, blob_path, sas_token=sas_token) - return batch_models.ResourceFile(file_path=blob_name, - blob_source=sas_url) + return batch_models.ResourceFile(file_path=blob_name, blob_source=sas_url) def upload_file_to_container(container_name, @@ -109,12 +116,9 @@ def upload_file_to_container(container_name, if not node_path: node_path = blob_name - blob_client.create_container(container_name, - fail_on_exist=False) + blob_client.create_container(container_name, fail_on_exist=False) - blob_client.create_blob_from_path(container_name, - blob_path, - file_path) + blob_client.create_blob_from_path(container_name, blob_path, file_path) sas_token = blob_client.generate_blob_shared_access_signature( container_name, @@ -122,12 +126,10 @@ def upload_file_to_container(container_name, permission=blob.BlobPermissions.READ, expiry=datetime.datetime.utcnow() + datetime.timedelta(days=7)) - sas_url = blob_client.make_blob_url(container_name, - blob_path, - sas_token=sas_token) + sas_url = blob_client.make_blob_url( + container_name, blob_path, sas_token=sas_token) - return batch_models.ResourceFile(file_path=node_path, - blob_source=sas_url) + return batch_models.ResourceFile(file_path=node_path, blob_source=sas_url) def create_pool_if_not_exist(pool, batch_client): @@ -142,7 +144,9 @@ def create_pool_if_not_exist(pool, batch_client): batch_client.pool.add(pool) except batch_models.BatchErrorException as e: if e.error.code == "PoolExists": - raise error.AztkError("A cluster with the same id already exists. Use a different id or delete the existing cluster") + raise error.AztkError( + "A cluster with the same id already exists. Use a different id or delete the existing cluster" + ) else: raise return True @@ -169,8 +173,8 @@ def wait_for_all_nodes_state(pool, node_state, batch_client): nodes = list(batch_client.compute_node.list(pool.id)) totalNodes = pool.target_dedicated_nodes + pool.target_low_priority_nodes - if (len(nodes) >= totalNodes and - all(node.state in node_state for node in nodes)): + if (len(nodes) >= totalNodes + and all(node.state in node_state for node in nodes)): return nodes time.sleep(1) @@ -195,9 +199,9 @@ def select_latest_verified_vm_image_with_node_agent_sku( skus_to_use = [ (sku, image_ref) for sku in node_agent_skus for image_ref in sorted( sku.verified_image_references, key=lambda item: item.sku) - if image_ref.publisher.lower() == publisher.lower() and - image_ref.offer.lower() == offer.lower() and - image_ref.sku.startswith(sku_starts_with) + if image_ref.publisher.lower() == publisher.lower() + and image_ref.offer.lower() == offer.lower() + and image_ref.sku.startswith(sku_starts_with) ] # skus are listed in reverse order, pick first for latest @@ -205,9 +209,12 @@ def select_latest_verified_vm_image_with_node_agent_sku( return (sku_to_use.id, image_ref_to_use) -def create_sas_token( - container_name, blob_name, permission, blob_client, expiry=None, - timeout=None): +def create_sas_token(container_name, + blob_name, + permission, + blob_client, + expiry=None, + timeout=None): """ Create a blob sas token :param blob_client: The storage block blob client to use. @@ -230,9 +237,12 @@ def create_sas_token( container_name, blob_name, permission=permission, expiry=expiry) -def upload_blob_and_create_sas( - container_name, blob_name, file_name, expiry, blob_client, - timeout=None): +def upload_blob_and_create_sas(container_name, + blob_name, + file_name, + expiry, + blob_client, + timeout=None): """ Uploads a file from local disk to Azure Storage and creates a SAS for it. :param blob_client: The storage block blob client to use. @@ -247,14 +257,9 @@ def upload_blob_and_create_sas( :return: A SAS URL to the blob with the specified expiry time. :rtype: str """ - blob_client.create_container( - container_name, - fail_on_exist=False) + blob_client.create_container(container_name, fail_on_exist=False) - blob_client.create_blob_from_path( - container_name, - blob_name, - file_name) + blob_client.create_blob_from_path(container_name, blob_name, file_name) sas_token = create_sas_token( container_name, @@ -265,9 +270,7 @@ def upload_blob_and_create_sas( timeout=timeout) sas_url = blob_client.make_blob_url( - container_name, - blob_name, - sas_token=sas_token) + container_name, blob_name, sas_token=sas_token) return sas_url @@ -292,8 +295,7 @@ def get_connection_info(pool_id, node_id, batch_client): :param str pool_id: The pool id to look up :param str node_id: The node id to look up """ - rls = batch_client.compute_node.get_remote_login_settings( - pool_id, node_id) + rls = batch_client.compute_node.get_remote_login_settings(pool_id, node_id) remote_ip = rls.remote_login_ip_address ssh_port = str(rls.remote_login_port) return (remote_ip, ssh_port) @@ -313,7 +315,7 @@ def get_cluster_total_current_nodes(pool): return pool.current_dedicated_nodes + pool.current_low_priority_nodes -def normalize_path(path: str)-> str: +def normalize_path(path: str) -> str: """ Convert a path in a path that will work well with blob storage and unix It will replace \ with / and remove relative . @@ -326,7 +328,8 @@ def normalize_path(path: str)-> str: return path -def get_file_properties(job_id: str, task_id: str, file_path: str, batch_client): +def get_file_properties(job_id: str, task_id: str, file_path: str, + batch_client): raw = batch_client.file.get_properties_from_task( job_id, task_id, file_path, raw=True) @@ -374,3 +377,26 @@ def format_batch_exception(batch_exception): l.append("-------------------------------------------") return '\n'.join(l) + + +def save_cluster_config(cluster_config, blob_client): + blob_path = "config.yaml" + content = yaml.dump(cluster_config) + container_name = cluster_config.cluster_id + blob_client.create_container(container_name, fail_on_exist=False) + blob_client.create_blob_from_text(container_name, blob_path, content) + + +def read_cluster_config(cluster_id: str, blob_client: blob.BlockBlobService): + blob_path = "config.yaml" + try: + result = blob_client.get_blob_to_text(cluster_id, blob_path) + return yaml.load(result.content) + except azure.common.AzureMissingResourceHttpError: + logging.warn( + "Cluster %s doesn't have cluster configuration in storage", + cluster_id) + except yaml.YAMLError: + logging.warn( + "Cluster %s contains invalid cluster configuration in blob", + cluster_id) diff --git a/cli/config.py b/cli/config.py index 4e00d2d9..e8a42e44 100644 --- a/cli/config.py +++ b/cli/config.py @@ -1,10 +1,17 @@ import os import yaml -import typing from cli import log import aztk.spark -from aztk.spark.models import SecretsConfiguration, ServicePrincipalConfiguration, SharedKeyConfiguration, DockerConfiguration, ClusterConfiguration, UserConfiguration - +from aztk.spark.models import ( + SecretsConfiguration, + ServicePrincipalConfiguration, + SharedKeyConfiguration, + DockerConfiguration, + ClusterConfiguration, + UserConfiguration, + PluginConfiguration, +) +from aztk.models.plugins.internal import PluginReference def load_aztk_screts() -> SecretsConfiguration: """ @@ -12,8 +19,9 @@ def load_aztk_screts() -> SecretsConfiguration: """ secrets = SecretsConfiguration() # read global ~/secrets.yaml - global_config = _load_secrets_config(os.path.join( - aztk.utils.constants.HOME_DIRECTORY_PATH, '.aztk', 'secrets.yaml')) + global_config = _load_secrets_config( + os.path.join(aztk.utils.constants.HOME_DIRECTORY_PATH, '.aztk', + 'secrets.yaml')) # read current working directory secrets.yaml local_config = _load_secrets_config() @@ -30,7 +38,8 @@ def load_aztk_screts() -> SecretsConfiguration: return secrets -def _load_secrets_config(path: str = aztk.utils.constants.DEFAULT_SECRETS_PATH): +def _load_secrets_config( + path: str = aztk.utils.constants.DEFAULT_SECRETS_PATH): """ Loads the secrets.yaml file in the .aztk directory """ @@ -64,17 +73,16 @@ def _merge_secrets_dict(secrets: SecretsConfiguration, secrets_config): if shared_key_config and (batch or storage): raise aztk.error.AztkError( - "Shared keys must be configured either under 'sharedKey:' or under 'batch:' and 'storage:', not both.") + "Shared keys must be configured either under 'sharedKey:' or under 'batch:' and 'storage:', not both." + ) if shared_key_config: secrets.shared_key = SharedKeyConfiguration( batch_account_name=shared_key_config.get('batch_account_name'), batch_account_key=shared_key_config.get('batch_account_key'), batch_service_url=shared_key_config.get('batch_service_url'), - storage_account_name=shared_key_config.get( - 'storage_account_name'), - storage_account_key=shared_key_config.get( - 'storage_account_key'), + storage_account_name=shared_key_config.get('storage_account_name'), + storage_account_key=shared_key_config.get('storage_account_key'), storage_account_suffix=shared_key_config.get( 'storage_account_suffix'), ) @@ -82,13 +90,12 @@ def _merge_secrets_dict(secrets: SecretsConfiguration, secrets_config): secrets.shared_key = SharedKeyConfiguration() if batch: log.warning( - "Your secrets.yaml format is deprecated. To use shared key authentication use the shared_key key. See config/secrets.yaml.template") + "Your secrets.yaml format is deprecated. To use shared key authentication use the shared_key key. See config/secrets.yaml.template" + ) secrets.shared_key.batch_account_name = batch.get( 'batchaccountname') - secrets.shared_key.batch_account_key = batch.get( - 'batchaccountkey') - secrets.shared_key.batch_service_url = batch.get( - 'batchserviceurl') + secrets.shared_key.batch_account_key = batch.get('batchaccountkey') + secrets.shared_key.batch_service_url = batch.get('batchserviceurl') if storage: secrets.shared_key.storage_account_name = storage.get( @@ -113,7 +120,9 @@ def _merge_secrets_dict(secrets: SecretsConfiguration, secrets_config): secrets.ssh_pub_key = default_config.get('ssh_pub_key') -def read_cluster_config(path: str = aztk.utils.constants.DEFAULT_CLUSTER_CONFIG_PATH) -> ClusterConfiguration: +def read_cluster_config( + path: str = aztk.utils.constants.DEFAULT_CLUSTER_CONFIG_PATH +) -> ClusterConfiguration: """ Reads the config file in the .aztk/ directory (.aztk/cluster.yaml) """ @@ -155,8 +164,7 @@ def cluster_config_from_dict(config: dict): if config.get('username') is not None: output.user_configuration = UserConfiguration( - username=config['username'] - ) + username=config['username']) if config.get('password') is not None: output.user_configuration.password = config['password'] @@ -167,9 +175,7 @@ def cluster_config_from_dict(config: dict): output.custom_scripts.append( aztk.spark.models.CustomScript( script=custom_script['script'], - run_on=custom_script['runOn'] - ) - ) + run_on=custom_script['runOn'])) if config.get('azure_files') not in [[None], None]: output.file_shares = [] @@ -180,12 +186,17 @@ def cluster_config_from_dict(config: dict): storage_account_key=file_share['storage_account_key'], file_share_path=file_share['file_share_path'], mount_path=file_share['mount_path'], - ) - ) + )) if config.get('docker_repo') is not None: output.docker_repo = config['docker_repo'] + if config.get('plugins') not in [[None], None]: + output.plugins = [] + for plugin in config['plugins']: + ref = PluginReference.from_dict(plugin) + output.plugins.append(ref.get_plugin()) + if config.get('worker_on_master') is not None: output.worker_on_master = config['worker_on_master'] @@ -196,7 +207,6 @@ def cluster_config_from_dict(config: dict): class SshConfig: - def __init__(self): self.username = None self.cluster_id = None @@ -207,11 +217,9 @@ def __init__(self): self.job_ui_port = '4040' self.job_history_ui_port = '18080' self.web_ui_port = '8080' - self.jupyter_port = '8888' - self.name_node_ui_port = '50070' - self.rstudio_server_port = '8787' - def _read_config_file(self, path: str = aztk.utils.constants.DEFAULT_SSH_CONFIG_PATH): + def _read_config_file( + self, path: str = aztk.utils.constants.DEFAULT_SSH_CONFIG_PATH): """ Reads the config file in the .aztk/ directory (.aztk/ssh.yaml) """ @@ -261,11 +269,15 @@ def _merge_dict(self, config): if config.get('connect') is not None: self.connect = config['connect'] - def merge(self, cluster_id, username, job_ui_port, job_history_ui_port, web_ui_port, jupyter_port, name_node_ui_port, rstudio_server_port, host, connect): + def merge(self, cluster_id, username, job_ui_port, job_history_ui_port, + web_ui_port, jupyter_port, name_node_ui_port, + rstudio_server_port, host, connect): """ Merges fields with args object """ - self._read_config_file(os.path.join(aztk.utils.constants.HOME_DIRECTORY_PATH, '.aztk', 'ssh.yaml')) + self._read_config_file( + os.path.join(aztk.utils.constants.HOME_DIRECTORY_PATH, '.aztk', + 'ssh.yaml')) self._read_config_file() self._merge_dict( dict( @@ -278,17 +290,17 @@ def merge(self, cluster_id, username, job_ui_port, job_history_ui_port, web_ui_p name_node_ui_port=name_node_ui_port, rstudio_server_port=rstudio_server_port, host=host, - connect=connect - ) - ) + connect=connect)) if self.cluster_id is None: raise aztk.error.AztkError( - "Please supply an id for the cluster either in the ssh.yaml configuration file or with a parameter (--id)") + "Please supply an id for the cluster either in the ssh.yaml configuration file or with a parameter (--id)" + ) if self.username is None: raise aztk.error.AztkError( - "Please supply a username either in the ssh.yaml configuration file or with a parameter (--username)") + "Please supply a username either in the ssh.yaml configuration file or with a parameter (--username)" + ) class JobConfig(): @@ -336,10 +348,13 @@ def __convert_to_path(self, str_path): if str_path: abs_path = os.path.abspath(os.path.expanduser(str_path)) if not os.path.exists(abs_path): - raise aztk.error.AztkError("Could not find file: {0}\nCheck your configuration file".format(str_path)) + raise aztk.error.AztkError( + "Could not find file: {0}\nCheck your configuration file". + format(str_path)) return abs_path - def _read_config_file(self, path: str = aztk.utils.constants.DEFAULT_SPARK_JOB_CONFIG): + def _read_config_file( + self, path: str = aztk.utils.constants.DEFAULT_SPARK_JOB_CONFIG): """ Reads the Job config file in the .aztk/ directory (.aztk/job.yaml) """ @@ -368,15 +383,19 @@ def merge(self, id, job_config_yaml=None): for entry in self.applications: if entry['name'] is None: raise aztk.error.AztkError( - "Application specified with no name. Please verify your configuration in job.yaml") + "Application specified with no name. Please verify your configuration in job.yaml" + ) if entry['application'] is None: raise aztk.error.AztkError( - "No path to application specified for {} in job.yaml".format(entry['name'])) + "No path to application specified for {} in job.yaml". + format(entry['name'])) def get_file_if_exists(file): - local_conf_file = os.path.join(aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE, file) - global_conf_file = os.path.join(aztk.utils.constants.GLOBAL_CONFIG_PATH, file) + local_conf_file = os.path.join( + aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE, file) + global_conf_file = os.path.join(aztk.utils.constants.GLOBAL_CONFIG_PATH, + file) if os.path.exists(local_conf_file): return local_conf_file @@ -399,16 +418,16 @@ def load_jars(): # try load global try: - jars_src = os.path.join( - aztk.utils.constants.GLOBAL_CONFIG_PATH, 'jars') + jars_src = os.path.join(aztk.utils.constants.GLOBAL_CONFIG_PATH, + 'jars') jars = [os.path.join(jars_src, jar) for jar in os.listdir(jars_src)] except FileNotFoundError: pass # try load local, overwrite if found try: - jars_src = os.path.join( - aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE, 'jars') + jars_src = os.path.join(aztk.utils.constants.DEFAULT_SPARK_CONF_SOURCE, + 'jars') jars = [os.path.join(jars_src, jar) for jar in os.listdir(jars_src)] except FileNotFoundError: pass diff --git a/cli/entrypoint.py b/cli/entrypoint.py index 665321f3..aefaaa3f 100644 --- a/cli/entrypoint.py +++ b/cli/entrypoint.py @@ -10,7 +10,7 @@ import aztk from cli import logger, log, utils, constants from cli.spark.endpoints import spark - +from . import plugins def main(): parser = argparse.ArgumentParser(prog=constants.CLI_EXE) @@ -22,8 +22,11 @@ def main(): subparsers.required = True spark_parser = subparsers.add_parser( "spark", help="Commands to run spark jobs") + plugins_parser = subparsers.add_parser( + "plugins", help="Commands to list and view plugins") spark.setup_parser(spark_parser) + plugins.setup_parser(plugins_parser) args = parser.parse_args() parse_common_args(args) @@ -33,7 +36,7 @@ def main(): except batch_error.BatchErrorException as e: utils.print_batch_exception(e) except aztk.error.AztkError as e: - log.error(e.message) + log.error(str(e)) def setup_common_args(parser: argparse.ArgumentParser): @@ -54,6 +57,7 @@ def parse_common_args(args: NamedTuple): def run_software(args: NamedTuple): softwares = {} softwares[aztk.models.Software.spark] = spark.execute + softwares["plugins"] = plugins.execute func = softwares[args.software] func(args) diff --git a/cli/plugins.py b/cli/plugins.py new file mode 100644 index 00000000..b2d780d0 --- /dev/null +++ b/cli/plugins.py @@ -0,0 +1,30 @@ +import argparse +import typing +from cli import log +from aztk.models.plugins.internal import plugin_manager + + +def setup_parser(parser: argparse.ArgumentParser): + pass + + +def execute(args: typing.NamedTuple): + plugins = plugin_manager.plugins + log.info("------------------------------------------------------") + log.info(" Plugins (%i available)",len(plugins)) + log.info("------------------------------------------------------") + for name, plugin in plugins.items(): + log.info("- %s", name) + args = plugin_manager.get_args_for(plugin) + if args: + log.info(" Arguments:") + for arg in args.values(): + log.info(" - %s", arg_str(arg)) + else: + log.info(" Arguments: None") + log.info("") + + +def arg_str(arg): + required = "Required" if arg.required else "Optional(Default: {0})".format(arg.default) + return "{0}: {1}".format(arg.name, required) diff --git a/cli/spark/endpoints/cluster/cluster_create.py b/cli/spark/endpoints/cluster/cluster_create.py index 1279e7f9..3d15f96a 100644 --- a/cli/spark/endpoints/cluster/cluster_create.py +++ b/cli/spark/endpoints/cluster/cluster_create.py @@ -69,7 +69,7 @@ def execute(args: typing.NamedTuple): else: cluster_conf.user_configuration = None - print_cluster_conf(cluster_conf, wait) + utils.print_cluster_conf(cluster_conf, wait) spinner = utils.Spinner() spinner.start() @@ -86,23 +86,3 @@ def execute(args: typing.NamedTuple): else: log.info("Cluster %s is being provisioned.", cluster.id) - -def print_cluster_conf(cluster_conf: ClusterConfiguration, wait: bool): - user_configuration = cluster_conf.user_configuration - - log.info("-------------------------------------------") - log.info("spark cluster id: %s", cluster_conf.cluster_id) - log.info("spark cluster size: %s", - cluster_conf.vm_count + cluster_conf.vm_low_pri_count) - log.info("> dedicated: %s", cluster_conf.vm_count) - log.info("> low priority: %s", cluster_conf.vm_low_pri_count) - log.info("spark cluster vm size: %s", cluster_conf.vm_size) - log.info("custom scripts: %s", len(cluster_conf.custom_scripts) if cluster_conf.custom_scripts else 0) - log.info("subnet ID: %s", cluster_conf.subnet_id) - log.info("file shares: %s", len(cluster_conf.file_shares) if cluster_conf.file_shares is not None else 0) - log.info("docker repo name: %s", cluster_conf.docker_repo) - log.info("wait for cluster: %s", wait) - log.info("username: %s", user_configuration.username) - if user_configuration.password: - log.info("Password: %s", '*' * len(user_configuration.password)) - log.info("-------------------------------------------") diff --git a/cli/spark/endpoints/cluster/cluster_get.py b/cli/spark/endpoints/cluster/cluster_get.py index 73b9e624..f84f52ce 100644 --- a/cli/spark/endpoints/cluster/cluster_get.py +++ b/cli/spark/endpoints/cluster/cluster_get.py @@ -1,6 +1,7 @@ import argparse import typing import aztk +from cli import log from cli import utils, config @@ -9,6 +10,10 @@ def setup_parser(parser: argparse.ArgumentParser): dest='cluster_id', required=True, help='The unique id of your spark cluster') + parser.add_argument('--show-config', + dest='show_config', + action='store_true', + help='Show the cluster configuration') def execute(args: typing.NamedTuple): @@ -16,3 +21,9 @@ def execute(args: typing.NamedTuple): cluster_id = args.cluster_id cluster = spark_client.get_cluster(cluster_id) utils.print_cluster(spark_client, cluster) + + configuration = utils.helpers.read_cluster_config(cluster_id, spark_client.blob_client) + if configuration and args.show_config: + log.info("-------------------------------------------") + log.info("Cluster configuration:") + utils.print_cluster_conf(configuration, False) diff --git a/cli/spark/endpoints/cluster/cluster_ssh.py b/cli/spark/endpoints/cluster/cluster_ssh.py index c53b690d..36358ba1 100644 --- a/cli/spark/endpoints/cluster/cluster_ssh.py +++ b/cli/spark/endpoints/cluster/cluster_ssh.py @@ -5,38 +5,36 @@ from cli.config import SshConfig import aztk import azure.batch.models.batch_error as batch_error +from aztk.models import ClusterConfiguration def setup_parser(parser: argparse.ArgumentParser): - parser.add_argument('--id', dest="cluster_id", - help='The unique id of your spark cluster') - parser.add_argument('--webui', - help='Local port to port spark\'s master UI to') - parser.add_argument('--jobui', - help='Local port to port spark\'s job UI to') - parser.add_argument('--jobhistoryui', - help='Local port to port spark\'s job history UI to') - parser.add_argument('--jupyter', - help='Local port to port jupyter to') - parser.add_argument('--namenodeui', - help='Local port to port HDFS NameNode UI to') - parser.add_argument('--rstudioserver', - help='Local port to port rstudio server to') - parser.add_argument('-u', '--username', - help='Username to spark cluster') - parser.add_argument('--host', dest="host", - action='store_true', - help='Connect to the host of the Spark container') - parser.add_argument('--no-connect', dest="connect", - action='store_false', - help='Do not create the ssh session. Only print out \ + parser.add_argument('--id', dest="cluster_id", help='The unique id of your spark cluster') + parser.add_argument('--webui', help='Local port to port spark\'s master UI to') + parser.add_argument('--jobui', help='Local port to port spark\'s job UI to') + parser.add_argument('--jobhistoryui', help='Local port to port spark\'s job history UI to') + parser.add_argument('--jupyter', help='Local port to port jupyter to') + parser.add_argument('--namenodeui', help='Local port to port HDFS NameNode UI to') + parser.add_argument('--rstudioserver', help='Local port to port rstudio server to') + parser.add_argument('-u', '--username', help='Username to spark cluster') + parser.add_argument('--host', dest="host", action='store_true', help='Connect to the host of the Spark container') + parser.add_argument( + '--no-connect', + dest="connect", + action='store_false', + help='Do not create the ssh session. Only print out \ the command to run.') parser.set_defaults(connect=True) +http_prefix = 'http://localhost:' + + def execute(args: typing.NamedTuple): spark_client = aztk.spark.Client(config.load_aztk_screts()) + cluster = spark_client.get_cluster(args.cluster_id) + cluster_config = utils.helpers.read_cluster_config(args.cluster_id, spark_client.blob_client) ssh_conf = SshConfig() ssh_conf.merge( @@ -49,20 +47,16 @@ def execute(args: typing.NamedTuple): name_node_ui_port=args.namenodeui, rstudio_server_port=args.rstudioserver, host=args.host, - connect=args.connect - ) + connect=args.connect) - http_prefix = 'http://localhost:' log.info("-------------------------------------------") - log.info("spark cluster id: %s", ssh_conf.cluster_id) - log.info("open webui: %s%s", http_prefix, ssh_conf.web_ui_port) - log.info("open jobui: %s%s", http_prefix, ssh_conf.job_ui_port) - log.info("open jobhistoryui: %s%s", http_prefix, ssh_conf.job_history_ui_port) - log.info("open jupyter: %s%s", http_prefix, ssh_conf.jupyter_port) - log.info("open namenodeui: %s%s", http_prefix, ssh_conf.name_node_ui_port) - log.info("open rstudio server: %s%s", http_prefix, ssh_conf.rstudio_server_port) - log.info("ssh username: %s", ssh_conf.username) - log.info("connect: %s", ssh_conf.connect) + utils.log_property("spark cluster id", ssh_conf.cluster_id) + utils.log_property("open webui", "{0}{1}".format(http_prefix, ssh_conf.web_ui_port)) + utils.log_property("open jobui", "{0}{1}".format(http_prefix, ssh_conf.job_ui_port)) + utils.log_property("open jobhistoryui", "{0}{1}".format(http_prefix, ssh_conf.job_history_ui_port)) + print_plugin_ports(cluster_config) + utils.log_property("ssh username", ssh_conf.username) + utils.log_property("connect", ssh_conf.connect) log.info("-------------------------------------------") # get ssh command @@ -73,9 +67,6 @@ def execute(args: typing.NamedTuple): webui=ssh_conf.web_ui_port, jobui=ssh_conf.job_ui_port, jobhistoryui=ssh_conf.job_history_ui_port, - namenodeui=ssh_conf.name_node_ui_port, - jupyter=ssh_conf.jupyter_port, - rstudioserver=ssh_conf.rstudio_server_port, username=ssh_conf.username, host=ssh_conf.host, connect=ssh_conf.connect) @@ -90,3 +81,29 @@ def execute(args: typing.NamedTuple): raise aztk.error.AztkError("The cluster you are trying to connect to does not exist.") else: raise + + +def print_plugin_ports(cluster_config: ClusterConfiguration): + + if cluster_config and cluster_config.plugins: + plugins = cluster_config.plugins + has_ports = False + for plugin in plugins: + for port in plugin.ports: + if port.expose_publicly: + has_ports = True + break + + if has_ports > 0: + log.info("plugins:") + for plugin in plugins: + for port in plugin.ports: + if port.expose_publicly: + label = " - open {}".format(plugin.name) + + if port.name: + label += " {}".format(port.name) + + url = "{0}{1}".format(http_prefix, port.public_port) + utils.log_property(label, url) + diff --git a/cli/utils.py b/cli/utils.py index 644323ec..26d97657 100644 --- a/cli/utils.py +++ b/cli/utils.py @@ -6,9 +6,10 @@ from subprocess import call from typing import List import azure.batch.models as batch_models -import aztk.spark -from aztk import error -from aztk.utils import get_ssh_key +from aztk import error, utils +from aztk.utils import get_ssh_key, helpers +from aztk.models import ClusterConfiguration +from aztk.spark import models from . import log @@ -32,7 +33,7 @@ def get_ssh_key_or_prompt(ssh_key, username, password, secrets_config): raise error.AztkError("Failed to get valid password, cannot add user to cluster. It is recommended that you provide a ssh public key in .aztk/secrets.yaml. Or provide an ssh-key or password with commnad line parameters (--ssh-key or --password). You may also run the 'aztk spark cluster add-user' command to add a user to this cluster.") return ssh_key, password -def print_cluster(client, cluster: aztk.spark.models.Cluster): +def print_cluster(client, cluster: models.Cluster): node_count = __pretty_node_count(cluster) log.info("") @@ -64,7 +65,7 @@ def print_cluster(client, cluster: aztk.spark.models.Cluster): ) log.info('') -def __pretty_node_count(cluster: aztk.spark.models.Cluster) -> str: +def __pretty_node_count(cluster: models.Cluster) -> str: if cluster.pool.allocation_state is batch_models.AllocationState.resizing: return '{} -> {}'.format( cluster.total_current_nodes, @@ -72,7 +73,7 @@ def __pretty_node_count(cluster: aztk.spark.models.Cluster) -> str: else: return '{}'.format(cluster.total_current_nodes) -def __pretty_dedicated_node_count(cluster: aztk.spark.models.Cluster)-> str: +def __pretty_dedicated_node_count(cluster: models.Cluster)-> str: if (cluster.pool.allocation_state is batch_models.AllocationState.resizing or cluster.pool.state is batch_models.PoolState.deleting)\ and cluster.current_dedicated_nodes != cluster.target_dedicated_nodes: @@ -82,7 +83,7 @@ def __pretty_dedicated_node_count(cluster: aztk.spark.models.Cluster)-> str: else: return '{}'.format(cluster.current_dedicated_nodes) -def __pretty_low_pri_node_count(cluster: aztk.spark.models.Cluster)-> str: +def __pretty_low_pri_node_count(cluster: models.Cluster)-> str: if (cluster.pool.allocation_state is batch_models.AllocationState.resizing or cluster.pool.state is batch_models.PoolState.deleting)\ and cluster.current_low_pri_nodes != cluster.target_low_pri_nodes: @@ -92,7 +93,7 @@ def __pretty_low_pri_node_count(cluster: aztk.spark.models.Cluster)-> str: else: return '{}'.format(cluster.current_low_pri_nodes) -def print_clusters(clusters: List[aztk.spark.models.Cluster]): +def print_clusters(clusters: List[models.Cluster]): print_format = '{:<34}| {:<10}| {:<20}| {:<7}' print_format_underline = '{:-<34}|{:-<11}|{:-<21}|{:-<7}' @@ -131,9 +132,6 @@ def ssh_in_master( webui: str = None, jobui: str = None, jobhistoryui: str = None, - jupyter: str = None, - namenodeui: str = None, - rstudioserver: str = None, ports=None, host: bool = False, connect: bool = True): @@ -143,33 +141,30 @@ def ssh_in_master( :param username: Username to use to ssh :param webui: Port for the spark master web ui (Local port) :param jobui: Port for the job web ui (Local port) - :param jupyter: Port for jupyter (Local port) - :param rstudioserver: Port for rstudio server (Local port) :param ports: an list of local and remote ports :type ports: [[, ]] """ # Get master node id from task (job and task are both named pool_id) cluster = client.get_cluster(cluster_id) + configuration = helpers.read_cluster_config(cluster_id, client.blob_client) + master_node_id = cluster.master_node_id if master_node_id is None: - raise aztk.error.ClusterNotReadyError("Master node has not yet been picked!") + raise error.ClusterNotReadyError("Master node has not yet been picked!") # get remote login settings for the user remote_login_settings = client.get_remote_login_settings(cluster.id, master_node_id) master_node_ip = remote_login_settings.ip_address master_node_port = remote_login_settings.port - spark_web_ui_port = aztk.utils.constants.DOCKER_SPARK_WEB_UI_PORT - spark_worker_ui_port = aztk.utils.constants.DOCKER_SPARK_WORKER_UI_PORT - spark_rstudio_server_port = aztk.utils.constants.DOCKER_SPARK_RSTUDIO_SERVER_PORT - spark_jupyter_port = aztk.utils.constants.DOCKER_SPARK_JUPYTER_PORT - spark_job_ui_port = aztk.utils.constants.DOCKER_SPARK_JOB_UI_PORT - spark_job_history_ui_port = aztk.utils.constants.DOCKER_SPARK_JOB_UI_HISTORY_PORT - spark_namenode_ui_port = aztk.utils.constants.DOCKER_SPARK_NAMENODE_UI_PORT + spark_web_ui_port = utils.constants.DOCKER_SPARK_WEB_UI_PORT + spark_worker_ui_port = utils.constants.DOCKER_SPARK_WORKER_UI_PORT + spark_job_ui_port = utils.constants.DOCKER_SPARK_JOB_UI_PORT + spark_job_history_ui_port = utils.constants.DOCKER_SPARK_JOB_UI_HISTORY_PORT - ssh_command = aztk.utils.command_builder.CommandBuilder('ssh') + ssh_command = utils.command_builder.CommandBuilder('ssh') # get ssh private key path if specified ssh_priv_key = client.secrets_config.ssh_priv_key @@ -183,17 +178,16 @@ def ssh_in_master( jobui, spark_job_ui_port), enable=bool(jobui)) ssh_command.add_option("-L", "{0}:localhost:{1}".format( jobhistoryui, spark_job_history_ui_port), enable=bool(jobui)) - ssh_command.add_option("-L", "{0}:localhost:{1}".format( - jupyter, spark_jupyter_port), enable=bool(jupyter)) - ssh_command.add_option("-L", "{0}:localhost:{1}".format( - namenodeui, spark_namenode_ui_port), enable=bool(namenodeui)) - ssh_command.add_option("-L", "{0}:localhost:{1}".format( - rstudioserver, spark_rstudio_server_port), enable=bool(rstudioserver)) if ports is not None: for port in ports: ssh_command.add_option( "-L", "{0}:localhost:{1}".format(port[0], port[1])) + if configuration and configuration.plugins: + for plugin in configuration.plugins: + for port in plugin.ports: + if port.expose_publicly: + ssh_command.add_option("-L", "{0}:localhost:{1}".format(port.public_port, port.internal)) user = username if username is not None else '' ssh_command.add_argument( @@ -230,7 +224,7 @@ def print_batch_exception(batch_exception): Job submission ''' -def print_jobs(jobs: List[aztk.spark.models.Job]): +def print_jobs(jobs: List[models.Job]): print_format = '{:<34}| {:<10}| {:<20}' print_format_underline = '{:-<34}|{:-<11}|{:-<21}' @@ -247,7 +241,7 @@ def print_jobs(jobs: List[aztk.spark.models.Job]): ) -def print_job(client, job: aztk.spark.models.Job): +def print_job(client, job: models.Job): print_format = '{:<36}| {:<15}' log.info("") @@ -274,7 +268,7 @@ def print_job(client, job: aztk.spark.models.Job): log.info("") -def node_state_count(cluster: aztk.spark.models.Cluster): +def node_state_count(cluster: models.Cluster): states = {} for state in batch_models.ComputeNodeState: states[state] = 0 @@ -283,7 +277,7 @@ def node_state_count(cluster: aztk.spark.models.Cluster): return states -def print_cluster_summary(cluster: aztk.spark.models.Cluster): +def print_cluster_summary(cluster: models.Cluster): print_format = '{:<4} {:<23} {:<15}' log.info("Cluster %s", cluster.id) @@ -351,7 +345,7 @@ def print_applications(applications): if warn_scheduling: log.warning("\nNo Spark applications will be scheduled until the master is selected.") -def print_application(application: aztk.spark.models.Application): +def print_application(application: models.Application): print_format = '{:<30}| {:<15}' log.info("") @@ -404,3 +398,35 @@ def stop(self): def utc_to_local(utc_dt): return utc_dt.replace(tzinfo=datetime.timezone.utc).astimezone(tz=None).strftime("%H:%M%p %d/%m/%y") + + +def print_cluster_conf(cluster_conf: ClusterConfiguration, wait: bool): + user_configuration = cluster_conf.user_configuration + + log.info("-------------------------------------------") + log.info("spark cluster id: %s", cluster_conf.cluster_id) + log.info("spark cluster size: %s", + cluster_conf.vm_count + cluster_conf.vm_low_pri_count) + log.info("> dedicated: %s", cluster_conf.vm_count) + log.info("> low priority: %s", cluster_conf.vm_low_pri_count) + log.info("spark cluster vm size: %s", cluster_conf.vm_size) + log.info("custom scripts: %s", len(cluster_conf.custom_scripts) if cluster_conf.custom_scripts else 0) + log.info("subnet ID: %s", cluster_conf.subnet_id) + log.info("file shares: %s", len(cluster_conf.file_shares) if cluster_conf.file_shares is not None else 0) + log.info("docker repo name: %s", cluster_conf.docker_repo) + log.info("wait for cluster: %s", wait) + log.info("username: %s", user_configuration.username) + if user_configuration.password: + log.info("Password: %s", '*' * len(user_configuration.password)) + log.info("Plugins:") + if not cluster_conf.plugins: + log.info(" None Configured") + else: + for plugin in cluster_conf.plugins: + log.info(" - %s", plugin.name) + log.info("-------------------------------------------") + + +def log_property(label: str, value: str): + label += ":" + log.info("{0:30} {1}".format(label, value)) diff --git a/config/cluster.yaml b/config/cluster.yaml index d51c12b9..ab1285f2 100644 --- a/config/cluster.yaml +++ b/config/cluster.yaml @@ -15,7 +15,7 @@ size: 2 username: spark # docker_repo: -docker_repo: +docker_repo: # # optional custom scripts to run on the Spark master, Spark worker or all nodes in the cluster # custom_scripts: @@ -27,5 +27,11 @@ docker_repo: # To add your cluster to a virtual network provide the full arm resoruce id below # subnet_id: /subscriptions/********-****-****-****-************/resourceGroups/********/providers/Microsoft.Network/virtualNetworks/*******/subnets/****** +# To define plugins +# plugins: +# - name: rstudio_server +# args: +# version: 1.2.3 + # wait: wait: false diff --git a/config/ssh.yaml b/config/ssh.yaml index 909f41c8..2d91446f 100644 --- a/config/ssh.yaml +++ b/config/ssh.yaml @@ -14,14 +14,5 @@ job_history_ui_port: 18080 # web_ui_port: web_ui_port: 8080 -# jupyter_port: -jupyter_port: 8888 - -# name_node_ui_port: -name_node_ui_port: 50070 - -# rstudio_server_port: -rstudio_server_port: 8787 - # connect: connect: true diff --git a/node_scripts/core/config.py b/node_scripts/core/config.py index d26fa9f1..349aa1db 100644 --- a/node_scripts/core/config.py +++ b/node_scripts/core/config.py @@ -31,7 +31,6 @@ spark_web_ui_port = os.environ["SPARK_WEB_UI_PORT"] spark_worker_ui_port = os.environ["SPARK_WORKER_UI_PORT"] -spark_jupyter_port = os.environ["SPARK_JUPYTER_PORT"] spark_job_ui_port = os.environ["SPARK_JOB_UI_PORT"] storage_account_name = os.environ["STORAGE_ACCOUNT_NAME"] diff --git a/node_scripts/install/install.py b/node_scripts/install/install.py index 7ff522e9..0df0eb13 100644 --- a/node_scripts/install/install.py +++ b/node_scripts/install/install.py @@ -1,6 +1,6 @@ import os from core import config -from install import pick_master, spark, scripts, create_user +from install import pick_master, spark, scripts, create_user, plugins import wait_until_master_selected @@ -24,10 +24,12 @@ def setup_node(): if is_master: setup_as_master() + plugins.setup_plugins(is_master=True, is_worker=True) scripts.run_custom_scripts(is_master=True, is_worker=True) else: setup_as_worker() + plugins.setup_plugins(is_master=False, is_worker=True) scripts.run_custom_scripts(is_master=False, is_worker=True) open("/tmp/setup_complete", 'a').close() diff --git a/node_scripts/install/plugins.py b/node_scripts/install/plugins.py new file mode 100644 index 00000000..bac6fd97 --- /dev/null +++ b/node_scripts/install/plugins.py @@ -0,0 +1,99 @@ +import os +import json +import yaml +import subprocess +from pathlib import Path + + +log_folder = os.path.join(os.environ['DOCKER_WORKING_DIR'], 'logs','plugins') + +def _read_manifest_file(path=None): + custom_scripts = None + if not os.path.isfile(path): + print("Plugins manifest file doesn't exist at {0}".format(path)) + else: + with open(path, 'r') as stream: + try: + custom_scripts = yaml.load(stream) + except json.JSONDecodeError as err: + print("Error in plugins manifest: {0}".format(err)) + + return custom_scripts + + +def setup_plugins(is_master: bool = False, is_worker: bool = False): + + plugins_dir = _plugins_dir() + plugins_manifest = _read_manifest_file( + os.path.join(plugins_dir, 'plugins-manifest.yaml')) + + if not os.path.exists(log_folder): + os.makedirs(log_folder) + + if plugins_manifest is not None: + _setup_plugins(plugins_manifest, is_master, is_worker) + + +def _plugins_dir(): + return os.path.join(os.environ['DOCKER_WORKING_DIR'], 'plugins') + + +def _run_on_this_node(plugin_obj=None, is_master=False, is_worker=False): + if plugin_obj['runOn'] == 'master' and is_master is True: + return True + if plugin_obj['runOn'] == 'worker' and is_worker is True: + return True + if plugin_obj['runOn'] == 'all-nodes': + return True + + return False + + +def _setup_plugins(plugins_manifest, is_master=False, is_worker=False): + plugins_dir = _plugins_dir() + + if is_master: + os.environ["IS_MASTER"] = "1" + else: + os.environ["IS_MASTER"] = "0" + + if is_worker: + os.environ["IS_WORKER"] = "1" + else: + os.environ["IS_WORKER"] = "0" + + for plugin in plugins_manifest: + if _run_on_this_node(plugin, is_master, is_worker): + path = os.path.join(plugins_dir, plugin['execute']) + _run_script(plugin.get("name"), path, plugin.get('args'), plugin.get('env')) + + +def _run_script(name: str, script_path: str = None, args: dict = None, env: dict = None): + if not os.path.isfile(script_path): + print("Cannot run plugin script: {0} file does not exist".format( + script_path)) + return + file_stat = os.stat(script_path) + os.chmod(script_path, file_stat.st_mode | 0o777) + print("------------------------------------------------------------------") + print("Running plugin script:", script_path) + + my_env = os.environ.copy() + if env: + for [key, value] in env.items(): + my_env[key] = value + + if args is None: + args = [] + + out_file = open(os.path.join(log_folder, '{0}.txt'.format(name)), 'w') + try: + subprocess.call( + [script_path] + args, + env=my_env, + stdout=out_file, + stderr=out_file) + print("Finished running") + print("------------------------------------------------------------------") + except Exception as e: + print(e) diff --git a/requirements.txt b/requirements.txt index 4badd998..d70521d2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,6 +9,6 @@ paramiko==2.4.0 # Development yapf==0.20.1 -pylint==1.7.2 +pylint==1.8.2 pytest==3.1.3 pytest-xdist==1.22.0 diff --git a/tests/models/internal/test_plugin-manager.py b/tests/models/internal/test_plugin-manager.py new file mode 100644 index 00000000..dab024d6 --- /dev/null +++ b/tests/models/internal/test_plugin-manager.py @@ -0,0 +1,35 @@ +import os +import pytest +from aztk.models.plugins import PluginConfiguration +from aztk.models.plugins.internal import PluginManager +from aztk.error import InvalidPluginReferenceError + +dir_path = os.path.dirname(os.path.realpath(__file__)) +fake_plugin_dir = os.path.join(dir_path, "fake_plugins") + + +class RequiredArgPlugin(PluginConfiguration): + def __init__(self, req_arg): + super().__init__(name="required-arg") + + +def test_missing_plugin(): + plugin_manager = PluginManager() + message = "Cannot find a plugin with name .*" + with pytest.raises(InvalidPluginReferenceError, match=message): + plugin_manager.get_plugin("non-existing-plugin") + + +def test_extra_args_plugin(): + plugin_manager = PluginManager() + message = "Plugin JupyterPlugin doesn't have an argument called 'invalid'" + with pytest.raises(InvalidPluginReferenceError, match=message): + plugin_manager.get_plugin("jupyter", args=dict(invalid="foo")) + + +def test_missing_required_arg(): + plugin_manager = PluginManager() + plugin_manager.plugins["required-arg"] = RequiredArgPlugin + message = "Missing a required argument req_arg for plugin RequiredArgPlugin" + with pytest.raises(InvalidPluginReferenceError, match=message): + plugin_manager.get_plugin("required-arg") diff --git a/tests/models/plugins/test_plugin_configuration.py b/tests/models/plugins/test_plugin_configuration.py new file mode 100644 index 00000000..d7ff7c63 --- /dev/null +++ b/tests/models/plugins/test_plugin_configuration.py @@ -0,0 +1,47 @@ +from aztk.models.plugins import PluginConfiguration, PluginPort, PluginRunTarget + + +def test_create_basic_plugin(): + plugin = PluginConfiguration( + name="abc", files=["file.sh"], execute="file.sh") + assert plugin.name == "abc" + assert plugin.files == ["file.sh"] + assert plugin.execute == "file.sh" + assert plugin.args == [] + assert plugin.run_on == PluginRunTarget.Master + + +def test_create_with_args(): + plugin = PluginConfiguration( + name="abc", args=["arg1", "arg2"]) + assert plugin.name == "abc" + assert len(plugin.args) == 2 + assert plugin.args == ["arg1", "arg2"] + + +def test_plugin_with_internal_port(): + plugin = PluginConfiguration(name="abc", ports=[PluginPort(internal=1234)]) + assert plugin.name == "abc" + assert len(plugin.ports) == 1 + port = plugin.ports[0] + assert port.internal == 1234 + assert port.expose_publicly == False + assert port.public_port == None + +def test_plugin_with_auto_public_port(): + plugin = PluginConfiguration(name="abc", ports=[PluginPort(internal=1234, public=True)]) + assert plugin.name == "abc" + assert len(plugin.ports) == 1 + port = plugin.ports[0] + assert port.internal == 1234 + assert port.expose_publicly == True + assert port.public_port == 1234 + +def test_plugin_with_specified_public_port(): + plugin = PluginConfiguration(name="abc", ports=[PluginPort(internal=1234, public=4321)]) + assert plugin.name == "abc" + assert len(plugin.ports) == 1 + port = plugin.ports[0] + assert port.internal == 1234 + assert port.expose_publicly == True + assert port.public_port == 4321 diff --git a/tests/utils/test_command_builder.py b/tests/utils/test_command_builder.py new file mode 100644 index 00000000..269e12af --- /dev/null +++ b/tests/utils/test_command_builder.py @@ -0,0 +1,35 @@ +from aztk.utils.command_builder import CommandBuilder + + +def test_only_command(): + cmd = CommandBuilder("ssh") + assert cmd.to_str() == "ssh" + + +def test_with_option(): + cmd = CommandBuilder("ssh") + cmd.add_option("-L", "8080:localhost:8080") + assert cmd.to_str() == "ssh -L 8080:localhost:8080" + + +def test_with_multiple_options(): + cmd = CommandBuilder("ssh") + cmd.add_option("-L", "8080:localhost:8080") + cmd.add_option("-p", "2020") + assert cmd.to_str() == "ssh -L 8080:localhost:8080 -p 2020" + + +def test_with_arg_and_option(): + cmd = CommandBuilder("ssh") + cmd.add_argument("admin@1.2.3.4") + cmd.add_option("-p", "2020") + assert cmd.to_str() == "ssh -p 2020 admin@1.2.3.4" + + +def test_with_disabled_options(): + cmd = CommandBuilder("ssh") + + cmd.add_option("--verbose", enable=True) + cmd.add_option("-p", None) + cmd.add_option("-L", "8080:localhost:8080", enable=False) + assert cmd.to_str() == "ssh --verbose"