Skip to content
This repository has been archived by the owner on Jul 19, 2024. It is now read-only.

Commit

Permalink
Pyflyte meta inputs (flyteorg#1823)
Browse files Browse the repository at this point in the history
* Re-orgining pyflyte run

Signed-off-by: Ketan Umare <ketan.umare@gmail.com>

* Pyflyte beautified and simplified

Signed-off-by: Ketan Umare <ketan.umare@gmail.com>

* fixed unit test

Signed-off-by: Ketan Umare <ketan.umare@gmail.com>

* Added Launch options

Signed-off-by: Ketan Umare <ketan.umare@gmail.com>

* lint fix

Signed-off-by: Ketan Umare <ketan.umare@gmail.com>

* test fix

Signed-off-by: Ketan Umare <ketan.umare@gmail.com>

* fixing docs failure

Signed-off-by: Ketan Umare <ketan.umare@gmail.com>

---------

Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
Signed-off-by: Future Outlier <eric901201@gmai.com>
  • Loading branch information
kumare3 authored and Future Outlier committed Oct 3, 2023
1 parent f3368c9 commit d617ea6
Show file tree
Hide file tree
Showing 14 changed files with 1,224 additions and 945 deletions.
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import rich_click as click

from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context
from flytekit.clis.sdk_in_container.run import DateTimeType, DurationParamType
from flytekit.interaction.click_types import DateTimeType, DurationParamType

_backfill_help = """
The backfill command generates and registers a new workflow based on the input launchplan to run an
Expand Down
100 changes: 37 additions & 63 deletions flytekit/clis/sdk_in_container/build.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,30 @@
import os
import pathlib
import typing
from dataclasses import dataclass

import rich_click as click
from typing_extensions import OrderedDict

from flytekit.clis.sdk_in_container.constants import CTX_MODULE, CTX_PROJECT_ROOT
from flytekit.clis.sdk_in_container.run import RUN_LEVEL_PARAMS_KEY, get_entities_in_file, load_naive_entity
from flytekit.clis.sdk_in_container.constants import make_field
from flytekit.clis.sdk_in_container.run import RunCommand, RunLevelParams, WorkflowCommand
from flytekit.configuration import ImageConfig, SerializationSettings
from flytekit.core.base_task import PythonTask
from flytekit.core.workflow import PythonFunctionWorkflow
from flytekit.tools.script_mode import _find_project_root
from flytekit.tools.translator import get_serializable


def get_workflow_command_base_params() -> typing.List[click.Option]:
"""
Return the set of base parameters added to every pyflyte build workflow subcommand.
"""
return [
@dataclass
class BuildParams(RunLevelParams):

fast: bool = make_field(
click.Option(
param_decls=["--fast"],
required=False,
is_flag=True,
default=False,
show_default=True,
help="Use fast serialization. The image won't contain the source code. The value is false by default.",
),
]
)
)


def build_command(ctx: click.Context, entity: typing.Union[PythonFunctionWorkflow, PythonTask]):
Expand All @@ -37,84 +35,60 @@ def build_command(ctx: click.Context, entity: typing.Union[PythonFunctionWorkflo
def _build(*args, **kwargs):
m = OrderedDict()
options = None
run_level_params = ctx.obj[RUN_LEVEL_PARAMS_KEY]
build_params: BuildParams = ctx.obj

project, domain = run_level_params.get("project"), run_level_params.get("domain")
serialization_settings = SerializationSettings(
project=project,
domain=domain,
project=build_params.project,
domain=build_params.domain,
image_config=ImageConfig.auto_default_image(),
)
if not run_level_params.get("fast"):
serialization_settings.source_root = ctx.obj[RUN_LEVEL_PARAMS_KEY].get(CTX_PROJECT_ROOT)
if not build_params.fast:
serialization_settings.source_root = build_params.computed_params.project_root

_ = get_serializable(m, settings=serialization_settings, entity=entity, options=options)

return _build


class WorkflowCommand(click.MultiCommand):
class BuildWorkflowCommand(WorkflowCommand):
"""
click multicommand at the python file layer, subcommands should be all the workflows in the file.
"""

def __init__(self, filename: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self._filename = pathlib.Path(filename).resolve()

def list_commands(self, ctx):
entities = get_entities_in_file(self._filename.__str__(), False)
return entities.all()

def get_command(self, ctx, exe_entity):
"""
This command uses the filename with which this command was created, and the string name of the entity passed
after the Python filename on the command line, to load the Python object, and then return the Command that
click should run.
:param ctx: The click Context object.
:param exe_entity: string of the flyte entity provided by the user. Should be the name of a workflow, or task
function.
:return:
"""
rel_path = os.path.relpath(self._filename)
if rel_path.startswith(".."):
raise ValueError(
f"You must call pyflyte from the same or parent dir, {self._filename} not under {os.getcwd()}"
)

project_root = _find_project_root(self._filename)
rel_path = self._filename.relative_to(project_root)
module = os.path.splitext(rel_path)[0].replace(os.path.sep, ".")

ctx.obj[RUN_LEVEL_PARAMS_KEY][CTX_PROJECT_ROOT] = project_root
ctx.obj[RUN_LEVEL_PARAMS_KEY][CTX_MODULE] = module

entity = load_naive_entity(module, exe_entity, project_root)

def _create_command(
self,
ctx: click.Context,
entity_name: str,
run_level_params: RunLevelParams,
loaded_entity: typing.Any,
is_workflow: bool,
):
cmd = click.Command(
name=exe_entity,
callback=build_command(ctx, entity),
help=f"Build an image for {module}.{exe_entity}.",
name=entity_name,
callback=build_command(ctx, loaded_entity),
help=f"Build an image for {run_level_params.computed_params.module}.{entity_name}.",
)
return cmd


class BuildCommand(click.MultiCommand):
class BuildCommand(RunCommand):
"""
A click command group for building a image for flyte workflows & tasks in a file.
"""

def __init__(self, *args, **kwargs):
params = get_workflow_command_base_params()
super().__init__(*args, params=params, **kwargs)
params = BuildParams.options()
kwargs["params"] = params
super().__init__(*args, **kwargs)

def list_commands(self, ctx):
return [str(p) for p in pathlib.Path(".").glob("*.py") if str(p) != "__init__.py"]
def list_commands(self, ctx, *args, **kwargs):
return super().list_commands(ctx, add_remote=False)

def get_command(self, ctx, filename):
if ctx.obj:
ctx.obj[RUN_LEVEL_PARAMS_KEY] = ctx.params
return WorkflowCommand(filename, name=filename, help="Build an image for [workflow|task]")
if ctx.obj is None:
ctx.obj = {}
ctx.obj = BuildParams.from_dict(ctx.params)
return BuildWorkflowCommand(filename, name=filename, help=f"Build an image for [workflow|task] from {filename}")


_build_help = """
Expand Down
31 changes: 27 additions & 4 deletions flytekit/clis/sdk_in_container/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import typing
from dataclasses import Field, dataclass, field
from types import MappingProxyType

import click
import rich_click as _click

CTX_PROJECT = "project"
Expand All @@ -7,11 +12,29 @@
CTX_PACKAGES = "pkgs"
CTX_NOTIFICATIONS = "notifications"
CTX_CONFIG_FILE = "config_file"
CTX_PROJECT_ROOT = "project_root"
CTX_MODULE = "module"
CTX_VERBOSE = "verbose"
CTX_COPY_ALL = "copy_all"
CTX_FILE_NAME = "file_name"


def make_field(o: click.Option) -> Field:
if o.multiple:
o.help = click.style("Multiple values allowed.", bold=True) + f"{o.help}"
return field(default_factory=lambda: o.default, metadata={"click.option": o})
return field(default=o.default, metadata={"click.option": o})


def get_option_from_metadata(metadata: MappingProxyType) -> click.Option:
return metadata["click.option"]


@dataclass
class PyFlyteParams:
config_file: typing.Optional[str] = None
verbose: bool = False
pkgs: typing.List[str] = field(default_factory=list)

@classmethod
def from_dict(cls, d: typing.Dict[str, typing.Any]) -> "PyFlyteParams":
return cls(**d)


project_option = _click.option(
Expand Down
27 changes: 17 additions & 10 deletions flytekit/clis/sdk_in_container/helpers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import typing
from dataclasses import replace
from typing import Optional

Expand All @@ -11,6 +12,19 @@
FLYTE_REMOTE_INSTANCE_KEY = "flyte_remote"


def get_remote(cfg_file_path: typing.Optional[str], project: str, domain: str) -> FlyteRemote:
cfg_file = get_config_file(cfg_file_path)
if cfg_file is None:
cfg_obj = Config.for_sandbox()
cli_logger.info("No config files found, creating remote with sandbox config")
else:
cfg_obj = Config.auto(cfg_file_path)
cli_logger.info(
f"Creating remote with config {cfg_obj}" + (f" with file {cfg_file_path}" if cfg_file_path else "")
)
return FlyteRemote(cfg_obj, default_project=project, default_domain=domain)


def get_and_save_remote_with_click_context(
ctx: click.Context, project: str, domain: str, save: bool = True
) -> FlyteRemote:
Expand All @@ -24,17 +38,10 @@ def get_and_save_remote_with_click_context(
:param save: If false, will not mutate the context.obj dict
:return: FlyteRemote instance
"""
if ctx.obj.get(FLYTE_REMOTE_INSTANCE_KEY) is not None:
return ctx.obj[FLYTE_REMOTE_INSTANCE_KEY]
cfg_file_location = ctx.obj.get(CTX_CONFIG_FILE)
cfg_file = get_config_file(cfg_file_location)
if cfg_file is None:
cfg_obj = Config.for_sandbox()
cli_logger.info("No config files found, creating remote with sandbox config")
else:
cfg_obj = Config.auto(cfg_file_location)
cli_logger.info(
f"Creating remote with config {cfg_obj}" + (f" with file {cfg_file_location}" if cfg_file_location else "")
)
r = FlyteRemote(cfg_obj, default_project=project, default_domain=domain)
r = get_remote(cfg_file_location, project, domain)
if save:
ctx.obj[FLYTE_REMOTE_INSTANCE_KEY] = r
return r
Expand Down
69 changes: 1 addition & 68 deletions flytekit/clis/sdk_in_container/pyflyte.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import os
import typing

import grpc
import rich_click as click
from google.protobuf.json_format import MessageToJson

from flytekit import configuration
from flytekit.clis.sdk_in_container.backfill import backfill
Expand All @@ -18,77 +16,12 @@
from flytekit.clis.sdk_in_container.run import run
from flytekit.clis.sdk_in_container.serialize import serialize
from flytekit.clis.sdk_in_container.serve import serve
from flytekit.clis.sdk_in_container.utils import ErrorHandlingCommand, validate_package
from flytekit.configuration.file import FLYTECTL_CONFIG_ENV_VAR, FLYTECTL_CONFIG_ENV_VAR_OVERRIDE
from flytekit.configuration.internal import LocalSDK
from flytekit.exceptions.base import FlyteException
from flytekit.exceptions.user import FlyteInvalidInputException
from flytekit.loggers import cli_logger


def validate_package(ctx, param, values):
pkgs = []
for val in values:
if "/" in val or "-" in val or "\\" in val:
raise click.BadParameter(
f"Illegal package value {val} for parameter: {param}. Expected for the form [a.b.c]"
)
elif "," in val:
pkgs.extend(val.split(","))
else:
pkgs.append(val)
cli_logger.debug(f"Using packages: {pkgs}")
return pkgs


def pretty_print_grpc_error(e: grpc.RpcError):
if isinstance(e, grpc._channel._InactiveRpcError): # noqa
click.secho(f"RPC Failed, with Status: {e.code()}", fg="red", bold=True)
click.secho(f"\tdetails: {e.details()}", fg="magenta", bold=True)
click.secho(f"\tDebug string {e.debug_error_string()}", dim=True)
return


def pretty_print_exception(e: Exception):
if isinstance(e, click.exceptions.Exit):
raise e

if isinstance(e, click.ClickException):
click.secho(e.message, fg="red")
raise e

if isinstance(e, FlyteException):
click.secho(f"Failed with Exception Code: {e._ERROR_CODE}", fg="red") # noqa
if isinstance(e, FlyteInvalidInputException):
click.secho("Request rejected by the API, due to Invalid input.", fg="red")
click.secho(f"\tInput Request: {MessageToJson(e.request)}", dim=True)

cause = e.__cause__
if cause:
if isinstance(cause, grpc.RpcError):
pretty_print_grpc_error(cause)
else:
click.secho(f"Underlying Exception: {cause}")
return

if isinstance(e, grpc.RpcError):
pretty_print_grpc_error(e)
return

click.secho(f"Failed with Unknown Exception {type(e)} Reason: {e}", fg="red") # noqa


class ErrorHandlingCommand(click.RichGroup):
def invoke(self, ctx: click.Context) -> typing.Any:
try:
return super().invoke(ctx)
except Exception as e:
if CTX_VERBOSE in ctx.obj and ctx.obj[CTX_VERBOSE]:
click.secho("Verbose mode on")
raise e
pretty_print_exception(e)
raise SystemExit(e) from e


@click.group("pyflyte", invoke_without_command=True, cls=ErrorHandlingCommand)
@click.option(
"--verbose", required=False, default=False, is_flag=True, help="Show verbose messages and exception traces"
Expand Down
Loading

0 comments on commit d617ea6

Please sign in to comment.