Skip to content

Commit

Permalink
feat: processing layer (#3603)
Browse files Browse the repository at this point in the history
* Fixes for API tests

* get_collection_index

* Missing files

* bunch of changes

* properly set revision_of

* failure

* failure layer

* more merge
  • Loading branch information
ebezzi authored Dec 5, 2022
1 parent 7bd523a commit 637c224
Show file tree
Hide file tree
Showing 19 changed files with 241 additions and 79 deletions.
1 change: 1 addition & 0 deletions Dockerfile.upload_failures
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
FROM public.ecr.aws/lambda/python:3.8

COPY backend/layers/processing/upload_failures .
COPY backend/layers ./backend/layers

RUN pip3 install -r requirements.txt

Expand Down
Empty file added backend/layers/__init__.py
Empty file.
19 changes: 11 additions & 8 deletions backend/layers/api/portal_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from backend.layers.business.business_interface import BusinessLogicInterface
from backend.layers.business.entities import CollectionMetadataUpdate, CollectionQueryFilter
from backend.layers.business.exceptions import ArtifactNotFoundException, CollectionCreationException, CollectionIsPublishedException, CollectionNotFoundException, CollectionPublishException, CollectionUpdateException, CollectionVersionException, DatasetInWrongStatusException, DatasetNotFoundException, InvalidURIException, MaxFileSizeExceededException
from backend.layers.common.entities import CollectionId, CollectionMetadata, CollectionVersion, CollectionVersionId, DatasetArtifact, DatasetId, DatasetStatus, DatasetVersion, DatasetVersionId, Link, OntologyTermId
from backend.layers.common.entities import CollectionId, CollectionMetadata, CollectionVersion, CollectionVersionId, DatasetArtifact, DatasetArtifactId, DatasetId, DatasetStatus, DatasetVersion, DatasetVersionId, Link, OntologyTermId

from backend.common.utils import authorization_checks as auth
from backend.common.utils.ontology_mappings.ontology_map_loader import ontology_mappings
Expand Down Expand Up @@ -47,13 +47,16 @@ def get_collections_list(self, from_date: int = None, to_date: int = None, token

collections = []
for c in itertools.chain(all_published_collections, all_owned_collections):
collections.append({
collection = {
"id": c.version_id.id if c.published_at is None else c.collection_id.id,
"visibility": "PRIVATE" if c.published_at is None else "PUBLIC",
"owner": c.owner,
"created_at": c.created_at,
# "revision_of": "NA", # TODO: looks like this isn't returned right now
})
}
if c.published_at is None:
collection["revision_of"] = c.collection_id.id
collections.append(collection)


result = {"collections": collections}
return make_response(jsonify(result), 200)
Expand Down Expand Up @@ -89,7 +92,7 @@ def _dataset_asset_to_response(self, dataset_artifact: DatasetArtifact, dataset_
"created_at": 0,
"dataset_id": dataset_id,
"filename": "TODO", # TODO: might need to get it from the url
"filetype": dataset_artifact.type,
"filetype": dataset_artifact.type.upper(),
"id": dataset_artifact.id.id,
"s3_uri": dataset_artifact.uri,
"updated_at": 0,
Expand Down Expand Up @@ -117,7 +120,7 @@ def _dataset_to_response(self, dataset: DatasetVersion):
"cell_type": None if dataset.metadata is None else self._ontology_term_ids_to_response(dataset.metadata.cell_type),
"collection_id": dataset.collection_id.id,
"created_at": dataset.created_at,
"dataset_assets": [self._dataset_asset_to_response(a, dataset.dataset_id.id) for a in dataset.artifacts],
"dataset_assets": [self._dataset_asset_to_response(a, dataset.version_id.id) for a in dataset.artifacts],
"dataset_deployments": [{"url": "TODO"}], # TODO: dataset.metadata.explorer_url,
"development_stage": None if dataset.metadata is None else self._ontology_term_ids_to_response(dataset.metadata.development_stage),
"disease": None if dataset.metadata is None else self._ontology_term_ids_to_response(dataset.metadata.disease),
Expand All @@ -128,7 +131,7 @@ def _dataset_to_response(self, dataset: DatasetVersion):
"mean_genes_per_cell": None if dataset.metadata is None else dataset.metadata.mean_genes_per_cell,
"name": "" if dataset.metadata is None else dataset.metadata.name,
"organism": None if dataset.metadata is None else self._ontology_term_ids_to_response(dataset.metadata.organism),
"processing_status": self._dataset_processing_status_to_response(dataset.status, dataset.dataset_id.id),
"processing_status": self._dataset_processing_status_to_response(dataset.status, dataset.version_id.id),
"published": True, # TODO
"published_at": dataset.canonical_dataset.published_at,
"revision": 0, # TODO this is the progressive revision number. I don't think we'll need this
Expand Down Expand Up @@ -385,7 +388,7 @@ def post_dataset_asset(self, dataset_id: str, asset_id: str):
raise NotFoundHTTPException(detail=f"'dataset/{dataset_id}' not found.")

try:
download_data = self.business_logic.get_dataset_artifact_download_data(DatasetVersionId(dataset_id), asset_id)
download_data = self.business_logic.get_dataset_artifact_download_data(DatasetVersionId(dataset_id), DatasetArtifactId(asset_id))
except ArtifactNotFoundException:
raise NotFoundHTTPException(detail=f"'dataset/{dataset_id}/asset/{asset_id}' not found.")

Expand Down
6 changes: 3 additions & 3 deletions backend/layers/business/business.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def create_collection(self, owner: str, collection_metadata: CollectionMetadata)

return created_version

def get_published_collection_version(self, collection_id: CollectionId) -> Optional[CollectionVersion]:
def get_published_collection_version(self, collection_id: CollectionId) -> Optional[CollectionVersionWithDatasets]:
"""
Returns the published collection version that belongs to a canonical collection.
Returns None if no published collection exists
Expand All @@ -125,13 +125,13 @@ def get_collection_version(self, version_id: CollectionVersionId) -> CollectionV
"""
return self.database_provider.get_collection_version_with_datasets(version_id)

def get_collection_versions_from_canonical(self, collection_id: CollectionId) -> Iterable[CollectionVersion]:
def get_collection_versions_from_canonical(self, collection_id: CollectionId) -> Iterable[CollectionVersionWithDatasets]:
"""
Returns all the collection versions connected to a canonical collection
"""
return self.database_provider.get_all_versions_for_collection(collection_id)

def get_collection_version_from_canonical(self, collection_id: CollectionId) -> Optional[CollectionVersion]:
def get_collection_version_from_canonical(self, collection_id: CollectionId) -> Optional[CollectionVersionWithDatasets]:
"""
Returns the published collection version mapped to a canonical collection, if available.
Otherwise will return the active unpublished version.
Expand Down
2 changes: 1 addition & 1 deletion backend/layers/business/business_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def get_dataset_artifacts(self, dataset_version_id: DatasetVersionId) -> Iterabl
pass

def get_dataset_artifact_download_data(
self, dataset_version_id: DatasetVersionId, artifact_id: str
self, dataset_version_id: DatasetVersionId, artifact_id: DatasetArtifactId
) -> DatasetArtifactDownloadData:
pass

Expand Down
46 changes: 18 additions & 28 deletions backend/layers/common/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ class DatasetProcessingStatus(DatasetStatusGeneric, Enum):
FAILURE = "FAILURE"


class CollectionLinkType(str, Enum):
DOI = "doi"
RAW_DATA = "raw_data"
PROTOCOL = "protocol"
LAB_WEBSITE = "lab_website"
OTHER = "other"
DATA_SOURCE = "data_source"


class DatasetArtifactType(str, Enum):
RAW_H5AD = "raw_h5ad"
H5AD = "h5ad"
RDS = "rds"
CXG = "cxg"



@dataclass_json
@dataclass
class DatasetStatus:
Expand Down Expand Up @@ -115,7 +132,7 @@ def __repr__(self) -> str:
@dataclass
class DatasetArtifact:
id: DatasetArtifactId
type: str
type: DatasetArtifactType
uri: str


Expand Down Expand Up @@ -210,30 +227,3 @@ class CollectionVersion(CollectionVersionBase):
@dataclass
class CollectionVersionWithDatasets(CollectionVersionBase):
datasets: List[DatasetVersion]


class CollectionLinkType(str, Enum):
DOI = "doi"
RAW_DATA = "raw_data"
PROTOCOL = "protocol"
LAB_WEBSITE = "lab_website"
OTHER = "other"
DATA_SOURCE = "data_source"


class DatasetArtifactType(str, Enum):
"""
Enumerates DatasetArtifact file types.
H5AD - An AnnData object describing an expression matrix, post-processing by cellxgene pipeline.
Uses the .h5ad extension.
RAW_H5AD - An AnnData object describing an expression matrix, as directly uploaded by users.
Uses the .h5ad extension.
RDS - A Seurat file object describing an expression matrix. Uses the .rds extension.
CXG - A TileDb object describing a cellxgene object. Uses .cxg extension.
"""

RAW_H5AD = "raw_h5ad"
H5AD = "h5ad"
RDS = "rds"
CXG = "cxg"
31 changes: 20 additions & 11 deletions backend/layers/persistence/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def get_collection_version_with_datasets(self, version_id: CollectionVersionId)
return self._row_to_collection_version_with_datasets(collection_version, canonical_collection, datasets)


def get_collection_mapped_version(self, collection_id: CollectionId) -> Optional[CollectionVersion]:
def get_collection_mapped_version(self, collection_id: CollectionId) -> Optional[CollectionVersionWithDatasets]:
"""
Retrieves the latest mapped version for a collection
"""
Expand All @@ -228,9 +228,10 @@ def get_collection_mapped_version(self, collection_id: CollectionId) -> Optional
version_id = version_id[0]
collection_version = session.query(CollectionVersionTable).filter_by(version_id=version_id).one()
canonical_collection = self.get_canonical_collection(collection_id)
return self._row_to_collection_version(collection_version, canonical_collection)
datasets = self._get_datasets([DatasetVersionId(str(id)) for id in collection_version.datasets])
return self._row_to_collection_version_with_datasets(collection_version, canonical_collection, datasets)

def get_all_versions_for_collection(self, collection_id: CollectionId) -> List[CollectionVersion]:
def get_all_versions_for_collection(self, collection_id: CollectionId) -> List[CollectionVersionWithDatasets]:
"""
Retrieves all versions for a specific collections, without filtering
"""
Expand All @@ -239,7 +240,8 @@ def get_all_versions_for_collection(self, collection_id: CollectionId) -> List[C
canonical_collection = self.get_canonical_collection(collection_id)
versions = list()
for i in range(len(version_rows)):
version = self._row_to_collection_version(version_rows[i], canonical_collection)
datasets = self._get_datasets([DatasetVersionId(str(id)) for id in version_rows[i].datasets])
version = self._row_to_collection_version_with_datasets(version_rows[i], canonical_collection, datasets)
versions.append(version)
return versions

Expand All @@ -259,20 +261,27 @@ def get_all_mapped_collection_versions(self, get_tombstoned: bool = False) -> It
"""
with self.db_session_manager() as session:
if get_tombstoned:
mapped_version_ids = session.query(CollectionTable.version_id).filter(CollectionTable.version_id.isnot(None)).all() # noqa
canonical_collections = session.query(CollectionTable).filter(CollectionTable.version_id.isnot(None)).all() # noqa
else:
mapped_version_ids = session.query(CollectionTable.version_id)\
canonical_collections = session.query(CollectionTable)\
.filter(CollectionTable.version_id.isnot(None))\
.filter_by(tombstoned=False)\
.all()

# TODO: Very hacky
mapped_version_ids = [i[0] for i in mapped_version_ids]

mapped_version_ids = [i.version_id for i in canonical_collections]
versions = session.query(CollectionVersionTable).filter(CollectionVersionTable.version_id.in_(mapped_version_ids)).all() # noqa

# TODO: do we need to hydrate versions with canonical collections? would require a join or many lookup calls
return [self._row_to_collection_version(v, None) for v in versions]
for version in versions:
# TODO: should be optimized using a map
canonical_row = next(cc for cc in canonical_collections if cc.version_id == version.version_id)
canonical = CanonicalCollection(
CollectionId(str(canonical_row.id)),
CollectionVersionId(str(canonical_row.version_id)),
canonical_row.originally_published_at,
canonical_row.tombstoned
)

yield self._row_to_collection_version(version, canonical)

def delete_canonical_collection(self, collection_id: CollectionId) -> None:
"""
Expand Down
Empty file.
12 changes: 12 additions & 0 deletions backend/layers/processing/make_seurat.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
library(sceasy)

require(devtools)

h5adPath <- commandArgs(trailingOnly = TRUE)[1]

sceasy::convertFormat(h5adPath,
from="anndata",
to="seurat",
outFile = gsub(".h5ad", ".rds", h5adPath),
main_layer = "data",
target_uns_keys = c("schema_version", "title", "batch_condition", "default_embedding", "X_approximate_distribution"))
Empty file.
100 changes: 100 additions & 0 deletions backend/layers/processing/upload_failures/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import json
import logging
import os

from backend.common.utils.aws import delete_many_from_s3
from backend.common.utils.json import CustomJSONEncoder
from backend.common.utils.result_notification import format_failed_batch_issue_slack_alert, notify_slack
from backend.layers.business.business import BusinessLogic
from backend.layers.persistence.persistence import DatabaseProvider
from backend.layers.common.entities import DatasetProcessingStatus, DatasetStatusKey, DatasetVersionId

database_provider = DatabaseProvider()
business_logic = BusinessLogic(database_provider, None, None, None, None)


def handle_failure(event: dict, context) -> None:
dataset_id = event["dataset_id"]
trigger_slack_notification(dataset_id)
update_dataset_processing_status_to_failed(dataset_id, event["error"]["Cause"])

error_step_name = get_error_step_name(event)
object_key = os.path.join(os.environ.get("REMOTE_DEV_PREFIX", ""), dataset_id).strip("/")
# clean up artifacts depending on error; default to full clean-up if error step is unknown
if not error_step_name or error_step_name == "download-validate":
delete_many_from_s3(os.environ["ARTIFACT_BUCKET"], object_key)
if not error_step_name or error_step_name == "cxg":
cellxgene_bucket = os.getenv("CELLXGENE_BUCKET", default=f"hosted-cellxgene-{os.environ['DEPLOYMENT_STAGE']}")
delete_many_from_s3(cellxgene_bucket, object_key)


def update_dataset_processing_status_to_failed(dataset_id, error=None) -> None:
"""
This functions updates the processing status for a given dataset uuid to failed
"""
try:
business_logic.update_dataset_version_status(
DatasetVersionId(dataset_id),
DatasetStatusKey.PROCESSING,
DatasetProcessingStatus.FAILURE
)
# If dataset not in db dont worry about updating its processing status
except Exception:
pass


def get_error_step_name(event: dict) -> str:
"""
This function derives the name of the step that failed
"""
error_cause_dict = json.loads(event["error"]["Cause"])
error_job_name = None
if "JobName" in error_cause_dict:
error_job_name = error_cause_dict["JobName"]
return error_job_name


def trigger_slack_notification(dataset_id):
dataset = business_logic.get_dataset_version(DatasetVersionId(dataset_id))
if dataset is None:
return
collection_id = dataset.collection_id
collection = business_logic.get_collection_version_from_canonical(collection_id)
if collection is None:
return
collection_id, collection_owner = collection.version_id, collection.owner
processing_status = dataset.status.to_json()

data = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "Dataset failed to process:fire:",
"emoji": True,
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"Dataset processing job failed! @sc-oncall-eng\n"
f"*Owner*: {collection_owner}\n"
f"*Collection*: https://cellxgene.cziscience.com/collections/{collection_id}\n"
f"*Processing Status*:\n",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"```{json.dumps(processing_status, cls=CustomJSONEncoder, indent=2, sort_keys=True)}```",
},
},
]
}
batch_alert_data = format_failed_batch_issue_slack_alert(data)
logger = logging.getLogger(__name__)
logger.info(batch_alert_data)
notify_slack(data)
7 changes: 7 additions & 0 deletions backend/layers/processing/upload_failures/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
dataclasses-json
psycopg2-binary>=2.8.5
SQLAlchemy>=1.3.17,<2
requests>=2.22.0
pyrsistent
boto3
rsa>=4.7 # not directly required, pinned by Snyk to avoid a vulnerability
Empty file.
27 changes: 27 additions & 0 deletions backend/layers/processing/upload_success/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import logging

from backend.layers.business.business import BusinessLogic
from backend.layers.persistence.persistence import DatabaseProvider
from backend.layers.common.entities import DatasetProcessingStatus, DatasetStatusKey, DatasetVersionId

database_provider = DatabaseProvider()
business_logic = BusinessLogic(database_provider, None, None, None, None)

logger = logging.getLogger(__name__)


def success_handler(event: dict, context) -> None:
"""
Lambda function invoked by the ingestion step function that updates
the processing status for the specified dataset to SUCCESS
:param event: Lambda's event object
:param context: Lambda's context object
:return:
"""
dataset_id = event["dataset_id"]

business_logic.update_dataset_version_status(
DatasetVersionId(dataset_id),
DatasetStatusKey.PROCESSING,
DatasetProcessingStatus.SUCCESS
)
7 changes: 7 additions & 0 deletions backend/layers/processing/upload_success/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
dataclasses-json
psycopg2-binary>=2.8.5
SQLAlchemy>=1.3.17,<2
requests>=2.22.0
pyrsistent
boto3
rsa>=4.7 # not directly required, pinned by Snyk to avoid a vulnerability
Loading

0 comments on commit 637c224

Please sign in to comment.