From 32de752d53a726eb68cd9a2261d381a89943962c Mon Sep 17 00:00:00 2001 From: Jacob Freck Date: Thu, 5 Apr 2018 12:21:56 -0700 Subject: [PATCH] Feature: Spark add output logs flag (#468) * add output flag to cluster submit * add output flag to cluster app-logs * add output flag to job get-app-logs * sort imports * make spinner context --- .../endpoints/cluster/cluster_add_user.py | 4 ++-- .../endpoints/cluster/cluster_app_logs.py | 20 ++++++++++++---- .../spark/endpoints/cluster/cluster_copy.py | 1 + .../spark/endpoints/cluster/cluster_create.py | 23 ++++++++----------- .../spark/endpoints/cluster/cluster_delete.py | 3 ++- .../spark/endpoints/cluster/cluster_get.py | 4 ++-- .../spark/endpoints/cluster/cluster_list.py | 3 ++- .../spark/endpoints/cluster/cluster_run.py | 4 +++- .../spark/endpoints/cluster/cluster_ssh.py | 10 ++++---- .../spark/endpoints/cluster/cluster_submit.py | 23 ++++++++++++++++--- aztk_cli/spark/endpoints/init.py | 3 ++- aztk_cli/spark/endpoints/job/delete.py | 3 ++- aztk_cli/spark/endpoints/job/get.py | 7 +++--- aztk_cli/spark/endpoints/job/get_app.py | 7 +++--- aztk_cli/spark/endpoints/job/get_app_logs.py | 17 +++++++++++--- aztk_cli/spark/endpoints/job/list.py | 7 +++--- aztk_cli/spark/endpoints/job/list_apps.py | 5 ++-- aztk_cli/spark/endpoints/job/stop.py | 7 +++--- aztk_cli/spark/endpoints/job/stop_app.py | 7 +++--- aztk_cli/spark/endpoints/job/submit.py | 5 ++-- aztk_cli/spark/endpoints/spark.py | 3 ++- aztk_cli/utils.py | 6 +++++ 22 files changed, 113 insertions(+), 59 deletions(-) diff --git a/aztk_cli/spark/endpoints/cluster/cluster_add_user.py b/aztk_cli/spark/endpoints/cluster/cluster_add_user.py index 5af9b0c5..436d28a3 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_add_user.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_add_user.py @@ -1,8 +1,8 @@ import argparse import typing + import aztk.spark -from aztk_cli import log -from aztk_cli import utils, config +from aztk_cli import config, log, utils def setup_parser(parser: argparse.ArgumentParser): diff --git a/aztk_cli/spark/endpoints/cluster/cluster_app_logs.py b/aztk_cli/spark/endpoints/cluster/cluster_app_logs.py index 8c4b6536..4a27be31 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_app_logs.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_app_logs.py @@ -1,8 +1,10 @@ import argparse +import os import typing + import aztk +from aztk_cli import config, utils -from aztk_cli import utils, config def setup_parser(parser: argparse.ArgumentParser): parser.add_argument('--id', @@ -14,7 +16,12 @@ def setup_parser(parser: argparse.ArgumentParser): required=True, help='The unique id of your job name') - parser.add_argument('--tail', dest='tail', action='store_true') + output_group = parser.add_mutually_exclusive_group() + + output_group.add_argument('--output', + help='Path to the file you wish to output to. If not \ + specified, output is printed to stdout') + output_group.add_argument('--tail', dest='tail', action='store_true') def execute(args: typing.NamedTuple): @@ -23,5 +30,10 @@ def execute(args: typing.NamedTuple): if args.tail: utils.stream_logs(client=spark_client, cluster_id=args.cluster_id, application_name=args.app_name) else: - app_logs = spark_client.get_application_log(cluster_id=args.cluster_id, application_name=args.app_name) - print(app_logs.log) + app_log = spark_client.get_application_log(cluster_id=args.cluster_id, application_name=args.app_name) + if args.output: + with utils.Spinner(): + with open(os.path.abspath(os.path.expanduser(args.output)), "w", encoding="UTF-8") as f: + f.write(app_log.log) + else: + print(app_log.log) diff --git a/aztk_cli/spark/endpoints/cluster/cluster_copy.py b/aztk_cli/spark/endpoints/cluster/cluster_copy.py index b82f50af..70e06d52 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_copy.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_copy.py @@ -1,5 +1,6 @@ import argparse import typing + import aztk.spark from aztk_cli import config diff --git a/aztk_cli/spark/endpoints/cluster/cluster_create.py b/aztk_cli/spark/endpoints/cluster/cluster_create.py index 6726b6d2..628a8834 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_create.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_create.py @@ -1,11 +1,11 @@ -import os import argparse +import os import typing + import aztk.spark from aztk.spark.models import ClusterConfiguration, UserConfiguration -from aztk_cli import log +from aztk_cli import config, log, utils from aztk_cli.config import load_aztk_spark_config -from aztk_cli import utils, config def setup_parser(parser: argparse.ArgumentParser): @@ -70,19 +70,14 @@ def execute(args: typing.NamedTuple): cluster_conf.user_configuration = None utils.print_cluster_conf(cluster_conf, wait) - spinner = utils.Spinner() - spinner.start() - - # create spark cluster - cluster = spark_client.create_cluster( - cluster_conf, - wait=wait - ) - - spinner.stop() + with utils.Spinner(): + # create spark cluster + cluster = spark_client.create_cluster( + cluster_conf, + wait=wait + ) if wait: log.info("Cluster %s created successfully.", cluster.id) else: log.info("Cluster %s is being provisioned.", cluster.id) - diff --git a/aztk_cli/spark/endpoints/cluster/cluster_delete.py b/aztk_cli/spark/endpoints/cluster/cluster_delete.py index f3230a9d..354ce69c 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_delete.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_delete.py @@ -1,7 +1,8 @@ import argparse import typing + import aztk -from aztk_cli import log, config +from aztk_cli import config, log def setup_parser(parser: argparse.ArgumentParser): diff --git a/aztk_cli/spark/endpoints/cluster/cluster_get.py b/aztk_cli/spark/endpoints/cluster/cluster_get.py index c611c4e9..583fd9a3 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_get.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_get.py @@ -1,8 +1,8 @@ import argparse import typing + import aztk -from aztk_cli import log -from aztk_cli import utils, config +from aztk_cli import config, log, utils def setup_parser(parser: argparse.ArgumentParser): diff --git a/aztk_cli/spark/endpoints/cluster/cluster_list.py b/aztk_cli/spark/endpoints/cluster/cluster_list.py index 5f1b57f5..6dd33900 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_list.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_list.py @@ -1,7 +1,8 @@ import argparse import typing + import aztk -from aztk_cli import utils, config +from aztk_cli import config, utils def setup_parser(_: argparse.ArgumentParser): diff --git a/aztk_cli/spark/endpoints/cluster/cluster_run.py b/aztk_cli/spark/endpoints/cluster/cluster_run.py index 6c214b23..e5298d26 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_run.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_run.py @@ -1,7 +1,9 @@ import argparse import typing + import aztk.spark -from aztk_cli import utils, config +from aztk_cli import config, utils + def setup_parser(parser: argparse.ArgumentParser): parser.add_argument('--id', diff --git a/aztk_cli/spark/endpoints/cluster/cluster_ssh.py b/aztk_cli/spark/endpoints/cluster/cluster_ssh.py index 514629e6..8c8a57b6 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_ssh.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_ssh.py @@ -1,11 +1,12 @@ import argparse import typing -from aztk_cli import log -from aztk_cli import utils, config -from aztk_cli.config import SshConfig -import aztk + import azure.batch.models.batch_error as batch_error + +import aztk from aztk.models import ClusterConfiguration +from aztk_cli import config, log, utils +from aztk_cli.config import SshConfig def setup_parser(parser: argparse.ArgumentParser): @@ -106,4 +107,3 @@ def print_plugin_ports(cluster_config: ClusterConfiguration): url = "{0}{1}".format(http_prefix, port.public_port) utils.log_property(label, url) - diff --git a/aztk_cli/spark/endpoints/cluster/cluster_submit.py b/aztk_cli/spark/endpoints/cluster/cluster_submit.py index b8d7177b..d5836364 100644 --- a/aztk_cli/spark/endpoints/cluster/cluster_submit.py +++ b/aztk_cli/spark/endpoints/cluster/cluster_submit.py @@ -1,8 +1,10 @@ import argparse +import os import sys import typing -from aztk_cli import utils, config, log + import aztk.spark +from aztk_cli import config, log, utils def setup_parser(parser: argparse.ArgumentParser): @@ -64,6 +66,10 @@ def setup_parser(parser: argparse.ArgumentParser): help='Number of times the Spark job may be retried \ if there is a failure') + parser.add_argument('--output', + help='Path to the file you wish to output to. If not \ + specified, output is printed to stdout') + parser.add_argument('app', help='App jar OR python file to execute. Use absolute \ path to reference file.') @@ -73,6 +79,9 @@ def setup_parser(parser: argparse.ArgumentParser): def execute(args: typing.NamedTuple): + if not args.wait and args.output: + raise aztk.error.AztkError("--output flag requires --wait flag") + spark_client = aztk.spark.Client(config.load_aztk_secrets()) jars = [] py_files = [] @@ -141,6 +150,14 @@ def execute(args: typing.NamedTuple): ) if args.wait: - exit_code = utils.stream_logs(client=spark_client, cluster_id=args.cluster_id, application_name=args.name) - sys.exit(exit_code) + if not args.output: + exit_code = utils.stream_logs(client=spark_client, cluster_id=args.cluster_id, application_name=args.name) + else: + with utils.Spinner(): + spark_client.wait_until_application_done(cluster_id=args.cluster_id, task_id=args.name) + application_log = spark_client.get_application_log(cluster_id=args.cluster_id, application_name=args.name) + with open(os.path.abspath(os.path.expanduser(args.output)), "w", encoding="UTF-8") as f: + f.write(application_log.log) + exit_code = application_log.exit_code + sys.exit(exit_code) diff --git a/aztk_cli/spark/endpoints/init.py b/aztk_cli/spark/endpoints/init.py index 4b7b9465..04313553 100644 --- a/aztk_cli/spark/endpoints/init.py +++ b/aztk_cli/spark/endpoints/init.py @@ -1,7 +1,8 @@ -import os import argparse +import os import typing from distutils.dir_util import copy_tree + import aztk.utils.constants as constants diff --git a/aztk_cli/spark/endpoints/job/delete.py b/aztk_cli/spark/endpoints/job/delete.py index bd9b931e..5d5cb5bd 100644 --- a/aztk_cli/spark/endpoints/job/delete.py +++ b/aztk_cli/spark/endpoints/job/delete.py @@ -1,7 +1,8 @@ import argparse import typing + import aztk.spark -from aztk_cli import log, config +from aztk_cli import config, log def setup_parser(parser: argparse.ArgumentParser): diff --git a/aztk_cli/spark/endpoints/job/get.py b/aztk_cli/spark/endpoints/job/get.py index 8715a1f8..026a3cc7 100644 --- a/aztk_cli/spark/endpoints/job/get.py +++ b/aztk_cli/spark/endpoints/job/get.py @@ -1,9 +1,10 @@ import argparse -import typing import time +import typing + import aztk.spark -from aztk_cli import config -from aztk_cli import utils +from aztk_cli import config, utils + def setup_parser(parser: argparse.ArgumentParser): parser.add_argument('--id', diff --git a/aztk_cli/spark/endpoints/job/get_app.py b/aztk_cli/spark/endpoints/job/get_app.py index 0ae25c82..1405432c 100644 --- a/aztk_cli/spark/endpoints/job/get_app.py +++ b/aztk_cli/spark/endpoints/job/get_app.py @@ -1,9 +1,10 @@ import argparse -import typing import time +import typing + import aztk.spark -from aztk_cli import config -from aztk_cli import utils +from aztk_cli import config, utils + def setup_parser(parser: argparse.ArgumentParser): parser.add_argument('--id', diff --git a/aztk_cli/spark/endpoints/job/get_app_logs.py b/aztk_cli/spark/endpoints/job/get_app_logs.py index 3967f8cb..d6c5e0cf 100644 --- a/aztk_cli/spark/endpoints/job/get_app_logs.py +++ b/aztk_cli/spark/endpoints/job/get_app_logs.py @@ -1,7 +1,10 @@ import argparse +import os import typing + import aztk.spark -from aztk_cli import utils, config +from aztk_cli import config, utils + def setup_parser(parser: argparse.ArgumentParser): parser.add_argument('--id', @@ -12,9 +15,17 @@ def setup_parser(parser: argparse.ArgumentParser): dest='app_name', required=True, help='The unique id of your job name') + parser.add_argument('--output', + help='Path to the file you wish to output to. If not \ + specified, output is printed to stdout') def execute(args: typing.NamedTuple): spark_client = aztk.spark.Client(config.load_aztk_secrets()) - app_logs = spark_client.get_job_application_log(args.job_id, args.app_name) - print(app_logs.log) + app_log = spark_client.get_job_application_log(args.job_id, args.app_name) + if args.output: + with utils.Spinner(): + with open(os.path.abspath(os.path.expanduser(args.output)), "w", encoding="UTF-8") as f: + f.write(app_log.log) + else: + print(app_log.log) diff --git a/aztk_cli/spark/endpoints/job/list.py b/aztk_cli/spark/endpoints/job/list.py index 52fd34b8..0be7541b 100644 --- a/aztk_cli/spark/endpoints/job/list.py +++ b/aztk_cli/spark/endpoints/job/list.py @@ -1,9 +1,10 @@ import argparse -import typing import time +import typing + import aztk.spark -from aztk_cli import config -from aztk_cli import utils +from aztk_cli import config, utils + def setup_parser(_: argparse.ArgumentParser): # No arguments for list yet diff --git a/aztk_cli/spark/endpoints/job/list_apps.py b/aztk_cli/spark/endpoints/job/list_apps.py index b70d8d20..6db5af97 100644 --- a/aztk_cli/spark/endpoints/job/list_apps.py +++ b/aztk_cli/spark/endpoints/job/list_apps.py @@ -1,8 +1,9 @@ import argparse import typing + import aztk.spark -from aztk_cli import config -from aztk_cli import utils +from aztk_cli import config, utils + def setup_parser(parser: argparse.ArgumentParser): parser.add_argument('--id', diff --git a/aztk_cli/spark/endpoints/job/stop.py b/aztk_cli/spark/endpoints/job/stop.py index bdcf2a89..d261d747 100644 --- a/aztk_cli/spark/endpoints/job/stop.py +++ b/aztk_cli/spark/endpoints/job/stop.py @@ -1,9 +1,10 @@ import argparse -import typing import time +import typing + import aztk.spark -from aztk_cli import config -from aztk_cli import utils +from aztk_cli import config, utils + def setup_parser(parser: argparse.ArgumentParser): parser.add_argument('--id', diff --git a/aztk_cli/spark/endpoints/job/stop_app.py b/aztk_cli/spark/endpoints/job/stop_app.py index aa06225a..da3e297c 100644 --- a/aztk_cli/spark/endpoints/job/stop_app.py +++ b/aztk_cli/spark/endpoints/job/stop_app.py @@ -1,10 +1,9 @@ import argparse -import typing import time +import typing + import aztk.spark -from aztk_cli import config -from aztk_cli import utils -from aztk_cli import log +from aztk_cli import config, log, utils def setup_parser(parser: argparse.ArgumentParser): diff --git a/aztk_cli/spark/endpoints/job/submit.py b/aztk_cli/spark/endpoints/job/submit.py index 6dd70990..c0b8999a 100644 --- a/aztk_cli/spark/endpoints/job/submit.py +++ b/aztk_cli/spark/endpoints/job/submit.py @@ -1,8 +1,9 @@ import argparse -import typing import time +import typing + import aztk.spark -from aztk_cli import config, utils, log +from aztk_cli import config, log, utils from aztk_cli.config import JobConfig, load_aztk_spark_config diff --git a/aztk_cli/spark/endpoints/spark.py b/aztk_cli/spark/endpoints/spark.py index d4d28b9e..39eae266 100644 --- a/aztk_cli/spark/endpoints/spark.py +++ b/aztk_cli/spark/endpoints/spark.py @@ -1,9 +1,10 @@ import argparse import typing +from . import init from .cluster import cluster from .job import job -from . import init + def setup_parser(parser: argparse.ArgumentParser): subparsers = parser.add_subparsers( diff --git a/aztk_cli/utils.py b/aztk_cli/utils.py index f39d837e..adee7f00 100644 --- a/aztk_cli/utils.py +++ b/aztk_cli/utils.py @@ -379,6 +379,12 @@ def __init__(self, delay=None): self.spinner_generator = self.spinning_cursor() if delay and float(delay): self.delay = delay + def __enter__(self): + return self.start() + + def __exit__(self, exc_type, exc_val, exc_tb): + return self.stop() + def spinner_task(self): while self.busy: sys.stdout.write(next(self.spinner_generator))