Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enumerating Airlock request files after request submission #2504

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
35b8a1d
add method to enumerate airlock request files
yuvalyaron Aug 22, 2022
dd312c9
add request_files to the output event of BlobCreatedTrigger
yuvalyaron Aug 22, 2022
fe0fce2
consume request_files in the api
yuvalyaron Aug 23, 2022
1771410
Merge branch 'main' of https://github.com/microsoft/AzureTRE into fea…
yuvalyaron Aug 23, 2022
1a1a1be
fix merge conflict and add log
yuvalyaron Aug 23, 2022
f272be5
clean code in BlobCreatedTrigger
yuvalyaron Aug 23, 2022
7f05002
update api and airlock processor versions
yuvalyaron Aug 23, 2022
6ad8ef1
refine comment
yuvalyaron Aug 24, 2022
979f5c2
enumerate request files on failures
yuvalyaron Aug 24, 2022
d354f6a
add test for get_request_files in StatusChangedQueueTrigger
yuvalyaron Aug 24, 2022
f537857
add test for get_request_files in BlobCreatedTrigger
yuvalyaron Aug 24, 2022
0919968
Merge branch 'main' of https://github.com/microsoft/AzureTRE into fea…
yuvalyaron Aug 24, 2022
ed1e071
refine log
yuvalyaron Aug 24, 2022
ac47739
remove empty lines
yuvalyaron Aug 24, 2022
c46f11d
update changelog
yuvalyaron Aug 24, 2022
4d03250
revet file enumeration in BlobCreatedTrigger
yuvalyaron Aug 25, 2022
4671a13
add file enumeration to StatusChangedQueueTrigger
yuvalyaron Aug 28, 2022
6d56239
add support for concurrent updates of airlock requests and for file e…
yuvalyaron Aug 28, 2022
bad2a31
add test for file enumeration in StatusChangedQueueTrigger
yuvalyaron Aug 28, 2022
69e2058
add ETag test case for update_airlock_request
yuvalyaron Aug 28, 2022
4355c9a
Merge branch 'main' of https://github.com/microsoft/AzureTRE into fea…
yuvalyaron Aug 28, 2022
a16d09a
update api version
yuvalyaron Aug 28, 2022
d4d1ef6
Merge branch 'main' of https://github.com/microsoft/AzureTRE into fea…
yuvalyaron Aug 29, 2022
c6e1a65
Merge branch 'main' into feature/2267-enumerating-airlock-files-when-…
yuvalyaron Aug 30, 2022
660732c
update api version
yuvalyaron Aug 30, 2022
a529aec
Merge branch 'main' into feature/2267-enumerating-airlock-files-when-…
yuvalyaron Aug 30, 2022
b9e3264
Merge branch 'main' into feature/2267-enumerating-airlock-files-when-…
yuvalyaron Aug 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ENHANCEMENTS:
* Block anonymous access to 2 storage accounts ([#2524](https://github.com/microsoft/AzureTRE/pull/2524))
* Gitea shared service support app-service standard SKUs ([#2523](https://github.com/microsoft/AzureTRE/pull/2523))
* Keyvault diagnostic settings in base workspace ([#2521](https://github.com/microsoft/AzureTRE/pull/2521))
* Airlock requests contain a field with information about the files that were submitted ([#2504](https://github.com/microsoft/AzureTRE/pull/2504))

BUG FIXES:

Expand Down
35 changes: 28 additions & 7 deletions airlock_processor/StatusChangedQueueTrigger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,18 @@ def __init__(self, source_account_name: str, dest_account_name: str):
def main(msg: func.ServiceBusMessage, outputEvent: func.Out[func.EventGridOutputEvent]):
try:
request_properties = extract_properties(msg)
handle_status_changed(request_properties)
request_files = get_request_files(request_properties) if request_properties.status == constants.STAGE_SUBMITTED else None
handle_status_changed(request_properties, outputEvent, request_files)

except NoFilesInRequestException:
report_failure(outputEvent, request_properties, failure_reason=constants.NO_FILES_IN_REQUEST_MESSAGE)
set_output_event_to_report_failure(outputEvent, request_properties, failure_reason=constants.NO_FILES_IN_REQUEST_MESSAGE, request_files=request_files)
except TooManyFilesInRequestException:
report_failure(outputEvent, request_properties, failure_reason=constants.TOO_MANY_FILES_IN_REQUEST_MESSAGE)
set_output_event_to_report_failure(outputEvent, request_properties, failure_reason=constants.TOO_MANY_FILES_IN_REQUEST_MESSAGE, request_files=request_files)
except Exception:
report_failure(outputEvent, request_properties, failure_reason=constants.UNKNOWN_REASON_MESSAGE)
set_output_event_to_report_failure(outputEvent, request_properties, failure_reason=constants.UNKNOWN_REASON_MESSAGE, request_files=request_files)


def handle_status_changed(request_properties: RequestProperties):
def handle_status_changed(request_properties: RequestProperties, outputEvent: func.Out[func.EventGridOutputEvent], request_files):
new_status = request_properties.status
req_id = request_properties.request_id
ws_id = request_properties.workspace_id
Expand All @@ -65,6 +66,9 @@ def handle_status_changed(request_properties: RequestProperties):
blob_operations.create_container(account_name, req_id)
return

if new_status == constants.STAGE_SUBMITTED:
set_output_event_to_report_request_files(outputEvent, request_properties, request_files)

if (is_require_data_copy(new_status)):
logging.info('Request with id %s. requires data copy between storage accounts', req_id)
containers_metadata = get_source_dest_for_copy(new_status, request_type, ws_id)
Expand Down Expand Up @@ -148,13 +152,30 @@ def get_source_dest_for_copy(new_status: str, request_type: str, short_workspace
return ContainersCopyMetadata(source_account_name, dest_account_name)


def report_failure(outputEvent, request_properties, failure_reason):
def set_output_event_to_report_failure(outputEvent, request_properties, failure_reason, request_files):
logging.exception(f"Failed processing Airlock request with ID: '{request_properties.request_id}', changing request status to '{constants.STAGE_FAILED}'.")
outputEvent.set(
func.EventGridOutputEvent(
id=str(uuid.uuid4()),
data={"completed_step": request_properties.status, "new_status": constants.STAGE_FAILED, "request_id": request_properties.request_id, "error_message": failure_reason},
data={"completed_step": request_properties.status, "new_status": constants.STAGE_FAILED, "request_id": request_properties.request_id, "request_files": request_files, "error_message": failure_reason},
subject=request_properties.request_id,
event_type="Airlock.StepResult",
event_time=datetime.datetime.utcnow(),
data_version=constants.STEP_RESULT_EVENT_DATA_VERSION))


def set_output_event_to_report_request_files(outputEvent, request_properties, request_files):
outputEvent.set(
func.EventGridOutputEvent(
id=str(uuid.uuid4()),
data={"completed_step": request_properties.status, "request_id": request_properties.request_id, "request_files": request_files},
subject=request_properties.request_id,
event_type="Airlock.StepResult",
event_time=datetime.datetime.utcnow(),
data_version=constants.STEP_RESULT_EVENT_DATA_VERSION))


def get_request_files(request_properties):
containers_metadata = get_source_dest_for_copy(request_properties.status, request_properties.type, request_properties.workspace_id)
storage_account_name = containers_metadata.source_account_name
return blob_operations.get_request_files(account_name=storage_account_name, request_id=request_properties.request_id)
2 changes: 1 addition & 1 deletion airlock_processor/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.5"
__version__ = "0.4.6"
11 changes: 11 additions & 0 deletions airlock_processor/shared_code/blob_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ def create_container(account_name: str, request_id: str):
logging.info(f'Did not create a new container. Container already exists for request id: {request_id}.')


def get_request_files(account_name: str, request_id: str) -> list:
files = []
blob_service_client = BlobServiceClient(account_url=get_account_url(account_name), credential=get_credential())
container_client = blob_service_client.get_container_client(container=request_id)

for blob in container_client.list_blobs():
files.append({"name": blob.name, "size": blob.size})

return files


def copy_data(source_account_name: str, destination_account_name: str, request_id: str):
credential = get_credential()
container_name = request_id
Expand Down
49 changes: 48 additions & 1 deletion airlock_processor/tests/test_status_change_queue_trigger.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from json import JSONDecodeError
import os
import unittest
from unittest import mock
from unittest.mock import MagicMock, patch

from pydantic import ValidationError
from StatusChangedQueueTrigger import extract_properties, get_source_dest_for_copy, is_require_data_copy
from StatusChangedQueueTrigger import get_request_files, main, extract_properties, get_source_dest_for_copy, is_require_data_copy
from azure.functions.servicebus import ServiceBusMessage
from shared_code import constants


class TestPropertiesExtraction(unittest.TestCase):
Expand Down Expand Up @@ -62,6 +66,49 @@ def test_wrong_type_raises_when_getting_storage_account_properties(self):
self.assertRaises(Exception, get_source_dest_for_copy, "accepted", "somethingelse")


class TestFileEnumeration(unittest.TestCase):
guybartal marked this conversation as resolved.
Show resolved Hide resolved
@patch("StatusChangedQueueTrigger.set_output_event_to_report_request_files")
@patch("StatusChangedQueueTrigger.get_request_files")
@patch("StatusChangedQueueTrigger.is_require_data_copy", return_value=False)
@mock.patch.dict(os.environ, {"TRE_ID": "tre-id"}, clear=True)
def test_get_request_files_should_be_called_on_submit_stage(self, _, mock_get_request_files, mock_set_output_event_to_report_request_files):
message_body = "{ \"data\": { \"request_id\":\"123\",\"status\":\"submitted\" , \"type\":\"import\", \"workspace_id\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
main(msg=message, outputEvent=MagicMock())
self.assertTrue(mock_get_request_files.called)
self.assertTrue(mock_set_output_event_to_report_request_files.called)

@patch("StatusChangedQueueTrigger.set_output_event_to_report_failure")
@patch("StatusChangedQueueTrigger.get_request_files")
@patch("StatusChangedQueueTrigger.handle_status_changed")
def test_get_request_files_should_not_be_called_if_new_status_is_not_submit(self, _, mock_get_request_files, mock_set_output_event_to_report_failure):
message_body = "{ \"data\": { \"request_id\":\"123\",\"status\":\"fake-status\" , \"type\":\"import\", \"workspace_id\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
main(msg=message, outputEvent=MagicMock())
self.assertFalse(mock_get_request_files.called)
self.assertFalse(mock_set_output_event_to_report_failure.called)

@patch("StatusChangedQueueTrigger.set_output_event_to_report_failure")
@patch("StatusChangedQueueTrigger.get_request_files")
@patch("StatusChangedQueueTrigger.handle_status_changed", side_effect=Exception)
def test_get_request_files_should_be_called_when_failing_during_submit_stage(self, _, mock_get_request_files, mock_set_output_event_to_report_failure):
message_body = "{ \"data\": { \"request_id\":\"123\",\"status\":\"submitted\" , \"type\":\"import\", \"workspace_id\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
main(msg=message, outputEvent=MagicMock())
self.assertTrue(mock_get_request_files.called)
self.assertTrue(mock_set_output_event_to_report_failure.called)

@patch("StatusChangedQueueTrigger.blob_operations.get_request_files")
@mock.patch.dict(os.environ, {"TRE_ID": "tre-id"}, clear=True)
def test_get_request_files_called_with_correct_storage_account(self, mock_get_request_files):
source_storage_account_for_submitted_stage = constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL + 'ws1'
message_body = "{ \"data\": { \"request_id\":\"123\",\"status\":\"submitted\" , \"type\":\"export\", \"workspace_id\":\"ws1\" }}"
message = _mock_service_bus_message(body=message_body)
request_properties = extract_properties(message)
get_request_files(request_properties)
mock_get_request_files.assert_called_with(account_name=source_storage_account_for_submitted_stage, request_id=request_properties.request_id)


def _mock_service_bus_message(body: str):
encoded_body = str.encode(body, "utf-8")
message = ServiceBusMessage(body=encoded_body, message_id="123", user_properties={})
Expand Down
2 changes: 1 addition & 1 deletion api_app/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.21"
__version__ = "0.4.22"
6 changes: 3 additions & 3 deletions api_app/api/routes/airlock_resource_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from fastapi import HTTPException
from starlette import status
from db.repositories.airlock_requests import AirlockRequestRepository
from models.domain.airlock_request import AirlockActions, AirlockRequest, AirlockRequestStatus, AirlockRequestType, AirlockReview
from models.domain.airlock_request import AirlockActions, AirlockFile, AirlockRequest, AirlockRequestStatus, AirlockRequestType, AirlockReview
from event_grid.event_sender import send_status_changed_event, send_airlock_notification_event
from models.domain.authentication import User
from models.domain.workspace import Workspace
Expand Down Expand Up @@ -42,10 +42,10 @@ async def save_and_publish_event_airlock_request(airlock_request: AirlockRequest
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=strings.EVENT_GRID_GENERAL_ERROR_MESSAGE)


async def update_and_publish_event_airlock_request(airlock_request: AirlockRequest, airlock_request_repo: AirlockRequestRepository, user: User, new_status: AirlockRequestStatus, workspace: Workspace, error_message: str = None, airlock_review: AirlockReview = None):
async def update_and_publish_event_airlock_request(airlock_request: AirlockRequest, airlock_request_repo: AirlockRequestRepository, user: User, new_status: AirlockRequestStatus, workspace: Workspace, request_files: List[AirlockFile] = None, error_message: str = None, airlock_review: AirlockReview = None):
try:
logging.debug(f"Updating airlock request item: {airlock_request.id}")
updated_airlock_request = airlock_request_repo.update_airlock_request(airlock_request=airlock_request, new_status=new_status, user=user, error_message=error_message, airlock_review=airlock_review)
updated_airlock_request = airlock_request_repo.update_airlock_request(original_request=airlock_request, user=user, new_status=new_status, request_files=request_files, error_message=error_message, airlock_review=airlock_review)
except Exception as e:
logging.error(f'Failed updating airlock_request item {airlock_request}: {e}')
# If the validation failed, the error was not related to the saving itself
Expand Down
58 changes: 40 additions & 18 deletions api_app/db/repositories/airlock_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@
from datetime import datetime
from typing import List
from pydantic import UUID4
from azure.cosmos.exceptions import CosmosResourceNotFoundError
from azure.cosmos.exceptions import CosmosResourceNotFoundError, CosmosAccessConditionFailedError
from azure.cosmos import CosmosClient
from starlette import status
from fastapi import HTTPException
from pydantic import parse_obj_as
from models.domain.authentication import User
from db.errors import EntityDoesNotExist
from models.domain.airlock_request import AirlockRequest, AirlockRequestStatus, AirlockReview, AirlockReviewDecision, AirlockRequestHistoryItem, AirlockRequestType
from models.domain.airlock_request import AirlockFile, AirlockRequest, AirlockRequestStatus, AirlockReview, AirlockReviewDecision, AirlockRequestHistoryItem, AirlockRequestType
from models.schemas.airlock_request import AirlockRequestInCreate, AirlockReviewInCreate
from core import config
from resources import strings
from db.repositories.base import BaseRepository
import logging


class AirlockRequestRepository(BaseRepository):
Expand Down Expand Up @@ -43,7 +44,7 @@ def update_airlock_request_item(self, original_request: AirlockRequest, new_requ
new_request.user = user
new_request.updatedWhen = self.get_timestamp()

self.update_item(new_request)
self.upsert_item_with_etag(new_request, new_request.etag)
return new_request

@staticmethod
Expand Down Expand Up @@ -126,21 +127,17 @@ def get_airlock_request_by_id(self, airlock_request_id: UUID4) -> AirlockRequest
raise EntityDoesNotExist
return parse_obj_as(AirlockRequest, airlock_requests)

def update_airlock_request(self, airlock_request: AirlockRequest, new_status: AirlockRequestStatus, user: User, error_message: str = None, airlock_review: AirlockReview = None) -> AirlockRequest:
current_status = airlock_request.status
if self.validate_status_update(current_status, new_status):
updated_request = copy.deepcopy(airlock_request)
updated_request.status = new_status
if new_status == AirlockRequestStatus.Failed:
updated_request.errorMessage = error_message
if airlock_review is not None:
if updated_request.reviews is None:
updated_request.reviews = [airlock_review]
else:
updated_request.reviews.append(airlock_review)
yuvalyaron marked this conversation as resolved.
Show resolved Hide resolved
return self.update_airlock_request_item(airlock_request, updated_request, user, {"previousStatus": current_status})
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=strings.AIRLOCK_REQUEST_ILLEGAL_STATUS_CHANGE)
def update_airlock_request(self, original_request: AirlockRequest, user: User, new_status: AirlockRequestStatus = None, request_files: List[AirlockFile] = None, error_message: str = None, airlock_review: AirlockReview = None) -> AirlockRequest:
updated_request = self._build_updated_request(original_request=original_request, new_status=new_status, request_files=request_files, error_message=error_message, airlock_review=airlock_review)
try:
db_response = self.update_airlock_request_item(original_request, updated_request, user, {"previousStatus": original_request.status})
except CosmosAccessConditionFailedError:
guybartal marked this conversation as resolved.
Show resolved Hide resolved
logging.warning(f"ETag mismatch for request ID: '{original_request.id}'. Retrying.")
original_request = self.get_airlock_request_by_id(original_request.id)
updated_request = self._build_updated_request(original_request=original_request, new_status=new_status, request_files=request_files, error_message=error_message, airlock_review=airlock_review)
db_response = self.update_airlock_request_item(original_request, updated_request, user, {"previousStatus": original_request.status})

return db_response

def get_airlock_request_spec_params(self):
return self.get_resource_base_spec_params()
Expand All @@ -158,3 +155,28 @@ def create_airlock_review_item(self, airlock_review_input: AirlockReviewInCreate
)

return airlock_review

def _build_updated_request(self, original_request: AirlockRequest, new_status: AirlockRequestStatus = None, request_files: List[AirlockFile] = None, error_message: str = None, airlock_review: AirlockReview = None) -> AirlockRequest:
updated_request = copy.deepcopy(original_request)

if new_status is not None:
self._validate_status_update(current_status=original_request.status, new_status=new_status)
updated_request.status = new_status

if error_message is not None:
updated_request.errorMessage = error_message

if request_files is not None:
updated_request.files = request_files

if airlock_review is not None:
if updated_request.reviews is None:
updated_request.reviews = [airlock_review]
else:
updated_request.reviews.append(airlock_review)

return updated_request

def _validate_status_update(self, current_status, new_status):
if not self.validate_status_update(current_status=current_status, new_status=new_status):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=strings.AIRLOCK_REQUEST_ILLEGAL_STATUS_CHANGE)
3 changes: 3 additions & 0 deletions api_app/db/repositories/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ def update_item_with_etag(self, item: BaseModel, etag: str) -> BaseModel:
self.container.replace_item(item=item.id, body=item.dict(), etag=etag, match_condition=MatchConditions.IfNotModified)
return self.read_item_by_id(item.id)

def upsert_item_with_etag(self, item: BaseModel, etag: str) -> BaseModel:
return self.container.upsert_item(body=item.dict(), etag=etag, match_condition=MatchConditions.IfNotModified)

def update_item_dict(self, item_dict: dict):
self.container.upsert_item(body=item_dict)

Expand Down
5 changes: 4 additions & 1 deletion api_app/models/domain/airlock_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
from pydantic.types import UUID4
from pydantic.schema import Optional
from models.domain.azuretremodel import AzureTREModel
from typing import List
from models.domain.airlock_request import AirlockFile


class EventGridMessageData(AzureTREModel):
completed_step: str = Field(title="", description="")
new_status: str = Field(title="", description="")
new_status: Optional[str] = Field(title="", description="")
request_id: str = Field(title="", description="")
request_files: Optional[List[AirlockFile]] = Field(title="", description="")
error_message: Optional[str] = Field(title="", description="")


Expand Down
Loading