Skip to content
This repository has been archived by the owner on Jul 19, 2024. It is now read-only.

Commit

Permalink
Pyflyte register optionally activates schedule (flyteorg#1832)
Browse files Browse the repository at this point in the history
* Pyflyte register auto activates schedule

Signed-off-by: Ketan Umare <ketan.umare@gmail.com>

* comment addressed

Signed-off-by: Ketan Umare <ketan.umare@gmail.com>

---------

Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
Signed-off-by: Future Outlier <eric901201@gmai.com>
  • Loading branch information
kumare3 authored and Future Outlier committed Oct 3, 2023
1 parent 0bea129 commit 18bd35f
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 119 deletions.
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import rich_click as click
from typing_extensions import OrderedDict

from flytekit.clis.sdk_in_container.constants import make_field
from flytekit.clis.sdk_in_container.run import RunCommand, RunLevelParams, WorkflowCommand
from flytekit.clis.sdk_in_container.utils import make_field
from flytekit.configuration import ImageConfig, SerializationSettings
from flytekit.core.base_task import PythonTask
from flytekit.core.workflow import PythonFunctionWorkflow
Expand Down
52 changes: 0 additions & 52 deletions flytekit/clis/sdk_in_container/constants.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
import typing
from dataclasses import Field, dataclass, field
from types import MappingProxyType

import click
import rich_click as _click

CTX_PROJECT = "project"
CTX_DOMAIN = "domain"
CTX_VERSION = "version"
Expand All @@ -13,48 +6,3 @@
CTX_NOTIFICATIONS = "notifications"
CTX_CONFIG_FILE = "config_file"
CTX_VERBOSE = "verbose"


def make_field(o: click.Option) -> Field:
if o.multiple:
o.help = click.style("Multiple values allowed.", bold=True) + f"{o.help}"
return field(default_factory=lambda: o.default, metadata={"click.option": o})
return field(default=o.default, metadata={"click.option": o})


def get_option_from_metadata(metadata: MappingProxyType) -> click.Option:
return metadata["click.option"]


@dataclass
class PyFlyteParams:
config_file: typing.Optional[str] = None
verbose: bool = False
pkgs: typing.List[str] = field(default_factory=list)

@classmethod
def from_dict(cls, d: typing.Dict[str, typing.Any]) -> "PyFlyteParams":
return cls(**d)


project_option = _click.option(
"-p",
"--project",
required=True,
type=str,
help="Flyte project to use. You can have more than one project per repo",
)
domain_option = _click.option(
"-d",
"--domain",
required=True,
type=str,
help="This is usually development, staging, or production",
)
version_option = _click.option(
"-v",
"--version",
required=False,
type=str,
help="This is the version to apply globally for this context",
)
57 changes: 26 additions & 31 deletions flytekit/clis/sdk_in_container/launchplan.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import rich_click as click
from rich.progress import Progress

from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context
from flytekit.clis.sdk_in_container.utils import domain_option_dec, project_option_dec
from flytekit.models.launch_plan import LaunchPlanState

_launchplan_help = """
Expand All @@ -13,22 +15,8 @@


@click.command("launchplan", help=_launchplan_help)
@click.option(
"-p",
"--project",
required=False,
type=str,
default="flytesnacks",
help="Fecth launchplan from this project",
)
@click.option(
"-d",
"--domain",
required=False,
type=str,
default="development",
help="Fetch launchplan from this domain",
)
@project_option_dec
@domain_option_dec
@click.option(
"--activate/--deactivate",
required=True,
Expand Down Expand Up @@ -57,18 +45,25 @@ def launchplan(
launchplan_version: str,
):
remote = get_and_save_remote_with_click_context(ctx, project, domain)
try:
launchplan = remote.fetch_launch_plan(
project=project,
domain=domain,
name=launchplan,
version=launchplan_version,
)
state = LaunchPlanState.ACTIVE if activate else LaunchPlanState.INACTIVE
remote.client.update_launch_plan(id=launchplan.id, state=state)
click.secho(
f"\n Launchplan was set to {LaunchPlanState.enum_to_string(state)}: {launchplan.name}:{launchplan.id.version}",
fg="green",
)
except StopIteration as e:
click.secho(f"{e.value}", fg="red")
with Progress() as progress:
t1 = progress.add_task(f"[cyan] {'Activating' if activate else 'Deactivating'}...", total=1)
try:
progress.start_task(t1)
launchplan = remote.fetch_launch_plan(
project=project,
domain=domain,
name=launchplan,
version=launchplan_version,
)
progress.advance(t1)

state = LaunchPlanState.ACTIVE if activate else LaunchPlanState.INACTIVE
remote.client.update_launch_plan(id=launchplan.id, state=state)
progress.advance(t1)
progress.update(t1, completed=True, visible=False)
click.secho(
f"\n Launchplan was set to {LaunchPlanState.enum_to_string(state)}: {launchplan.name}:{launchplan.id.version}",
fg="green",
)
except StopIteration as e:
click.secho(f"{e.value}", fg="red")
20 changes: 12 additions & 8 deletions flytekit/clis/sdk_in_container/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from flytekit.clis.helpers import display_help_with_error
from flytekit.clis.sdk_in_container import constants
from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context, patch_image_config
from flytekit.clis.sdk_in_container.utils import domain_option_dec, project_option_dec
from flytekit.configuration import ImageConfig
from flytekit.configuration.default_images import DefaultImages
from flytekit.loggers import cli_logger
Expand All @@ -27,14 +28,8 @@


@click.command("register", help=_register_help)
@click.option(
"-p",
"--project",
required=False,
type=str,
default="flytesnacks",
help="Project to register and run this workflow in",
)
@project_option_dec
@domain_option_dec
@click.option(
"-d",
"--domain",
Expand Down Expand Up @@ -113,6 +108,13 @@
is_flag=True,
help="Execute registration in dry-run mode. Skips actual registration to remote",
)
@click.option(
"--activate-launchplans",
"--activate-launchplan",
default=False,
is_flag=True,
help="Activate newly registered Launchplans. This operation deactivates previous versions of Launchplans.",
)
@click.argument("package-or-module", type=click.Path(exists=True, readable=True, resolve_path=True), nargs=-1)
@click.pass_context
def register(
Expand All @@ -129,6 +131,7 @@ def register(
non_fast: bool,
package_or_module: typing.Tuple[str],
dry_run: bool,
activate_launchplans: bool,
):
"""
see help
Expand Down Expand Up @@ -179,6 +182,7 @@ def register(
package_or_module=package_or_module,
remote=remote,
dry_run=dry_run,
activate_launchplans=activate_launchplans,
)
except Exception as e:
raise e
34 changes: 10 additions & 24 deletions flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,15 @@
from rich.progress import Progress

from flytekit import Annotations, FlyteContext, Labels, Literal
from flytekit.clis.sdk_in_container.constants import PyFlyteParams, get_option_from_metadata, make_field
from flytekit.clis.sdk_in_container.helpers import get_remote, patch_image_config
from flytekit.clis.sdk_in_container.utils import pretty_print_exception
from flytekit.clis.sdk_in_container.utils import (
PyFlyteParams,
domain_option,
get_option_from_metadata,
make_field,
pretty_print_exception,
project_option,
)
from flytekit.configuration import DefaultImages, ImageConfig
from flytekit.core import context_manager
from flytekit.core.base_task import PythonTask
Expand Down Expand Up @@ -53,28 +59,8 @@ class RunLevelParams(PyFlyteParams):
This class is used to store the parameters that are used to run a workflow / task / launchplan.
"""

project: str = make_field(
click.Option(
param_decls=["-p", "--project"],
required=False,
type=str,
default=os.getenv("FLYTE_DEFAULT_PROJECT", "flytesnacks"),
show_default=True,
help="Project to register and run this workflow in. Can also be set through envvar "
"``FLYTE_DEFAULT_PROJECT``",
)
)
domain: str = make_field(
click.Option(
param_decls=["-d", "--domain"],
required=False,
type=str,
default=os.getenv("FLYTE_DEFAULT_DOMAIN", "development"),
show_default=True,
help="Domain to register and run this workflow in, can also be set through envvar "
"``FLYTE_DEFAULT_DOMAIN``",
)
)
project: str = make_field(project_option)
domain: str = make_field(domain_option)
destination_dir: str = make_field(
click.Option(
param_decls=["--destination-dir", "destination_dir"],
Expand Down
63 changes: 63 additions & 0 deletions flytekit/clis/sdk_in_container/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import os
import typing
from dataclasses import Field, dataclass, field
from types import MappingProxyType

import grpc
import rich_click as click
Expand All @@ -9,6 +12,44 @@
from flytekit.exceptions.user import FlyteInvalidInputException
from flytekit.loggers import cli_logger

project_option = click.Option(
param_decls=["-p", "--project"],
required=False,
type=str,
default=os.getenv("FLYTE_DEFAULT_PROJECT", "flytesnacks"),
show_default=True,
help="Project to register and run this workflow in. Can also be set through envvar " "``FLYTE_DEFAULT_PROJECT``",
)

domain_option = click.Option(
param_decls=["-d", "--domain"],
required=False,
type=str,
default=os.getenv("FLYTE_DEFAULT_DOMAIN", "development"),
show_default=True,
help="Domain to register and run this workflow in, can also be set through envvar " "``FLYTE_DEFAULT_DOMAIN``",
)

project_option_dec = click.option(
"-p",
"--project",
required=False,
type=str,
default=os.getenv("FLYTE_DEFAULT_PROJECT", "flytesnacks"),
show_default=True,
help="Project for workflow/launchplan. Can also be set through envvar " "``FLYTE_DEFAULT_PROJECT``",
)

domain_option_dec = click.option(
"-d",
"--domain",
required=False,
type=str,
default=os.getenv("FLYTE_DEFAULT_DOMAIN", "development"),
show_default=True,
help="Domain for workflow/launchplan, can also be set through envvar " "``FLYTE_DEFAULT_DOMAIN``",
)


def validate_package(ctx, param, values):
"""
Expand Down Expand Up @@ -87,3 +128,25 @@ def invoke(self, ctx: click.Context) -> typing.Any:
raise e
pretty_print_exception(e)
raise SystemExit(e) from e


def make_field(o: click.Option) -> Field:
if o.multiple:
o.help = click.style("Multiple values allowed.", bold=True) + f"{o.help}"
return field(default_factory=lambda: o.default, metadata={"click.option": o})
return field(default=o.default, metadata={"click.option": o})


def get_option_from_metadata(metadata: MappingProxyType) -> click.Option:
return metadata["click.option"]


@dataclass
class PyFlyteParams:
config_file: typing.Optional[str] = None
verbose: bool = False
pkgs: typing.List[str] = field(default_factory=list)

@classmethod
def from_dict(cls, d: typing.Dict[str, typing.Any]) -> "PyFlyteParams":
return cls(**d)
7 changes: 7 additions & 0 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
NotificationList,
WorkflowExecutionGetDataResponse,
)
from flytekit.models.launch_plan import LaunchPlanState
from flytekit.models.literals import Literal, LiteralMap
from flytekit.remote.backfill import create_backfill_workflow
from flytekit.remote.entities import FlyteLaunchPlan, FlyteNode, FlyteTask, FlyteTaskNode, FlyteWorkflow
Expand Down Expand Up @@ -1957,3 +1958,9 @@ def get_extra_headers_for_protocol(native_url):
if native_url.startswith("abfs://"):
return {"x-ms-blob-type": "BlockBlob"}
return {}

