Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Fix: spark debug tool filter out .venv, make debug tool testable (#612)
Browse files Browse the repository at this point in the history
* filter out .venv

* add NodeOutput model

* add debug tool integration test

* add test for debug tool

* split condition

* revert style change

* remove debug print

* whitespace

* remove other model implementation

* fix cluster copy

* fix cluster run and cluster copy
  • Loading branch information
jafreck committed Jun 20, 2018
1 parent 883980d commit 4e0b1ec
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 56 deletions.
2 changes: 1 addition & 1 deletion aztk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def __cluster_run(self, cluster_id, command, internal, container_name=None, time
finally:
self.__delete_user_on_pool(generated_username, pool.id, nodes)

def __cluster_copy(self, cluster_id, source_path, destination_path, container_name=None, internal=False, get=False, timeout=None):
def __cluster_copy(self, cluster_id, source_path, destination_path=None, container_name=None, internal=False, get=False, timeout=None):
pool, nodes = self.__get_pool_details(cluster_id)
nodes = list(nodes)
if internal:
Expand Down
1 change: 1 addition & 0 deletions aztk/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .remote_login import RemoteLogin
from .ssh_log import SSHLog
from .vm_image import VmImage
from .node_output import NodeOutput
from .software import Software
from .cluster import Cluster
from .scheduling_target import SchedulingTarget
Expand Down
8 changes: 8 additions & 0 deletions aztk/models/node_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from tempfile import SpooledTemporaryFile
from typing import Union

class NodeOutput:
def __init__(self, id: str, output: Union[SpooledTemporaryFile, str] = None, error: Exception = None):
self.id = id
self.output = output
self.error = error
10 changes: 5 additions & 5 deletions aztk/spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,25 +184,25 @@ def node_run(self, cluster_id: str, node_id: str, command: str, host=False, inte
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout=None):
def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout: int = None):
try:
container_name = None if host else 'spark'
return self.__cluster_copy(cluster_id,
source_path,
destination_path,
destination_path=destination_path,
container_name=container_name,
get=False,
internal=internal,
timeout=timeout)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

def cluster_download(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False, timeout=None):
def cluster_download(self, cluster_id: str, source_path: str, destination_path: str = None, host: bool = False, internal: bool = False, timeout: int = None):
try:
container_name = None if host else 'spark'
return self.__cluster_copy(cluster_id,
source_path,
destination_path,
destination_path=destination_path,
container_name=container_name,
get=True,
internal=internal,
Expand Down Expand Up @@ -333,7 +333,7 @@ def wait_until_all_jobs_finished(self, jobs):
for job in jobs:
self.wait_until_job_finished(job)

def run_cluster_diagnostics(self, cluster_id, output_directory):
def run_cluster_diagnostics(self, cluster_id, output_directory=None):
try:
output = cluster_diagnostic_helper.run(self, cluster_id, output_directory)
return output
Expand Down
19 changes: 12 additions & 7 deletions aztk/spark/helpers/cluster_diagnostic_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@
from aztk import models as aztk_models
import azure.batch.models as batch_models

def run(spark_client, cluster_id, output_directory):
def run(spark_client, cluster_id, output_directory=None):
# copy debug program to each node
spark_client.cluster_copy(cluster_id, os.path.abspath("./aztk/spark/utils/debug.py"), "/tmp/debug.py", host=True)
output = spark_client.cluster_copy(cluster_id, os.path.abspath("./aztk/spark/utils/debug.py"), "/tmp/debug.py", host=True)
ssh_cmd = _build_diagnostic_ssh_command()
run_output = spark_client.cluster_run(cluster_id, ssh_cmd, host=True)
local_path = os.path.join(os.path.abspath(output_directory), "debug", "debug.zip")
remote_path = "/tmp/debug.zip"
output = spark_client.cluster_download(cluster_id, remote_path, local_path, host=True)
# write run output to debug/ directory
with open(os.path.join(os.path.dirname(local_path), "debug-output.txt"), 'w', encoding="UTF-8") as f:
[f.write(line + '\n') for node_id, result in run_output for line in result]
if output_directory:
local_path = os.path.join(os.path.abspath(output_directory), "debug.zip")
output = spark_client.cluster_download(cluster_id, remote_path, local_path, host=True)

# write run output to debug/ directory
with open(os.path.join(os.path.dirname(local_path), "debug-output.txt"), 'w', encoding="UTF-8") as f:
[f.write(line + '\n') for node_output in run_output for line in node_output.output]
else:
output = spark_client.cluster_download(cluster_id, remote_path, host=True)

return output


Expand Down
15 changes: 9 additions & 6 deletions aztk/spark/utils/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from subprocess import STDOUT, CalledProcessError, check_output
from zipfile import ZIP_DEFLATED, ZipFile

import docker # pylint: disable=import-error
import docker # pylint: disable=import-error


def main():
Expand Down Expand Up @@ -86,7 +86,7 @@ def get_docker_containers(docker_client):
# get docker container logs
logs.append((container.name + "/docker.log", container.logs()))
logs.append(get_docker_process_status(container))
if container.name == "spark": #TODO: find a more robust way to get specific info off specific containers
if container.name == "spark": #TODO: find a more robust way to get specific info off specific containers
logs.extend(get_container_aztk_script(container))
logs.extend(get_spark_logs(container))
logs.extend(get_spark_app_logs(container))
Expand All @@ -112,7 +112,7 @@ def get_docker_process_status(container):
def get_container_aztk_script(container):
aztk_path = "/mnt/batch/tasks/startup/wd"
try:
stream, _ = container.get_archive(aztk_path) # second item is stat info
stream, _ = container.get_archive(aztk_path) # second item is stat info
return extract_tar_in_memory(container, stream)
except docker.errors.APIError as e:
return (container.name + "/" + "aztk-scripts.err", e.__str__())
Expand All @@ -121,7 +121,7 @@ def get_container_aztk_script(container):
def get_spark_logs(container):
spark_logs_path = "/home/spark-current/logs"
try:
stream, _ = container.get_archive(spark_logs_path) # second item is stat info
stream, _ = container.get_archive(spark_logs_path) # second item is stat info
return extract_tar_in_memory(container, stream)
except docker.errors.APIError as e:
return [(container.name + "/" + "spark-logs.err", e.__str__())]
Expand All @@ -139,9 +139,12 @@ def get_spark_app_logs(container):
def filter_members(members):
skip_files = ["id_rsa", "id_rsa.pub", "docker.log"]
skip_extensions = [".pyc", ".zip"]
skip_directories = [".venv"]
for tarinfo in members:
if (os.path.splitext(tarinfo.name)[1] not in skip_extensions and
os.path.basename(tarinfo.name) not in skip_files):
member_path = os.path.normpath(tarinfo.name).split(os.sep)
if (not any(directory in skip_directories for directory in member_path)
and os.path.basename(tarinfo.name) not in skip_files
and os.path.splitext(tarinfo.name)[1] not in skip_extensions):
yield tarinfo


Expand Down
38 changes: 23 additions & 15 deletions aztk/utils/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from concurrent.futures import ThreadPoolExecutor

from aztk.error import AztkError

from aztk.models import NodeOutput

class ForwardServer(SocketServer.ThreadingTCPServer):
daemon_threads = True
Expand Down Expand Up @@ -110,15 +110,15 @@ def node_exec_command(node_id, command, username, hostname, port, ssh_key=None,
try:
client = connect(hostname=hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
except AztkError as e:
return (node_id, e)
return NodeOutput(node_id, e)
if container_name:
cmd = 'sudo docker exec 2>&1 -t {0} /bin/bash -c \'set -e; set -o pipefail; {1}; wait\''.format(container_name, command)
else:
cmd = '/bin/bash 2>&1 -c \'set -e; set -o pipefail; {0}; wait\''.format(command)
stdin, stdout, stderr = client.exec_command(cmd, get_pty=True)
output = [line.decode('utf-8') for line in stdout.read().splitlines()]
output = stdout.read().decode("utf-8")
client.close()
return (node_id, output)
return NodeOutput(node_id, output, None)


async def clus_exec_command(command, username, nodes, ports=None, ssh_key=None, password=None, container_name=None, timeout=None):
Expand All @@ -141,16 +141,23 @@ def copy_from_node(node_id, source_path, destination_path, username, hostname, p
try:
client = connect(hostname=hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
except AztkError as e:
return (node_id, False, e)
return NodeOutput(node_id, False, e)
sftp_client = client.open_sftp()
try:
destination_path = os.path.join(os.path.dirname(destination_path), node_id, os.path.basename(destination_path))
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
with open(destination_path, 'wb') as f: #SpooledTemporaryFile instead??
if destination_path:
destination_path = os.path.join(os.path.dirname(destination_path), node_id, os.path.basename(destination_path))
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
with open(destination_path, 'wb') as f:
sftp_client.getfo(source_path, f)
else:
import tempfile
# create 2mb temporary file
f = tempfile.SpooledTemporaryFile(2*1024**3)
sftp_client.getfo(source_path, f)
return (node_id, True, None)

return NodeOutput(node_id, f, None)
except OSError as e:
return (node_id, False, e)
return (node_id, None, e)
finally:
sftp_client.close()
client.close()
Expand All @@ -160,7 +167,7 @@ def node_copy(node_id, source_path, destination_path, username, hostname, port,
try:
client = connect(hostname=hostname, port=port, username=username, password=password, pkey=ssh_key, timeout=timeout)
except AztkError as e:
return (node_id, False, e)
return NodeOutput(node_id, None, e)
sftp_client = client.open_sftp()
try:
if container_name:
Expand All @@ -170,15 +177,16 @@ def node_copy(node_id, source_path, destination_path, username, hostname, port,
# move to correct destination on container
docker_command = 'sudo docker cp {0} {1}:{2}'.format(tmp_file, container_name, destination_path)
_, stdout, _ = client.exec_command(docker_command, get_pty=True)
output = [line.decode('utf-8') for line in stdout.read().splitlines()]
output = stdout.read().decode('utf-8')
# clean up
sftp_client.remove(tmp_file)
return (node_id, True, None)
return NodeOutput(node_id, output, None)
else:
output = sftp_client.put(source_path, destination_path).__str__()
return (node_id, True, None)
return NodeOutput(node_id, output, None)
except (IOError, PermissionError) as e:
return (node_id, False, e)
raise e
return NodeOutput(node_id, None, e)
finally:
sftp_client.close()
client.close()
Expand Down
14 changes: 2 additions & 12 deletions aztk_cli/spark/endpoints/cluster/cluster_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,5 @@ def execute(args: typing.NamedTuple):
destination_path=args.dest_path,
internal=args.internal
)
[print_copy_result(node_id, result, err) for node_id, result, err in copy_output]
sys.exit(0 if all([result for _, result, _ in copy_output]) else 1)


def print_copy_result(node_id, success, err):
log.print("-" * (len(node_id) + 6))
log.print("| %s |", node_id)
log.print("-" * (len(node_id) + 6))
if success:
log.print("Copy successful")
else:
log.print(err)
[utils.log_node_copy_output(node_output) for node_output in copy_output]
sys.exit(0 if not any([node_output.error for node_output in copy_output]) else 1)
2 changes: 1 addition & 1 deletion aztk_cli/spark/endpoints/cluster/cluster_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ def execute(args: typing.NamedTuple):
else:
results = spark_client.cluster_run(args.cluster_id, args.command, args.host, args.internal)

[utils.log_execute_result(node_id, result) for node_id, result in results]
[utils.log_node_run_output(node_output) for node_output in results]
25 changes: 17 additions & 8 deletions aztk_cli/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,21 @@ def log_property(label: str, value: str):
log.info("{0:30} {1}".format(label, value))


def log_execute_result(node_id, result):
log.info("-" * (len(node_id) + 4))
log.info("| %s |", node_id)
log.info("-" * (len(node_id) + 4))
if isinstance(result, Exception):
log.info("%s\n", result)
def log_node_copy_output(node_output):
log.info("-" * (len(node_output.id) + 4))
log.info("| %s |", node_output.id)
log.info("-" * (len(node_output.id) + 4))
if node_output.error:
log.error(node_output.error)
else:
for line in result:
log.print(line)
log.print("Copy successful")


def log_node_run_output(node_output):
log.info("-" * (len(node_output.id) + 4))
log.info("| %s |", node_output.id)
log.info("-" * (len(node_output.id) + 4))
if node_output.error:
log.error("%s\n", node_output.error)
else:
log.print(node_output.output)
49 changes: 48 additions & 1 deletion tests/integration_tests/spark/sdk/cluster/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import time
from datetime import datetime
from zipfile import ZipFile

import azure.batch.models as batch_models
from azure.batch.models import BatchErrorException
Expand Down Expand Up @@ -56,6 +57,7 @@ def clean_up_cluster(cluster_id):
# pass in the event that the cluster does not exist
pass


def ensure_spark_master(cluster_id):
results = spark_client.cluster_run(cluster_id,
"if $AZTK_IS_MASTER ; then $SPARK_HOME/sbin/spark-daemon.sh status org.apache.spark.deploy.master.Master 1 ;" \
Expand All @@ -66,6 +68,7 @@ def ensure_spark_master(cluster_id):
print(result[0])
assert result[0] in ["org.apache.spark.deploy.master.Master is running.", "AZTK_IS_MASTER is false"]


def ensure_spark_worker(cluster_id):
results = spark_client.cluster_run(cluster_id,
"if $AZTK_IS_WORKER ; then $SPARK_HOME/sbin/spark-daemon.sh status org.apache.spark.deploy.worker.Worker 1 ;" \
Expand All @@ -75,10 +78,12 @@ def ensure_spark_worker(cluster_id):
raise result
assert result[0] in ["org.apache.spark.deploy.worker.Worker is running.", "AZTK_IS_WORKER is false"]


def ensure_spark_processes(cluster_id):
ensure_spark_master(cluster_id)
ensure_spark_worker(cluster_id)


def wait_for_all_nodes(cluster_id, nodes):
while True:
for node in nodes:
Expand All @@ -89,6 +94,7 @@ def wait_for_all_nodes(cluster_id, nodes):
continue
break


def test_create_cluster():
test_id = "test-create-"
# TODO: make Cluster Configuration more robust, test each value
Expand Down Expand Up @@ -383,7 +389,7 @@ def test_delete_cluster():
clean_up_cluster(cluster_configuration.cluster_id)

def test_spark_processes_up():
test_id = "test-spark-processes-up"
test_id = "test-spark-processes-up-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
vm_count=2,
Expand All @@ -408,3 +414,44 @@ def test_spark_processes_up():

finally:
clean_up_cluster(cluster_configuration.cluster_id)


def test_debug_tool():
test_id = "debug-tool-"
cluster_configuration = aztk.spark.models.ClusterConfiguration(
cluster_id=test_id+base_cluster_id,
size=2,
size_low_priority=0,
vm_size="standard_f2",
subnet_id=None,
custom_scripts=None,
file_shares=None,
toolkit=aztk.spark.models.SparkToolkit(version="2.3.0"),
spark_configuration=None
)
expected_members = [
"df.txt",
"hostname.txt",
"docker-images.txt",
"docker-containers.txt",
"spark/docker.log",
"spark/ps_aux.txt",
"spark/logs",
"spark/wd"
]
try:
cluster = spark_client.create_cluster(cluster_configuration, wait=True)
nodes = [node for node in cluster.nodes]
wait_for_all_nodes(cluster.id, nodes)
cluster_output = spark_client.run_cluster_diagnostics(cluster_id=cluster.id)
for node_output in cluster_output:
node_output.output.seek(0) # tempfile requires seek 0 before reading
debug_zip = ZipFile(node_output.output)
assert node_output.id in [node.id for node in nodes]
assert node_output.error is None
assert any(member in name for name in debug_zip.namelist() for member in expected_members)
except (AztkError, BatchErrorException):
assert False

finally:
clean_up_cluster(cluster_configuration.cluster_id)

0 comments on commit 4e0b1ec

Please sign in to comment.