From fef9faa10f0305cf902e5f15cbbda2889e6224a0 Mon Sep 17 00:00:00 2001 From: Frost Ming Date: Sun, 1 Sep 2024 08:46:53 +0800 Subject: [PATCH] feat: remote development (#4942) * feat: remote development Signed-off-by: Frost Ming * feat: tail pod logs Signed-off-by: Frost Ming * fix: change the color order Signed-off-by: Frost Ming * fix: add cluster name Signed-off-by: Frost Ming * feat: connect to existing dev deployment Signed-off-by: Frost Ming * feat: develop command Signed-off-by: Frost Ming * fix: reload service when importing Signed-off-by: Frost Ming * fix: upload models for bare push bento Signed-off-by: Frost Ming --------- Signed-off-by: Frost Ming --- src/_bentoml_impl/loader.py | 1 - src/_bentoml_impl/server/serving.py | 1 - src/bentoml/_internal/bento/bento.py | 161 +++-- src/bentoml/_internal/cloud/base.py | 23 +- src/bentoml/_internal/cloud/bentocloud.py | 10 +- src/bentoml/_internal/cloud/client.py | 33 +- src/bentoml/_internal/cloud/deployment.py | 615 +++++++++++++----- .../_internal/cloud/schemas/modelschemas.py | 15 +- .../_internal/cloud/schemas/schemasv2.py | 52 +- src/bentoml/_internal/service/loader.py | 4 + src/bentoml/_internal/utils/__init__.py | 27 + .../utils/circus/watchfilesplugin.py | 11 +- src/bentoml/bentos.py | 13 +- src/bentoml_cli/cli.py | 2 + src/bentoml_cli/deployment.py | 100 ++- src/bentoml_cli/utils.py | 13 +- 16 files changed, 823 insertions(+), 258 deletions(-) diff --git a/src/_bentoml_impl/loader.py b/src/_bentoml_impl/loader.py index a93ceb9de50..b25b9761974 100644 --- a/src/_bentoml_impl/loader.py +++ b/src/_bentoml_impl/loader.py @@ -163,7 +163,6 @@ def import_service( assert ( module_name and attrs_str ), f'Invalid import target "{service_identifier}", must format as ":"' - module = importlib.import_module(module_name) root_service_name, _, depend_path = attrs_str.partition(".") root_service = t.cast("Service[t.Any]", getattr(module, root_service_name)) diff --git a/src/_bentoml_impl/server/serving.py b/src/_bentoml_impl/server/serving.py index d652305115f..ea05f250c7e 100644 --- a/src/_bentoml_impl/server/serving.py +++ b/src/_bentoml_impl/server/serving.py @@ -332,7 +332,6 @@ def serve_http( if development_mode: arbiter_kwargs["debug"] = True arbiter_kwargs["loggerconfig"] = SERVER_LOGGING_CONFIG - arbiter_kwargs["loglevel"] = "WARNING" arbiter = create_standalone_arbiter(**arbiter_kwargs) arbiter.exit_stack.enter_context( diff --git a/src/bentoml/_internal/bento/bento.py b/src/bentoml/_internal/bento/bento.py index e1234a3015c..ba7c843fb1f 100644 --- a/src/bentoml/_internal/bento/bento.py +++ b/src/bentoml/_internal/bento/bento.py @@ -53,6 +53,7 @@ from _bentoml_sdk import Service as NewService from _bentoml_sdk.service import ServiceConfig + from ..cloud.schemas.modelschemas import BentoManifestSchema from ..models import Model as StoredModel from ..service import Service from ..service.inference_api import InferenceAPI @@ -176,6 +177,49 @@ def _fs(self) -> FS: def info(self) -> BentoInfo: return self._info + @property + def entry_service(self) -> str: + return self.info.entry_service + + def get_manifest(self) -> BentoManifestSchema: + from ..cloud.schemas.modelschemas import BentoManifestSchema + from ..cloud.schemas.modelschemas import BentoRunnerResourceSchema + from ..cloud.schemas.modelschemas import BentoRunnerSchema + + info = self.info + models = [str(m.tag) for m in info.all_models] + runners = [ + BentoRunnerSchema( + name=r.name, + runnable_type=r.runnable_type, + models=r.models, + resource_config=( + BentoRunnerResourceSchema( + cpu=r.resource_config.get("cpu"), + nvidia_gpu=r.resource_config.get("nvidia.com/gpu"), + custom_resources=r.resource_config.get("custom_resources"), + ) + if r.resource_config + else None + ), + ) + for r in info.runners + ] + return BentoManifestSchema( + name=info.name, + entry_service=info.entry_service, + service=info.service, + bentoml_version=info.bentoml_version, + apis={}, + models=models, + runners=runners, + size_bytes=self.total_size(), + services=info.services, + envs=info.envs, + schema=info.schema, + version=info.version, + ) + @classmethod @inject def create( @@ -184,6 +228,8 @@ def create( version: t.Optional[str] = None, build_ctx: t.Optional[str] = None, platform: t.Optional[str] = None, + bare: bool = False, + reload: bool = False, ) -> Bento: from _bentoml_sdk.models import BentoModel @@ -203,7 +249,10 @@ def create( BentoMLContainer.model_aliases.set(build_config.model_aliases) # This also verifies that svc can be imported correctly svc = import_service( - build_config.service, working_dir=build_ctx, standalone_load=True + build_config.service, + working_dir=build_ctx, + reload=reload, + standalone_load=True, ) is_legacy = isinstance(svc, Service) # Apply default build options @@ -229,13 +278,10 @@ def create( logger.debug( 'Building BentoML service "%s" from build context "%s".', tag, build_ctx ) - bento_fs = TempFS( identifier=f"bentoml_bento_{bento_name}", temp_dir=BentoMLContainer.tmp_bento_store_dir.get(), ) - ctx_fs = fs.open_fs(encode_path_for_uri(build_ctx)) - models: list[BentoModelInfo] = [] def append_model(model: BentoModelInfo) -> None: @@ -256,57 +302,60 @@ def append_model(model: BentoModelInfo) -> None: for model in runner.models: append_model(BentoModel(model.tag).to_info()) - # create ignore specs - specs = BentoPathSpec(build_config.include, build_config.exclude) + if not bare: + ctx_fs = fs.open_fs(encode_path_for_uri(build_ctx)) - # Copy all files base on include and exclude, into `src` directory - relpaths = [s for s in build_config.include if s.startswith("../")] - if len(relpaths) != 0: - raise InvalidArgument( - "Paths outside of the build context directory cannot be included; use a symlink or copy those files into the working directory manually." - ) - bento_fs.makedir(BENTO_PROJECT_DIR_NAME) - target_fs = bento_fs.opendir(BENTO_PROJECT_DIR_NAME) - with target_fs.open(DEFAULT_BENTO_BUILD_FILE, "w") as bentofile_yaml: - build_config.to_yaml(bentofile_yaml) - ignore_specs = list(specs.from_path(build_ctx)) - - for dir_path, _, files in ctx_fs.walk(): - for f in files: - path = fs.path.combine(dir_path, f.name).lstrip("/") - if specs.includes(path, recurse_exclude_spec=ignore_specs): - if ctx_fs.getsize(path) > 10 * 1024 * 1024: - logger.warn("File size is larger than 10MiB: %s", path) - target_fs.makedirs(dir_path, recreate=True) - copy_file(ctx_fs, path, target_fs, path) - - # NOTE: we need to generate both Python and Conda - # first to make sure we can generate the Dockerfile correctly. - build_config.python.write_to_bento(bento_fs, build_ctx, platform_=platform) - build_config.conda.write_to_bento(bento_fs, build_ctx) - build_config.docker.write_to_bento(bento_fs, build_ctx, build_config.conda) - - # Create `readme.md` file - if build_config.description is None: - with bento_fs.open(BENTO_README_FILENAME, "w", encoding="utf-8") as f: - f.write(get_default_svc_readme(svc, svc_version=tag.version)) - else: - if build_config.description.startswith("file:"): - file_name = build_config.description[5:].strip() - copy_file_to_fs_folder( - file_name, bento_fs, dst_filename=BENTO_README_FILENAME + # create ignore specs + specs = BentoPathSpec(build_config.include, build_config.exclude) + + # Copy all files base on include and exclude, into `src` directory + relpaths = [s for s in build_config.include if s.startswith("../")] + if len(relpaths) != 0: + raise InvalidArgument( + "Paths outside of the build context directory cannot be included; use a symlink or copy those files into the working directory manually." ) + bento_fs.makedir(BENTO_PROJECT_DIR_NAME) + target_fs = bento_fs.opendir(BENTO_PROJECT_DIR_NAME) + with target_fs.open(DEFAULT_BENTO_BUILD_FILE, "w") as bentofile_yaml: + build_config.to_yaml(bentofile_yaml) + ignore_specs = list(specs.from_path(build_ctx)) + + for dir_path, _, files in ctx_fs.walk(): + for f in files: + path = fs.path.combine(dir_path, f.name).lstrip("/") + if specs.includes(path, recurse_exclude_spec=ignore_specs): + if ctx_fs.getsize(path) > 10 * 1024 * 1024: + logger.warn("File size is larger than 10MiB: %s", path) + target_fs.makedirs(dir_path, recreate=True) + copy_file(ctx_fs, path, target_fs, path) + + # NOTE: we need to generate both Python and Conda + # first to make sure we can generate the Dockerfile correctly. + build_config.python.write_to_bento(bento_fs, build_ctx, platform=platform) + build_config.conda.write_to_bento(bento_fs, build_ctx) + build_config.docker.write_to_bento(bento_fs, build_ctx, build_config.conda) + + # Create `readme.md` file + if build_config.description is None: + with bento_fs.open(BENTO_README_FILENAME, "w", encoding="utf-8") as f: + f.write(get_default_svc_readme(svc, svc_version=tag.version)) else: - with bento_fs.open(BENTO_README_FILENAME, "w") as f: - f.write(build_config.description) - - # Create 'apis/openapi.yaml' file - bento_fs.makedir("apis") - with bento_fs.open(fs.path.combine("apis", "openapi.yaml"), "w") as f: - yaml.dump(svc.openapi_spec, f) - if not is_legacy: - with bento_fs.open(fs.path.combine("apis", "schema.json"), "w") as f: - json.dump(svc.schema(), f, indent=2) + if build_config.description.startswith("file:"): + file_name = build_config.description[5:].strip() + copy_file_to_fs_folder( + file_name, bento_fs, dst_filename=BENTO_README_FILENAME + ) + else: + with bento_fs.open(BENTO_README_FILENAME, "w") as f: + f.write(build_config.description) + + # Create 'apis/openapi.yaml' file + bento_fs.makedir("apis") + with bento_fs.open(fs.path.combine("apis", "openapi.yaml"), "w") as f: + yaml.dump(svc.openapi_spec, f) + if not is_legacy: + with bento_fs.open(fs.path.combine("apis", "schema.json"), "w") as f: + json.dump(svc.schema(), f, indent=2) res = Bento( tag, @@ -342,6 +391,8 @@ def append_model(model: BentoModelInfo) -> None: schema=svc.schema() if not is_legacy else {}, ), ) + if bare: + return res # Create bento.yaml res.flush_info() try: @@ -559,9 +610,9 @@ def from_bento_model( class BentoServiceInfo: name: str service: str - models: t.List[BentoModelInfo] = attr.field(factory=list) - dependencies: t.List[str] = attr.field(factory=list) - config: ServiceConfig = attr.field(factory=dict) + models: t.List[BentoModelInfo] = attr.field(factory=list, eq=False) + dependencies: t.List[str] = attr.field(factory=list, eq=False) + config: ServiceConfig = attr.field(factory=dict, eq=False) @classmethod def from_service(cls, svc: NewService[t.Any]) -> BentoServiceInfo: diff --git a/src/bentoml/_internal/cloud/base.py b/src/bentoml/_internal/cloud/base.py index 40a853c5ce1..37e91511747 100644 --- a/src/bentoml/_internal/cloud/base.py +++ b/src/bentoml/_internal/cloud/base.py @@ -4,6 +4,7 @@ from contextlib import contextmanager import attrs +from rich import get_console from rich.console import Group from rich.live import Live from rich.panel import Panel @@ -87,7 +88,8 @@ class Spinner: Use it as a context manager to start the live updating. """ - def __init__(self): + def __init__(self, console: Console | None = None) -> None: + self.console = console or get_console() self.transmission_progress = Progress( TextColumn("[bold blue]{task.description}", justify="right"), BarColumn(bar_width=None), @@ -98,6 +100,7 @@ def __init__(self): TransferSpeedColumn(), "•", TimeRemainingColumn(), + console=self.console, ) self._logs: list[str] = [] @@ -106,24 +109,21 @@ def __init__(self): TimeElapsedColumn(), TextColumn("[bold purple]{task.description}"), SpinnerColumn("simpleDots"), + console=self.console, ) self._spinner_task_id: t.Optional[TaskID] = None - self._live = Live(self) - - @property - def console(self) -> "Console": - return self._live.console + self._live = Live(self, console=self.console) @contextmanager def spin(self, text: str) -> t.Generator[TaskID, None, None]: """Create a spinner as a context manager.""" + task_id = self.update(text, new=True) try: - task_id = self.update(text, new=True) yield task_id finally: - self._spinner_task_id = None - self._spinner_progress.stop_task(task_id) - self._spinner_progress.update(task_id, visible=False) + self._spinner_progress.remove_task(task_id) + if self._spinner_task_id == task_id: + self._spinner_task_id = None def update(self, text: str, new: bool = False) -> TaskID: """Update the spin text.""" @@ -149,8 +149,7 @@ def start(self) -> None: def stop(self) -> None: """Stop live updating.""" if self._spinner_task_id is not None: - self._spinner_progress.stop_task(self._spinner_task_id) - self._spinner_progress.update(self._spinner_task_id, visible=False) + self._spinner_progress.remove_task(self._spinner_task_id) self._spinner_task_id = None self._live.stop() diff --git a/src/bentoml/_internal/cloud/bentocloud.py b/src/bentoml/_internal/cloud/bentocloud.py index b7baab2a754..02920a3c229 100644 --- a/src/bentoml/_internal/cloud/bentocloud.py +++ b/src/bentoml/_internal/cloud/bentocloud.py @@ -62,13 +62,16 @@ def push_bento( bento: Bento, *, force: bool = False, + bare: bool = False, threads: int = 10, ): with self.spinner: upload_task_id = self.spinner.transmission_progress.add_task( f'Pushing Bento "{bento.tag}"', start=False, visible=False ) - self._do_push_bento(bento, upload_task_id, force=force, threads=threads) + self._do_push_bento( + bento, upload_task_id, force=force, threads=threads, bare=bare + ) self.spinner.log(f'✅ Pushed Bento "{bento.tag}"') @inject @@ -79,6 +82,7 @@ def _do_push_bento( *, force: bool = False, threads: int = 10, + bare: bool = False, rest_client: RestApiClient = Provide[BentoMLContainer.rest_api_client], bentoml_tmp_dir: str = Provide[BentoMLContainer.tmp_bento_store_dir], ): @@ -112,6 +116,7 @@ def push_model(model: Model[t.Any]) -> None: ) executor.map(push_model, models_to_push) + with self.spinner.spin(text=f'Fetching Bento repository "{name}"'): bento_repository = rest_client.v1.get_bento_repository( bento_repository_name=name @@ -197,7 +202,8 @@ def push_model(model: Model[t.Any]) -> None: labels=labels, ), ) - + if bare: + return transmission_strategy: TransmissionStrategy = "proxy" presigned_upload_url: str | None = None diff --git a/src/bentoml/_internal/cloud/client.py b/src/bentoml/_internal/cloud/client.py index 3824a3abf24..b1924b78337 100644 --- a/src/bentoml/_internal/cloud/client.py +++ b/src/bentoml/_internal/cloud/client.py @@ -48,12 +48,15 @@ from .schemas.schemasv1 import UpdateSecretSchema from .schemas.schemasv1 import UserSchema from .schemas.schemasv2 import CreateDeploymentSchema as CreateDeploymentSchemaV2 +from .schemas.schemasv2 import DeleteDeploymentFilesSchema +from .schemas.schemasv2 import DeploymentFileListSchema from .schemas.schemasv2 import DeploymentFullSchema as DeploymentFullSchemaV2 from .schemas.schemasv2 import DeploymentListSchema as DeploymentListSchemaV2 from .schemas.schemasv2 import KubePodSchema from .schemas.schemasv2 import KubePodWSResponseSchema from .schemas.schemasv2 import LogWSResponseSchema from .schemas.schemasv2 import UpdateDeploymentSchema as UpdateDeploymentSchemaV2 +from .schemas.schemasv2 import UploadDeploymentFilesSchema from .schemas.utils import schema_from_json from .schemas.utils import schema_from_object from .schemas.utils import schema_to_json @@ -775,7 +778,7 @@ def list_deployment_pods( scheme = "ws" endpoint = f"{scheme}://{url_.netloc}" with connect_ws( - url=f"{endpoint}/ws/v1/clusters/{deployment.cluster.name}/pods?{urlencode(dict(organization_name=deployment.cluster.organization_name, namespace=deployment.kube_namespace, selector=f'yatai.ai/bento-repository={target.bento.repository.name},yatai.ai/bento={target.bento.version}'))}", + url=f"{endpoint}/ws/v2/deployments/{name}/pods?{urlencode(dict(organization_name=deployment.cluster.organization_name, cluster=deployment.cluster.name))}", client=self.session, ) as ws: jsn = schema_from_object(ws.receive_json(), KubePodWSResponseSchema) @@ -799,7 +802,7 @@ def tail_logs( endpoint = f"{scheme}://{url_.netloc}" with connect_ws( - url=f"{endpoint}/ws/v1/clusters/{cluster_name}/tail?{urlencode(dict(namespace=namespace, pod_name=pod_name))}", + url=f"{endpoint}/ws/v1/clusters/{cluster_name}/tail?{urlencode(dict(namespace=namespace, pod_name=pod_name, timestamps='false'))}", client=self.session, ) as ws: req_id = str(uuid.uuid4()) @@ -848,6 +851,32 @@ def heartbeat(): stop_event.set() heartbeat_thread.join() + def upload_files( + self, name: str, files: UploadDeploymentFilesSchema, cluster: str | None = None + ) -> None: + url = urljoin(self.endpoint, f"/api/v2/deployments/{name}/files") + resp = self.session.post( + url, content=schema_to_json(files), params={"cluster": cluster} + ) + self._check_resp(resp) + + def delete_files( + self, name: str, paths: DeleteDeploymentFilesSchema, cluster: str | None = None + ) -> None: + url = urljoin(self.endpoint, f"/api/v2/deployments/{name}/files") + resp = self.session.request( + "DELETE", url, content=schema_to_json(paths), params={"cluster": cluster} + ) + self._check_resp(resp) + + def list_files( + self, name: str, cluster: str | None = None + ) -> DeploymentFileListSchema: + url = urljoin(self.endpoint, f"/api/v2/deployments/{name}/files") + resp = self.session.get(url, params={"cluster": cluster}) + self._check_resp(resp) + return schema_from_json(resp.text, DeploymentFileListSchema) + class RestApiClient: def __init__(self, endpoint: str, api_token: str) -> None: diff --git a/src/bentoml/_internal/cloud/deployment.py b/src/bentoml/_internal/cloud/deployment.py index f0477b8417f..48630bdff41 100644 --- a/src/bentoml/_internal/cloud/deployment.py +++ b/src/bentoml/_internal/cloud/deployment.py @@ -1,7 +1,10 @@ from __future__ import annotations import base64 +import contextlib +import hashlib import logging +import os import time import typing as t from os import path @@ -12,9 +15,14 @@ import rich import yaml from deepmerge.merger import Merger +from rich.console import Console from simple_di import Provide from simple_di import inject +from bentoml._internal.bento.bento import Bento + +from ..bento.build_config import BentoBuildConfig + if t.TYPE_CHECKING: from _bentoml_impl.client import AsyncHTTPClient from _bentoml_impl.client import SyncHTTPClient @@ -25,19 +33,23 @@ from ...exceptions import BentoMLException from ...exceptions import NotFound -from ..bento.bento import BentoInfo from ..configuration.containers import BentoMLContainer from ..tag import Tag from ..utils import bentoml_cattr +from ..utils import filter_control_codes from ..utils import resolve_user_filepath from .base import Spinner +from .schemas.modelschemas import BentoManifestSchema from .schemas.modelschemas import DeploymentStatus from .schemas.modelschemas import DeploymentTargetHPAConf from .schemas.schemasv2 import CreateDeploymentSchema as CreateDeploymentSchemaV2 +from .schemas.schemasv2 import DeleteDeploymentFilesSchema +from .schemas.schemasv2 import DeploymentFileListSchema from .schemas.schemasv2 import DeploymentSchema from .schemas.schemasv2 import DeploymentTargetSchema from .schemas.schemasv2 import KubePodSchema from .schemas.schemasv2 import UpdateDeploymentSchema as UpdateDeploymentSchemaV2 +from .schemas.schemasv2 import UploadDeploymentFilesSchema logger = logging.getLogger(__name__) @@ -68,6 +80,7 @@ class DeploymentConfigParameters: config_dict: dict[str, t.Any] | None = None config_file: str | t.TextIO | None = None cli: bool = False + dev: bool = False service_name: str | None = None cfg_dict: dict[str, t.Any] | None = None _param_config: dict[str, t.Any] | None = None @@ -111,6 +124,7 @@ def verify( ("access_authorization", self.access_authorization), ("envs", self.envs if self.envs else None), ("secrets", self.secrets), + ("dev", self.dev), ] if v is not None } @@ -163,12 +177,16 @@ def verify( # target is a path if self.cli: rich.print(f"building bento from [green]{bento_name}[/] ...") - bento_info = get_bento_info(project_path=bento_name, cli=self.cli) + bento_info = ensure_bento( + project_path=bento_name, bare=self.dev, cli=self.cli + ) + elif self.dev: # dev mode and bento is built + return else: if self.cli: rich.print(f"using bento [green]{bento_name}[/]...") - bento_info = get_bento_info(bento=str(bento_name), cli=self.cli) - self.cfg_dict["bento"] = bento_info.tag + bento_info = ensure_bento(bento=str(bento_name), cli=self.cli) + self.cfg_dict["bento"] = str(bento_info.tag) if self.service_name is None: self.service_name = bento_info.entry_service @@ -200,7 +218,7 @@ def get_config_dict(self, bento: str | None = None): raise BentoMLException("Bento is required") bento = self.cfg_dict.get("bento") - info = get_bento_info(bento=bento) + info = ensure_bento(bento=bento) if info.entry_service == "": # for compatibility self.service_name = "apiserver" @@ -278,21 +296,27 @@ def get_args_from_config( @inject -def get_bento_info( +def ensure_bento( project_path: str | None = None, bento: str | Tag | None = None, cli: bool = False, + bare: bool = False, + push: bool = True, + reload: bool = False, _bento_store: BentoStore = Provide[BentoMLContainer.bento_store], _cloud_client: BentoCloudClient = Provide[BentoMLContainer.bentocloud_client], -) -> BentoInfo: +) -> Bento | BentoManifestSchema: if project_path: from bentoml.bentos import build_bentofile - bento_obj = build_bentofile(build_ctx=project_path, _bento_store=_bento_store) + bento_obj = build_bentofile( + build_ctx=project_path, bare=bare, _bento_store=_bento_store, reload=reload + ) if cli: rich.print(f"🍱 Built bento [green]{bento_obj.info.tag}[/]") - _cloud_client.push_bento(bento=bento_obj) - return bento_obj.info + if push: + _cloud_client.push_bento(bento=bento_obj, bare=bare) + return bento_obj elif bento: bento = Tag.from_taglike(bento) try: @@ -310,19 +334,17 @@ def get_bento_info( if bento_obj is not None: # push to bentocloud - _cloud_client.push_bento(bento=bento_obj) - return bento_obj.info + if push: + _cloud_client.push_bento(bento=bento_obj, bare=bare) + return bento_obj if bento_schema is not None: assert bento_schema.manifest is not None if cli: rich.print( f"[bold blue]Using bento [green]{bento.name}:{bento.version}[/] from bentocloud to deploy" ) - return BentoInfo( - tag=Tag(name=bento.name, version=bento.version), - entry_service=bento_schema.manifest.entry_service, - service=bento_schema.manifest.service, - ) + bento_schema.manifest.version = bento.version + return bento_schema.manifest raise NotFound(f"bento {bento} not found in both local and cloud") else: raise BentoMLException( @@ -368,6 +390,10 @@ class DeploymentInfo: _schema: DeploymentSchema = attr.field(alias="_schema", repr=False) _urls: t.Optional[list[str]] = attr.field(alias="_urls", default=None, repr=False) + @property + def is_dev(self) -> bool: + return self._schema.manifest is not None and self._schema.manifest.dev + def to_dict(self) -> dict[str, t.Any]: return { "name": self.name, @@ -482,165 +508,429 @@ def get_async_client( @inject def wait_until_ready( self, + spinner: Spinner, timeout: int = 3600, check_interval: int = 10, - spinner: Spinner | None = None, cloud_rest_client: RestApiClient = Provide[BentoMLContainer.rest_api_client], - ) -> None: + ) -> int: from httpx import TimeoutException start_time = time.time() - if spinner is not None: - stop_tail_event = Event() - - def tail_image_builder_logs() -> None: - started_at = time.time() - wait_pod_timeout = 60 * 10 - pod: KubePodSchema | None = None - while True: - pod = cloud_rest_client.v2.get_deployment_image_builder_pod( - self.name, self.cluster - ) - if pod is None: - if time.time() - started_at > timeout: - spinner.console.print( - "🚨 [bold red]Time out waiting for image builder pod created[/bold red]" - ) - return - if stop_tail_event.wait(check_interval): - return - continue - if pod.pod_status.status == "Running": - break - if time.time() - started_at > wait_pod_timeout: + stop_tail_event = Event() + console = spinner.console + + def tail_image_builder_logs() -> None: + started_at = time.time() + wait_pod_timeout = 60 * 10 + pod: KubePodSchema | None = None + while True: + pod = cloud_rest_client.v2.get_deployment_image_builder_pod( + self.name, self.cluster + ) + if pod is None: + if time.time() - started_at > timeout: spinner.console.print( - "🚨 [bold red]Time out waiting for image builder pod running[/bold red]" + "🚨 [bold red]Time out waiting for image builder pod created[/bold red]" ) return if stop_tail_event.wait(check_interval): return + continue + if pod.pod_status.status == "Running": + break + if time.time() - started_at > wait_pod_timeout: + spinner.console.print( + "🚨 [bold red]Time out waiting for image builder pod running[/bold red]" + ) + return + if stop_tail_event.wait(check_interval): + return - is_first = True - logs_tailer = cloud_rest_client.v2.tail_logs( - cluster_name=self.cluster, - namespace=self._schema.kube_namespace, - pod_name=pod.name, - container_name="builder", - stop_event=stop_tail_event, - ) + is_first = True + logs_tailer = cloud_rest_client.v2.tail_logs( + cluster_name=self.cluster, + namespace=self._schema.kube_namespace, + pod_name=pod.name, + container_name="builder", + stop_event=stop_tail_event, + ) - for piece in logs_tailer: - decoded_bytes = base64.b64decode(piece) - decoded_str = decoded_bytes.decode("utf-8") - if is_first: - is_first = False - spinner.update("🚧 Image building...") - spinner.stop() - print(decoded_str, end="", flush=True) + for piece in logs_tailer: + decoded_bytes = base64.b64decode(piece) + decoded_str = decoded_bytes.decode("utf-8") + if is_first: + is_first = False + spinner.update("🚧 Image building...") + spinner.stop() + console.print(decoded_str, end="") - tail_thread: Thread | None = None + tail_thread: Thread | None = None - try: - status: DeploymentState | None = None - spinner.update( - f'🔄 Waiting for deployment "{self.name}" to be ready...' - ) - while time.time() - start_time < timeout: - for _ in range(3): - try: - new_status = self.get_status() - break - except TimeoutException: - spinner.update( - "⚠️ Unable to get deployment status, retrying..." - ) - else: - spinner.log( - "🚨 [bold red]Unable to contact the server, but the deployment is created. " - "You can check the status on the bentocloud website.[/bold red]" - ) - return - if ( - status is None or status.status != new_status.status - ): # on status change - status = new_status - spinner.update( - f'🔄 Waiting for deployment "{self.name}" to be ready. Current status: "{status.status}"' - ) - if status.status == DeploymentStatus.ImageBuilding.value: - if tail_thread is None: - tail_thread = Thread( - target=tail_image_builder_logs, daemon=True - ) - tail_thread.start() - elif ( - tail_thread is not None - ): # The status has changed from ImageBuilding to other - stop_tail_event.set() - tail_thread.join() - spinner.start() - - if status.status in ( - DeploymentStatus.Running.value, - DeploymentStatus.ScaledToZero.value, - ): - spinner.stop() - spinner.console.print( - f'✅ [bold green] Deployment "{self.name}" is ready:[/] {self.admin_console}' - ) - return - if status.status in [ - DeploymentStatus.Failed.value, - DeploymentStatus.ImageBuildFailed.value, - DeploymentStatus.Terminated.value, - DeploymentStatus.Terminating.value, - DeploymentStatus.Unhealthy.value, - ]: - spinner.stop() - spinner.console.print( - f'🚨 [bold red]Deployment "{self.name}" is not ready. Current status: "{status.status}"[/]' - ) - return - - time.sleep(check_interval) - - spinner.stop() - spinner.console.print( - f'🚨 [bold red]Time out waiting for Deployment "{self.name}" ready[/]' - ) - return - finally: - stop_tail_event.set() - if tail_thread is not None: - tail_thread.join() - else: + try: + status: DeploymentState | None = None + spinner.update(f'🔄 Waiting for deployment "{self.name}" to be ready...') while time.time() - start_time < timeout: - status: DeploymentState | None = None for _ in range(3): try: - status = self.get_status() + new_status = self.get_status() break except TimeoutException: - pass - if status is None: - logger.error( - f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Unable to contact the server, but the deployment is created. You can check the status on the bentocloud website." + spinner.update("⚠️ Unable to get deployment status, retrying...") + else: + spinner.log( + "🚨 [bold red]Unable to contact the server, but the deployment is created. " + "You can check the status on the bentocloud website.[/bold red]" ) - return + return 1 + if ( + status is None or status.status != new_status.status + ): # on status change + status = new_status + spinner.update( + f'🔄 Waiting for deployment "{self.name}" to be ready. Current status: "{status.status}"' + ) + if status.status == DeploymentStatus.ImageBuilding.value: + if tail_thread is None: + tail_thread = Thread( + target=tail_image_builder_logs, daemon=True + ) + tail_thread.start() + elif ( + tail_thread is not None + ): # The status has changed from ImageBuilding to other + stop_tail_event.set() + tail_thread.join() + tail_thread = None + spinner.start() + + if status.status in [ + DeploymentStatus.Failed.value, + DeploymentStatus.ImageBuildFailed.value, + DeploymentStatus.Terminated.value, + DeploymentStatus.Terminating.value, + DeploymentStatus.Unhealthy.value, + ]: + spinner.stop() + console.print( + f'🚨 [bold red]Deployment "{self.name}" is not ready. Current status: "{status.status}"[/]' + ) + return 1 if status.status in ( DeploymentStatus.Running.value, DeploymentStatus.ScaledToZero.value, ): - logger.info( - f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Deployment '{self.name}' is ready." + spinner.stop() + console.print( + f'✅ [bold green] Deployment "{self.name}" is ready:[/] {self.admin_console}' ) - return - logger.info( - f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Waiting for deployment '{self.name}' to be ready. Current status: '{status.status}'." - ) + break + time.sleep(check_interval) + else: + spinner.stop() + console.print( + f'🚨 [bold red]Time out waiting for Deployment "{self.name}" ready[/]' + ) + return 1 - logger.error(f"Timed out waiting for deployment '{self.name}' to be ready.") + finally: + stop_tail_event.set() + if tail_thread is not None: + tail_thread.join() + return 0 + + @inject + def upload_files( + self, + files: t.Iterable[tuple[str, bytes]], + cloud_rest_client: RestApiClient = Provide[BentoMLContainer.rest_api_client], + ) -> None: + data = { + "files": [ + { + "path": path, + "b64_encoded_content": base64.b64encode(content).decode("utf-8"), + } + for path, content in files + ] + } + cloud_rest_client.v2.upload_files( + self.name, + bentoml_cattr.structure(data, UploadDeploymentFilesSchema), + cluster=self.cluster, + ) + + @inject + def delete_files( + self, + paths: t.Iterable[str], + cloud_rest_client: RestApiClient = Provide[BentoMLContainer.rest_api_client], + ) -> None: + data = {"paths": paths} + cloud_rest_client.v2.delete_files( + self.name, + bentoml_cattr.structure(data, DeleteDeploymentFilesSchema), + cluster=self.cluster, + ) + + @inject + def list_files( + self, + cloud_rest_client: RestApiClient = Provide[BentoMLContainer.rest_api_client], + ) -> DeploymentFileListSchema: + return cloud_rest_client.v2.list_files(self.name, cluster=self.cluster) + + @inject + def _init_deployment_files( + self, + bento_dir: str, + console: Console | None = None, + timeout: int = 600, + cloud_rest_client: RestApiClient = Provide[BentoMLContainer.rest_api_client], + ) -> str: + from ..bento.build_config import BentoPathSpec + + check_interval = 5 + start_time = time.time() + if console is None: + console = rich.get_console() + while time.time() - start_time < timeout: + pods = cloud_rest_client.v2.list_deployment_pods(self.name, self.cluster) + if any( + pod.labels.get("yatai.ai/bento-function-component-type") == "api-server" + and pod.status.phase == "Running" + and pod.pod_status.status != "Terminating" + for pod in pods + ): + break + time.sleep(check_interval) + else: + raise TimeoutError("Timeout waiting for API server pod to be ready") + + build_config = get_bento_build_config(bento_dir) + bento_spec = BentoPathSpec(build_config.include, build_config.exclude) + upload_files: list[tuple[str, bytes]] = [] + requirements_content = _build_requirements_txt(bento_dir, build_config) + ignore_patterns = bento_spec.from_path(bento_dir) + + pod_files = {file.path: file.md5 for file in self.list_files().files} + for root, _, files in os.walk(bento_dir): + for fn in files: + full_path = os.path.join(root, fn) + rel_path = os.path.relpath(full_path, bento_dir) + if ( + not bento_spec.includes( + full_path, recurse_exclude_spec=ignore_patterns + ) + and rel_path != "bentofile.yaml" + ): + continue + if rel_path == REQUIREMENTS_TXT: + continue + file_content = open(full_path, "rb").read() + file_md5 = hashlib.md5(file_content).hexdigest() + if rel_path in pod_files and pod_files[rel_path] == file_md5: + continue + console.print(f" [green]Uploading[/] {rel_path}") + upload_files.append((rel_path, file_content)) + requirements_md5 = hashlib.md5(requirements_content).hexdigest() + if requirements_md5 != pod_files.get(REQUIREMENTS_TXT, ""): + console.print(f" [green]Uploading[/] {REQUIREMENTS_TXT}") + upload_files.append((REQUIREMENTS_TXT, requirements_content)) + self.upload_files(upload_files) + return requirements_md5 + + @inject + def watch( + self, + bento_dir: str, + cloud_client: BentoCloudClient = Provide[BentoMLContainer.bentocloud_client], + ) -> None: + import watchfiles + + from ..bento.build_config import BentoPathSpec + + build_config = get_bento_build_config(bento_dir) + bento_spec = BentoPathSpec(build_config.include, build_config.exclude) + ignore_patterns = bento_spec.from_path(bento_dir) + requirements_hash: str | None = None + + default_filter = watchfiles.filters.DefaultFilter() + + def watch_filter(change: watchfiles.Change, path: str) -> bool: + if not default_filter(change, path): + return False + if os.path.relpath(path, bento_dir) == "bentofile.yaml": + return True + return bento_spec.includes(path, recurse_exclude_spec=ignore_patterns) + + console = Console(highlight=False) + bento_info = ensure_bento(bento_dir, bare=True, push=False, reload=True) + assert isinstance(bento_info, Bento) + target = self._refetch_target(False) + + spinner = Spinner(console=console) + try: + spinner.start() + upload_id = spinner.transmission_progress.add_task( + "Dummy upload task", visible=False + ) + while True: + if ( + target is None + or target.bento is None + or target.bento.manifest != bento_info.get_manifest() + ): + console.print("✨ [green bold]Bento change detected[/]") + spinner.update("🔄 Pushing Bento to BentoCloud") + cloud_client._do_push_bento(bento_info, upload_id, bare=True) # type: ignore + spinner.update("🔄 Updating deployment with new configuration") + update_config = DeploymentConfigParameters( + bento=str(bento_info.tag), + name=self.name, + cluster=self.cluster, + cli=False, + dev=True, + ) + update_config.verify() + self = Deployment.update(update_config) + target = self._refetch_target(False) + requirements_hash = self._init_deployment_files( + bento_dir, console=spinner.console + ) + elif not requirements_hash: + spinner.update("🔄 Initializing deployment files") + requirements_hash = self._init_deployment_files( + bento_dir, console=spinner.console + ) + with self._tail_logs(spinner.console): + spinner.update("👀 Watching for changes") + for changes in watchfiles.watch( + bento_dir, watch_filter=watch_filter + ): + bento_info = ensure_bento( + bento_dir, bare=True, push=False, reload=True + ) + assert isinstance(bento_info, Bento) + if ( + target is None + or target.bento is None + or target.bento.manifest != bento_info.get_manifest() + ): + # stop log tail and reset the deployment + break + + build_config = get_bento_build_config(bento_dir) + upload_files: list[tuple[str, bytes]] = [] + delete_files: list[str] = [] + + for change, path in changes: + rel_path = os.path.relpath(path, bento_dir) + if rel_path == REQUIREMENTS_TXT: + continue + if change == watchfiles.Change.deleted: + console.print(f" [red]Deleting[/] {rel_path}") + delete_files.append(rel_path) + else: + console.print(f" [green]Uploading[/] {rel_path}") + upload_files.append((rel_path, open(path, "rb").read())) + + requirements_content = _build_requirements_txt( + bento_dir, build_config + ) + if ( + new_hash := hashlib.md5(requirements_content).hexdigest() + ) != requirements_hash: + requirements_hash = new_hash + console.print(f" [green]Uploading[/] {REQUIREMENTS_TXT}") + upload_files.append( + (REQUIREMENTS_TXT, requirements_content) + ) + if upload_files: + self.upload_files(upload_files) + if delete_files: + self.delete_files(delete_files) + if (status := self.get_status().status) in [ + DeploymentStatus.Failed.value, + DeploymentStatus.ImageBuildFailed.value, + DeploymentStatus.Terminated.value, + DeploymentStatus.Terminating.value, + DeploymentStatus.Unhealthy.value, + ]: + console.print( + f'🚨 [bold red]Deployment "{self.name}" is not ready. Current status: "{status}"[/]' + ) + return + except KeyboardInterrupt: + spinner.log( + "The deployment is still running, view the dashboard:\n" + f" [blue]{self.admin_console}[/]\n\n" + "Next steps:\n" + "* Push the bento to BentoCloud:\n" + " [blue]$ bentoml build --push[/]\n\n" + "* Attach to this deployment again:\n" + f" [blue]$ bentoml develop --attach {self.name} --cluster {self.cluster}[/]\n\n" + "* Terminate the deployment:\n" + f" [blue]$ bentoml deployment terminate {self.name} --cluster {self.cluster}[/]" + ) + finally: + spinner.stop() + + @contextlib.contextmanager + @inject + def _tail_logs( + self, + console: Console, + cloud_rest_client: RestApiClient = Provide[BentoMLContainer.rest_api_client], + ) -> t.Generator[None, None, None]: + import itertools + from collections import defaultdict + + pods = cloud_rest_client.v2.list_deployment_pods(self.name, self.cluster) + stop_event = Event() + workers: list[Thread] = [] + + colors = itertools.cycle(["cyan", "yellow", "blue", "magenta", "green"]) + runner_color: dict[str, str] = defaultdict(lambda: next(colors)) + + def pod_log_worker(pod: KubePodSchema, stop_event: Event) -> None: + current = "" + color = runner_color[pod.runner_name] + for chunk in cloud_rest_client.v2.tail_logs( + cluster_name=self.cluster, + namespace=self._schema.kube_namespace, + pod_name=pod.name, + container_name="main", + stop_event=stop_event, + ): + decoded_str = base64.b64decode(chunk).decode("utf-8") + chunk = filter_control_codes(decoded_str) + if "\n" not in chunk: + current += chunk + continue + for i, line in enumerate(chunk.split("\n")): + if i == 0: + line = current + line + current = "" + if i == len(chunk.split("\n")) - 1: + current = line + break + console.print(f"[{color}]\[{pod.runner_name}][/] {line}") + if current: + console.print(f"[{color}]\[{pod.runner_name}][/] {current}") + + try: + for pod in pods: + if pod.labels.get("yatai.ai/is-bento-image-builder") == "true": + continue + thread = Thread(target=pod_log_worker, args=(pod, stop_event)) + thread.start() + workers.append(thread) + yield + finally: + stop_event.set() + for thread in workers: + thread.join() @attr.define @@ -939,3 +1229,32 @@ class InstanceTypeInfo: def to_dict(self): return {k: v for k, v in attr.asdict(self).items() if v is not None and v != ""} + + +def get_bento_build_config(bento_dir: str) -> BentoBuildConfig: + bentofile_path = os.path.join(bento_dir, "bentofile.yaml") + if not os.path.exists(bentofile_path): + return BentoBuildConfig(service="").with_defaults() + else: + # respect bentofile.yaml include and exclude + with open(bentofile_path, "r") as f: + return BentoBuildConfig.from_yaml(f).with_defaults() + + +REQUIREMENTS_TXT = "requirements.txt" + + +def _build_requirements_txt(bento_dir: str, config: BentoBuildConfig) -> bytes: + from bentoml._internal.configuration import BENTOML_VERSION + from bentoml._internal.configuration import clean_bentoml_version + + filename = config.python.requirements_txt + content = b"" + if filename and os.path.exists(fullpath := os.path.join(bento_dir, filename)): + with open(fullpath, "rb") as f: + content = f.read() + for package in config.python.packages or []: + content += f"{package}\n".encode() + bentoml_version = clean_bentoml_version(BENTOML_VERSION) + content += f"bentoml=={bentoml_version}\n".encode() + return content diff --git a/src/bentoml/_internal/cloud/schemas/modelschemas.py b/src/bentoml/_internal/cloud/schemas/modelschemas.py index 8016a62a676..21fa3b2df41 100644 --- a/src/bentoml/_internal/cloud/schemas/modelschemas.py +++ b/src/bentoml/_internal/cloud/schemas/modelschemas.py @@ -6,9 +6,9 @@ import attr -from bentoml._internal.cloud.schemas.utils import dict_options_converter - from ...bento.bento import BentoServiceInfo +from ...cloud.schemas.utils import dict_options_converter +from ...tag import Tag time_format = "%Y-%m-%d %H:%M:%S.%f" @@ -74,16 +74,21 @@ class BentoRunnerSchema: @attr.define class BentoManifestSchema: service: str - bentoml_version: str - size_bytes: int + bentoml_version: str = attr.field(eq=False) + size_bytes: int = attr.field(eq=False) entry_service: str = attr.field(default="") name: t.Optional[str] = attr.field(default=None) apis: t.Dict[str, BentoApiSchema] = attr.field(factory=dict) - models: t.List[str] = attr.field(factory=list) + models: t.List[str] = attr.field(factory=list, eq=False) runners: t.Optional[t.List[BentoRunnerSchema]] = attr.field(factory=list) services: t.List[BentoServiceInfo] = attr.field(factory=dict) envs: t.List[t.Dict[str, str]] = attr.field(factory=list) schema: t.Dict[str, t.Any] = attr.field(factory=dict) + version: t.Optional[str] = attr.field(default=None, eq=False) + + @property + def tag(self) -> Tag: + return Tag(self.name, self.version) if TYPE_CHECKING: diff --git a/src/bentoml/_internal/cloud/schemas/schemasv2.py b/src/bentoml/_internal/cloud/schemas/schemasv2.py index ecfbb91ac13..76b096f1b30 100644 --- a/src/bentoml/_internal/cloud/schemas/schemasv2.py +++ b/src/bentoml/_internal/cloud/schemas/schemasv2.py @@ -61,7 +61,14 @@ class UpdateDeploymentSchema(DeploymentConfigSchema): class CreateDeploymentSchema(UpdateDeploymentSchema): __omit_if_default__ = True __forbid_extra_keys__ = False - name: t.Optional[str] = attr.field(default=None) + name: t.Optional[str] = None + dev: bool = False + + +@attr.define +class DeploymentManifestSchema: + __forbid_extra_keys__ = False + dev: bool = False @attr.define @@ -73,6 +80,7 @@ class DeploymentSchema(ResourceSchema): creator: UserSchema cluster: ClusterSchema latest_revision: t.Optional[DeploymentRevisionSchema] + manifest: t.Optional[DeploymentManifestSchema] = None @attr.define @@ -94,6 +102,13 @@ class KubePodStatusSchema: reason: str +@attr.define +class PodStatusSchema: + __forbid_extra_keys__ = False + phase: str + ready: bool + + @attr.define class KubePodSchema: __forbid_extra_keys__ = False @@ -101,6 +116,8 @@ class KubePodSchema: namespace: str labels: t.Dict[str, str] pod_status: KubePodStatusSchema + status: PodStatusSchema + runner_name: str @attr.define @@ -124,3 +141,36 @@ class KubePodWSResponseSchema: message: str type: str payload: t.Optional[t.List[KubePodSchema]] + + +@attr.define +class UploadDeploymentFileSchema: + __forbid_extra_keys__ = False + path: str + b64_encoded_content: str + + +@attr.define +class UploadDeploymentFilesSchema: + __forbid_extra_keys__ = False + files: t.List[UploadDeploymentFileSchema] + + +@attr.define +class DeleteDeploymentFilesSchema: + __forbid_extra_keys__ = False + paths: t.List[str] + + +@attr.define +class DeploymentFileSchema: + __forbid_extra_keys__ = False + path: str + size: int + md5: str + + +@attr.define +class DeploymentFileListSchema: + __forbid_extra_keys__ = False + files: t.List[DeploymentFileSchema] diff --git a/src/bentoml/_internal/service/loader.py b/src/bentoml/_internal/service/loader.py index 496f71b0099..985d8eb938b 100644 --- a/src/bentoml/_internal/service/loader.py +++ b/src/bentoml/_internal/service/loader.py @@ -41,6 +41,7 @@ def import_service( *, working_dir: t.Optional[str] = None, standalone_load: bool = False, + reload: bool = False, model_store: ModelStore = Provide[BentoMLContainer.model_store], ) -> Service | NewService[t.Any]: """Import a Service instance from source code, by providing the svc_import_path @@ -148,10 +149,13 @@ def recover_standalone_env_change(): module_name = import_path # Import the service using the Bento's own model store + needs_reload = module_name in sys.modules try: module = importlib.import_module(module_name, package=working_dir) except ImportError as e: raise ImportServiceError(f'Failed to import module "{module_name}": {e}') + if reload and needs_reload: + importlib.reload(module) if not standalone_load: recover_standalone_env_change() diff --git a/src/bentoml/_internal/utils/__init__.py b/src/bentoml/_internal/utils/__init__.py index b9112570bb8..46c0434b941 100644 --- a/src/bentoml/_internal/utils/__init__.py +++ b/src/bentoml/_internal/utils/__init__.py @@ -552,3 +552,30 @@ def is_async_callable(obj: t.Any) -> t.TypeGuard[t.Callable[..., t.Awaitable[t.A def dict_filter_none(d: dict[str, t.Any]) -> dict[str, t.Any]: return {k: v for k, v in d.items() if v is not None} + + +CONTROL_CODES = re.compile( + r""" + \x07| # BELL + \r| # CARRIAGE_RETURN + \x1b\[H| # HOME + \x1b\[2J| # CLEAR + \x1b\[?1049h| # ENABLE_ALT_SCREEN + \x1b\[?1049l| # DISABLE_ALT_SCREEN + \x1b\[?25h| # SHOW_CURSOR + \x1b\[?25l| # HIDE_CURSOR + \x1b\[\d+A| # CURSOR_UP + \x1b\[\d+B| # CURSOR_DOWN + \x1b\[\d+C| # CURSOR_FORWARD + \x1b\[\d+D| # CURSOR_BACKWARD + \x1b\[\d+G| # CURSOR_MOVE_TO_COLUMN + \x1b\[\d+K| # ERASE_IN_LINE + \x1b\[\d+;\d+H| # CURSOR_MOVE_TO + \x1b\]0;.+?\x07 # SET_WINDOW_TITLE +""", + flags=re.VERBOSE, +) + + +def filter_control_codes(line: str) -> str: + return CONTROL_CODES.sub("", line) diff --git a/src/bentoml/_internal/utils/circus/watchfilesplugin.py b/src/bentoml/_internal/utils/circus/watchfilesplugin.py index 29b46e1e087..57448c44e5c 100644 --- a/src/bentoml/_internal/utils/circus/watchfilesplugin.py +++ b/src/bentoml/_internal/utils/circus/watchfilesplugin.py @@ -46,11 +46,12 @@ def __init__(self, *args: t.Any, **config: t.Any): watch_dirs = [self.working_dir, os.path.join(self.bentoml_home, "models")] if not is_pypi_installed_bentoml(): - # bentoml src from this __file__ - logger.info( - "BentoML is installed via development mode, adding source root to 'watch_dirs'." - ) - watch_dirs.append(t.cast(str, os.path.dirname(source_locations("bentoml")))) + bentoml_project = Path(source_locations("bentoml")).parent.parent + if bentoml_project.joinpath("pyproject.toml").exists(): + logger.info( + "BentoML is installed via development mode, adding source root to 'watch_dirs'." + ) + watch_dirs.append(str(bentoml_project / "src")) logger.info("Watching directories: %s", watch_dirs) self.watch_dirs = watch_dirs diff --git a/src/bentoml/bentos.py b/src/bentoml/bentos.py index 47ebfb0581e..21faff669ae 100644 --- a/src/bentoml/bentos.py +++ b/src/bentoml/bentos.py @@ -378,6 +378,8 @@ def build_bentofile( labels: dict[str, str] | None = None, build_ctx: str | None = None, platform: str | None = None, + bare: bool = False, + reload: bool = False, _bento_store: BentoStore = Provide[BentoMLContainer.bento_store], ) -> Bento: """ @@ -390,6 +392,8 @@ def build_bentofile( bentofile: The file path to build config yaml file version: Override the default auto generated version str build_ctx: Build context directory, when used as + bare: whether to build a bento without copying files + reload: whether to reload the service Returns: Bento: a Bento instance representing the materialized Bento saved in BentoStore @@ -407,12 +411,17 @@ def build_bentofile( build_config.labels = labels build_config.labels.update(labels) - return Bento.create( + bento = Bento.create( build_config=build_config, version=version, build_ctx=build_ctx, platform=platform, - ).save(_bento_store) + bare=bare, + reload=reload, + ) + if not bare: + return bento.save(_bento_store) + return bento def containerize(bento_tag: Tag | str, **kwargs: t.Any) -> bool: diff --git a/src/bentoml_cli/cli.py b/src/bentoml_cli/cli.py index 8e34ca3495c..517513503f9 100644 --- a/src/bentoml_cli/cli.py +++ b/src/bentoml_cli/cli.py @@ -12,6 +12,7 @@ def create_bentoml_cli() -> click.Command: from bentoml_cli.containerize import containerize_command from bentoml_cli.deployment import deploy_command from bentoml_cli.deployment import deployment_command + from bentoml_cli.deployment import develop_command from bentoml_cli.env import env_command from bentoml_cli.models import model_command from bentoml_cli.secret import secret_command @@ -45,6 +46,7 @@ def bentoml_cli(): bentoml_cli.add_subcommands(serve_command) bentoml_cli.add_command(containerize_command) bentoml_cli.add_command(deploy_command) + bentoml_cli.add_command(develop_command) bentoml_cli.add_command(deployment_command) bentoml_cli.add_command(secret_command) diff --git a/src/bentoml_cli/deployment.py b/src/bentoml_cli/deployment.py index b9c2a0c0e20..da78e9062d5 100644 --- a/src/bentoml_cli/deployment.py +++ b/src/bentoml_cli/deployment.py @@ -1,23 +1,30 @@ from __future__ import annotations import json +import logging import typing as t from http import HTTPStatus import click import rich +import rich.style import yaml +from rich.console import Console from rich.syntax import Syntax from rich.table import Table from bentoml._internal.cloud.base import Spinner from bentoml._internal.cloud.deployment import Deployment from bentoml._internal.cloud.deployment import DeploymentConfigParameters +from bentoml._internal.cloud.deployment import DeploymentInfo +from bentoml._internal.cloud.schemas.modelschemas import DeploymentStatus from bentoml._internal.cloud.schemas.modelschemas import DeploymentStrategy from bentoml._internal.utils import rich_console as console from bentoml.exceptions import BentoMLException from bentoml_cli.utils import BentoMLCommandGroup +logger = logging.getLogger("bentoml.cli.deployment") + if t.TYPE_CHECKING: TupleStrAny = tuple[str, ...] else: @@ -188,6 +195,55 @@ def decorate(f: t.Callable[..., t.Any]) -> t.Callable[..., t.Any]: return decorate +@click.command(name="develop") +@click.argument( + "bento_dir", + type=click.Path(exists=True, file_okay=False, dir_okay=True, readable=True), + default=".", +) +@click.option( + "--attach", help="Attach to the given deployment instead of creating a new one." +) +@shared_decorator +def develop_command(bento_dir: str, cluster: str | None, attach: str): + """Create or attach to a development deployment and watch for local file changes""" + import questionary + + console = Console(highlight=False) + if attach: + deployment = Deployment.get(attach) + else: + with console.status("Fetching deployments..."): + deployments = [ + d + for d in Deployment.list(cluster=cluster) + if d.is_dev + and d.get_status(False).status + in [ + DeploymentStatus.Deploying.value, + DeploymentStatus.Running.value, + DeploymentStatus.ScaledToZero.value, + DeploymentStatus.Failed.value, + ] + ] + + chosen = questionary.select( + message="Select a deployment to attach to or create a new one", + choices=[{"name": d.name, "value": d} for d in deployments] + + [{"name": "Create a new deployment", "value": "new"}], + ).ask() + + if chosen == "new": + deployment = create_deployment( + bento=bento_dir, cluster=cluster, dev=True, wait=False + ) + elif chosen is None: + return + else: + deployment = t.cast(DeploymentInfo, chosen) + deployment.watch(bento_dir) + + @click.group(name="deployment", cls=BentoMLCommandGroup) def deployment_command(): """Deployment Subcommands Groups""" @@ -678,21 +734,22 @@ def list_instance_types( # type: ignore def create_deployment( - bento: str | None, - name: str | None, - cluster: str | None, - access_authorization: bool | None, - scaling_min: int | None, - scaling_max: int | None, - instance_type: str | None, - strategy: str | None, - env: tuple[str] | None, - secret: tuple[str] | None, - config_file: str | t.TextIO | None, - config_dict: str | None, - wait: bool, - timeout: int, -) -> None: + bento: str | None = None, + name: str | None = None, + cluster: str | None = None, + access_authorization: bool | None = None, + scaling_min: int | None = None, + scaling_max: int | None = None, + instance_type: str | None = None, + strategy: str | None = None, + env: tuple[str] | None = None, + secret: tuple[str] | None = None, + config_file: str | t.TextIO | None = None, + config_dict: str | None = None, + wait: bool = True, + timeout: int = 3600, + dev: bool = False, +) -> DeploymentInfo: cfg_dict = None if config_dict is not None and config_dict != "": cfg_dict = json.loads(config_dict) @@ -710,16 +767,20 @@ def create_deployment( if env is not None else None ), - secrets=secret, + secrets=list(secret) if secret is not None else None, config_file=config_file, config_dict=cfg_dict, cli=True, + dev=dev, ) + try: config_params.verify() except BentoMLException as e: raise_deployment_config_error(e, "create") - with Spinner() as spinner: + + console = Console(highlight=False) + with Spinner(console=console) as spinner: spinner.update("Creating deployment on BentoCloud") deployment = Deployment.create(deployment_config_params=config_params) spinner.log( @@ -730,4 +791,7 @@ def create_deployment( spinner.update( "[bold blue]Waiting for deployment to be ready, you can use --no-wait to skip this process[/]", ) - deployment.wait_until_ready(timeout=timeout, spinner=spinner) + retcode = deployment.wait_until_ready(timeout=timeout, spinner=spinner) + if retcode != 0: + raise SystemExit(retcode) + return deployment diff --git a/src/bentoml_cli/utils.py b/src/bentoml_cli/utils.py index 9dace2353bf..0eeedac961f 100644 --- a/src/bentoml_cli/utils.py +++ b/src/bentoml_cli/utils.py @@ -21,8 +21,9 @@ from click import Parameter P = t.ParamSpec("P") + R = t.TypeVar("R") - F = t.Callable[P, t.Any] + F = t.Callable[P, R] class ClickFunctionWrapper(t.Protocol[P]): __name__: str @@ -41,14 +42,14 @@ def __call__( # pylint: disable=no-method-argument def kwargs_transformers( - f: F[t.Any] | None = None, + f: F[P, R] | None = None, *, - transformer: F[t.Any], + transformer: F[P, R], pass_click_context: bool = False, -) -> F[t.Any]: - def decorator(_f: F[t.Any]) -> t.Callable[P, t.Any]: +) -> F[P, R]: + def decorator(_f: F[P, R]) -> F[P, R]: @functools.wraps(_f) - def wrapper(*args: P.args, **kwargs: P.kwargs) -> t.Any: + def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: transformed = {k: transformer(v) for k, v in kwargs.items()} if pass_click_context: return _f(click.get_current_context(), *args, **transformed)