def activate_launchplan(self, ident: Identifier):
"""
Given a launchplan, activate it, all previous versions are deactivated.
"""
self.client.update_launch_plan(id=ident, state=LaunchPlanState.ACTIVE)
16 changes: 13 additions & 3 deletions flytekit/tools/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def load_packages_and_modules(
return registrable_entities


def secho(i: Identifier, state: str = "success", reason: str = None):
def secho(i: Identifier, state: str = "success", reason: str = None, op: str = "Registration"):
state_ind = "[ ]"
fg = "white"
nl = False
Expand All @@ -198,7 +198,7 @@ def secho(i: Identifier, state: str = "success", reason: str = None):
nl = True
reason = "skipped!"
click.secho(
click.style(f"{state_ind}", fg=fg) + f" Registration {i.name} type {i.resource_type_name()} {reason}",
click.style(f"{state_ind}", fg=fg) + f" {op} {i.name} type {i.resource_type_name()} {reason}",
dim=True,
nl=nl,
)
Expand All @@ -218,6 +218,7 @@ def register(
package_or_module: typing.Tuple[str],
remote: FlyteRemote,
dry_run: bool = False,
activate_launchplans: bool = False,
):
detected_root = find_common_root(package_or_module)
click.secho(f"Detected Root {detected_root}, using this to create deployable package...", fg="yellow")
Expand Down Expand Up @@ -262,14 +263,23 @@ def register(
return

for cp_entity in registrable_entities:
og_id = cp_entity.id if isinstance(cp_entity, launch_plan.LaunchPlan) else cp_entity.template.id
is_lp = False
if isinstance(cp_entity, launch_plan.LaunchPlan):
og_id = cp_entity.id
is_lp = True
else:
og_id = cp_entity.template.id
secho(og_id, "")
try:
if not dry_run:
i = remote.raw_register(
cp_entity, serialization_settings, version=version, create_default_launchplan=False
)
secho(i)
if is_lp and activate_launchplans:
secho(og_id, "", op="Activation")
remote.activate_launchplan(i)
secho(i, reason="activated", op="Activation")
else:
secho(og_id, reason="Dry run Mode!")
except RegistrationSkipped:
Expand Down

0 comments on commit 18bd35f

Please sign in to comment.