Skip to content

Commit

Permalink
Allow to deploy and undeploy Pipelines on Deepset Cloud (#2285)
Browse files Browse the repository at this point in the history
* add deploy_on_deepset_cloud and undeploy_on_deepset_cloud

* increase polling interval to 5 seconds

* Update Documentation & Code Style

* improve logging

* move transitioning logic to PipelineClient

* use enum for Pipeline states

* improve docstrings

* Update Documentation & Code Style

* tests added

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
tstadel and github-actions[bot] authored Mar 10, 2022
1 parent e85b948 commit fd46a42
Show file tree
Hide file tree
Showing 4 changed files with 737 additions and 3 deletions.
58 changes: 58 additions & 0 deletions docs/_src/api/api/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,64 @@ If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
- `overwrite`: Whether to overwrite the config if it already exists. Otherwise an error is being raised.

<a id="base.BasePipeline.deploy_on_deepset_cloud"></a>

#### deploy\_on\_deepset\_cloud

```python
@classmethod
def deploy_on_deepset_cloud(cls, pipeline_config_name: str, workspace: str = "default", api_key: Optional[str] = None, api_endpoint: Optional[str] = None, timeout: int = 60)
```

Deploys the pipelines of a pipeline config on Deepset Cloud.

Blocks until pipelines are successfully deployed, deployment failed or timeout exceeds.
If pipelines are already deployed no action will be taken and an info will be logged.
If timeout exceeds a TimeoutError will be raised.
If deployment fails a DeepsetCloudError will be raised.

Pipeline config must be present on Deepset Cloud. See save_to_deepset_cloud() for more information.

**Arguments**:

- `pipeline_config_name`: name of the config file inside the Deepset Cloud workspace.
- `workspace`: workspace in Deepset Cloud
- `api_key`: Secret value of the API key.
If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
- `api_endpoint`: The URL of the Deepset Cloud API.
If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
- `timeout`: The time in seconds to wait until deployment completes.
If the timeout is exceeded an error will be raised.

<a id="base.BasePipeline.undeploy_on_deepset_cloud"></a>

#### undeploy\_on\_deepset\_cloud

```python
@classmethod
def undeploy_on_deepset_cloud(cls, pipeline_config_name: str, workspace: str = "default", api_key: Optional[str] = None, api_endpoint: Optional[str] = None, timeout: int = 60)
```

Undeploys the pipelines of a pipeline config on Deepset Cloud.

Blocks until pipelines are successfully undeployed, undeployment failed or timeout exceeds.
If pipelines are already undeployed no action will be taken and an info will be logged.
If timeout exceeds a TimeoutError will be raised.
If deployment fails a DeepsetCloudError will be raised.

Pipeline config must be present on Deepset Cloud. See save_to_deepset_cloud() for more information.

**Arguments**:

- `pipeline_config_name`: name of the config file inside the Deepset Cloud workspace.
- `workspace`: workspace in Deepset Cloud
- `api_key`: Secret value of the API key.
If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
- `api_endpoint`: The URL of the Deepset Cloud API.
If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
- `timeout`: The time in seconds to wait until undeployment completes.
If the timeout is exceeded an error will be raised.

<a id="base.Pipeline"></a>

## Pipeline
Expand Down
60 changes: 60 additions & 0 deletions haystack/pipelines/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,66 @@ def save_to_deepset_cloud(
client.save_pipeline_config(config=config, pipeline_config_name=pipeline_config_name)
logger.info(f"Pipeline config '{pipeline_config_name}' successfully created.")

@classmethod
def deploy_on_deepset_cloud(
cls,
pipeline_config_name: str,
workspace: str = "default",
api_key: Optional[str] = None,
api_endpoint: Optional[str] = None,
timeout: int = 60,
):
"""
Deploys the pipelines of a pipeline config on Deepset Cloud.
Blocks until pipelines are successfully deployed, deployment failed or timeout exceeds.
If pipelines are already deployed no action will be taken and an info will be logged.
If timeout exceeds a TimeoutError will be raised.
If deployment fails a DeepsetCloudError will be raised.
Pipeline config must be present on Deepset Cloud. See save_to_deepset_cloud() for more information.
:param pipeline_config_name: name of the config file inside the Deepset Cloud workspace.
:param workspace: workspace in Deepset Cloud
:param api_key: Secret value of the API key.
If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
:param api_endpoint: The URL of the Deepset Cloud API.
If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
:param timeout: The time in seconds to wait until deployment completes.
If the timeout is exceeded an error will be raised.
"""
client = DeepsetCloud.get_pipeline_client(api_key=api_key, api_endpoint=api_endpoint, workspace=workspace)
client.deploy(pipeline_config_name=pipeline_config_name, timeout=timeout)

@classmethod
def undeploy_on_deepset_cloud(
cls,
pipeline_config_name: str,
workspace: str = "default",
api_key: Optional[str] = None,
api_endpoint: Optional[str] = None,
timeout: int = 60,
):
"""
Undeploys the pipelines of a pipeline config on Deepset Cloud.
Blocks until pipelines are successfully undeployed, undeployment failed or timeout exceeds.
If pipelines are already undeployed no action will be taken and an info will be logged.
If timeout exceeds a TimeoutError will be raised.
If deployment fails a DeepsetCloudError will be raised.
Pipeline config must be present on Deepset Cloud. See save_to_deepset_cloud() for more information.
:param pipeline_config_name: name of the config file inside the Deepset Cloud workspace.
:param workspace: workspace in Deepset Cloud
:param api_key: Secret value of the API key.
If not specified, will be read from DEEPSET_CLOUD_API_KEY environment variable.
:param api_endpoint: The URL of the Deepset Cloud API.
If not specified, will be read from DEEPSET_CLOUD_API_ENDPOINT environment variable.
:param timeout: The time in seconds to wait until undeployment completes.
If the timeout is exceeded an error will be raised.
"""
client = DeepsetCloud.get_pipeline_client(api_key=api_key, api_endpoint=api_endpoint, workspace=workspace)
client.undeploy(pipeline_config_name=pipeline_config_name, timeout=timeout)


class Pipeline(BasePipeline):
"""
Expand Down
217 changes: 214 additions & 3 deletions haystack/utils/deepsetcloud.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations
from enum import Enum
import logging
import os
from typing import Any, Dict, Generator, List, Optional, Union
import time
from typing import Any, Dict, Generator, List, Optional, Tuple, Union

try:
from typing import Literal
Expand All @@ -12,6 +15,41 @@

DEFAULT_API_ENDPOINT = f"DC_API_PLACEHOLDER/v1" # TODO


class PipelineStatus(Enum):
UNDEPLOYED: str = "UNDEPLOYED"
DEPLOYED_UNHEALTHY: str = "DEPLOYED_UNHEALTHY"
DEPLOYED: str = "DEPLOYED"
DEPLOYMENT_IN_PROGRESS: str = "DEPLOYMENT_IN_PROGRESS"
UNDEPLOYMENT_IN_PROGRESS: str = "UNDEPLOYMENT_IN_PROGRESS"
DEPLOYMENT_SCHEDULED: str = "DEPLOYMENT_SCHEDULED"
UNDEPLOYMENT_SCHEDULED: str = "UNDEPLOYMENT_SCHEDULED"
UKNOWN: str = "UNKNOWN"

@classmethod
def from_str(cls, status_string: str) -> PipelineStatus:
return cls.__dict__.get(status_string, PipelineStatus.UKNOWN)


SATISFIED_STATES_KEY = "satisfied_states"
VALID_INITIAL_STATES_KEY = "valid_initial_states"
VALID_TRANSITIONING_STATES_KEY = "valid_transitioning_states"
PIPELINE_STATE_TRANSITION_INFOS: Dict[PipelineStatus, Dict[str, List[PipelineStatus]]] = {
PipelineStatus.UNDEPLOYED: {
SATISFIED_STATES_KEY: [PipelineStatus.UNDEPLOYED],
VALID_INITIAL_STATES_KEY: [PipelineStatus.DEPLOYED, PipelineStatus.DEPLOYED_UNHEALTHY],
VALID_TRANSITIONING_STATES_KEY: [
PipelineStatus.UNDEPLOYMENT_SCHEDULED,
PipelineStatus.UNDEPLOYMENT_IN_PROGRESS,
],
},
PipelineStatus.DEPLOYED: {
SATISFIED_STATES_KEY: [PipelineStatus.DEPLOYED, PipelineStatus.DEPLOYED_UNHEALTHY],
VALID_INITIAL_STATES_KEY: [PipelineStatus.UNDEPLOYED],
VALID_TRANSITIONING_STATES_KEY: [PipelineStatus.DEPLOYMENT_SCHEDULED, PipelineStatus.DEPLOYMENT_IN_PROGRESS],
},
}

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -393,7 +431,11 @@ def list_pipeline_configs(self, workspace: Optional[str] = None, headers: dict =
return generator

def save_pipeline_config(
self, config: dict, pipeline_config_name: str, workspace: Optional[str] = None, headers: dict = None
self,
config: dict,
pipeline_config_name: Optional[str] = None,
workspace: Optional[str] = None,
headers: dict = None,
):
config["name"] = pipeline_config_name
workspace_url = self._build_workspace_url(workspace=workspace)
Expand All @@ -403,7 +445,11 @@ def save_pipeline_config(
logger.warning(f"Unexpected response from saving pipeline config: {response}")

def update_pipeline_config(
self, config: dict, pipeline_config_name: str, workspace: Optional[str] = None, headers: dict = None
self,
config: dict,
pipeline_config_name: Optional[str] = None,
workspace: Optional[str] = None,
headers: dict = None,
):
config["name"] = pipeline_config_name
pipeline_url = self._build_pipeline_url(workspace=workspace, pipeline_config_name=pipeline_config_name)
Expand All @@ -412,6 +458,171 @@ def update_pipeline_config(
if "name" not in response or response["name"] != pipeline_config_name:
logger.warning(f"Unexpected response from updating pipeline config: {response}")

def deploy(
self, pipeline_config_name: Optional[str] = None, workspace: str = None, headers: dict = None, timeout: int = 60
):
"""
Deploys the pipelines of a pipeline config on Deepset Cloud.
Blocks until pipelines are successfully deployed, deployment failed or timeout exceeds.
If pipelines are already deployed no action will be taken and an info will be logged.
If timeout exceeds a TimeoutError will be raised.
If deployment fails a DeepsetCloudError will be raised.
:param pipeline_config_name: name of the config file inside the Deepset Cloud workspace.
:param workspace: workspace in Deepset Cloud
:param headers: Headers to pass to API call
:param timeout: The time in seconds to wait until deployment completes.
If the timeout is exceeded an error will be raised.
"""
status, changed = self._transition_pipeline_state(
target_state=PipelineStatus.DEPLOYED,
timeout=timeout,
pipeline_config_name=pipeline_config_name,
workspace=workspace,
headers=headers,
)

if status == PipelineStatus.DEPLOYED:
if changed:
logger.info(f"Pipeline config '{pipeline_config_name}' successfully deployed.")
else:
logger.info(f"Pipeline config '{pipeline_config_name}' is already deployed.")
elif status == PipelineStatus.DEPLOYED_UNHEALTHY:
logger.warning(
f"Deployment of pipeline config '{pipeline_config_name}' succeeded. But '{pipeline_config_name}' is unhealthy."
)
elif status in [PipelineStatus.UNDEPLOYMENT_IN_PROGRESS, PipelineStatus.UNDEPLOYMENT_SCHEDULED]:
raise DeepsetCloudError(
f"Deployment of pipline config '{pipeline_config_name}' aborted. Undeployment was requested."
)
elif status == PipelineStatus.UNDEPLOYED:
raise DeepsetCloudError(f"Deployment of pipeline config '{pipeline_config_name}' failed.")
else:
raise DeepsetCloudError(
f"Deployment of pipeline config '{pipeline_config_name} ended in unexpected status: {status.value}"
)

def undeploy(
self, pipeline_config_name: Optional[str] = None, workspace: str = None, headers: dict = None, timeout: int = 60
):
"""
Undeploys the pipelines of a pipeline config on Deepset Cloud.
Blocks until pipelines are successfully undeployed, undeployment failed or timeout exceeds.
If pipelines are already undeployed no action will be taken and an info will be logged.
If timeout exceeds a TimeoutError will be raised.
If deployment fails a DeepsetCloudError will be raised.
:param pipeline_config_name: name of the config file inside the Deepset Cloud workspace.
:param workspace: workspace in Deepset Cloud
:param headers: Headers to pass to API call
:param timeout: The time in seconds to wait until undeployment completes.
If the timeout is exceeded an error will be raised.
"""
status, changed = self._transition_pipeline_state(
target_state=PipelineStatus.UNDEPLOYED,
timeout=timeout,
pipeline_config_name=pipeline_config_name,
workspace=workspace,
headers=headers,
)

if status == PipelineStatus.UNDEPLOYED:
if changed:
logger.info(f"Pipeline config '{pipeline_config_name}' successfully undeployed.")
else:
logger.info(f"Pipeline config '{pipeline_config_name}' is already undeployed.")
elif status in [PipelineStatus.DEPLOYMENT_IN_PROGRESS, PipelineStatus.DEPLOYMENT_SCHEDULED]:
raise DeepsetCloudError(
f"Undeployment of pipline config '{pipeline_config_name}' aborted. Deployment was requested."
)
elif status in [PipelineStatus.DEPLOYED, PipelineStatus.DEPLOYED_UNHEALTHY]:
raise DeepsetCloudError(f"Undeployment of pipeline config '{pipeline_config_name}' failed.")
else:
raise DeepsetCloudError(
f"Undeployment of pipeline config '{pipeline_config_name} ended in unexpected status: {status.value}"
)

def _transition_pipeline_state(
self,
target_state: Literal[PipelineStatus.DEPLOYED, PipelineStatus.UNDEPLOYED],
timeout: int = 60,
pipeline_config_name: Optional[str] = None,
workspace: str = None,
headers: dict = None,
) -> Tuple[PipelineStatus, bool]:
"""
Transitions the pipeline config state to desired target_state on Deepset Cloud.
:param target_state: the target state of the Pipeline config.
:param pipeline_config_name: name of the config file inside the Deepset Cloud workspace.
:param workspace: workspace in Deepset Cloud
:param headers: Headers to pass to API call
:param timeout: The time in seconds to wait until undeployment completes.
If the timeout is exceeded an error will be raised.
"""
pipeline_info = self.get_pipeline_config_info(
pipeline_config_name=pipeline_config_name, workspace=workspace, headers=headers
)
if pipeline_info is None:
raise DeepsetCloudError(f"Pipeline config '{pipeline_config_name}' does not exist.")

transition_info = PIPELINE_STATE_TRANSITION_INFOS[target_state]
satisfied_states = transition_info[SATISFIED_STATES_KEY]
valid_transitioning_states = transition_info[VALID_TRANSITIONING_STATES_KEY]
valid_initial_states = transition_info[VALID_INITIAL_STATES_KEY]

status = PipelineStatus.from_str(pipeline_info["status"])
if status in satisfied_states:
return status, False

if status not in valid_initial_states:
raise DeepsetCloudError(
f"Pipeline config '{pipeline_config_name}' is in invalid state '{status.value}' to be transitioned to '{target_state.value}'."
)

if target_state == PipelineStatus.DEPLOYED:
res = self._deploy(pipeline_config_name=pipeline_config_name, workspace=workspace, headers=headers)
status = PipelineStatus.from_str(res["status"])
elif target_state == PipelineStatus.UNDEPLOYED:
res = self._undeploy(pipeline_config_name=pipeline_config_name, workspace=workspace, headers=headers)
status = PipelineStatus.from_str(res["status"])
else:
raise NotImplementedError(f"Transitioning to state '{target_state.value}' is not implemented.")

start_time = time.time()
while status in valid_transitioning_states:
if time.time() - start_time > timeout:
raise TimeoutError(
f"Transitioning of '{pipeline_config_name}' to state '{target_state.value}' timed out."
)
pipeline_info = self.get_pipeline_config_info(
pipeline_config_name=pipeline_config_name, workspace=workspace, headers=headers
)
if pipeline_info is None:
raise DeepsetCloudError(f"Pipeline config '{pipeline_config_name}' does not exist anymore.")
status = PipelineStatus.from_str(pipeline_info["status"])
if status in valid_transitioning_states:
logger.info(f"Current status of '{pipeline_config_name}' is: '{status}'")
time.sleep(5)

return status, True

def _deploy(
self, pipeline_config_name: Optional[str] = None, workspace: Optional[str] = None, headers: dict = None
) -> dict:
pipeline_url = self._build_pipeline_url(workspace=workspace, pipeline_config_name=pipeline_config_name)
deploy_url = f"{pipeline_url}/deploy"
response = self.client.post(url=deploy_url, headers=headers).json()
return response

def _undeploy(
self, pipeline_config_name: Optional[str] = None, workspace: Optional[str] = None, headers: dict = None
) -> dict:
pipeline_url = self._build_pipeline_url(workspace=workspace, pipeline_config_name=pipeline_config_name)
undeploy_url = f"{pipeline_url}/undeploy"
response = self.client.post(url=undeploy_url, headers=headers).json()
return response

def _build_pipeline_url(self, workspace: Optional[str] = None, pipeline_config_name: Optional[str] = None):
if pipeline_config_name is None:
pipeline_config_name = self.pipeline_config_name
Expand Down
Loading

0 comments on commit fd46a42

Please sign in to comment.