Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Feature: spark debug tool (#455)
Browse files Browse the repository at this point in the history
* start implementation of cluster debug utility

* update debug program

* update debug

* fix output directory structure

* cleanup output, add error checking

* sort imports

* start untar

* extract tar

* add debug.py to pylintc ignore, line too long

* crlf->lf

* add app logs

* call get_spark_app_logs, typos

* add docs

* remove debug.py from pylintrc ignore

* added debug.py back to pylint ignore

* change pylint ignore

* remove commented log

* update cluster_run

* refactor cluster_copy

* update debug, add spinner for run and copy

* make new sdk cluster_download endpoint
  • Loading branch information
jafreck committed Apr 9, 2018
1 parent 61e7c59 commit 44a0765
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 71 deletions.
44 changes: 26 additions & 18 deletions aztk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,43 +229,48 @@ def __delete_user_on_pool(self, username, pool_id, nodes):
concurrent.futures.wait(futures)


def __cluster_run(self, cluster_id, container_name, command, internal):
def __cluster_run(self, cluster_id, command, internal, container_name=None):
pool, nodes = self.__get_pool_details(cluster_id)
nodes = [node for node in nodes]
if internal:
cluster_nodes = [models.RemoteLogin(ip_address=node.ip_address, port="22") for node in nodes]
cluster_nodes = [(node, models.RemoteLogin(ip_address=node.ip_address, port="22")) for node in nodes]
else:
cluster_nodes = [self.__get_remote_login_settings(pool.id, node.id) for node in nodes]
cluster_nodes = [(node, self.__get_remote_login_settings(pool.id, node.id)) for node in nodes]
try:
ssh_key = self.__create_user_on_pool('aztk', pool.id, nodes)
asyncio.get_event_loop().run_until_complete(ssh_lib.clus_exec_command(command,
container_name,
'aztk',
cluster_nodes,
ssh_key=ssh_key.exportKey().decode('utf-8')))
output = asyncio.get_event_loop().run_until_complete(ssh_lib.clus_exec_command(command,
'aztk',
cluster_nodes,
ssh_key=ssh_key.exportKey().decode('utf-8'),
container_name=container_name))
return output
except OSError as exc:
raise exc
finally:
self.__delete_user_on_pool('aztk', pool.id, nodes)

def __cluster_copy(self, cluster_id, container_name, source_path, destination_path, internal):
def __cluster_copy(self, cluster_id, source_path, destination_path, container_name=None, internal=False, get=False):
pool, nodes = self.__get_pool_details(cluster_id)
nodes = [node for node in nodes]
if internal:
cluster_nodes = [models.RemoteLogin(ip_address=node.ip_address, port="22") for node in nodes]
cluster_nodes = [(node, models.RemoteLogin(ip_address=node.ip_address, port="22")) for node in nodes]
else:
cluster_nodes = [self.__get_remote_login_settings(pool.id, node.id) for node in nodes]
cluster_nodes = [(node, self.__get_remote_login_settings(pool.id, node.id)) for node in nodes]
try:
ssh_key = self.__create_user_on_pool('aztk', pool.id, nodes)
asyncio.get_event_loop().run_until_complete(ssh_lib.clus_copy(container_name=container_name,
username='aztk',
nodes=cluster_nodes,
source_path=source_path,
destination_path=destination_path,
ssh_key=ssh_key.exportKey().decode('utf-8')))
self.__delete_user_on_pool('aztk', pool.id, nodes)
output = asyncio.get_event_loop().run_until_complete(
ssh_lib.clus_copy(container_name=container_name,
username='aztk',
nodes=cluster_nodes,
source_path=source_path,
destination_path=destination_path,
ssh_key=ssh_key.exportKey().decode('utf-8'),
get=get))
return output
except (OSError, batch_error.BatchErrorException) as exc:
raise exc
finally:
self.__delete_user_on_pool('aztk', pool.id, nodes)

def __submit_job(self,
job_configuration,
Expand Down Expand Up @@ -388,5 +393,8 @@ def cluster_run(self, cluster_id, command):
def cluster_copy(self, cluster_id, source_path, destination_path):
raise NotImplementedError()

def cluster_download(self, cluster_id, source_path, destination_path):
raise NotImplementedError()

def submit_job(self, job):
raise NotImplementedError()
24 changes: 20 additions & 4 deletions aztk/spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from aztk.spark.helpers import submit as cluster_submit_helper
from aztk.spark.helpers import job_submission as job_submit_helper
from aztk.spark.helpers import get_log as get_log_helper
from aztk.spark.helpers import cluster_diagnostic_helper
from aztk.spark.utils import util
from aztk.internal.cluster_data import NodeData
import yaml
Expand Down Expand Up @@ -146,15 +147,23 @@ def get_application_status(self, cluster_id: str, app_name: str):
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

def cluster_run(self, cluster_id: str, command: str, internal: bool = False):
def cluster_run(self, cluster_id: str, command: str, host=False, internal: bool = False):
try:
return self.__cluster_run(cluster_id, 'spark', command, internal)
return self.__cluster_run(cluster_id, command, internal, container_name='spark' if not host else None)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str, internal: bool = False):
def cluster_copy(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False):
try:
return self.__cluster_copy(cluster_id, 'spark', source_path, destination_path, internal)
container_name = None if host else 'spark'
return self.__cluster_copy(cluster_id, source_path, destination_path, container_name=container_name, get=False, internal=internal)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

def cluster_download(self, cluster_id: str, source_path: str, destination_path: str, host: bool = False, internal: bool = False):
try:
container_name = None if host else 'spark'
return self.__cluster_copy(cluster_id, source_path, destination_path, container_name=container_name, get=True, internal=internal)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Expand Down Expand Up @@ -272,3 +281,10 @@ def wait_until_job_finished(self, job_id):
def wait_until_all_jobs_finished(self, jobs):
for job in jobs:
self.wait_until_job_finished(job)

def run_cluster_diagnostics(self, cluster_id, output_directory):
try:
output = cluster_diagnostic_helper.run(self, cluster_id, output_directory)
return output
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))
26 changes: 26 additions & 0 deletions aztk/spark/helpers/cluster_diagnostic_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
from aztk.utils import ssh
from aztk.utils.command_builder import CommandBuilder
from aztk import models as aztk_models
import azure.batch.models as batch_models

