Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Feature: Support passing of remote executables via aztk spark cluster…
Browse files Browse the repository at this point in the history
… submit (#549)
  • Loading branch information
lachiemurray authored and timotheeguerin committed May 24, 2018
1 parent 66037fd commit f6735cc
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 18 deletions.
4 changes: 2 additions & 2 deletions aztk/node_scripts/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def __app_submit_cmd(
spark_submit_cmd.add_option('--executor-cores', str(executor_cores))

spark_submit_cmd.add_argument(
os.environ['AZ_BATCH_TASK_WORKING_DIR'] + '/' + app + ' ' +
os.path.expandvars(app) + ' ' +
' '.join(['\'' + str(app_arg) + '\'' for app_arg in (app_args or [])]))

with open("spark-submit.txt", mode="w", encoding="UTF-8") as stream:
Expand Down Expand Up @@ -156,7 +156,7 @@ def recieve_submit_request(application_file_path):

cmd = __app_submit_cmd(
name=application['name'],
app=os.path.basename(application['application']),
app=application['application'],
app_args=application['application_args'],
main_class=application['main_class'],
jars=application['jars'],
Expand Down
4 changes: 2 additions & 2 deletions aztk/spark/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ def get_remote_login_settings(self, cluster_id: str, node_id: str):
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

def submit(self, cluster_id: str, application: models.ApplicationConfiguration, wait: bool = False):
def submit(self, cluster_id: str, application: models.ApplicationConfiguration, remote: bool = False, wait: bool = False):
try:
cluster_submit_helper.submit_application(self, cluster_id, application, wait)
cluster_submit_helper.submit_application(self, cluster_id, application, remote, wait)
except batch_error.BatchErrorException as e:
raise error.AztkError(helpers.format_batch_exception(e))

Expand Down
25 changes: 14 additions & 11 deletions aztk/spark/helpers/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@ def __get_node(spark_client, node_id: str, cluster_id: str) -> batch_models.Comp
return spark_client.batch_client.compute_node.get(cluster_id, node_id)


def generate_task(spark_client, container_id, application):
def generate_task(spark_client, container_id, application, remote=False):
resource_files = []

app_resource_file = helpers.upload_file_to_container(container_name=container_id,
application_name=application.name,
file_path=application.application,
blob_client=spark_client.blob_client,
use_full_path=False)
# The application provided is not hosted remotely and therefore must be uploaded
if not remote:
app_resource_file = helpers.upload_file_to_container(container_name=container_id,
application_name=application.name,
file_path=application.application,
blob_client=spark_client.blob_client,
use_full_path=False)

# Upload application file
resource_files.append(app_resource_file)
# Upload application file
resource_files.append(app_resource_file)

application.application = '$AZ_BATCH_TASK_WORKING_DIR/' + os.path.basename(application.application)

# Upload dependent JARS
jar_resource_file_paths = []
Expand Down Expand Up @@ -64,7 +68,6 @@ def generate_task(spark_client, container_id, application):
resource_files.append(files_resource_file_path)

# Upload application definition
application.application = os.path.basename(application.application)
application.jars = [os.path.basename(jar) for jar in application.jars]
application.py_files = [os.path.basename(py_files) for py_files in application.py_files]
application.files = [os.path.basename(files) for files in application.files]
Expand Down Expand Up @@ -112,11 +115,11 @@ def affinitize_task_to_master(spark_client, cluster_id, task):
return task


def submit_application(spark_client, cluster_id, application, wait: bool = False):
def submit_application(spark_client, cluster_id, application, remote: bool = False, wait: bool = False):
"""
Submit a spark app
"""
task = generate_task(spark_client, cluster_id, application)
task = generate_task(spark_client, cluster_id, application, remote)
task = affinitize_task_to_master(spark_client, cluster_id, task)


Expand Down
12 changes: 10 additions & 2 deletions aztk_cli/spark/endpoints/cluster/cluster_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,16 @@ def setup_parser(parser: argparse.ArgumentParser):
help='Path to the file you wish to output to. If not \
specified, output is printed to stdout')

parser.add_argument('--remote', action='store_true',
help='Do not upload the app to the cluster, assume it is \
already accessible at the given path')

parser.add_argument('app',
help='App jar OR python file to execute. Use absolute \
path to reference file.')
help='App jar OR python file to execute. A path to a local \
file is expected, unless used in conjunction with \
the --remote flag. When the --remote flag is set, a \
remote path that is accessible from the cluster is \
expected. Remote paths are not validated up-front.')

parser.add_argument('app_args', nargs='*',
help='Arguments for the application')
Expand Down Expand Up @@ -146,6 +153,7 @@ def execute(args: typing.NamedTuple):
executor_cores=args.executor_cores,
max_retry_count=args.max_retry_count
),
remote=args.remote,
wait=False
)

Expand Down
7 changes: 6 additions & 1 deletion docs/20-spark-submit.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ Run a Spark job:
aztk spark cluster submit --id <name_of_spark_cluster> --name <name_of_spark_job> <executable> <executable_params>
```

For example, run a local pi.py file on a Spark cluster
For example, to run a local pi.py file on a Spark cluster, simply specify the local path of the file:
```sh
aztk spark cluster submit --id spark --name pipy examples/src/main/python/pi.py 100
```

To run a remotely hosted pi.py file on a Spark cluster, specify the remote path of the file and use the '--remote' flag:
```sh
aztk spark cluster submit --id spark --name pipy --remote wasbs://path@remote/pi.py 100
```

NOTE: The job name (--name) must be atleast 3 characters long, can only contain alphanumeric characters including hyphens but excluding underscores, and cannot contain uppercase letters. Each job you submit **must** have a unique name.

## Monitoring job
Expand Down

0 comments on commit f6735cc

Please sign in to comment.