def run(spark_client, cluster_id, output_directory):
# copy debug program to each node
spark_client.cluster_copy(cluster_id, os.path.abspath("./aztk/spark/utils/debug.py"), "/tmp/debug.py", host=True)
ssh_cmd = _build_diagnostic_ssh_command()
run_output = spark_client.cluster_run(cluster_id, ssh_cmd, host=True)
local_path = os.path.join(os.path.abspath(output_directory), "debug", "debug.zip")
remote_path = "/tmp/debug.zip"
output = spark_client.cluster_download(cluster_id, remote_path, local_path, host=True)
# write run output to debug/ directory
with open(os.path.join(os.path.dirname(local_path), "debug-output.txt"), 'w', encoding="UTF-8") as f:
[f.write(line + '\n') for node_id, result in run_output for line in result]
return output


def _build_diagnostic_ssh_command():
return "sudo rm -rf /tmp/debug.zip; "\
"sudo apt-get install -y python3-pip; "\
"sudo -H pip3 install --upgrade pip; "\
"sudo -H pip3 install docker; "\
"sudo python3 /tmp/debug.py"
160 changes: 160 additions & 0 deletions aztk/spark/utils/debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""
Diagnostic program that runs on each node in the cluster
This program must be run with sudo
"""
import io
import json
import os
import socket
import tarfile
from subprocess import STDOUT, CalledProcessError, check_output
from zipfile import ZIP_DEFLATED, ZipFile

import docker # pylint: disable=import-error


def main():
zipf = create_zip_archive()

# general node diagnostics
zipf.writestr("hostname.txt", data=get_hostname())
zipf.writestr("df.txt", data=get_disk_free())

# docker container diagnostics
docker_client = docker.from_env()
for filename, data in get_docker_diagnostics(docker_client):
zipf.writestr(filename, data=data)

zipf.close()


def create_zip_archive():
zip_file_path = "/tmp/debug.zip"
return ZipFile(zip_file_path, "w", ZIP_DEFLATED)


def get_hostname():
return socket.gethostname()


def cmd_check_output(cmd):
try:
output = check_output(cmd, shell=True, stderr=STDOUT)
except CalledProcessError as e:
return "CMD: {0}\n"\
"returncode: {1}"\
"output: {2}".format(e.cmd, e.returncode, e.output)
else:
return output


def get_disk_free():
return cmd_check_output("df -h")


def get_docker_diagnostics(docker_client):
'''
returns list of tuples (filename, data) to be written in the zip
'''
output = []
output.append(get_docker_images(docker_client))
logs = get_docker_containers(docker_client)
for item in logs:
output.append(item)

return output


def get_docker_images(docker_client):
output = ""
try:
images = docker_client.images.list()
for image in images:
output += json.dumps(image.attrs, sort_keys=True, indent=4)
return ("docker-images.txt", output)
except docker.errors.APIError as e:
return ("docker-images.err", e.__str__())


def get_docker_containers(docker_client):
container_attrs = ""
logs = []
try:
containers = docker_client.containers.list()
for container in containers:
container_attrs += json.dumps(container.attrs, sort_keys=True, indent=4)
# get docker container logs
logs.append((container.name + "/docker.log", container.logs()))
logs.append(get_docker_process_status(container))
if container.name == "spark": #TODO: find a more robust way to get specific info off specific containers
logs.extend(get_container_aztk_script(container))
logs.extend(get_spark_logs(container))
logs.extend(get_spark_app_logs(container))

logs.append(("docker-containers.txt", container_attrs))
return logs
except docker.errors.APIError as e:
return [("docker-containers.err", e.__str__())]


def get_docker_process_status(container):
try:
exit_code, output = container.exec_run("ps -auxw", tty=True, privileged=True)
out_file_name = container.name + "/ps_aux.txt"
if exit_code == 0:
return (out_file_name, output)
else:
return (out_file_name, "exit_code: {0}\n{1}".format(exit_code, output))
except docker.errors.APIError as e:
return (container.name + "ps_aux.err", e.__str__())


def get_container_aztk_script(container):
aztk_path = "/mnt/batch/tasks/startup/wd"
try:
stream, _ = container.get_archive(aztk_path) # second item is stat info
return extract_tar_in_memory(container, stream)
except docker.errors.APIError as e:
return (container.name + "/" + "aztk-scripts.err", e.__str__())


def get_spark_logs(container):
spark_logs_path = "/home/spark-current/logs"
try:
stream, _ = container.get_archive(spark_logs_path) # second item is stat info
return extract_tar_in_memory(container, stream)
except docker.errors.APIError as e:
return [(container.name + "/" + "spark-logs.err", e.__str__())]


def get_spark_app_logs(container):
spark_app_logs_path = "/home/spark-current/work"
try:
stream, _ = container.get_archive(spark_app_logs_path)
return extract_tar_in_memory(container, stream)
except docker.errors.APIError as e:
return [(container.name + "/" + "spark-work-logs.err", e.__str__())]


def filter_members(members):
skip_files = ["id_rsa", "id_rsa.pub", "docker.log"]
skip_extensions = [".pyc", ".zip"]
for tarinfo in members:
if (os.path.splitext(tarinfo.name)[1] not in skip_extensions and
os.path.basename(tarinfo.name) not in skip_files):
yield tarinfo


def extract_tar_in_memory(container, data):
data = io.BytesIO(b''.join([item for item in data]))
tarf = tarfile.open(fileobj=data)
logs = []
for member in filter_members(tarf):
file_bytes = tarf.extractfile(member)
if file_bytes is not None:
logs.append((container.name + "/" + member.name, b''.join(file_bytes.readlines())))
return logs


if __name__ == "__main__":
main()
Loading

0 comments on commit 44a0765

Please sign in to comment.