diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index d73028ff90..9d49130458 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -88,7 +88,10 @@ "console": "integratedTerminal", "preLaunchTask": "Copy_env_file_for_api_debug", "cwd": "${workspaceFolder}/api_app", - "envFile": "${workspaceFolder}/api_app/.env" + "envFile": "${workspaceFolder}/api_app/.env", + "env": { + "OTEL_RESOURCE_ATTRIBUTES": "service.name=api,service.instance.id=local_debug,service.version=dev" + } }, { "name": "E2E Extended", @@ -190,8 +193,10 @@ "cwd": "${workspaceFolder}/resource_processor", "envFile": "${workspaceFolder}/core/private.env", "env": { - "PYTHONPATH": "." - } + "PYTHONPATH": ".", + "OTEL_RESOURCE_ATTRIBUTES": "service.name=resource_processor,service.instance.id=local_debug,service.version=dev" + }, + "justMyCode": false }, { "name": "Debug Python file", diff --git a/CHANGELOG.md b/CHANGELOG.md index ddbc9f7489..b6de49e9d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ FEATURES: ENHANCEMENTS: +* Switch from OpenCensus to OpenTelemetry for logging ([#3762](https://github.com/microsoft/AzureTRE/pull/3762)) BUG FIXES: diff --git a/api_app/.env.sample b/api_app/.env.sample index 4c6a6d3780..acc5a0056f 100644 --- a/api_app/.env.sample +++ b/api_app/.env.sample @@ -1,7 +1,8 @@ # API configuration # ----------------- # When debug is set to True, debugging information for unhandled exceptions is shown in the swagger UI and logging is more verbose -# DEBUG=True +# LOGGING_LEVEL can be set to DEBUG, INFO, WARNING, ERROR or CRITICAL +LOGGING_LEVEL="INFO" # OAUTH information - client ids etc. for the AAD Apps # ---------------------------------------------------- diff --git a/api_app/_version.py b/api_app/_version.py index 8bc9d14811..c6eae9f8a3 100644 --- a/api_app/_version.py +++ b/api_app/_version.py @@ -1 +1 @@ -__version__ = "0.16.9" +__version__ = "0.17.1" diff --git a/api_app/api/dependencies/database.py b/api_app/api/dependencies/database.py index 40c064b30f..61dfe0f901 100644 --- a/api_app/api/dependencies/database.py +++ b/api_app/api/dependencies/database.py @@ -1,4 +1,3 @@ -import logging from typing import Callable, Type from azure.cosmos.aio import CosmosClient @@ -9,10 +8,11 @@ from db.errors import UnableToAccessDatabase from db.repositories.base import BaseRepository from resources import strings +from services.logging import logger async def connect_to_db() -> CosmosClient: - logging.debug(f"Connecting to {config.STATE_STORE_ENDPOINT}") + logger.debug(f"Connecting to {config.STATE_STORE_ENDPOINT}") try: async with credentials.get_credential_async() as credential: @@ -27,10 +27,10 @@ async def connect_to_db() -> CosmosClient: cosmos_client = CosmosClient( config.STATE_STORE_ENDPOINT, primary_master_key, connection_verify=False ) - logging.debug("Connection established") + logger.debug("Connection established") return cosmos_client except Exception: - logging.exception("Connection to state store could not be established.") + logger.exception("Connection to state store could not be established.") async def get_store_key(credential) -> str: @@ -53,7 +53,7 @@ async def get_store_key(credential) -> str: async def get_db_client(app: FastAPI) -> CosmosClient: - if not app.state.cosmos_client: + if not hasattr(app.state, 'cosmos_client') or not app.state.cosmos_client: app.state.cosmos_client = await connect_to_db() return app.state.cosmos_client @@ -71,7 +71,7 @@ async def _get_repo( try: return await repo_type.create(client) except UnableToAccessDatabase: - logging.exception(strings.STATE_STORE_ENDPOINT_NOT_RESPONDING) + logger.exception(strings.STATE_STORE_ENDPOINT_NOT_RESPONDING) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=strings.STATE_STORE_ENDPOINT_NOT_RESPONDING, diff --git a/api_app/api/errors/generic_error.py b/api_app/api/errors/generic_error.py index 4faba12562..f291fac5e4 100644 --- a/api_app/api/errors/generic_error.py +++ b/api_app/api/errors/generic_error.py @@ -1,12 +1,13 @@ -import logging from resources import strings from fastapi import Request from fastapi.responses import PlainTextResponse +from services.logging import logger + async def generic_error_handler(_: Request, exception: Exception) -> PlainTextResponse: - logging.debug("=====================================") - logging.exception(exception) - logging.debug("=====================================") + logger.debug("=====================================") + logger.exception(exception) + logger.debug("=====================================") return PlainTextResponse(strings.UNABLE_TO_PROCESS_REQUEST, status_code=500) diff --git a/api_app/api/routes/airlock.py b/api_app/api/routes/airlock.py index a72a071dd8..9fa9790e97 100644 --- a/api_app/api/routes/airlock.py +++ b/api_app/api/routes/airlock.py @@ -1,4 +1,3 @@ -import logging from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status as status_code, Response @@ -27,6 +26,7 @@ from services.airlock import create_review_vm, review_airlock_request, get_airlock_container_link, get_allowed_actions, save_and_publish_event_airlock_request, update_and_publish_event_airlock_request, \ enrich_requests_with_allowed_actions, get_airlock_requests_by_user_and_workspace, cancel_request +from services.logging import logger airlock_workspace_router = APIRouter(dependencies=[Depends(get_current_workspace_owner_or_researcher_user_or_airlock_manager)]) @@ -46,7 +46,7 @@ async def create_draft_request(airlock_request_input: AirlockRequestInCreate, us allowed_actions = get_allowed_actions(airlock_request, user, airlock_request_repo) return AirlockRequestWithAllowedUserActions(airlockRequest=airlock_request, allowedUserActions=allowed_actions) except (ValidationError, ValueError) as e: - logging.exception("Failed creating airlock request model instance") + logger.exception("Failed creating airlock request model instance") raise HTTPException(status_code=status_code.HTTP_400_BAD_REQUEST, detail=str(e)) @@ -69,7 +69,7 @@ async def get_all_airlock_requests_by_workspace( airlock_requests_with_allowed_user_actions = enrich_requests_with_allowed_actions(airlock_requests, user, airlock_request_repo) return AirlockRequestWithAllowedUserActionsInList(airlockRequests=airlock_requests_with_allowed_user_actions) except (ValidationError, ValueError) as e: - logging.exception("Failed retrieving all the airlock requests for a workspace") + logger.exception("Failed retrieving all the airlock requests for a workspace") raise HTTPException(status_code=status_code.HTTP_400_BAD_REQUEST, detail=str(e)) @@ -137,17 +137,17 @@ async def create_review_user_resource( response.headers["Location"] = construct_location_header(operation) return AirlockRequestAndOperationInResponse(airlockRequest=updated_resource, operation=operation) except (KeyError, TypeError, EntityDoesNotExist) as e: - logging.exception("Failed to retrieve Airlock Review configuration for workspace %s", workspace.id) + logger.exception("Failed to retrieve Airlock Review configuration for workspace %s", workspace.id) raise HTTPException(status_code=status_code.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Failed to retrieve Airlock Review configuration for workspace {workspace.id}.\ Please ask your TRE administrator to check the configuration. Details: {str(e)}") except (ValidationError, ValueError) as e: - logging.exception("Failed create user resource model instance due to validation error") + logger.exception("Failed create user resource model instance due to validation error") raise HTTPException(status_code=status_code.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Invalid configuration for creating user resource. Please contact your TRE administrator. \ Details: {str(e)}") except UserNotAuthorizedToUseTemplate as e: - logging.exception("User not authorized to use template") + logger.exception("User not authorized to use template") raise HTTPException(status_code=status_code.HTTP_403_FORBIDDEN, detail=str(e)) @@ -171,7 +171,7 @@ async def create_airlock_review( allowed_actions = get_allowed_actions(updated_airlock_request, user, airlock_request_repo) return AirlockRequestWithAllowedUserActions(airlockRequest=updated_airlock_request, allowedUserActions=allowed_actions) except (ValidationError, ValueError) as e: - logging.exception("Failed creating airlock review model instance") + logger.exception("Failed creating airlock review model instance") raise HTTPException(status_code=status_code.HTTP_400_BAD_REQUEST, detail=str(e)) diff --git a/api_app/api/routes/costs.py b/api_app/api/routes/costs.py index 8b5017aa92..b6684756f2 100644 --- a/api_app/api/routes/costs.py +++ b/api_app/api/routes/costs.py @@ -2,7 +2,6 @@ from dateutil.relativedelta import relativedelta from fastapi import APIRouter, Depends, Query, HTTPException, status from fastapi.responses import JSONResponse -import logging from typing import Optional from pydantic import UUID4 @@ -18,6 +17,8 @@ from resources import strings from services.authentication import get_current_admin_user, get_current_workspace_owner_or_tre_admin from services.cost_service import CostService, ServiceUnavailable, SubscriptionNotSupported, TooManyRequests, WorkspaceDoesNotExist, cost_service_factory +from services.logging import logger + costs_core_router = APIRouter(dependencies=[Depends(get_current_admin_user)]) costs_workspace_router = APIRouter(dependencies=[Depends(get_current_workspace_owner_or_tre_admin)]) @@ -79,7 +80,7 @@ async def costs( "retry-after": str(e.retry_after) }}, status_code=503, headers={"Retry-After": str(e.retry_after)}) except Exception: - logging.exception("Failed to query Azure TRE costs") + logger.exception("Failed to query Azure TRE costs") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=strings.API_GET_COSTS_INTERNAL_SERVER_ERROR) @@ -117,5 +118,5 @@ async def workspace_costs(workspace_id: UUID4, params: CostsQueryParams = Depend "retry-after": str(e.retry_after) }}, status_code=503, headers={"Retry-After": str(e.retry_after)}) except Exception: - logging.exception("Failed to query Azure TRE costs") + logger.exception("Failed to query Azure TRE costs") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=strings.API_GET_COSTS_INTERNAL_SERVER_ERROR) diff --git a/api_app/api/routes/health.py b/api_app/api/routes/health.py index 629a10b420..301a6fd54d 100644 --- a/api_app/api/routes/health.py +++ b/api_app/api/routes/health.py @@ -1,11 +1,10 @@ import asyncio -import logging from fastapi import APIRouter from core import credentials from models.schemas.status import HealthCheck, ServiceStatus, StatusEnum from resources import strings from services.health_checker import create_resource_processor_status, create_state_store_status, create_service_bus_status - +from services.logging import logger router = APIRouter() @@ -25,9 +24,9 @@ async def health_check() -> HealthCheck: sb_status, sb_message = sb rp_status, rp_message = rp if cosmos_status == StatusEnum.not_ok or sb_status == StatusEnum.not_ok or rp_status == StatusEnum.not_ok: - logging.error(f'Cosmos Status: {cosmos_status}, message: {cosmos_message}') - logging.error(f'Service Bus Status: {sb_status}, message: {sb_message}') - logging.error(f'Resource Processor Status: {rp_status}, message: {rp_message}') + logger.error(f'Cosmos Status: {cosmos_status}, message: {cosmos_message}') + logger.error(f'Service Bus Status: {sb_status}, message: {sb_message}') + logger.error(f'Resource Processor Status: {rp_status}, message: {rp_message}') services = [ServiceStatus(service=strings.COSMOS_DB, status=cosmos_status, message=cosmos_message), ServiceStatus(service=strings.SERVICE_BUS, status=sb_status, message=sb_message), diff --git a/api_app/api/routes/migrations.py b/api_app/api/routes/migrations.py index dac0740689..48fe49437f 100644 --- a/api_app/api/routes/migrations.py +++ b/api_app/api/routes/migrations.py @@ -1,5 +1,3 @@ -import logging - from fastapi import APIRouter, Depends, HTTPException, status from db.migrations.airlock import AirlockMigration from db.migrations.resources import ResourceMigration @@ -12,7 +10,7 @@ from db.migrations.workspaces import WorkspaceMigration from db.repositories.resources import ResourceRepository from models.schemas.migrations import MigrationOutList, Migration - +from services.logging import logger migrations_core_router = APIRouter(dependencies=[Depends(get_current_admin_user)]) @@ -31,39 +29,39 @@ async def migrate_database(resources_repo=Depends(get_repository(ResourceReposit airlock_migration=Depends(get_repository(AirlockMigration)),): try: migrations = list() - logging.info("PR 1030") + logger.info("PR 1030") await resources_repo.rename_field_name('resourceTemplateName', 'templateName') await resources_repo.rename_field_name('resourceTemplateVersion', 'templateVersion') await resources_repo.rename_field_name('resourceTemplateParameters', 'properties') migrations.append(Migration(issueNumber="PR 1030", status="Executed")) - logging.info("PR 1031") + logger.info("PR 1031") await resources_repo.rename_field_name('workspaceType', 'templateName') await resources_repo.rename_field_name('workspaceServiceType', 'templateName') await resources_repo.rename_field_name('userResourceType', 'templateName') migrations.append(Migration(issueNumber="PR 1031", status="Executed")) - logging.info("PR 1717 - Shared services") + logger.info("PR 1717 - Shared services") migration_status = "Executed" if await shared_services_migration.deleteDuplicatedSharedServices() else "Skipped" migrations.append(Migration(issueNumber="PR 1717", status=migration_status)) - logging.info("PR 1726 - Authentication needs to be in properties so we can update them") + logger.info("PR 1726 - Authentication needs to be in properties so we can update them") migration_status = "Executed" if await workspace_migration.moveAuthInformationToProperties() else "Skipped" migrations.append(Migration(issueNumber="PR 1726", status=migration_status)) - logging.info("PR 1406 - Extra field to support UI") + logger.info("PR 1406 - Extra field to support UI") num_rows = await resource_migration.add_deployment_status_field(operations_repo) migrations.append(Migration(issueNumber="1406", status=f'Updated {num_rows} resource objects')) - logging.info("PR 3066 - Archive resources history") + logger.info("PR 3066 - Archive resources history") num_rows = await resource_migration.archive_history(resource_history_repo) migrations.append(Migration(issueNumber="3066", status=f'Updated {num_rows} resource objects')) - logging.info("PR 2371 - Validate min firewall version") + logger.info("PR 2371 - Validate min firewall version") await shared_services_migration.checkMinFirewallVersion() migrations.append(Migration(issueNumber="2371", status='Firewall version meets requirement')) - logging.info("PR 2779 - Restructure Airlock requests & add createdBy field") + logger.info("PR 2779 - Restructure Airlock requests & add createdBy field") await airlock_migration.rename_field_name('requestType', 'type') await airlock_migration.rename_field_name('requestTitle', 'title') await airlock_migration.rename_field_name('user', 'updatedBy') @@ -71,19 +69,19 @@ async def migrate_database(resources_repo=Depends(get_repository(ResourceReposit num_updated = await airlock_migration.add_created_by_and_rename_in_history() migrations.append(Migration(issueNumber="2779", status=f'Renamed fields & updated {num_updated} airlock requests with createdBy')) - logging.info("PR 2883 - Support multiple reviewer VMs per Airlock request") + logger.info("PR 2883 - Support multiple reviewer VMs per Airlock request") num_updated = await airlock_migration.change_review_resources_to_dict() migrations.append(Migration(issueNumber="2883", status=f'Updated {num_updated} airlock requests with new reviewUserResources format')) - logging.info("PR 3152 - Migrate reviewDecision of Airlock Reviews") + logger.info("PR 3152 - Migrate reviewDecision of Airlock Reviews") num_updated = await airlock_migration.update_review_decision_values() migrations.append(Migration(issueNumber="3152", status=f'Updated {num_updated} airlock requests with new reviewDecision value')) - logging.info("PR 3358 - Migrate OperationSteps of Operations") + logger.info("PR 3358 - Migrate OperationSteps of Operations") num_updated = await resource_migration.migrate_step_id_of_operation_steps(operations_repo) migrations.append(Migration(issueNumber="3358", status=f'Updated {num_updated} operations')) return MigrationOutList(migrations=migrations) except Exception as e: - logging.exception("Failed to migrate database") + logger.exception("Failed to migrate database") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) diff --git a/api_app/api/routes/resource_helpers.py b/api_app/api/routes/resource_helpers.py index 7067220b36..34cc5006f3 100644 --- a/api_app/api/routes/resource_helpers.py +++ b/api_app/api/routes/resource_helpers.py @@ -1,5 +1,4 @@ from datetime import datetime -import logging import semantic_version from copy import deepcopy from typing import Dict, Any, Optional @@ -26,6 +25,7 @@ RequestAction, ) from services.authentication import get_access_service +from services.logging import logger async def delete_validation(resource: Resource, resource_repo: ResourceRepository): @@ -75,7 +75,7 @@ async def save_and_deploy_resource( ) await resource_repo.save_item(masked_resource) except Exception: - logging.exception(f"Failed saving resource item {resource.id}") + logger.exception(f"Failed saving resource item {resource.id}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=strings.STATE_STORE_ENDPOINT_NOT_RESPONDING, @@ -94,7 +94,7 @@ async def save_and_deploy_resource( return operation except Exception: await resource_repo.delete_item(resource.id) - logging.exception("Failed send resource request message") + logger.exception("Failed send resource request message") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=strings.SERVICE_BUS_GENERAL_ERROR_MESSAGE, @@ -189,7 +189,7 @@ async def send_uninstall_message( ) return operation except Exception: - logging.exception(f"Failed to send {resource_type} resource delete message") + logger.exception(f"Failed to send {resource_type} resource delete message") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=strings.SERVICE_BUS_GENERAL_ERROR_MESSAGE, @@ -240,7 +240,7 @@ async def send_custom_action_message( ) return operation except Exception: - logging.exception(f"Failed to send {resource_type} resource custom action message") + logger.exception(f"Failed to send {resource_type} resource custom action message") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=strings.SERVICE_BUS_GENERAL_ERROR_MESSAGE, @@ -278,7 +278,7 @@ async def get_template( detail=strings.NO_UNIQUE_CURRENT_FOR_TEMPLATE, ) except Exception as e: - logging.debug(e) + logger.debug(e) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=strings.STATE_STORE_ENDPOINT_NOT_RESPONDING, diff --git a/api_app/api/routes/shared_services.py b/api_app/api/routes/shared_services.py index 3311b0585c..f09c3f996f 100644 --- a/api_app/api/routes/shared_services.py +++ b/api_app/api/routes/shared_services.py @@ -1,5 +1,4 @@ import asyncio -import logging from fastapi import APIRouter, Depends, HTTPException, Header, status, Response from jsonschema.exceptions import ValidationError @@ -21,6 +20,7 @@ from .resource_helpers import enrich_resource_with_available_upgrades, send_custom_action_message, send_uninstall_message, send_resource_request_message from services.authentication import get_current_admin_user, get_current_tre_user_or_tre_admin from models.domain.request_action import RequestAction +from services.logging import logger shared_services_router = APIRouter(dependencies=[Depends(get_current_tre_user_or_tre_admin)]) @@ -56,13 +56,13 @@ async def create_shared_service(response: Response, shared_service_input: Shared try: shared_service, resource_template = await shared_services_repo.create_shared_service_item(shared_service_input, user.roles) except (ValidationError, ValueError) as e: - logging.exception("Failed create shared service model instance") + logger.exception("Failed create shared service model instance") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) except DuplicateEntity as e: - logging.exception("Shared service already exists") + logger.exception("Shared service already exists") raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e)) except UserNotAuthorizedToUseTemplate as e: - logging.exception("User not authorized to use template") + logger.exception("User not authorized to use template") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) operation = await save_and_deploy_resource( diff --git a/api_app/api/routes/workspaces.py b/api_app/api/routes/workspaces.py index 9a7fc9fe9c..a086812ff5 100644 --- a/api_app/api/routes/workspaces.py +++ b/api_app/api/routes/workspaces.py @@ -1,5 +1,4 @@ import asyncio -import logging from fastapi import APIRouter, Depends, HTTPException, Header, status, Request, Response @@ -36,6 +35,8 @@ from .resource_helpers import cascaded_update_resource, delete_validation, enrich_resource_with_available_upgrades, get_identity_role_assignments, save_and_deploy_resource, construct_location_header, send_uninstall_message, \ send_custom_action_message, send_resource_request_message, update_user_resource from models.domain.request_action import RequestAction +from services.logging import logger + workspaces_core_router = APIRouter(dependencies=[Depends(get_current_tre_user_or_tre_admin)]) workspaces_shared_router = APIRouter(dependencies=[Depends(get_current_workspace_owner_or_researcher_user_or_airlock_manager_or_tre_admin)]) @@ -102,10 +103,10 @@ async def create_workspace(workspace_create: WorkspaceInCreate, response: Respon auth_info = extract_auth_information(workspace_create.properties) workspace, resource_template = await workspace_repo.create_workspace_item(workspace_create, auth_info, user.id, user.roles) except (ValidationError, ValueError) as e: - logging.exception("Failed to create workspace model instance") + logger.exception("Failed to create workspace model instance") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) except UserNotAuthorizedToUseTemplate as e: - logging.exception("User not authorized to use template") + logger.exception("User not authorized to use template") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) except InvalidInput as e: raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=str(e)) @@ -243,10 +244,10 @@ async def create_workspace_service(response: Response, workspace_service_input: try: workspace_service, resource_template = await workspace_service_repo.create_workspace_service_item(workspace_service_input, workspace.id, user.roles) except (ValidationError, ValueError) as e: - logging.exception("Failed create workspace service model instance") + logger.exception("Failed create workspace service model instance") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) except UserNotAuthorizedToUseTemplate as e: - logging.exception("User not authorized to use template") + logger.exception("User not authorized to use template") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) # if template has address_space get an address space @@ -407,10 +408,10 @@ async def create_user_resource( try: user_resource, resource_template = await user_resource_repo.create_user_resource_item(user_resource_create, workspace.id, workspace_service.id, workspace_service.templateName, user.id, user.roles) except (ValidationError, ValueError) as e: - logging.exception("Failed create user resource model instance") + logger.exception("Failed create user resource model instance") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)) except UserNotAuthorizedToUseTemplate as e: - logging.exception("User not authorized to use template") + logger.exception("User not authorized to use template") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=str(e)) operation = await save_and_deploy_resource( diff --git a/api_app/core/config.py b/api_app/core/config.py index f8f1d81fb8..aa84020783 100644 --- a/api_app/core/config.py +++ b/api_app/core/config.py @@ -7,7 +7,7 @@ # API settings API_PREFIX = "/api" PROJECT_NAME: str = config("PROJECT_NAME", default="Azure TRE API") -DEBUG: bool = config("DEBUG", cast=bool, default=False) +LOGGING_LEVEL: str = config("LOGGING_LEVEL", default="INFO") ENABLE_LOCAL_DEBUGGING: bool = config("ENABLE_LOCAL_DEBUGGING", cast=bool, default=False) ENABLE_SWAGGER: bool = config("ENABLE_SWAGGER", cast=bool, default=False) VERSION = __version__ @@ -54,6 +54,9 @@ MICROSOFT_GRAPH_URL: str = config("MICROSOFT_GRAPH_URL", default="https://graph.microsoft.com") STORAGE_ENDPOINT_SUFFIX: str = config("STORAGE_ENDPOINT_SUFFIX", default="core.windows.net") +# Monitoring +APPLICATIONINSIGHTS_CONNECTION_STRING: str = config("APPLICATIONINSIGHTS_CONNECTION_STRING", default=None) + # Authentication API_CLIENT_ID: str = config("API_CLIENT_ID", default="") API_CLIENT_SECRET: str = config("API_CLIENT_SECRET", default="") diff --git a/api_app/core/credentials.py b/api_app/core/credentials.py index 42b5b692a1..8780248a63 100644 --- a/api_app/core/credentials.py +++ b/api_app/core/credentials.py @@ -22,7 +22,13 @@ def get_credential() -> TokenCredential: ManagedIdentityCredential(client_id=managed_identity) ) else: - return DefaultAzureCredential(authority=urlparse(config.AAD_AUTHORITY_URL).netloc) + return DefaultAzureCredential(authority=urlparse(config.AAD_AUTHORITY_URL).netloc, + exclude_shared_token_cache_credential=True, + exclude_workload_identity_credential=True, + exclude_developer_cli_credential=True, + exclude_managed_identity_credential=True, + exclude_powershell_credential=True + ) @asynccontextmanager @@ -36,7 +42,13 @@ async def get_credential_async() -> TokenCredential: ManagedIdentityCredentialASync(client_id=managed_identity) ) if managed_identity - else DefaultAzureCredentialASync(authority=urlparse(config.AAD_AUTHORITY_URL).netloc) + else DefaultAzureCredentialASync(authority=urlparse(config.AAD_AUTHORITY_URL).netloc, + exclude_shared_token_cache_credential=True, + exclude_workload_identity_credential=True, + exclude_developer_cli_credential=True, + exclude_managed_identity_credential=True, + exclude_powershell_credential=True + ) ) yield credential await credential.close() diff --git a/api_app/db/events.py b/api_app/db/events.py index f1fb407482..af92b2a859 100644 --- a/api_app/db/events.py +++ b/api_app/db/events.py @@ -1,10 +1,9 @@ -import logging - from azure.cosmos.aio import CosmosClient from api.dependencies.database import get_db_client from db.repositories.resources import ResourceRepository from core import config +from services.logging import logger async def bootstrap_database(app) -> bool: @@ -16,5 +15,5 @@ async def bootstrap_database(app) -> bool: await ResourceRepository.create(client) return True except Exception as e: - logging.debug(e) + logger.debug(e) return False diff --git a/api_app/db/migrations/shared_services.py b/api_app/db/migrations/shared_services.py index 8736254de2..575ac74bb2 100644 --- a/api_app/db/migrations/shared_services.py +++ b/api_app/db/migrations/shared_services.py @@ -1,9 +1,9 @@ -import logging - from azure.cosmos.aio import CosmosClient +import semantic_version + from db.repositories.shared_services import SharedServiceRepository from db.repositories.resources import IS_ACTIVE_RESOURCE -import semantic_version +from services.logging import logger class SharedServiceMigration(SharedServiceRepository): @@ -25,7 +25,7 @@ async def deleteDuplicatedSharedServices(self) -> bool: ORDER BY c.updatedWhen ASC OFFSET 1 LIMIT 10000'): template_version = semantic_version.Version(item["templateVersion"]) if (template_version < semantic_version.Version('0.3.0')): - logging.info(f'Deleting element {item["id"]}') + logger.info(f'Deleting element {item["id"]}') await self.delete_item(item["id"]) migrated = True diff --git a/api_app/db/migrations/workspaces.py b/api_app/db/migrations/workspaces.py index 0fbdd415c4..79209ec30a 100644 --- a/api_app/db/migrations/workspaces.py +++ b/api_app/db/migrations/workspaces.py @@ -1,9 +1,9 @@ -import logging - from azure.cosmos.aio import CosmosClient -from db.repositories.workspaces import WorkspaceRepository import semantic_version +from db.repositories.workspaces import WorkspaceRepository +from services.logging import logger + class WorkspaceMigration(WorkspaceRepository): @classmethod @@ -32,7 +32,7 @@ async def moveAuthInformationToProperties(self) -> bool: updated = True if "authInformation" in item: - logging.info(f'Upgrading authInformation in workspace {item["id"]}') + logger.info(f'Upgrading authInformation in workspace {item["id"]}') # Copy authInformation into properties item["properties"]["sp_id"] = item["authInformation"]["sp_id"] @@ -44,7 +44,7 @@ async def moveAuthInformationToProperties(self) -> bool: if updated: await self.update_item_dict(item) - logging.info(f'Upgraded authentication info for workspace id {item["id"]}') + logger.info(f'Upgraded authentication info for workspace id {item["id"]}') migrated = True return migrated diff --git a/api_app/db/repositories/airlock_requests.py b/api_app/db/repositories/airlock_requests.py index b22748f559..ba683723d6 100644 --- a/api_app/db/repositories/airlock_requests.py +++ b/api_app/db/repositories/airlock_requests.py @@ -16,7 +16,7 @@ from core import config from resources import strings from db.repositories.base import BaseRepository -import logging +from services.logging import logger class AirlockRequestRepository(BaseRepository): @@ -159,7 +159,7 @@ async def update_airlock_request( try: db_response = await self.update_airlock_request_item(original_request, updated_request, updated_by, {"previousStatus": original_request.status}) except CosmosAccessConditionFailedError: - logging.warning(f"ETag mismatch for request ID: '{original_request.id}'. Retrying.") + logger.warning(f"ETag mismatch for request ID: '{original_request.id}'. Retrying.") original_request = await self.get_airlock_request_by_id(original_request.id) updated_request = self._build_updated_request(original_request=original_request, new_status=new_status, request_files=request_files, status_message=status_message, airlock_review=airlock_review) db_response = await self.update_airlock_request_item(original_request, updated_request, updated_by, {"previousStatus": original_request.status}) diff --git a/api_app/db/repositories/resources_history.py b/api_app/db/repositories/resources_history.py index eba7714b0d..2a6524d62d 100644 --- a/api_app/db/repositories/resources_history.py +++ b/api_app/db/repositories/resources_history.py @@ -1,12 +1,13 @@ -import logging from typing import List import uuid from azure.cosmos.aio import CosmosClient +from pydantic import parse_obj_as + from db.errors import EntityDoesNotExist from db.repositories.base import BaseRepository from core import config from models.domain.resource import Resource, ResourceHistoryItem -from pydantic import parse_obj_as +from services.logging import logger class ResourceHistoryRepository(BaseRepository): @@ -24,23 +25,23 @@ def is_valid_uuid(resourceId): raise ValueError("Resource Id should be a valid GUID") def resource_history_query(self, resourceId: str): - logging.debug("Validate sanity of resourceId") + logger.debug("Validate sanity of resourceId") self.is_valid_uuid(resourceId) return f'SELECT * FROM c WHERE c.resourceId = "{resourceId}"' async def get_resource_history_by_resource_id(self, resource_id: str) -> List[ResourceHistoryItem]: query = self.resource_history_query(resource_id) try: - logging.info(f"Fetching history for resource {resource_id}") + logger.info(f"Fetching history for resource {resource_id}") resource_history_items = await self.query(query=query) - logging.debug(f"Got {len(resource_history_items)} history items for resource {resource_id}") + logger.debug(f"Got {len(resource_history_items)} history items for resource {resource_id}") except EntityDoesNotExist: - logging.info(f"No history for resource {resource_id}") + logger.info(f"No history for resource {resource_id}") resource_history_items = [] return parse_obj_as(List[ResourceHistoryItem], resource_history_items) async def create_resource_history_item(self, resource: Resource) -> ResourceHistoryItem: - logging.info(f"Creating a new history item for resource {resource.id}") + logger.info(f"Creating a new history item for resource {resource.id}") resource_history_item_id = str(uuid.uuid4()) resource_history_item = ResourceHistoryItem( id=resource_history_item_id, @@ -52,10 +53,10 @@ async def create_resource_history_item(self, resource: Resource) -> ResourceHist user=resource.user, templateVersion=resource.templateVersion ) - logging.info(f"Saving history item for {resource.id}") + logger.info(f"Saving history item for {resource.id}") try: await self.save_item(resource_history_item) except Exception: - logging.exception(f"Failed saving history item for {resource.id}") + logger.exception(f"Failed saving history item for {resource.id}") raise return resource_history_item diff --git a/api_app/event_grid/event_sender.py b/api_app/event_grid/event_sender.py index 6799ac7895..1821c65589 100644 --- a/api_app/event_grid/event_sender.py +++ b/api_app/event_grid/event_sender.py @@ -1,4 +1,3 @@ -import logging import re import json @@ -9,6 +8,7 @@ from core import config from models.domain.airlock_request import AirlockRequest, AirlockRequestStatus from models.domain.workspace import Workspace +from services.logging import logger async def send_status_changed_event(airlock_request: AirlockRequest, previous_status: Optional[AirlockRequestStatus]): @@ -24,7 +24,7 @@ async def send_status_changed_event(airlock_request: AirlockRequest, previous_st subject=f"{request_id}/statusChanged", data_version="2.0" ) - logging.info(f"Sending status changed event with request ID {request_id}, new status: {new_status}, previous status: {previous_status}") + logger.info(f"Sending status changed event with request ID {request_id}, new status: {new_status}, previous status: {previous_status}") await publish_event(status_changed_event, config.EVENT_GRID_STATUS_CHANGED_TOPIC_ENDPOINT) @@ -67,5 +67,5 @@ def to_snake_case(string: str): data_version="4.0" ) - logging.info(f"Sending airlock notification event with request ID {request_id}, status: {status}") + logger.info(f"Sending airlock notification event with request ID {request_id}, status: {status}") await publish_event(airlock_notification, config.EVENT_GRID_AIRLOCK_NOTIFICATION_TOPIC_ENDPOINT) diff --git a/api_app/main.py b/api_app/main.py index 85775af752..703b8f4d77 100644 --- a/api_app/main.py +++ b/api_app/main.py @@ -1,7 +1,4 @@ import asyncio -import logging -from opencensus.ext.azure.trace_exporter import AzureExporter -import os import uvicorn from fastapi import FastAPI @@ -9,8 +6,7 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.concurrency import asynccontextmanager -from services.tracing import RequestTracerMiddleware -from opencensus.trace.samplers import ProbabilitySampler +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from starlette.exceptions import HTTPException from starlette.middleware.errors import ServerErrorMiddleware @@ -21,7 +17,7 @@ from api.errors.generic_error import generic_error_handler from core import config from db.events import bootstrap_database -from services.logging import initialize_logging, telemetry_processor_callback_function +from services.logging import initialize_logging, logger from service_bus.deployment_status_updater import DeploymentStatusUpdater from service_bus.airlock_request_status_update import AirlockStatusUpdater @@ -32,7 +28,7 @@ async def lifespan(app: FastAPI): while not await bootstrap_database(app): await asyncio.sleep(5) - logging.warning("Database connection could not be established") + logger.warning("Database connection could not be established") deploymentStatusUpdater = DeploymentStatusUpdater(app) await deploymentStatusUpdater.init_repos() @@ -48,7 +44,7 @@ async def lifespan(app: FastAPI): def get_application() -> FastAPI: application = FastAPI( title=config.PROJECT_NAME, - debug=config.DEBUG, + debug=(config.LOGGING_LEVEL == "DEBUG"), description=config.API_DESCRIPTION, version=config.VERSION, docs_url=None, @@ -57,15 +53,8 @@ def get_application() -> FastAPI: lifespan=lifespan ) - try: - if os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING"): - exporter = AzureExporter(sampler=ProbabilitySampler(1.0)) - exporter.add_telemetry_processor(telemetry_processor_callback_function) - application.add_middleware(RequestTracerMiddleware, exporter=exporter) - except Exception: - logging.exception("Failed to add RequestTracerMiddleware") - application.add_middleware(ServerErrorMiddleware, handler=generic_error_handler) + # Allow local UI debugging with local API if config.ENABLE_LOCAL_DEBUGGING: application.add_middleware( @@ -82,13 +71,9 @@ def get_application() -> FastAPI: return application -if config.DEBUG: - initialize_logging(logging.DEBUG, add_console_handler=True) -else: - initialize_logging(logging.INFO, add_console_handler=False) - +initialize_logging() app = get_application() - +FastAPIInstrumentor.instrument_app(app) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000, loop="asyncio") diff --git a/api_app/requirements.txt b/api_app/requirements.txt index 9ba99ce79e..f34c30660e 100644 --- a/api_app/requirements.txt +++ b/api_app/requirements.txt @@ -1,25 +1,24 @@ -# API -azure-core==1.29.5 aiohttp==3.9.0 +azure-core==1.29.5 azure-cosmos==4.5.1 +azure-eventgrid==4.15.0 azure-identity==1.14.1 -azure-mgmt-cosmosdb==9.3.0 azure-mgmt-compute==30.3.0 +azure-mgmt-cosmosdb==9.3.0 azure-mgmt-costmanagement==4.0.1 -azure-storage-blob==12.19.0 +azure-mgmt-resource==23.0.1 +azure-monitor-opentelemetry==1.1.0 azure-servicebus==7.11.3 -azure-eventgrid==4.15.0 +azure-storage-blob==12.19.0 fastapi==0.104.0 +fastapi-utils==0.2.1 gunicorn==21.2.0 jsonschema[format_nongpl]==4.19.1 msal==1.22.0 -opencensus-ext-azure==1.1.11 -opencensus-ext-logging==0.1.1 +opentelemetry.instrumentation.logging==0.41b0 +pandas==2.0.3 PyJWT==2.8.0 -uvicorn[standard]==0.23.2 -semantic-version==2.10.0 pytz==2022.7 python-dateutil==2.8.2 -azure-mgmt-resource==23.0.1 -pandas==2.0.3 -pydantic==1.10.13 +semantic-version==2.10.0 +uvicorn[standard]==0.23.2 diff --git a/api_app/service_bus/airlock_request_status_update.py b/api_app/service_bus/airlock_request_status_update.py index e6596a5363..e26f573303 100644 --- a/api_app/service_bus/airlock_request_status_update.py +++ b/api_app/service_bus/airlock_request_status_update.py @@ -1,6 +1,5 @@ import asyncio import json -import logging from azure.servicebus.aio import ServiceBusClient, AutoLockRenewer from azure.servicebus.exceptions import OperationTimeoutError, ServiceBusConnectionError @@ -10,6 +9,7 @@ from api.dependencies.database import get_db_client from api.dependencies.airlock import get_airlock_request_by_id_from_path from services.airlock import update_and_publish_event_airlock_request +from services.logging import logger, tracer from db.repositories.workspaces import WorkspaceRepository from models.domain.airlock_request import AirlockRequestStatus from db.repositories.airlock_requests import AirlockRequestRepository @@ -28,58 +28,61 @@ async def init_repos(self): self.airlock_request_repo = await AirlockRequestRepository.create(db_client) self.workspace_repo = await WorkspaceRepository.create(db_client) - import time + async def receive_messages(self): + with tracer.start_as_current_span("airlock_receive_messages"): + while True: + try: + async with credentials.get_credential_async() as credential: + service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential) + receiver = service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_STEP_RESULT_QUEUE) + logger.info(f"Looking for new messages on {config.SERVICE_BUS_STEP_RESULT_QUEUE} queue...") + async with receiver: + received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=1) + for msg in received_msgs: + async with AutoLockRenewer() as renewer: + renewer.register(receiver, msg, max_lock_renewal_duration=60) + complete_message = await self.process_message(msg) + if complete_message: + await receiver.complete_message(msg) + else: + # could have been any kind of transient issue, we'll abandon back to the queue, and retry + await receiver.abandon_message(msg) + + await asyncio.sleep(10) + + except OperationTimeoutError: + # Timeout occurred whilst connecting to a session - this is expected and indicates no non-empty sessions are available + logger.debug("No sessions for this process. Will look again...") + + except ServiceBusConnectionError: + # Occasionally there will be a transient / network-level error in connecting to SB. + logger.info("Unknown Service Bus connection error. Will retry...") + + except Exception as e: + # Catch all other exceptions, log them via .exception to get the stack trace, and reconnect + logger.exception(f"Unknown exception. Will retry - {e}") - ... + async def process_message(self, msg): + with tracer.start_as_current_span("process_message") as current_span: + complete_message = False - async def receive_messages(self): - while True: try: - async with credentials.get_credential_async() as credential: - service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential) - receiver = service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_STEP_RESULT_QUEUE) - logging.info(f"Looking for new messages on {config.SERVICE_BUS_STEP_RESULT_QUEUE} queue...") - async with receiver: - received_msgs = await receiver.receive_messages(max_message_count=10, max_wait_time=1) - for msg in received_msgs: - async with AutoLockRenewer() as renewer: - renewer.register(receiver, msg, max_lock_renewal_duration=60) - complete_message = await self.process_message(msg) - if complete_message: - await receiver.complete_message(msg) - else: - # could have been any kind of transient issue, we'll abandon back to the queue, and retry - await receiver.abandon_message(msg) - - await asyncio.sleep(10) - - except OperationTimeoutError: - # Timeout occurred whilst connecting to a session - this is expected and indicates no non-empty sessions are available - logging.debug("No sessions for this process. Will look again...") - - except ServiceBusConnectionError: - # Occasionally there will be a transient / network-level error in connecting to SB. - logging.info("Unknown Service Bus connection error. Will retry...") - - except Exception as e: - # Catch all other exceptions, log them via .exception to get the stack trace, and reconnect - logging.exception(f"Unknown exception. Will retry - {e}") + message = parse_obj_as(StepResultStatusUpdateMessage, json.loads(str(msg))) - async def process_message(self, msg): - complete_message = False + current_span.set_attribute("step_id", message.id) + current_span.set_attribute("event_type", message.eventType) + current_span.set_attribute("topic", message.topic) - try: - message = parse_obj_as(StepResultStatusUpdateMessage, json.loads(str(msg))) - logging.info(f"Received step_result status update message with correlation ID {message.id}: {message}") - complete_message = await self.update_status_in_database(message) - logging.info(f"Update status in DB for {message.id}") - except (json.JSONDecodeError, ValidationError): - logging.exception(f"{strings.STEP_RESULT_MESSAGE_FORMAT_INCORRECT}: {msg.correlation_id}") - complete_message = True - except Exception: - logging.exception(f"Exception processing message: {msg.correlation_id}") + logger.info(f"Received step_result status update message with correlation ID {message.id}: {message}") + complete_message = await self.update_status_in_database(message) + logger.info(f"Update status in DB for {message.id}") + except (json.JSONDecodeError, ValidationError): + logger.exception(f"{strings.STEP_RESULT_MESSAGE_FORMAT_INCORRECT}: {msg.correlation_id}") + complete_message = True + except Exception: + logger.exception(f"Exception processing message: {msg.correlation_id}") - return complete_message + return complete_message async def update_status_in_database(self, step_result_message: StepResultStatusUpdateMessage): """ @@ -103,18 +106,18 @@ async def update_status_in_database(self, step_result_message: StepResultStatusU await update_and_publish_event_airlock_request(airlock_request=airlock_request, airlock_request_repo=self.airlock_request_repo, updated_by=airlock_request.updatedBy, workspace=workspace, new_status=new_status, request_files=request_files, status_message=status_message) result = True else: - logging.error(strings.STEP_RESULT_MESSAGE_STATUS_DOES_NOT_MATCH.format(airlock_request_id, current_status, airlock_request.status)) + logger.error(strings.STEP_RESULT_MESSAGE_STATUS_DOES_NOT_MATCH.format(airlock_request_id, current_status, airlock_request.status)) except HTTPException as e: if e.status_code == 404: # Marking as true as this message will never succeed anyways and should be removed from the queue. result = True - logging.exception(strings.STEP_RESULT_ID_NOT_FOUND.format(airlock_request_id)) + logger.exception(strings.STEP_RESULT_ID_NOT_FOUND.format(airlock_request_id)) if e.status_code == 400: result = True - logging.exception(strings.STEP_RESULT_MESSAGE_INVALID_STATUS.format(airlock_request_id, current_status, new_status)) + logger.exception(strings.STEP_RESULT_MESSAGE_INVALID_STATUS.format(airlock_request_id, current_status, new_status)) if e.status_code == 503: - logging.exception(strings.STATE_STORE_ENDPOINT_NOT_RESPONDING) + logger.exception(strings.STATE_STORE_ENDPOINT_NOT_RESPONDING) except Exception: - logging.exception("Failed updating request status") + logger.exception("Failed updating request status") return result diff --git a/api_app/service_bus/deployment_status_updater.py b/api_app/service_bus/deployment_status_updater.py index 8c09e83d77..b38e138927 100644 --- a/api_app/service_bus/deployment_status_updater.py +++ b/api_app/service_bus/deployment_status_updater.py @@ -1,6 +1,5 @@ import asyncio import json -import logging import uuid from pydantic import ValidationError, parse_obj_as @@ -21,6 +20,7 @@ from db.repositories.resources import ResourceRepository from models.domain.operation import DeploymentStatusUpdateMessage, Operation, OperationStep, Status from resources import strings +from services.logging import logger, tracer class DeploymentStatusUpdater(): @@ -38,51 +38,58 @@ def run(self, *args, **kwargs): asyncio.run(self.receive_messages()) async def receive_messages(self): - while True: - try: - async with credentials.get_credential_async() as credential: - service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential) - - logging.info("Looking for new session...") - # max_wait_time=1 -> don't hold the session open after processing of the message has finished - async with service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_DEPLOYMENT_STATUS_UPDATE_QUEUE, max_wait_time=1, session_id=NEXT_AVAILABLE_SESSION) as receiver: - logging.info(f"Got a session containing messages: {receiver.session.session_id}") - async with AutoLockRenewer() as renewer: - renewer.register(receiver, receiver.session, max_lock_renewal_duration=60) - async for msg in receiver: - complete_message = await self.process_message(msg) - if complete_message: - await receiver.complete_message(msg) - else: - # could have been any kind of transient issue, we'll abandon back to the queue, and retry - await receiver.abandon_message(msg) - logging.info(f"Closing session: {receiver.session.session_id}") - - except OperationTimeoutError: - # Timeout occurred whilst connecting to a session - this is expected and indicates no non-empty sessions are available - logging.debug("No sessions for this process. Will look again...") - - except ServiceBusConnectionError: - # Occasionally there will be a transient / network-level error in connecting to SB. - logging.info("Unknown Service Bus connection error. Will retry...") - - except Exception as e: - # Catch all other exceptions, log them via .exception to get the stack trace, and reconnect - logging.exception(f"Unknown exception. Will retry - {e}") + with tracer.start_as_current_span("deployment_status_receive_messages"): + while True: + try: + async with credentials.get_credential_async() as credential: + service_bus_client = ServiceBusClient(config.SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE, credential) + + logger.info(f"Looking for new messages on {config.SERVICE_BUS_DEPLOYMENT_STATUS_UPDATE_QUEUE} queue...") + # max_wait_time=1 -> don't hold the session open after processing of the message has finished + async with service_bus_client.get_queue_receiver(queue_name=config.SERVICE_BUS_DEPLOYMENT_STATUS_UPDATE_QUEUE, max_wait_time=1, session_id=NEXT_AVAILABLE_SESSION) as receiver: + logger.info(f"Got a session containing messages: {receiver.session.session_id}") + async with AutoLockRenewer() as renewer: + renewer.register(receiver, receiver.session, max_lock_renewal_duration=60) + async for msg in receiver: + complete_message = await self.process_message(msg) + if complete_message: + await receiver.complete_message(msg) + else: + # could have been any kind of transient issue, we'll abandon back to the queue, and retry + await receiver.abandon_message(msg) + logger.info(f"Closing session: {receiver.session.session_id}") + + except OperationTimeoutError: + # Timeout occurred whilst connecting to a session - this is expected and indicates no non-empty sessions are available + logger.debug("No sessions for this process. Will look again...") + + except ServiceBusConnectionError: + # Occasionally there will be a transient / network-level error in connecting to SB. + logger.info("Unknown Service Bus connection error. Will retry...") + + except Exception as e: + # Catch all other exceptions, log them via .exception to get the stack trace, and reconnect + logger.exception(f"Unknown exception. Will retry - {e}") async def process_message(self, msg): complete_message = False message = "" - try: - message = parse_obj_as(DeploymentStatusUpdateMessage, json.loads(str(msg))) - logging.info(f"Received and parsed JSON for: {msg.correlation_id}") - complete_message = await self.update_status_in_database(message) - logging.info(f"Update status in DB for {message.operationId} - {message.status}") - except (json.JSONDecodeError, ValidationError): - logging.exception(f"{strings.DEPLOYMENT_STATUS_MESSAGE_FORMAT_INCORRECT}: {msg.correlation_id}") - except Exception: - logging.exception(f"Exception processing message: {msg.correlation_id}") + with tracer.start_as_current_span("process_message") as current_span: + try: + message = parse_obj_as(DeploymentStatusUpdateMessage, json.loads(str(msg))) + + current_span.set_attribute("step_id", message.stepId) + current_span.set_attribute("operation_id", message.operationId) + current_span.set_attribute("status", message.status) + + logger.info(f"Received and parsed JSON for: {msg.correlation_id}") + complete_message = await self.update_status_in_database(message) + logger.info(f"Update status in DB for {message.operationId} - {message.status}") + except (json.JSONDecodeError, ValidationError): + logger.exception(f"{strings.DEPLOYMENT_STATUS_MESSAGE_FORMAT_INCORRECT}: {msg.correlation_id}") + except Exception: + logger.exception(f"Exception processing message: {msg.correlation_id}") return complete_message @@ -161,11 +168,11 @@ async def update_status_in_database(self, message: DeploymentStatusUpdateMessage user=operation.user) # create + send the message - logging.info(f"Sending next step in operation to deployment queue -> step_id: {next_step.templateStepId}, action: {next_step.resourceAction}") + logger.info(f"Sending next step in operation to deployment queue -> step_id: {next_step.templateStepId}, action: {next_step.resourceAction}") content = json.dumps(resource_to_send.get_resource_request_message_payload(operation_id=operation.id, step_id=next_step.id, action=next_step.resourceAction)) await send_deployment_message(content=content, correlation_id=operation.id, session_id=resource_to_send.id, action=next_step.resourceAction) except Exception as e: - logging.exception("Unable to send update for resource in pipeline step") + logger.exception("Unable to send update for resource in pipeline step") next_step.message = repr(e) next_step.status = Status.UpdatingFailed await self.update_overall_operation_status(operation, next_step, is_last_step) @@ -176,9 +183,9 @@ async def update_status_in_database(self, message: DeploymentStatusUpdateMessage except EntityDoesNotExist: # Marking as true as this message will never succeed anyways and should be removed from the queue. result = True - logging.exception(strings.DEPLOYMENT_STATUS_ID_NOT_FOUND.format(message.id)) + logger.exception(strings.DEPLOYMENT_STATUS_ID_NOT_FOUND.format(message.id)) except Exception: - logging.exception("Failed to update status") + logger.exception("Failed to update status") return result diff --git a/api_app/service_bus/helpers.py b/api_app/service_bus/helpers.py index 729b9953af..55ae5c1b20 100644 --- a/api_app/service_bus/helpers.py +++ b/api_app/service_bus/helpers.py @@ -11,7 +11,7 @@ from models.schemas.resource import ResourcePatch from db.repositories.resources import ResourceRepository from core import config, credentials -import logging +from services.logging import logger from azure.cosmos.exceptions import CosmosAccessConditionFailedError @@ -36,7 +36,7 @@ async def _send_message(message: ServiceBusMessage, queue: str): async def send_deployment_message(content, correlation_id, session_id, action): resource_request_message = ServiceBusMessage(body=content, correlation_id=correlation_id, session_id=session_id) - logging.info(f"Sending resource request message with correlation ID {resource_request_message.correlation_id}, action: {action}") + logger.info(f"Sending resource request message with correlation ID {resource_request_message.correlation_id}, action: {action}") await _send_message(resource_request_message, config.SERVICE_BUS_RESOURCE_REQUEST_QUEUE) @@ -119,7 +119,7 @@ async def try_update_with_retries(num_retries: int, attempt_count: int, resource primary_parent_workspace_svc=primary_parent_workspace_svc ) except CosmosAccessConditionFailedError as e: - logging.warning(f"Etag mismatch for {resource_to_update_id}. Retrying.") + logger.warning(f"Etag mismatch for {resource_to_update_id}. Retrying.") if attempt_count < num_retries: await try_update_with_retries( num_retries=num_retries, diff --git a/api_app/service_bus/resource_request_sender.py b/api_app/service_bus/resource_request_sender.py index a0aac86aeb..e5fe83ba84 100644 --- a/api_app/service_bus/resource_request_sender.py +++ b/api_app/service_bus/resource_request_sender.py @@ -11,6 +11,7 @@ from models.domain.operation import Operation from db.repositories.operations import OperationRepository +from services.logging import tracer async def send_resource_request_message(resource: Resource, operations_repo: OperationRepository, resource_repo: ResourceRepository, user: User, resource_template_repo: ResourceTemplateRepository, resource_history_repo: ResourceHistoryRepository, action: RequestAction = RequestAction.Install, is_cascade: str = False) -> Operation: @@ -21,40 +22,46 @@ async def send_resource_request_message(resource: Resource, operations_repo: Ope :param resource: The resource to deploy. :param action: install, uninstall etc. """ - # Construct the resources to build an operation item for - resources_list = [] - if is_cascade: - resources_list = await resource_repo.get_resource_dependency_list(resource) - else: - resources_list.append(resource.__dict__) - - # add the operation to the db - this will create all the steps needed (if any are defined in the template) - operation = await operations_repo.create_operation_item( - resource_id=resource.id, - resource_list=resources_list, - action=action, - resource_path=resource.resourcePath, - resource_version=resource.resourceVersion, - user=user, - resource_repo=resource_repo, - resource_template_repo=resource_template_repo) - - # prep the first step to send in SB - # resource at this point is the original object with unmaskked values - first_step = operation.steps[0] - resource_to_send = await update_resource_for_step( - operation_step=first_step, - resource_repo=resource_repo, - resource_template_repo=resource_template_repo, - resource_history_repo=resource_history_repo, - root_resource=resource, - step_resource=None, - resource_to_update_id=first_step.resourceId, - primary_action=action, - user=user) - - # create + send the message - content = json.dumps(resource_to_send.get_resource_request_message_payload(operation_id=operation.id, step_id=first_step.id, action=first_step.resourceAction)) - await send_deployment_message(content=content, correlation_id=operation.id, session_id=first_step.resourceId, action=first_step.resourceAction) + with tracer.start_as_current_span("send_resource_request_message") as current_span: + current_span.set_attribute("resource_id", resource.id) + current_span.set_attribute("action", action) + + # Construct the resources to build an operation item for + resources_list = [] + if is_cascade: + resources_list = await resource_repo.get_resource_dependency_list(resource) + else: + resources_list.append(resource.__dict__) + + # add the operation to the db - this will create all the steps needed (if any are defined in the template) + operation = await operations_repo.create_operation_item( + resource_id=resource.id, + resource_list=resources_list, + action=action, + resource_path=resource.resourcePath, + resource_version=resource.resourceVersion, + user=user, + resource_repo=resource_repo, + resource_template_repo=resource_template_repo) + current_span.set_attribute("operation_id", operation.id) + + # prep the first step to send in SB + # resource at this point is the original object with unmaskked values + first_step = operation.steps[0] + current_span.set_attribute("step_id", first_step.id) + resource_to_send = await update_resource_for_step( + operation_step=first_step, + resource_repo=resource_repo, + resource_template_repo=resource_template_repo, + resource_history_repo=resource_history_repo, + root_resource=resource, + step_resource=None, + resource_to_update_id=first_step.resourceId, + primary_action=action, + user=user) + + # create + send the message + content = json.dumps(resource_to_send.get_resource_request_message_payload(operation_id=operation.id, step_id=first_step.id, action=first_step.resourceAction)) + await send_deployment_message(content=content, correlation_id=operation.id, session_id=first_step.resourceId, action=first_step.resourceAction) return operation diff --git a/api_app/services/aad_authentication.py b/api_app/services/aad_authentication.py index 61ff23f1fb..fba97619ed 100644 --- a/api_app/services/aad_authentication.py +++ b/api_app/services/aad_authentication.py @@ -1,5 +1,4 @@ import base64 -import logging from collections import defaultdict from enum import Enum from typing import List, Optional @@ -17,11 +16,13 @@ from resources import strings from api.dependencies.database import get_db_client_from_request from db.repositories.workspaces import WorkspaceRepository +from services.logging import logger from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization + MICROSOFT_GRAPH_URL = config.MICROSOFT_GRAPH_URL.strip("/") @@ -59,7 +60,7 @@ async def __call__(self, request: Request) -> User: # Try workspace app registration if appropriate if 'workspace_id' in request.path_params and any(role in self.require_one_of_roles for role in self.WORKSPACE_ROLES_DICT.keys()): # as we have a workspace_id not given, try decoding token - logging.debug("Workspace ID was provided. Getting Workspace API app registration") + logger.debug("Workspace ID was provided. Getting Workspace API app registration") try: # get the app reg id - which might be blank if the workspace hasn't fully created yet. # if it's blank, don't use workspace auth, use core auth - and a TRE Admin can still get it @@ -69,8 +70,8 @@ async def __call__(self, request: Request) -> User: except HTTPException as h: raise h except Exception as e: - logging.debug(e) - logging.debug("Failed to decode using workspace_id, trying with TRE API app registration") + logger.debug(e) + logger.debug("Failed to decode using workspace_id, trying with TRE API app registration") pass # Try TRE API app registration if appropriate @@ -78,18 +79,18 @@ async def __call__(self, request: Request) -> User: try: decoded_token = self._decode_token(token, config.API_AUDIENCE) except jwt.exceptions.InvalidSignatureError: - logging.debug("Failed to decode using TRE API app registration (Invalid Signatrue)") + logger.debug("Failed to decode using TRE API app registration (Invalid Signatrue)") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=strings.INVALID_SIGNATURE) except jwt.exceptions.ExpiredSignatureError: - logging.debug("Failed to decode using TRE API app registration (Expired Signature)") + logger.debug("Failed to decode using TRE API app registration (Expired Signature)") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=strings.EXPIRED_SIGNATURE) except jwt.exceptions.InvalidTokenError: # any other token validation exception, we want to catch all of these... - logging.debug("Failed to decode using TRE API app registration (Invalid token)") + logger.debug("Failed to decode using TRE API app registration (Invalid token)") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=strings.INVALID_TOKEN) except Exception as e: # Unexpected token decoding/validation exception. making sure we are not crashing (with 500) - logging.debug(e) + logger.debug(e) pass # Failed to decode token using either app registration @@ -99,14 +100,14 @@ async def __call__(self, request: Request) -> User: try: user = self._get_user_from_token(decoded_token) except Exception as e: - logging.debug(e) + logger.debug(e) raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=strings.ACCESS_UNABLE_TO_GET_ROLE_ASSIGNMENTS_FOR_USER, headers={"WWW-Authenticate": "Bearer"}) try: if not any(role in self.require_one_of_roles for role in user.roles): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f'{strings.ACCESS_USER_DOES_NOT_HAVE_REQUIRED_ROLE}: {self.require_one_of_roles}', headers={"WWW-Authenticate": "Bearer"}) except Exception as e: - logging.debug(e) + logger.debug(e) raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f'{strings.ACCESS_USER_DOES_NOT_HAVE_REQUIRED_ROLE}: {self.require_one_of_roles}', headers={"WWW-Authenticate": "Bearer"}) return user @@ -115,7 +116,7 @@ async def __call__(self, request: Request) -> User: async def _fetch_ws_app_reg_id_from_ws_id(request: Request) -> str: workspace_id = None if "workspace_id" not in request.path_params: - logging.error("Neither a workspace ID nor a default app registration id were provided") + logger.error("Neither a workspace ID nor a default app registration id were provided") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=strings.AUTH_COULD_NOT_VALIDATE_CREDENTIALS) try: workspace_id = request.path_params['workspace_id'] @@ -129,10 +130,10 @@ async def _fetch_ws_app_reg_id_from_ws_id(request: Request) -> str: return ws_app_reg_id except EntityDoesNotExist: - logging.exception(strings.WORKSPACE_DOES_NOT_EXIST) + logger.exception(strings.WORKSPACE_DOES_NOT_EXIST) raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=strings.WORKSPACE_DOES_NOT_EXIST) except Exception: - logging.exception(f"Failed to get workspace app registration ID for workspace {workspace_id}") + logger.exception(f"Failed to get workspace app registration ID for workspace {workspace_id}") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=strings.AUTH_COULD_NOT_VALIDATE_CREDENTIALS) @staticmethod @@ -148,7 +149,7 @@ def _decode_token(self, token: str, ws_app_reg_id: str) -> dict: key_id = self._get_key_id(token) key = self._get_token_key(key_id) - logging.debug("workspace app registration id: %s", ws_app_reg_id) + logger.debug("workspace app registration id: %s", ws_app_reg_id) return jwt.decode(token, key, options={"verify_signature": True}, algorithms=['RS256'], audience=ws_app_reg_id) @staticmethod @@ -183,6 +184,7 @@ def _get_token_key(self, key_id: str) -> str: n = int.from_bytes(base64.urlsafe_b64decode(self._ensure_b64padding(key['n'])), "big") e = int.from_bytes(base64.urlsafe_b64decode(self._ensure_b64padding(key['e'])), "big") pub_key = rsa.RSAPublicNumbers(e, n).public_key(default_backend()) + # Cache the PEM formatted public key. AzureADAuthorization._jwt_keys[key['kid']] = pub_key.public_bytes( encoding=serialization.Encoding.PEM, @@ -203,13 +205,10 @@ def _get_msgraph_token() -> str: except Exception: result = None if not result: - logging.debug('No suitable token exists in cache, getting a new one from AAD') + logger.debug('No suitable token exists in cache, getting a new one from AAD') result = app.acquire_token_for_client(scopes=scopes) if "access_token" not in result: - logging.debug(result.get('error')) - logging.debug(result.get('error_description')) - logging.debug(result.get('correlation_id')) - raise Exception(result.get('error')) + raise Exception(f"API app registration access token cannot be retrieved. {result.get('error')}: {result.get('error_description')}") return result["access_token"] @staticmethod @@ -330,7 +329,7 @@ def _get_batch_users_by_role_assignments_body(self, roles_graph_data): def _get_app_auth_info(self, client_id: str) -> dict: graph_data = self._get_app_sp_graph_data(client_id) if 'value' not in graph_data or len(graph_data['value']) == 0: - logging.debug(graph_data) + logger.debug(graph_data) raise AuthConfigValidationError(f"{strings.ACCESS_UNABLE_TO_GET_INFO_FOR_APP} {client_id}") app_info = graph_data['value'][0] @@ -350,7 +349,7 @@ def _ms_graph_query(self, url: str, http_method: str, json=None) -> dict: while True: if not url: break - logging.debug(f"Making request to: {url}") + logger.debug(f"Making request to: {url}") if json: response = requests.request(method=http_method, url=url, json=json, headers=auth_headers) else: @@ -362,8 +361,8 @@ def _ms_graph_query(self, url: str, http_method: str, json=None) -> dict: if '@odata.nextLink' in json_response: url = json_response['@odata.nextLink'] else: - logging.error(f"MS Graph query to: {url} failed with status code {response.status_code}") - logging.error(f"Full response: {response}") + logger.error(f"MS Graph query to: {url} failed with status code {response.status_code}") + logger.error(f"Full response: {response}") return graph_data def _get_role_assignment_graph_data_for_user(self, user_id: str) -> dict: @@ -381,15 +380,15 @@ def _get_identity_type(self, id: str) -> str: request_body = {"ids": [id], "types": ["user", "servicePrincipal"]} graph_data = self._ms_graph_query(objects_endpoint, "POST", json=request_body) - logging.debug(graph_data) + logger.debug(graph_data) if "value" not in graph_data or len(graph_data["value"]) != 1: - logging.debug(graph_data) + logger.debug(graph_data) raise AuthConfigValidationError(f"{strings.ACCESS_UNABLE_TO_GET_ACCOUNT_TYPE} {id}") object_info = graph_data["value"][0] if "@odata.type" not in object_info: - logging.debug(object_info) + logger.debug(object_info) raise AuthConfigValidationError(f"{strings.ACCESS_UNABLE_TO_GET_ACCOUNT_TYPE} {id}") return object_info["@odata.type"] @@ -421,10 +420,10 @@ def get_identity_role_assignments(self, user_id: str) -> List[RoleAssignment]: raise AuthConfigValidationError(f"{strings.ACCESS_UNHANDLED_ACCOUNT_TYPE} {identity_type}") if 'value' not in graph_data: - logging.debug(graph_data) + logger.debug(graph_data) raise AuthConfigValidationError(f"{strings.ACCESS_UNABLE_TO_GET_ROLE_ASSIGNMENTS_FOR_USER} {user_id}") - logging.debug(graph_data) + logger.debug(graph_data) return [RoleAssignment(role_assignment['resourceId'], role_assignment['appRoleId']) for role_assignment in graph_data['value']] diff --git a/api_app/services/airlock.py b/api_app/services/airlock.py index c0564ac711..fc33645afb 100644 --- a/api_app/services/airlock.py +++ b/api_app/services/airlock.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta -import logging +from services.logging import logger from azure.storage.blob import generate_container_sas, ContainerSasPermissions, BlobServiceClient from fastapi import HTTPException, status @@ -191,7 +191,7 @@ async def create_review_vm(airlock_request: AirlockRequest, user: User, workspac if resource_already_exists: existing_resource = airlock_request.reviewUserResources[user.id] existing_resource = await user_resource_repo.get_user_resource_by_id(workspace_id=existing_resource.workspaceId, service_id=existing_resource.workspaceServiceId, resource_id=existing_resource.userResourceId) - logging.info("User already has an existing review resource") + logger.info("User already has an existing review resource") await _handle_existing_review_resource(existing_resource, user, user_resource_repo, workspace_service_repo, operation_repo, resource_template_repo, resource_history_repo) # Create the VM @@ -209,14 +209,14 @@ async def create_review_vm(airlock_request: AirlockRequest, user: User, workspac userResourceId=user_resource.id )) - logging.info(f"Airlock Request {updated_resource.id} updated to include {updated_resource.reviewUserResources}") + logger.info(f"Airlock Request {updated_resource.id} updated to include {updated_resource.reviewUserResources}") return updated_resource, operation async def _deploy_vm(airlock_request: AirlockRequest, user: User, workspace: Workspace, review_workspace_id: str, review_workspace_service_id: str, user_resource_template_name: str, user_resource_repo: UserResourceRepository, workspace_service_repo: WorkspaceServiceRepository, operation_repo: OperationRepository, resource_template_repo: ResourceTemplateRepository, resource_history_repo: ResourceHistoryRepository): - logging.info(f"Creating review VM in workspace:{review_workspace_id} service:{review_workspace_service_id} using template:{user_resource_template_name}") + logger.info(f"Creating review VM in workspace:{review_workspace_id} service:{review_workspace_service_id} using template:{user_resource_template_name}") workspace_service = await workspace_service_repo.get_workspace_service_by_id(workspace_id=review_workspace_id, service_id=review_workspace_service_id) airlock_request_sas_url = get_airlock_container_link(airlock_request, user, workspace) @@ -250,15 +250,15 @@ async def _handle_existing_review_resource(existing_resource: AirlockReviewUserR if existing_resource.isEnabled and existing_resource.deploymentStatus == "deployed" and 'azure_resource_id' in existing_resource.properties: resource_status = get_azure_resource_status(existing_resource.properties['azure_resource_id']) if "powerState" in resource_status and resource_status["powerState"] == "VM running": - logging.info("Existing review resource is enabled, in a succeeded state and running. Returning a conflict error.") + logger.info("Existing review resource is enabled, in a succeeded state and running. Returning a conflict error.") raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="A healthy review resource is already deployed for the current user. " "You may only have a single review resource.") # If it wasn't healthy or running, we'll delete the existing resource if not already deleted, and then create a new one - logging.info("Existing review resource is in an unhealthy state.") + logger.info("Existing review resource is in an unhealthy state.") if existing_resource.deploymentStatus != "deleted": - logging.info("Deleting existing user resource...") + logger.info("Deleting existing user resource...") _ = await delete_review_user_resource( user_resource=existing_resource, user_resource_repo=user_resource_repo, @@ -278,21 +278,21 @@ async def save_and_publish_event_airlock_request(airlock_request: AirlockRequest check_email_exists(role_assignment_details) try: - logging.debug(f"Saving airlock request item: {airlock_request.id}") + logger.debug(f"Saving airlock request item: {airlock_request.id}") airlock_request.updatedBy = user airlock_request.updatedWhen = get_timestamp() await airlock_request_repo.save_item(airlock_request) except Exception: - logging.exception(f'Failed saving airlock request {airlock_request}') + logger.exception(f'Failed saving airlock request {airlock_request}') raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=strings.STATE_STORE_ENDPOINT_NOT_RESPONDING) try: - logging.debug(f"Sending status changed event for airlock request item: {airlock_request.id}") + logger.debug(f"Sending status changed event for airlock request item: {airlock_request.id}") await send_status_changed_event(airlock_request=airlock_request, previous_status=None) await send_airlock_notification_event(airlock_request, workspace, role_assignment_details) except Exception: await airlock_request_repo.delete_item(airlock_request.id) - logging.exception("Failed sending status_changed message") + logger.exception("Failed sending status_changed message") raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=strings.EVENT_GRID_GENERAL_ERROR_MESSAGE) @@ -307,7 +307,7 @@ async def update_and_publish_event_airlock_request( airlock_review: Optional[AirlockReview] = None, review_user_resource: Optional[AirlockReviewUserResource] = None) -> AirlockRequest: try: - logging.debug(f"Updating airlock request item: {airlock_request.id}") + logger.debug(f"Updating airlock request item: {airlock_request.id}") updated_airlock_request = await airlock_request_repo.update_airlock_request( original_request=airlock_request, updated_by=updated_by, @@ -317,7 +317,7 @@ async def update_and_publish_event_airlock_request( airlock_review=airlock_review, review_user_resource=review_user_resource) except Exception as e: - logging.exception(f'Failed updating airlock_request item {airlock_request}') + logger.exception(f'Failed updating airlock_request item {airlock_request}') # If the validation failed, the error was not related to the saving itself if hasattr(e, 'status_code'): if e.status_code == 400: # type: ignore @@ -325,18 +325,18 @@ async def update_and_publish_event_airlock_request( raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=strings.STATE_STORE_ENDPOINT_NOT_RESPONDING) if not new_status: - logging.debug(f"Skipping sending 'status changed' event for airlock request item: {airlock_request.id} - there is no status change") + logger.debug(f"Skipping sending 'status changed' event for airlock request item: {airlock_request.id} - there is no status change") return updated_airlock_request try: - logging.debug(f"Sending status changed event for airlock request item: {airlock_request.id}") + logger.debug(f"Sending status changed event for airlock request item: {airlock_request.id}") await send_status_changed_event(airlock_request=updated_airlock_request, previous_status=airlock_request.status) access_service = get_access_service() role_assignment_details = access_service.get_workspace_role_assignment_details(workspace) await send_airlock_notification_event(updated_airlock_request, workspace, role_assignment_details) return updated_airlock_request except Exception: - logging.exception("Failed sending status_changed message") + logger.exception("Failed sending status_changed message") raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=strings.EVENT_GRID_GENERAL_ERROR_MESSAGE) @@ -346,10 +346,10 @@ def get_timestamp() -> float: def check_email_exists(role_assignment_details: defaultdict(list)): if "WorkspaceResearcher" not in role_assignment_details or not role_assignment_details["WorkspaceResearcher"]: - logging.error('Creating an airlock request but the researcher does not have an email address.') + logger.error('Creating an airlock request but the researcher does not have an email address.') raise HTTPException(status_code=status.HTTP_417_EXPECTATION_FAILED, detail=strings.AIRLOCK_NO_RESEARCHER_EMAIL) if "AirlockManager" not in role_assignment_details or not role_assignment_details["AirlockManager"]: - logging.error('Creating an airlock request but the airlock manager does not have an email address.') + logger.error('Creating an airlock request but the airlock manager does not have an email address.') raise HTTPException(status_code=status.HTTP_417_EXPECTATION_FAILED, detail=strings.AIRLOCK_NO_AIRLOCK_MANAGER_EMAIL) @@ -401,7 +401,7 @@ async def delete_review_user_resource( # disable might contain logic that we need to execute before the deletion of the resource _ = await disable_user_resource(user_resource, user, workspace_service, user_resource_repo, resource_template_repo, operations_repo, resource_history_repo) - logging.info(f"Deleting user resource {user_resource.id} in workspace service {workspace_service.id}") + logger.info(f"Deleting user resource {user_resource.id} in workspace service {workspace_service.id}") operation = await send_uninstall_message( resource=user_resource, resource_repo=user_resource_repo, @@ -410,7 +410,7 @@ async def delete_review_user_resource( resource_template_repo=resource_template_repo, resource_history_repo=resource_history_repo, user=user) - logging.info(f"Started operation {operation}") + logger.info(f"Started operation {operation}") return operation @@ -458,7 +458,7 @@ async def delete_all_review_user_resources( ) operations.append(operation) - logging.info(f"Started {len(operations)} operations on deleting user resources") + logger.info(f"Started {len(operations)} operations on deleting user resources") return operations diff --git a/api_app/services/azure_resource_status.py b/api_app/services/azure_resource_status.py index 2e04876927..6672139a3b 100644 --- a/api_app/services/azure_resource_status.py +++ b/api_app/services/azure_resource_status.py @@ -1,5 +1,5 @@ from core import config, credentials -import logging +from services.logging import logger from azure.mgmt.compute import ComputeManagementClient, models from azure.core.exceptions import ResourceNotFoundError @@ -21,7 +21,7 @@ def get_azure_resource_status(resource_id): power_state = power_states[0].display_status return {"powerState": power_state} except ResourceNotFoundError: - logging.warning(f"Unable to query resource status for {resource_id}, as the resource was not found.") + logger.warning(f"Unable to query resource status for {resource_id}, as the resource was not found.") return {} diff --git a/api_app/services/cost_service.py b/api_app/services/cost_service.py index 5a3ddffba4..80a475264f 100644 --- a/api_app/services/cost_service.py +++ b/api_app/services/cost_service.py @@ -3,7 +3,6 @@ from functools import lru_cache from typing import Dict, Optional, Union import pandas as pd -import logging from azure.mgmt.costmanagement import CostManagementClient from azure.mgmt.costmanagement.models import QueryGrouping, QueryAggregation, QueryDataset, QueryDefinition, \ @@ -21,6 +20,7 @@ from models.domain.costs import GranularityEnum, CostReport, WorkspaceCostReport, CostItem, WorkspaceServiceCostItem, \ CostRow from models.domain.resource import Resource +from services.logging import logger class ResultColumnDaily(Enum): @@ -326,20 +326,20 @@ def query_costs(self, tag_name: str, tag_value: str, # Given subscription {subscription_id} doesn't have valid WebDirect/AIRS offer type. # it means that the Azure subscription deosn't support cost management if "doesn't have valid WebDirect/AIRS" in e.message: - logging.exception("Subscription doesn't support cost management") + logger.exception("Subscription doesn't support cost management") raise SubscriptionNotSupported(e) else: - logging.exception("Unhandled Cost Management API error") + logger.exception("Unhandled Cost Management API error") raise e except HttpResponseError as e: - logging.exception("Cost Management API error") + logger.exception("Cost Management API error") if e.status_code == 429: # Too many requests - Request is throttled. # Retry after waiting for the time specified in the "x-ms-ratelimit-microsoft.consumption-retry-after" header. if self.RATE_LIMIT_RETRY_AFTER_HEADER_KEY in e.response.headers: raise TooManyRequests(int(e.response.headers[self.RATE_LIMIT_RETRY_AFTER_HEADER_KEY])) else: - logging.exception(f"{self.RATE_LIMIT_RETRY_AFTER_HEADER_KEY} header was not found in response") + logger.exception(f"{self.RATE_LIMIT_RETRY_AFTER_HEADER_KEY} header was not found in response") raise e elif e.status_code == 503: # Service unavailable - Service is temporarily unavailable. @@ -347,7 +347,7 @@ def query_costs(self, tag_name: str, tag_value: str, if self.SERVICE_UNAVAILABLE_RETRY_AFTER_HEADER_KEY in e.response.headers: raise ServiceUnavailable(int(e.response.headers[self.SERVICE_UNAVAILABLE_RETRY_AFTER_HEADER_KEY])) else: - logging.exception(f"{self.SERVICE_UNAVAILABLE_RETRY_AFTER_HEADER_KEY} header was not found in response") + logger.exception(f"{self.SERVICE_UNAVAILABLE_RETRY_AFTER_HEADER_KEY} header was not found in response") raise e else: raise e diff --git a/api_app/services/health_checker.py b/api_app/services/health_checker.py index 42a01e9ff0..a79ee90cd1 100644 --- a/api_app/services/health_checker.py +++ b/api_app/services/health_checker.py @@ -1,4 +1,3 @@ -import logging from typing import Tuple from azure.core import exceptions from azure.cosmos.aio import CosmosClient @@ -11,12 +10,13 @@ from core import config from models.schemas.status import StatusEnum from resources import strings +from services.logging import logger async def create_state_store_status(credential) -> Tuple[StatusEnum, str]: status = StatusEnum.ok message = "" - debug = True if config.DEBUG == "true" else False + debug = True if config.LOGGING_LEVEL == "DEBUG" else False try: primary_master_key = await get_store_key(credential) cosmos_client = CosmosClient(config.STATE_STORE_ENDPOINT, primary_master_key, connection_verify=debug) @@ -30,7 +30,7 @@ async def create_state_store_status(credential) -> Tuple[StatusEnum, str]: status = StatusEnum.not_ok message = strings.STATE_STORE_ENDPOINT_NOT_ACCESSIBLE except Exception: - logging.exception("Failed to query cosmos db status") + logger.exception("Failed to query cosmos db status") status = StatusEnum.not_ok message = strings.UNSPECIFIED_ERROR return status, message @@ -52,7 +52,7 @@ async def create_service_bus_status(credential) -> Tuple[StatusEnum, str]: status = StatusEnum.not_ok message = strings.SERVICE_BUS_AUTHENTICATION_ERROR except Exception: - logging.exception("Failed to query service bus status") + logger.exception("Failed to query service bus status") status = StatusEnum.not_ok message = strings.UNSPECIFIED_ERROR return status, message @@ -76,7 +76,7 @@ async def create_resource_processor_status(credential) -> Tuple[StatusEnum, str] status = StatusEnum.not_ok message = strings.RESOURCE_PROCESSOR_GENERAL_ERROR_MESSAGE except Exception: - logging.exception("Failed to query resource processor status") + logger.exception("Failed to query resource processor status") status = StatusEnum.not_ok message = strings.UNSPECIFIED_ERROR return status, message diff --git a/api_app/services/logging.py b/api_app/services/logging.py index 502b67f705..eeb9ca17cd 100644 --- a/api_app/services/logging.py +++ b/api_app/services/logging.py @@ -1,13 +1,9 @@ import logging -import os -from typing import Optional +from opentelemetry.instrumentation.logging import LoggingInstrumentor +from opentelemetry import trace +from azure.monitor.opentelemetry import configure_azure_monitor -from opencensus.ext.azure.log_exporter import AzureLogHandler -from opencensus.trace import config_integration -from opencensus.trace.samplers import AlwaysOnSampler -from opencensus.trace.tracer import Tracer - -from core.config import VERSION +from core.config import APPLICATIONINSIGHTS_CONNECTION_STRING, LOGGING_LEVEL UNWANTED_LOGGERS = [ "azure.core.pipeline.policies.http_logging_policy", @@ -18,7 +14,7 @@ "azure.identity.aio._internal.decorators", "azure.identity.aio._credentials.chained", "azure.identity", - "msal.token_cache" + "msal.token_cache", # Remove these once the following PR is merged: # https://github.com/Azure/azure-sdk-for-python/pull/30832 # Issue: https://github.com/microsoft/AzureTRE/issues/3766 @@ -26,101 +22,74 @@ ] LOGGERS_FOR_ERRORS_ONLY = [ - "urllib3.connectionpool", + "azure.monitor.opentelemetry.exporter.export._base", + "azure.servicebus.aio._base_handler_async", + "azure.servicebus._pyamqp.aio._cbs_async", + "azure.servicebus._pyamqp.aio._client_async", + "azure.servicebus._pyamqp.aio._connection_async", + "azure.servicebus._pyamqp.aio._link_async", + "azure.servicebus._pyamqp.aio._management_link_async", + "opentelemetry.attributes", "uamqp", - "uamqp.authentication.cbs_auth_async", + "uamqp.async_ops", "uamqp.async_ops.client_async", "uamqp.async_ops.connection_async", - "uamqp.async_ops", + "uamqp.async_ops.session_async", "uamqp.authentication", + "uamqp.authentication.cbs_auth_async", "uamqp.c_uamqp", + "uamqp.client", "uamqp.connection", "uamqp.receiver", - "uamqp.async_ops.session_async", "uamqp.sender", - "uamqp.client", - "azure.identity._persistent_cache", - "azure.servicebus.aio._base_handler_async", - "azure.servicebus._pyamqp.aio._cbs_async", - "azure.servicebus._pyamqp.aio._connection_async", - "azure.servicebus._pyamqp.aio._link_async", - "azure.servicebus._pyamqp.aio._management_link_async", - "azure.servicebus._pyamqp.aio._session_async", - "azure.servicebus._pyamqp.aio._client_async" + "urllib3.connectionpool" ] +logger = logging.getLogger("azuretre_api") +tracer = trace.get_tracer("azuretre_api") + -def disable_unwanted_loggers(): - """ - Disables the unwanted loggers. - """ +def configure_loggers(): for logger_name in LOGGERS_FOR_ERRORS_ONLY: logging.getLogger(logger_name).setLevel(logging.ERROR) for logger_name in UNWANTED_LOGGERS: - logging.getLogger(logger_name).disabled = True - - -def telemetry_processor_callback_function(envelope): - envelope.tags['ai.cloud.role'] = 'api' - envelope.tags['ai.application.ver'] = VERSION - - -class ExceptionTracebackFilter(logging.Filter): - """ - If a record contains 'exc_info', it will only show in the 'exceptions' section of Application Insights without showing - in the 'traces' section. In order to show it also in the 'traces' section, we need another log that does not contain 'exc_info'. - """ - def filter(self, record): - if record.exc_info: - logger = logging.getLogger(record.name) - _, exception_value, _ = record.exc_info - message = f"{record.getMessage()}\nException message: '{exception_value}'" - logger.log(record.levelno, message) - - return True - - -def initialize_logging(logging_level: int, correlation_id: Optional[str] = None, add_console_handler: bool = False) -> logging.LoggerAdapter: - """ - Adds the Application Insights handler for the root logger and sets the given logging level. - Creates and returns a logger adapter that integrates the correlation ID, if given, to the log messages. + logging.getLogger(logger_name).setLevel(logging.CRITICAL) - :param logging_level: The logging level to set e.g., logging.WARNING. - :param correlation_id: Optional. The correlation ID that is passed on to the operation_Id in App Insights. - :returns: A newly created logger adapter. - """ - logger = logging.getLogger() - disable_unwanted_loggers() +def initialize_logging() -> logging.Logger: - if add_console_handler: - console_formatter = logging.Formatter(fmt='%(module)-7s %(name)-7s %(process)-7s %(asctime)s %(levelname)-7s %(message)s') - console_handler = logging.StreamHandler() - console_handler.setFormatter(console_formatter) - logger.addHandler(console_handler) + configure_loggers() - try: - # picks up APPLICATIONINSIGHTS_CONNECTION_STRING automatically - if os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING"): - azurelog_handler = AzureLogHandler() - azurelog_handler.add_telemetry_processor(telemetry_processor_callback_function) - azurelog_handler.addFilter(ExceptionTracebackFilter()) - logger.addHandler(azurelog_handler) - except ValueError as e: - logger.error(f"Failed to set Application Insights logger handler: {e}") + logging_level = logging.INFO - config_integration.trace_integrations(['logging']) - logging.basicConfig(level=logging_level, format='%(asctime)s traceId=%(traceId)s spanId=%(spanId)s %(message)s') - Tracer(sampler=AlwaysOnSampler()) - logger.setLevel(logging_level) + if LOGGING_LEVEL == "INFO": + logging_level = logging.INFO + elif LOGGING_LEVEL == "DEBUG": + logging_level = logging.DEBUG + elif LOGGING_LEVEL == "WARNING": + logging_level = logging.WARNING + elif LOGGING_LEVEL == "ERROR": + logging_level = logging.ERROR - extra = {} + if APPLICATIONINSIGHTS_CONNECTION_STRING: + configure_azure_monitor( + logger_name="azuretre_api", + instrumentation_options={ + "azure_sdk": {"enabled": False}, + "flask": {"enabled": False}, + "django": {"enabled": False}, + "fastapi": {"enabled": True}, + "psycopg2": {"enabled": False}, + } + ) - if correlation_id: - extra = {'traceId': correlation_id} + LoggingInstrumentor().instrument( + set_logging_format=True, + log_level=logging_level, + tracer_provider=tracer._real_tracer + ) - adapter = logging.LoggerAdapter(logger, extra) - adapter.debug(f"Logger adapter initialized with extra: {extra}") + logger.info("Logging initialized with level: %s", LOGGING_LEVEL) - return adapter + return logger diff --git a/api_app/services/tracing.py b/api_app/services/tracing.py deleted file mode 100644 index cf269af625..0000000000 --- a/api_app/services/tracing.py +++ /dev/null @@ -1,90 +0,0 @@ -import logging - -from fastapi import Request -from opencensus.trace import ( - attributes_helper, - execution_context, - print_exporter, - samplers, -) -from opencensus.trace import span as span_module -from opencensus.trace import tracer as tracer_module -from opencensus.trace import utils -from opencensus.trace.propagation import trace_context_http_header_format -from starlette.types import ASGIApp -from starlette.middleware.base import BaseHTTPMiddleware - -HTTP_HOST = attributes_helper.COMMON_ATTRIBUTES["HTTP_HOST"] -HTTP_METHOD = attributes_helper.COMMON_ATTRIBUTES["HTTP_METHOD"] -HTTP_PATH = attributes_helper.COMMON_ATTRIBUTES["HTTP_PATH"] -HTTP_ROUTE = attributes_helper.COMMON_ATTRIBUTES["HTTP_ROUTE"] -HTTP_URL = attributes_helper.COMMON_ATTRIBUTES["HTTP_URL"] -HTTP_STATUS_CODE = attributes_helper.COMMON_ATTRIBUTES["HTTP_STATUS_CODE"] - -module_logger = logging.getLogger(__name__) - - -class RequestTracerMiddleware(BaseHTTPMiddleware): - def __init__( - self, - app: ASGIApp, - excludelist_paths=None, - excludelist_hostnames=None, - sampler=None, - exporter=None, - propagator=None, - ) -> None: - super().__init__(app) - self.app = app - self.excludelist_paths = excludelist_paths - self.excludelist_hostnames = excludelist_hostnames - self.sampler = sampler or samplers.AlwaysOnSampler() - self.exporter = exporter or print_exporter.PrintExporter() - self.propagator = ( - propagator or trace_context_http_header_format.TraceContextPropagator() - ) - - async def dispatch(self, request: Request, call_next): - - # Do not trace if the url is in the exclude list - if utils.disable_tracing_url(str(request.url), self.excludelist_paths): - return await call_next(request) - - try: - span_context = self.propagator.from_headers(request.headers) - - tracer = tracer_module.Tracer( - span_context=span_context, - sampler=self.sampler, - exporter=self.exporter, - propagator=self.propagator, - ) - except Exception: # pragma: NO COVER - module_logger.error("Failed to trace request", exc_info=True) - return await call_next(request) - - try: - span = tracer.start_span() - span.span_kind = span_module.SpanKind.SERVER - span.name = "[{}]{}".format(request.method, request.url) - - tracer.add_attribute_to_current_span(HTTP_HOST, request.url.hostname) - tracer.add_attribute_to_current_span(HTTP_METHOD, request.method) - tracer.add_attribute_to_current_span(HTTP_PATH, request.url.path) - tracer.add_attribute_to_current_span(HTTP_ROUTE, request.url.path) - tracer.add_attribute_to_current_span(HTTP_URL, str(request.url)) - - execution_context.set_opencensus_attr( - "excludelist_hostnames", self.excludelist_hostnames - ) - except Exception: # pragma: NO COVER - module_logger.error("Failed to trace request", exc_info=True) - - response = await call_next(request) - try: - tracer.add_attribute_to_current_span(HTTP_STATUS_CODE, response.status_code) - except Exception: # pragma: NO COVER - module_logger.error("Failed to trace response", exc_info=True) - finally: - tracer.end_span() - return response diff --git a/api_app/tests_ma/test_api/test_routes/test_migrations.py b/api_app/tests_ma/test_api/test_routes/test_migrations.py index 02a30b92bf..d49239d222 100644 --- a/api_app/tests_ma/test_api/test_routes/test_migrations.py +++ b/api_app/tests_ma/test_api/test_routes/test_migrations.py @@ -33,7 +33,7 @@ def _prepare(self, app, admin_user): app.dependency_overrides = {} # [POST] /migrations/ - @ patch("api.routes.migrations.logging.info") + @ patch("api.routes.migrations.logger.info") @ patch("api.routes.migrations.OperationRepository") @ patch("api.routes.migrations.ResourceMigration.archive_history") @ patch("api.routes.migrations.ResourceMigration.add_deployment_status_field") @@ -67,7 +67,7 @@ async def test_post_migrations_returns_202_on_successful(self, migrate_step_id_o assert response.status_code == status.HTTP_202_ACCEPTED # [POST] /migrations/ - @ patch("api.routes.migrations.logging.info") + @ patch("api.routes.migrations.logger.info") @ patch("api.routes.migrations.ResourceRepository.rename_field_name", side_effect=ValueError) @ patch("api.routes.migrations.SharedServiceMigration.deleteDuplicatedSharedServices") @ patch("api.routes.migrations.WorkspaceMigration.moveAuthInformationToProperties") diff --git a/api_app/tests_ma/test_db/test_migrations/test_workspace_migration.py b/api_app/tests_ma/test_db/test_migrations/test_workspace_migration.py index c9671959ad..c15310f6d3 100644 --- a/api_app/tests_ma/test_db/test_migrations/test_workspace_migration.py +++ b/api_app/tests_ma/test_db/test_migrations/test_workspace_migration.py @@ -42,8 +42,7 @@ def get_sample_old_workspace(workspace_id: str = "7ab18f7e-ee8f-4202-8d46-747818 }] -@ patch('logging.info') -async def test_workspace_migration_moves_fields(logging, workspace_migrator): +async def test_workspace_migration_moves_fields(workspace_migrator): workspace_migrator.query = AsyncMock(return_value=get_sample_old_workspace()) assert (await workspace_migrator.moveAuthInformationToProperties()) diff --git a/api_app/tests_ma/test_service_bus/test_airlock_request_status_update.py b/api_app/tests_ma/test_service_bus/test_airlock_request_status_update.py index cb708fccf9..66829a4a47 100644 --- a/api_app/tests_ma/test_service_bus/test_airlock_request_status_update.py +++ b/api_app/tests_ma/test_service_bus/test_airlock_request_status_update.py @@ -139,7 +139,7 @@ async def test_receiving_good_message(_, app, logging_mock, workspace_repo, airl @pytest.mark.parametrize("payload", test_data) @patch('service_bus.airlock_request_status_update.AirlockRequestRepository.create') @patch('service_bus.airlock_request_status_update.WorkspaceRepository.create') -@patch('logging.exception') +@patch('services.logging.logger.exception') @patch('fastapi.FastAPI') async def test_receiving_bad_json_logs_error(app, logging_mock, workspace_repo, airlock_request_repo, payload): service_bus_received_message_mock = ServiceBusReceivedMessageMock(payload) @@ -148,13 +148,13 @@ async def test_receiving_bad_json_logs_error(app, logging_mock, workspace_repo, complete_message = await airlockStatusUpdater.process_message(service_bus_received_message_mock) assert complete_message is True - error_message = logging_mock.call_args.args[0] - assert error_message.startswith(strings.STEP_RESULT_MESSAGE_FORMAT_INCORRECT) + expected_error_message = f"{strings.STEP_RESULT_MESSAGE_FORMAT_INCORRECT}: {service_bus_received_message_mock.correlation_id}" + logging_mock.assert_called_once_with(expected_error_message) @patch('service_bus.airlock_request_status_update.WorkspaceRepository.create') @patch('service_bus.airlock_request_status_update.AirlockRequestRepository.create') -@patch('logging.exception') +@patch('services.logging.logger.exception') @patch('service_bus.airlock_request_status_update.ServiceBusClient') @patch('fastapi.FastAPI') async def test_updating_non_existent_airlock_request_error_is_logged(app, sb_client, logging_mock, airlock_request_repo, _): @@ -172,7 +172,7 @@ async def test_updating_non_existent_airlock_request_error_is_logged(app, sb_cli @patch('service_bus.airlock_request_status_update.WorkspaceRepository.create') @patch('service_bus.airlock_request_status_update.AirlockRequestRepository.create') -@patch('logging.exception') +@patch('services.logging.logger.exception') @patch('fastapi.FastAPI') async def test_when_updating_and_state_store_exception_error_is_logged(app, logging_mock, airlock_request_repo, _): service_bus_received_message_mock = ServiceBusReceivedMessageMock(test_sb_step_result_message) @@ -188,7 +188,7 @@ async def test_when_updating_and_state_store_exception_error_is_logged(app, logg @patch('service_bus.airlock_request_status_update.WorkspaceRepository.create') @patch('service_bus.airlock_request_status_update.AirlockRequestRepository.create') -@patch('logging.error') +@patch('services.logging.logger.error') @patch('fastapi.FastAPI') async def test_when_updating_and_current_status_differs_from_status_in_state_store_error_is_logged(app, logging_mock, airlock_request_repo, _): service_bus_received_message_mock = ServiceBusReceivedMessageMock(test_sb_step_result_message) @@ -206,7 +206,7 @@ async def test_when_updating_and_current_status_differs_from_status_in_state_sto @patch('service_bus.airlock_request_status_update.WorkspaceRepository.create') @patch('service_bus.airlock_request_status_update.AirlockRequestRepository.create') -@patch('logging.exception') +@patch('services.logging.logger.exception') @patch('service_bus.airlock_request_status_update.ServiceBusClient') @patch('fastapi.FastAPI') async def test_when_updating_and_status_update_is_illegal_error_is_logged(app, sb_client, logging_mock, airlock_request_repo, _): diff --git a/api_app/tests_ma/test_service_bus/test_deployment_status_update.py b/api_app/tests_ma/test_service_bus/test_deployment_status_update.py index c9b89643ca..71cfe2d843 100644 --- a/api_app/tests_ma/test_service_bus/test_deployment_status_update.py +++ b/api_app/tests_ma/test_service_bus/test_deployment_status_update.py @@ -117,7 +117,7 @@ def create_sample_operation(resource_id, request_action): @pytest.mark.parametrize("payload", test_data) -@patch('logging.exception') +@patch('services.logging.logger.exception') @patch('fastapi.FastAPI') async def test_receiving_bad_json_logs_error(app, logging_mock, payload): service_bus_received_message_mock = ServiceBusReceivedMessageMock(payload) @@ -137,7 +137,7 @@ async def test_receiving_bad_json_logs_error(app, logging_mock, payload): @patch('service_bus.deployment_status_updater.ResourceTemplateRepository.create') @patch('service_bus.deployment_status_updater.OperationRepository.create') @patch('service_bus.deployment_status_updater.ResourceRepository.create') -@patch('logging.exception') +@patch('services.logging.logger.exception') @patch('fastapi.FastAPI') async def test_receiving_good_message(app, logging_mock, resource_repo, operation_repo, _, __): expected_workspace = create_sample_workspace_object(test_sb_message["id"]) @@ -160,7 +160,7 @@ async def test_receiving_good_message(app, logging_mock, resource_repo, operatio @patch('service_bus.deployment_status_updater.ResourceTemplateRepository.create') @patch('service_bus.deployment_status_updater.OperationRepository.create') @patch('service_bus.deployment_status_updater.ResourceRepository.create') -@patch('logging.exception') +@patch('services.logging.logger.exception') @patch('fastapi.FastAPI') async def test_when_updating_non_existent_workspace_error_is_logged(app, logging_mock, resource_repo, operation_repo, _, __): resource_repo.return_value.get_resource_dict_by_id.side_effect = EntityDoesNotExist @@ -181,7 +181,7 @@ async def test_when_updating_non_existent_workspace_error_is_logged(app, logging @patch('service_bus.deployment_status_updater.ResourceTemplateRepository.create') @patch('service_bus.deployment_status_updater.OperationRepository.create') @patch('service_bus.deployment_status_updater.ResourceRepository.create') -@patch('logging.exception') +@patch('services.logging.logger.exception') @patch('fastapi.FastAPI') async def test_when_updating_and_state_store_exception(app, logging_mock, resource_repo, operation_repo, _, __): resource_repo.return_value.get_resource_dict_by_id.side_effect = Exception diff --git a/config.sample.yaml b/config.sample.yaml index 403c70c874..e4c2f2d789 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -65,9 +65,8 @@ resource_processor: # rp_bundle_values: '{"custom_key_1":"custom_value_1","image_gallery_id":"/subscriptions//resourceGroups//providers/Microsoft.Compute/galleries/"}' developer_settings: - # Locks will not be added to stateful resources so they can be easily removed - # stateful_resources_locked: false - +# Locks will not be added to stateful resources so they can be easily removed +# stateful_resources_locked: false # This setting will enable your local machine to be able to # communicate with Service Bus and Cosmos. It will also allow deploying @@ -75,4 +74,5 @@ developer_settings: # enable_local_debugging: true # Used by the API and Resource processor application to change log level -# debug: true +# Can be "ERROR", "WARNING", "INFO", "DEBUG" +# logging_level: "INFO" diff --git a/core/terraform/api-webapp.tf b/core/terraform/api-webapp.tf index 902e8b374a..d284df476b 100644 --- a/core/terraform/api-webapp.tf +++ b/core/terraform/api-webapp.tf @@ -58,6 +58,8 @@ resource "azurerm_linux_web_app" "api" { RESOURCE_MANAGER_ENDPOINT = module.terraform_azurerm_environment_configuration.resource_manager_endpoint MICROSOFT_GRAPH_URL = module.terraform_azurerm_environment_configuration.microsoft_graph_endpoint STORAGE_ENDPOINT_SUFFIX = module.terraform_azurerm_environment_configuration.storage_suffix + LOGGING_LEVEL = var.logging_level + OTEL_RESOURCE_ATTRIBUTES = "service.name=api,service.version=${local.version}" } identity { diff --git a/core/terraform/main.tf b/core/terraform/main.tf index a065e9fb8c..e31d359bda 100644 --- a/core/terraform/main.tf +++ b/core/terraform/main.tf @@ -166,6 +166,7 @@ module "resource_processor_vmss_porter" { resource_processor_number_processes_per_instance = var.resource_processor_number_processes_per_instance resource_processor_vmss_sku = var.resource_processor_vmss_sku arm_environment = var.arm_environment + logging_level = var.logging_level rp_bundle_values = var.rp_bundle_values depends_on = [ diff --git a/core/terraform/resource_processor/vmss_porter/cloud-config.yaml b/core/terraform/resource_processor/vmss_porter/cloud-config.yaml index b1d1223543..56a7ff21a2 100644 --- a/core/terraform/resource_processor/vmss_porter/cloud-config.yaml +++ b/core/terraform/resource_processor/vmss_porter/cloud-config.yaml @@ -1,5 +1,5 @@ --- -#cloud-config +# cloud-config package_upgrade: true apt: sources: @@ -57,6 +57,8 @@ write_files: AZURE_ENVIRONMENT=${azure_environment} AAD_AUTHORITY_URL=${aad_authority_url} MICROSOFT_GRAPH_FQDN=${microsoft_graph_fqdn} + OTEL_RESOURCE_ATTRIBUTES=service.name=resource_processor,service.version=${resource_processor_vmss_porter_image_tag} + LOGGING_LEVEL=${logging_level} ${rp_bundle_values} - path: /etc/cron.hourly/docker-prune # An hourly cron job to have docker free disk space. Running this frquently @@ -77,8 +79,11 @@ runcmd: # (https://microsoft.github.io/AzureTRE/troubleshooting-faq/troubleshooting-rp/#Logs) - printf '\nalias dlf="docker logs --since 1m --follow"' >> /etc/bash.bashrc - printf '\nalias dlf1='\''dlf $(docker ps -q | head -n 1)'\''' >> /etc/bash.bashrc - - printf '\nalias rpstatus='\''tmux new-session -d "watch docker ps"; tmux split-window -p 100 -v "docker logs --since 1m --follow resource_processor1"; tmux split-window -v -p 90; tmux -2 attach-session -d'\''\n' >> /etc/bash.bashrc - + - > + printf '\nalias rpstatus='\''tmux new-session -d "watch docker ps"; \ + tmux split-window -p 100 -v "docker logs --since 1m --follow resource_processor1"; \ + tmux split-window -v -p 90; \ + tmux -2 attach-session -d'\''\n' >> /etc/bash.bashrc - export DEBIAN_FRONTEND=noninteractive - az cloud set --name ${azure_environment} - az login --identity -u ${vmss_msi_id} diff --git a/core/terraform/resource_processor/vmss_porter/data.tf b/core/terraform/resource_processor/vmss_porter/data.tf index e3fba4dbe9..4ab89b4367 100644 --- a/core/terraform/resource_processor/vmss_porter/data.tf +++ b/core/terraform/resource_processor/vmss_porter/data.tf @@ -29,6 +29,7 @@ data "template_file" "cloudconfig" { azure_environment = local.azure_environment aad_authority_url = module.terraform_azurerm_environment_configuration.active_directory_endpoint microsoft_graph_fqdn = regex("(?:(?P[^:/?#]+):)?(?://(?P[^/?#:]*))?", module.terraform_azurerm_environment_configuration.microsoft_graph_endpoint).fqdn + logging_level = var.logging_level rp_bundle_values = local.rp_bundle_values_formatted } } diff --git a/core/terraform/resource_processor/vmss_porter/variables.tf b/core/terraform/resource_processor/vmss_porter/variables.tf index fcf7acab4c..4ec2f7910e 100644 --- a/core/terraform/resource_processor/vmss_porter/variables.tf +++ b/core/terraform/resource_processor/vmss_porter/variables.tf @@ -66,7 +66,9 @@ variable "subscription_id" { type = string default = "" } - +variable "logging_level" { + type = string +} variable "rp_bundle_values" { type = map(string) } diff --git a/core/terraform/variables.tf b/core/terraform/variables.tf index 4ccf9b8428..39702b98fb 100644 --- a/core/terraform/variables.tf +++ b/core/terraform/variables.tf @@ -184,3 +184,13 @@ variable "is_cosmos_defined_throughput" { type = bool default = false } + +variable "logging_level" { + type = string + default = "INFO" + description = "The logging level for the API and Resource Processor" + validation { + condition = contains(["INFO", "DEBUG", "WARNING", "ERROR"], var.logging_level) + error_message = "logging_level must be one of ERROR, WARNING, INFO, DEBUG" + } +} diff --git a/core/version.txt b/core/version.txt index 3e2f46a3a3..d69d16e980 100644 --- a/core/version.txt +++ b/core/version.txt @@ -1 +1 @@ -__version__ = "0.9.0" +__version__ = "0.9.1" diff --git a/docs/tre-developers/api.md b/docs/tre-developers/api.md index 21e1987078..e35e0cd83d 100644 --- a/docs/tre-developers/api.md +++ b/docs/tre-developers/api.md @@ -75,7 +75,7 @@ There, you can run a query like ```cmd AppTraces -| where AppRoleName == "uvicorn" +| where AppRoleName == "api" | order by TimeGenerated desc ``` @@ -100,22 +100,6 @@ make build-and-push-api make deploy-core ``` -### Enabling DEBUG mode on the API - -For security, the API is by default configured to not show detailed error messages and stack trace when an error occurs. - -You can enable debugging via one of the two ways: - -1. Set `debug=true` under developer_settings section in your`config.yaml` file (see []) - -To enable debugging on an already running instance: - -1. Go to App Service for the API and select **Settings > Configuration**. -1. Click **New Application Setting**. -1. in the new dialog box set **Name=DEBUG** and **Value=true** - -![API Debug True](../assets/api_debug_true.png) - ## Using Swagger UI Swagger UI lets you send requests to the API. diff --git a/docs/tre-developers/resource-processor.md b/docs/tre-developers/resource-processor.md index 158549c81e..fee75d24fe 100644 --- a/docs/tre-developers/resource-processor.md +++ b/docs/tre-developers/resource-processor.md @@ -43,43 +43,36 @@ For more information on how to use API, refer to [API documentation](./api.md#us On Azure Portal, find an Virtual VM scale set with a name `vmss-rp-porter-${TRE_ID}`. -### Resource Processor logs in LogAnalytics +### Connect to the Resource Processor terminal -To find logs in LogAnalytics, go to your resource group, then to LogAnalytics instance, which is named like `log-${TRE_ID}`. +The processor runs in a VNET, and you cannot connect to it directly. To SSH to this instance, use Bastion. -There, you can run a query like +1. Navigate to the VMSS instance named `vmss-rp-porter-${TRE_ID}` in the Azure Portal. +1. Click on `Instances` in the left menu. +1. Click on an instance name. +1. Click on `Connect -> Bastion` in the top menu. -```cmd -AppTraces -| where AppRoleName == "runner.py" -| order by TimeGenerated desc -``` - -### SSH-ing to the instance - -The processor runs in a VNET, and you cannot connect to it directly. -To SSH to this instance, use Bastion. - -1. Find a keyvault with a name `kv-${TRE_ID}` in your resource group. -1. Copy a secret named `resource-processor-vmss-password` + ![Bastion](../assets/bastion.png "Bastion") - ![VMSS Password](../assets/vmss_password.png) +1. Set `Authentication type` to `Password from Azure Key Vault`. +1. Set Username to `adminuser`. +1. Set the Key Vault to `kv-${TRE_ID}` and Azure Key Vault Secret to `resource-processor-vmss-password`. If you don't have permissions to see the secret, add yourself to the Access Policy of this keyvault with a permission to read secrets: [![Keyvault access policy](../assets/rp_kv_access_policy.png)](../assets/rp_kv_access_policy.png) -1. Connect to the instance using Bastion. Use the username `adminuser` and the password you just copied. - ![Bastion](../assets/bastion.png "Bastion") +1. Click `Connect`. + ### Getting container logs 1. SSH into the Resource Processor VM as described above -1. Check the status of the container using `sudo docker ps` +1. Check the status of the container using `docker ps` If you see nothing (and the container was pulled) then the processor has either not started yet or it has crashed. -1. Get the logs from the container using `sudo docker logs ` command. +1. Get the logs from the container using `docker logs ` command. ### Starting container manually diff --git a/docs/troubleshooting-faq/app-insights-logs.md b/docs/troubleshooting-faq/app-insights-logs.md index 57b5abb2d4..f63743b429 100644 --- a/docs/troubleshooting-faq/app-insights-logs.md +++ b/docs/troubleshooting-faq/app-insights-logs.md @@ -5,7 +5,21 @@ Every component of TRE should send their trace logs to App Insights logging back !!! note AppTraces can take time to appear so be patient. -Go to the deployed app insights instance and select **Monitoring > Logs** where you can run the following example query to get the logs for a specific deployment. +To find logs in Application Insights, go to your resource group, then to Application Insights instance, which is named like `appi-${TRE_ID}`. Click on *Logs* in the left menu. Under Queries, select either `TRE Resource Processor Logs` or `TRE API Logs`. + +![App Insights Queries](app_insights_queries.png) + +A manual query can also be created such as: + +```kusto +traces +| where cloud_RoleName == "resource_processor" +| order by timestamp desc +``` + +## Check the logs for a specific deployment + +You can run the following example query to get the logs for a specific deployment. ```kusto let tracking_id=""; @@ -22,4 +36,3 @@ Received deployment status update message with correlation ID 70b09db1-30c4-475c ``` It should also be evident from the message flow where the current processing is stuck or failed. Failed deployment status should also be available in the `GET /api/workspaces/{workspace_id}` and this is just another way to confirm it. - diff --git a/docs/troubleshooting-faq/app_insights_queries.png b/docs/troubleshooting-faq/app_insights_queries.png new file mode 100644 index 0000000000..b04e1b03d7 Binary files /dev/null and b/docs/troubleshooting-faq/app_insights_queries.png differ diff --git a/docs/troubleshooting-faq/debug-api.md b/docs/troubleshooting-faq/debug-api.md deleted file mode 100644 index b2fd2ee0a2..0000000000 --- a/docs/troubleshooting-faq/debug-api.md +++ /dev/null @@ -1,9 +0,0 @@ -# Enabling DEBUG mode on the API - -The API is by default configured to not show detailed error messages and stack trace when an error occurs. This is done to prevent leaking internal state to the outside world and to minimize information which an attacker could use against the deployed instance. - -However, you can enable debugging, by setting `DEBUG=true` in the configuration settings of the API using Azure portal. - -1. Go to App Service for the API and select **Settings > Configuration**. -1. Click **New Application Setting**. -1. in the new dialog box set **Name=DEBUG** and **Value=true** \ No newline at end of file diff --git a/docs/troubleshooting-faq/debug-logs.md b/docs/troubleshooting-faq/debug-logs.md new file mode 100644 index 0000000000..b0b01a4393 --- /dev/null +++ b/docs/troubleshooting-faq/debug-logs.md @@ -0,0 +1,7 @@ +# Setting the logging level to DEBUG on the Resource Processor and API + +For security, the API and Resource PRocessor are configured to not show detailed error messages and stack trace when an error occurs. + +You can enable debugging on the API and Resource Processor by setting `logging_level=debug` under developer_settings section in your`config.yaml` file. + +Once set, you need to run `make deploy-core` to update the settings on the API and Resource Processor. You should start to see logs with severity level `0` appear in the Application Insights logs. diff --git a/mkdocs.yml b/mkdocs.yml index 71739e3e72..34047b9fb4 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -156,7 +156,7 @@ nav: - Troubleshooting FAQ: # General Troubleshooting Section for Development - troubleshooting-faq/index.md - - Enabling Debugging for the API: troubleshooting-faq/debug-api.md + - Enabling DEBUG logs: troubleshooting-faq/debug-logs.md - API logs using deployment center: troubleshooting-faq/api-logs-deployment-center.md - Checking the Service Bus: troubleshooting-faq/troubleshooting-sb.md - Checking Logs in Application Insights: troubleshooting-faq/app-insights-logs.md diff --git a/resource_processor/_version.py b/resource_processor/_version.py index a5f830a2c0..8088f75131 100644 --- a/resource_processor/_version.py +++ b/resource_processor/_version.py @@ -1 +1 @@ -__version__ = "0.7.1" +__version__ = "0.8.1" diff --git a/resource_processor/resources/commands.py b/resource_processor/resources/commands.py index 8c61220f93..551029af8a 100644 --- a/resource_processor/resources/commands.py +++ b/resource_processor/resources/commands.py @@ -1,34 +1,46 @@ import asyncio import json -import logging import base64 +import logging from urllib.parse import urlparse - from resources.helpers import get_installation_id -from shared.logging import shell_output_logger +from shared.logging import logger, shell_output_logger def azure_login_command(config): - set_cloud_command = f"az cloud set --name {config['azure_environment']}" + set_cloud_command = f"az cloud set --name {config['azure_environment']} >/dev/null " if config["vmss_msi_id"]: # Use the Managed Identity when in VMSS context - login_command = f"az login --identity -u {config['vmss_msi_id']}" + login_command = f"az login --identity -u {config['vmss_msi_id']} >/dev/null " + else: # Use a Service Principal when running locally - login_command = f"az login --service-principal --username {config['arm_client_id']} --password {config['arm_client_secret']} --tenant {config['arm_tenant_id']}" + login_command = f"az login --service-principal --username {config['arm_client_id']} --password {config['arm_client_secret']} --tenant {config['arm_tenant_id']} >/dev/null" return f"{set_cloud_command} && {login_command}" +def apply_porter_credentials_sets_command(config): + if config["vmss_msi_id"]: + # Use the Managed Identity when in VMSS context + porter_credential_sets = "porter credentials apply vmss_porter/arm_auth_local_debugging.json >/dev/null 2>&1 && porter credentials apply vmss_porter/aad_auth.json >/dev/null 2>&1" + + else: + # Use a Service Principal when running locally + porter_credential_sets = "porter credentials apply vmss_porter/arm_auth_local_debugging.json >/dev/null 2>&1 && porter credentials apply vmss_porter/aad_auth_local_debugging.json >/dev/null 2>&1" + + return f"{porter_credential_sets}" + + def azure_acr_login_command(config): acr_name = _get_acr_name(acr_fqdn=config['registry_server']) - return f"az acr login --name {acr_name}" + return f"az acr login --name {acr_name} >/dev/null " -async def build_porter_command(config, logger, msg_body, custom_action=False): - porter_parameter_keys = await get_porter_parameter_keys(config, logger, msg_body) +async def build_porter_command(config, msg_body, custom_action=False): + porter_parameter_keys = await get_porter_parameter_keys(config, msg_body) porter_parameters = "" if porter_parameter_keys is None: @@ -50,6 +62,10 @@ async def build_porter_command(config, logger, msg_body, custom_action=False): elif parameter_name in msg_body: parameter_value = msg_body[parameter_name] + # 4. if starts user_ then look in user object + elif parameter_name.startswith("user_") and "user" in msg_body and parameter_name[5:] in msg_body["user"]: + parameter_value = msg_body["user"][parameter_name[5:]] + # if still not found, might be a special case # (we give a chance to the method above to allow override of the special handeling done below) else: @@ -68,7 +84,7 @@ async def build_porter_command(config, logger, msg_body, custom_action=False): installation_id = get_installation_id(msg_body) - command_line = [f"{azure_login_command(config)} && {azure_acr_login_command(config)} && porter" + command_line = [f"porter" # If a custom action (i.e. not install, uninstall, upgrade) we need to use 'invoke' f"{' invoke --action' if custom_action else ''}" f" {msg_body['action']} \"{installation_id}\"" @@ -77,6 +93,7 @@ async def build_porter_command(config, logger, msg_body, custom_action=False): f" --credential-set arm_auth" f" --credential-set aad_auth" ] + return command_line @@ -86,10 +103,8 @@ async def build_porter_command_for_outputs(msg_body): return command_line -async def get_porter_parameter_keys(config, logger, msg_body): - command = [f"{azure_login_command(config)} >/dev/null && \ - {azure_acr_login_command(config)} >/dev/null && \ - porter explain --reference {config['registry_server']}/{msg_body['name']}:v{msg_body['version']} --output json"] +async def get_porter_parameter_keys(config, msg_body): + command = [f"porter explain --reference {config['registry_server']}/{msg_body['name']}:v{msg_body['version']} --output json"] proc = await asyncio.create_subprocess_shell( ''.join(command), @@ -98,7 +113,7 @@ async def get_porter_parameter_keys(config, logger, msg_body): env=config["porter_env"]) stdout, stderr = await proc.communicate() - logging.info(f'get_porter_parameter_keys exited with {proc.returncode}') + logger.debug(f'get_porter_parameter_keys exited with {proc.returncode}') result_stdout = None result_stderr = None @@ -109,7 +124,7 @@ async def get_porter_parameter_keys(config, logger, msg_body): return porter_parameter_keys if stderr: result_stderr = stderr.decode() - shell_output_logger(result_stderr, '[stderr]', logger, logging.WARN) + shell_output_logger(result_stderr, '[stderr]', logging.WARN) def get_special_porter_param_value(config, parameter_name: str, msg_body): diff --git a/resource_processor/run.sh b/resource_processor/run.sh index 307d69d389..7fd1a8837f 100755 --- a/resource_processor/run.sh +++ b/resource_processor/run.sh @@ -55,11 +55,6 @@ else echo "Porter v0 state doesn't exist." fi -# Can't be in the image since DB connection is needed. -echo "Applying credential sets..." -porter credentials apply vmss_porter/arm_auth_local_debugging.json -porter credentials apply vmss_porter/aad_auth.json - # Launch the runner echo "Starting resource processor..." python -u vmss_porter/runner.py diff --git a/resource_processor/shared/config.py b/resource_processor/shared/config.py index 1e6a3e5c45..e937d24552 100644 --- a/resource_processor/shared/config.py +++ b/resource_processor/shared/config.py @@ -1,94 +1,96 @@ import os - from _version import __version__ +from shared.logging import logger, tracer + VERSION = __version__ -def get_config(logger_adapter) -> dict: - config = {} - - config["registry_server"] = os.environ["REGISTRY_SERVER"] - config["tfstate_container_name"] = os.environ["TERRAFORM_STATE_CONTAINER_NAME"] - config["tfstate_resource_group_name"] = os.environ["MGMT_RESOURCE_GROUP_NAME"] - config["tfstate_storage_account_name"] = os.environ["MGMT_STORAGE_ACCOUNT_NAME"] - config["deployment_status_queue"] = os.environ["SERVICE_BUS_DEPLOYMENT_STATUS_UPDATE_QUEUE"] - config["resource_request_queue"] = os.environ["SERVICE_BUS_RESOURCE_REQUEST_QUEUE"] - config["service_bus_namespace"] = os.environ["SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE"] - config["vmss_msi_id"] = os.environ.get("VMSS_MSI_ID", None) - config["number_processes"] = os.environ.get("NUMBER_PROCESSES", "1") - config["key_vault_url"] = os.environ.get("KEY_VAULT_URL", os.environ.get("KEYVAULT_URI", None)) - config["arm_environment"] = os.environ.get("ARM_ENVIRONMENT", "public") - config["azure_environment"] = os.environ.get("AZURE_ENVIRONMENT", "AzureCloud") - config["aad_authority_url"] = os.environ.get("AAD_AUTHORITY_URL", "https://login.microsoftonline.com") - config["microsoft_graph_fqdn"] = os.environ.get("MICROSOFT_GRAPH_FQDN", "graph.microsoft.com") - - try: - config["number_processes_int"] = int(config["number_processes"]) - except ValueError: - logger_adapter.info("Invalid setting for NUMBER_PROCESSES, will default to 1") - config["number_processes_int"] = 1 - - # Needed for running porter - config["arm_use_msi"] = os.environ.get("ARM_USE_MSI", "false") - config["arm_subscription_id"] = os.environ["AZURE_SUBSCRIPTION_ID"] - config["arm_client_id"] = os.environ["ARM_CLIENT_ID"] - config["arm_tenant_id"] = os.environ["AZURE_TENANT_ID"] - - if config["arm_use_msi"] == "false": - # These are needed when running locally - config["arm_client_secret"] = os.environ["ARM_CLIENT_SECRET"] - config["aad_tenant_id"] = os.environ["AAD_TENANT_ID"] - config["application_admin_client_id"] = os.environ["APPLICATION_ADMIN_CLIENT_ID"] - config["application_admin_client_secret"] = os.environ["APPLICATION_ADMIN_CLIENT_SECRET"] - - else: - config["arm_client_secret"] = "" # referenced in the credential set - - # when running in vscode devcontainer - if "DEVCONTAINER" in os.environ: - config["remote_containers_ipc"] = os.environ["REMOTE_CONTAINERS_IPC"] - - # Create env dict for porter - config["porter_env"] = { - "HOME": os.environ["HOME"], - "PATH": os.environ["PATH"], - "KEY_VAULT_URL": config["key_vault_url"], - "ARM_ENVIRONMENT": config["arm_environment"], - "AZURE_ENVIRONMENT": config["azure_environment"], - - # These are needed since they are referenced as credentials in every bundle and also in arm_auth credential set. - "ARM_CLIENT_ID": config["arm_client_id"], - "ARM_CLIENT_SECRET": config["arm_client_secret"], - "ARM_SUBSCRIPTION_ID": config["arm_subscription_id"], - "ARM_TENANT_ID": config["arm_tenant_id"], - } - - if config["arm_use_msi"] == "false": - config["porter_env"].update( - { - "AAD_TENANT_ID": config["aad_tenant_id"], - "APPLICATION_ADMIN_CLIENT_ID": config["application_admin_client_id"], - "APPLICATION_ADMIN_CLIENT_SECRET": config["application_admin_client_secret"], - } - ) - - # when running in vscode devcontainer - if "DEVCONTAINER" in os.environ: - config["porter_env"].update( - { - "REMOTE_CONTAINERS_IPC": config["remote_containers_ipc"] - } - ) - - # Load env vars for bundles - def envvar_to_key(name: str) -> str: - return name[len("RP_BUNDLE_"):].lower() - - config["bundle_params"] = { - envvar_to_key(env_var_name): os.getenv(env_var_name) - for env_var_name in os.environ - if env_var_name.startswith("RP_BUNDLE") - } +def get_config() -> dict: + with tracer.start_as_current_span("get_config"): + config = {} + + config["registry_server"] = os.environ["REGISTRY_SERVER"] + config["tfstate_container_name"] = os.environ["TERRAFORM_STATE_CONTAINER_NAME"] + config["tfstate_resource_group_name"] = os.environ["MGMT_RESOURCE_GROUP_NAME"] + config["tfstate_storage_account_name"] = os.environ["MGMT_STORAGE_ACCOUNT_NAME"] + config["deployment_status_queue"] = os.environ["SERVICE_BUS_DEPLOYMENT_STATUS_UPDATE_QUEUE"] + config["resource_request_queue"] = os.environ["SERVICE_BUS_RESOURCE_REQUEST_QUEUE"] + config["service_bus_namespace"] = os.environ["SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE"] + config["vmss_msi_id"] = os.environ.get("VMSS_MSI_ID", None) + config["number_processes"] = os.environ.get("NUMBER_PROCESSES", "1") + config["key_vault_url"] = os.environ.get("KEY_VAULT_URL", os.environ.get("KEYVAULT_URI", None)) + config["arm_environment"] = os.environ.get("ARM_ENVIRONMENT", "public") + config["azure_environment"] = os.environ.get("AZURE_ENVIRONMENT", "AzureCloud") + config["aad_authority_url"] = os.environ.get("AAD_AUTHORITY_URL", "https://login.microsoftonline.com") + config["microsoft_graph_fqdn"] = os.environ.get("MICROSOFT_GRAPH_FQDN", "graph.microsoft.com") + + try: + config["number_processes_int"] = int(config["number_processes"]) + except ValueError: + logger.info("Invalid setting for NUMBER_PROCESSES, will default to 1") + config["number_processes_int"] = 1 + + # Needed for running porter + config["arm_use_msi"] = os.environ.get("ARM_USE_MSI", "false") + config["arm_subscription_id"] = os.environ["AZURE_SUBSCRIPTION_ID"] + config["arm_client_id"] = os.environ["ARM_CLIENT_ID"] + config["arm_tenant_id"] = os.environ["AZURE_TENANT_ID"] + + if config["arm_use_msi"] == "false": + # These are needed when running locally + config["arm_client_secret"] = os.environ["ARM_CLIENT_SECRET"] + config["aad_tenant_id"] = os.environ["AAD_TENANT_ID"] + config["application_admin_client_id"] = os.environ["APPLICATION_ADMIN_CLIENT_ID"] + config["application_admin_client_secret"] = os.environ["APPLICATION_ADMIN_CLIENT_SECRET"] + + else: + config["arm_client_secret"] = "" # referenced in the credential set + + # when running in vscode devcontainer + if "DEVCONTAINER" in os.environ: + config["remote_containers_ipc"] = os.environ["REMOTE_CONTAINERS_IPC"] + + # Create env dict for porter + config["porter_env"] = { + "HOME": os.environ["HOME"], + "PATH": os.environ["PATH"], + "KEY_VAULT_URL": config["key_vault_url"], + "ARM_ENVIRONMENT": config["arm_environment"], + "AZURE_ENVIRONMENT": config["azure_environment"], + + # These are needed since they are referenced as credentials in every bundle and also in arm_auth credential set. + "ARM_CLIENT_ID": config["arm_client_id"], + "ARM_CLIENT_SECRET": config["arm_client_secret"], + "ARM_SUBSCRIPTION_ID": config["arm_subscription_id"], + "ARM_TENANT_ID": config["arm_tenant_id"], + } + + if config["arm_use_msi"] == "false": + config["porter_env"].update( + { + "AAD_TENANT_ID": config["aad_tenant_id"], + "APPLICATION_ADMIN_CLIENT_ID": config["application_admin_client_id"], + "APPLICATION_ADMIN_CLIENT_SECRET": config["application_admin_client_secret"], + } + ) + + # when running in vscode devcontainer + if "DEVCONTAINER" in os.environ: + config["porter_env"].update( + { + "REMOTE_CONTAINERS_IPC": config["remote_containers_ipc"] + } + ) + + # Load env vars for bundles + def envvar_to_key(name: str) -> str: + return name[len("RP_BUNDLE_"):].lower() + + config["bundle_params"] = { + envvar_to_key(env_var_name): os.getenv(env_var_name) + for env_var_name in os.environ + if env_var_name.startswith("RP_BUNDLE") + } return config diff --git a/resource_processor/shared/logging.py b/resource_processor/shared/logging.py index 345bdfc065..aeedf39f13 100644 --- a/resource_processor/shared/logging.py +++ b/resource_processor/shared/logging.py @@ -1,17 +1,15 @@ import logging import os import re - -from opencensus.ext.azure.log_exporter import AzureLogHandler -from opencensus.trace import config_integration -from opencensus.trace.samplers import AlwaysOnSampler -from opencensus.trace.tracer import Tracer - -from shared.config import VERSION +from opentelemetry.instrumentation.logging import LoggingInstrumentor +from opentelemetry import trace +from azure.monitor.opentelemetry import configure_azure_monitor UNWANTED_LOGGERS = [ "azure.core.pipeline.policies.http_logging_policy", "azure.eventhub._eventprocessor.event_processor", + # suppressing, have support case open + "azure.servicebus._pyamqp.aio._session_async", "azure.identity.aio._credentials.managed_identity", "azure.identity.aio._credentials.environment", "azure.identity.aio._internal.get_token_mixin", @@ -39,146 +37,85 @@ "uamqp.sender", "uamqp.client", "azure.servicebus.aio._base_handler_async", - "azure.servicebus._pyamqp.aio._cbs_async", + "azure.monitor.opentelemetry.exporter.export._base", + "azure.servicebus.aio._base_handler_async", "azure.servicebus._pyamqp.aio._connection_async", "azure.servicebus._pyamqp.aio._link_async", + "opentelemetry.attributes", "azure.servicebus._pyamqp.aio._management_link_async", - "azure.servicebus._pyamqp.aio._session_async" + "azure.servicebus._pyamqp.aio._cbs_async", + "azure.servicebus._pyamqp.aio._client_async" ] -debug = os.environ.get('DEBUG', 'False').lower() in ('true', '1') - - -def disable_unwanted_loggers(): - """ - Disables the unwanted loggers. - """ - for logger_name in UNWANTED_LOGGERS: - logging.getLogger(logger_name).disabled = True - - -def telemetry_processor_callback_function(envelope): - envelope.tags['ai.cloud.role'] = 'resource_processor' - envelope.tags['ai.application.ver'] = VERSION - +logger = logging.getLogger("azuretre_resource_processor") +tracer = trace.get_tracer("azuretre_resource_processor") -def initialize_logging(logging_level: int, correlation_id: str, add_console_handler: bool = False) -> logging.LoggerAdapter: - """ - Adds the Application Insights handler for the root logger and sets the given logging level. - Creates and returns a logger adapter that integrates the correlation ID, if given, to the log messages. - Note: This should be called only once, otherwise duplicate log entries could be produced. - :param logging_level: The logging level to set e.g., logging.WARNING. - :param correlation_id: Optional. The correlation ID that is passed on to the operation_Id in App Insights. - :returns: A newly created logger adapter. - """ - logger = logging.getLogger() - - # When using sessions and NEXT_AVAILABLE_SESSION we see regular exceptions which are actually expected - # See https://github.com/Azure/azure-sdk-for-python/issues/9402 - # Other log entries such as 'link detach' also confuse the logs, and are expected. - # We don't want these making the logs any noisier so we raise the logging level for that logger here - # To inspect all the loggers, use -> loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] +def configure_loggers(): for logger_name in LOGGERS_FOR_ERRORS_ONLY: logging.getLogger(logger_name).setLevel(logging.ERROR) - if add_console_handler: - console_formatter = logging.Formatter(fmt='%(module)-7s %(name)-7s %(process)-7s %(asctime)s %(levelname)-7s %(message)s') - console_handler = logging.StreamHandler() - console_handler.setFormatter(console_formatter) - logger.addHandler(console_handler) - - try: - # picks up APPLICATIONINSIGHTS_CONNECTION_STRING automatically - azurelog_handler = AzureLogHandler() - azurelog_handler.add_telemetry_processor(telemetry_processor_callback_function) - azurelog_formatter = AzureLogFormatter() - azurelog_handler.setFormatter(azurelog_formatter) - logger.addHandler(azurelog_handler) - except ValueError as e: - logger.error(f"Failed to set Application Insights logger handler: {e}") - - config_integration.trace_integrations(['logging']) - logging.basicConfig(level=logging_level, format='%(asctime)s traceId=%(traceId)s spanId=%(spanId)s %(message)s') - Tracer(sampler=AlwaysOnSampler()) - logger.setLevel(logging_level) - - extra = None - - if correlation_id: - extra = {'traceId': correlation_id} + for logger_name in UNWANTED_LOGGERS: + logging.getLogger(logger_name).setLevel(logging.CRITICAL) - adapter = logging.LoggerAdapter(logger, extra) - adapter.debug(f"Logger adapter initialized with extra: {extra}") - return adapter +def initialize_logging() -> logging.Logger: + configure_loggers() -def get_message_id_logger(correlation_id: str) -> logging.LoggerAdapter: - """ - Gets a logger that includes message id for easy correlation between log entries. - :param correlation_id: Optional. The correlation ID that is passed on to the operation_Id in App Insights. - :returns: A modified logger adapter (from the original initiated one). - """ - logger = logging.getLogger() - extra = None + logging_level = os.environ.get("LOGGING_LEVEL", "INFO") - if correlation_id: - extra = {'traceId': correlation_id} + if logging_level == "INFO": + logging_level = logging.INFO + elif logging_level == "DEBUG": + logging_level = logging.DEBUG + elif logging_level == "WARNING": + logging_level = logging.WARNING + elif logging_level == "ERROR": + logging_level = logging.ERROR + + if os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING"): + configure_azure_monitor( + logger_name="azuretre_resource_processor", + instrumentation_options={ + "azure_sdk": {"enabled": False}, + "flask": {"enabled": False}, + "django": {"enabled": False}, + "fastapi": {"enabled": True}, + "psycopg2": {"enabled": False}, + } + ) - adapter = logging.LoggerAdapter(logger, extra) - adapter.debug(f"Logger adapter now includes extra: {extra}") + LoggingInstrumentor().instrument( + set_logging_format=True, + log_level=logging_level, + tracer_provider=tracer._real_tracer + ) - return adapter + return logger -def shell_output_logger(console_output: str, prefix_item: str, logger: logging.LoggerAdapter, logging_level: int): - """ - Logs the shell output (stdout/err) a line at a time with an option to remove ANSI control chars. - """ +def shell_output_logger(console_output: str, prefix_item: str, logging_level: int): if not console_output: - logging.debug("shell console output is empty.") + logger.debug("shell console output is empty.") return - console_output = console_output.strip() - if (logging_level != logging.INFO - and len(console_output) < 200 and console_output.startswith("Unable to find image '") - and console_output.endswith("' locally")): - logging.debug("Image not present locally, setting log to INFO.") + and "' locally" in console_output): + console_output = console_output.strip() + console_output = re.sub(r"Unable to find image '.*' locally", '', console_output) + if console_output.startswith("\n"): + console_output = console_output[1:] + logger.debug("Image not present locally, removing text from console output.") logging_level = logging.INFO - logger.log(logging_level, f"{prefix_item} {console_output}") - - -class AzureLogFormatter(logging.Formatter): - # 7-bit C1 ANSI sequences - ansi_escape = re.compile(r''' - \x1B # ESC - (?: # 7-bit C1 Fe (except CSI) - [@-Z\\-_] - | # or [ for CSI, followed by a control sequence - \[ - [0-?]* # Parameter bytes - [ -/]* # Intermediate bytes - [@-~] # Final byte - ) - ''', re.VERBOSE) - - MAX_MESSAGE_LENGTH = 32000 - TRUNCATION_TEXT = "MESSAGE TOO LONG, TAILING..." - - def format(self, record): - s = super().format(record) - s = AzureLogFormatter.ansi_escape.sub('', s) - - # not doing this here might produce errors if we try to log empty strings. - if (s == ''): - s = "EMPTY MESSAGE!" + if (logging_level != logging.INFO + and len(console_output) < 34 + and "execution completed successfully!" in console_output): + logging_level = logging.INFO - # azure monitor is limiting the message size. - if (len(s) > AzureLogFormatter.MAX_MESSAGE_LENGTH): - s = f"{AzureLogFormatter.TRUNCATION_TEXT}\n{s[-1 * AzureLogFormatter.MAX_MESSAGE_LENGTH:]}" + console_output = console_output.strip() - return s + if console_output: + logger.log(logging_level, f"{prefix_item} {console_output}") diff --git a/resource_processor/tests_rp/__init__.py b/resource_processor/tests_rp/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/resource_processor/tests_rp/test_logging.py b/resource_processor/tests_rp/test_logging.py new file mode 100644 index 0000000000..3f38a57582 --- /dev/null +++ b/resource_processor/tests_rp/test_logging.py @@ -0,0 +1,31 @@ +from mock import patch +import logging +from shared.logging import shell_output_logger + + +@patch("shared.logging.logger") +def test_shell_output_logger_empty_console_output(mock_logger): + shell_output_logger("", "prefix", logging.DEBUG) + mock_logger.debug.assert_called_once_with("shell console output is empty.") + + +@patch("shared.logging.logger") +def test_shell_output_logger_image_not_present_locally(mock_logger): + console_output = "Unable to find image 'test_image' locally\nexecution completed successfully!" + shell_output_logger(console_output, "prefix", logging.DEBUG) + mock_logger.debug.assert_called_with("Image not present locally, removing text from console output.") + mock_logger.log.assert_called_with(logging.INFO, "prefix execution completed successfully!") + + +@patch("shared.logging.logger") +def test_shell_output_logger_execution_completed_successfully(mock_logger): + console_output = "execution completed successfully!" + shell_output_logger(console_output, "prefix", logging.DEBUG) + mock_logger.log.assert_called_with(logging.INFO, "prefix execution completed successfully!") + + +@patch("shared.logging.logger") +def test_shell_output_logger_normal_case(mock_logger): + console_output = "Some logs" + shell_output_logger(console_output, "prefix", logging.DEBUG) + mock_logger.log.assert_called_with(logging.DEBUG, "prefix Some logs") diff --git a/resource_processor/vmss_porter/requirements.txt b/resource_processor/vmss_porter/requirements.txt index b4ee2202ee..a7182801a6 100644 --- a/resource_processor/vmss_porter/requirements.txt +++ b/resource_processor/vmss_porter/requirements.txt @@ -1,6 +1,6 @@ -azure-servicebus==7.11.3 -opencensus-ext-azure==1.1.11 -opencensus-ext-logging==0.1.1 -azure-identity==1.14.1 aiohttp==3.9.0 azure-cli-core==2.50.0 +azure-identity==1.14.1 +azure-monitor-opentelemetry==1.1.0 +azure-servicebus==7.11.3 +opentelemetry.instrumentation.logging==0.41b0 diff --git a/resource_processor/vmss_porter/runner.py b/resource_processor/vmss_porter/runner.py index 118eafe8d4..12a12bdbfe 100644 --- a/resource_processor/vmss_porter/runner.py +++ b/resource_processor/vmss_porter/runner.py @@ -1,18 +1,17 @@ from typing import Optional from multiprocessing import Process import json -import socket import asyncio import logging import sys -from resources.commands import build_porter_command, build_porter_command_for_outputs +from resources.commands import azure_acr_login_command, azure_login_command, build_porter_command, build_porter_command_for_outputs, apply_porter_credentials_sets_command from shared.config import get_config from resources.helpers import get_installation_id from resources.httpserver import start_server -from shared.logging import disable_unwanted_loggers, initialize_logging, get_message_id_logger, shell_output_logger # pylint: disable=import-error # noqa +from shared.logging import initialize_logging, logger, shell_output_logger, tracer from shared.config import VERSION -from resources import strings, statuses # pylint: disable=import-error # noqa +from resources import statuses from contextlib import asynccontextmanager from azure.servicebus import ServiceBusMessage, NEXT_AVAILABLE_SESSION from azure.servicebus.exceptions import OperationTimeoutError, ServiceBusConnectionError @@ -20,19 +19,12 @@ from azure.identity.aio import DefaultAzureCredential -def set_up_logger(enable_console_logging: bool) -> logging.LoggerAdapter: - # Initialise logging - logger_adapter = initialize_logging(logging.INFO, socket.gethostname(), enable_console_logging) - disable_unwanted_loggers() - return logger_adapter - - -def set_up_config(logger_adapter: logging.LoggerAdapter) -> Optional[dict]: +def set_up_config() -> Optional[dict]: try: - config = get_config(logger_adapter) + config = get_config() return config except KeyError as e: - logger_adapter.error(f"Environment variable {e} is not set correctly...Exiting") + logger.error(f"Environment variable {e} is not set correctly...Exiting") sys.exit(1) @@ -46,7 +38,7 @@ async def default_credentials(msi_id): await credential.close() -async def receive_message(service_bus_client, logger_adapter: logging.LoggerAdapter, config: dict): +async def receive_message(service_bus_client, config: dict): """ This method is run per process. Each process will connect to service bus and try to establish a session. If messages are there, the process will continue to receive all the messages associated with that session. @@ -56,10 +48,10 @@ async def receive_message(service_bus_client, logger_adapter: logging.LoggerAdap while True: try: - logger_adapter.info("Looking for new session...") + logger.info("Looking for new session...") # max_wait_time=1 -> don't hold the session open after processing of the message has finished async with service_bus_client.get_queue_receiver(queue_name=q_name, max_wait_time=1, session_id=NEXT_AVAILABLE_SESSION) as receiver: - logger_adapter.info(f"Got a session containing messages: {receiver.session.session_id}") + logger.info(f"Got a session containing messages: {receiver.session.session_id}") async with AutoLockRenewer() as renewer: # allow a session to be auto lock renewed for up to an hour - if it's processing a message renewer.register(receiver, receiver.session, max_lock_renewal_duration=3600) @@ -70,57 +62,71 @@ async def receive_message(service_bus_client, logger_adapter: logging.LoggerAdap try: message = json.loads(str(msg)) - logger_adapter.info(f"Message received for resource_id={message['id']}, operation_id={message['operationId']}, step_id={message['stepId']}") - message_logger_adapter = get_message_id_logger(message['operationId']) # correlate messages per operation - result = await invoke_porter_action(message, service_bus_client, message_logger_adapter, config) except (json.JSONDecodeError) as e: - logging.error(f"Received bad service bus resource request message: {e}") + logger.error(f"Received bad service bus resource request message: {e}") + + with tracer.start_as_current_span("receive_message") as current_span: + current_span.set_attribute("resource_id", message["id"]) + current_span.set_attribute("action", message["action"]) + current_span.set_attribute("step_id", message["stepId"]) + current_span.set_attribute("operation_id", message["operationId"]) + logger.info(f"Message received for resource_id={message['id']}, operation_id={message['operationId']}, step_id={message['stepId']}") - if result: - logging.info(f"Resource request for {message} is complete") - else: - logging.error('Message processing failed!') + result = await invoke_porter_action(message, service_bus_client, config) - logger_adapter.info(f"Message for resource_id={message['id']}, operation_id={message['operationId']} processed as {result} and marked complete.") - await receiver.complete_message(msg) + if result: + logger.info(f"Resource request for {message} is complete") + else: + logger.error('Message processing failed!') - logger_adapter.info(f"Closing session: {receiver.session.session_id}") + logger.info(f"Message for resource_id={message['id']}, operation_id={message['operationId']} processed as {result} and marked complete.") + await receiver.complete_message(msg) + + logger.info(f"Closing session: {receiver.session.session_id}") except OperationTimeoutError: # Timeout occurred whilst connecting to a session - this is expected and indicates no non-empty sessions are available - logger_adapter.debug("No sessions for this process. Will look again...") + logger.debug("No sessions for this process. Will look again...") except ServiceBusConnectionError: # Occasionally there will be a transient / network-level error in connecting to SB. - logger_adapter.info("Unknown Service Bus connection error. Will retry...") + logger.info("Unknown Service Bus connection error. Will retry...") except Exception: # Catch all other exceptions, log them via .exception to get the stack trace, sleep, and reconnect - logger_adapter.exception("Unknown exception. Will retry...") + logger.exception("Unknown exception. Will retry...") -async def run_porter(command, logger_adapter: logging.LoggerAdapter, config: dict): +async def run_porter(command, config: dict): """ Run a Porter command """ + command = [ + f"{azure_login_command(config)} && ", + f"{azure_acr_login_command(config)} && ", + f"{apply_porter_credentials_sets_command(config)} && ", + *command + ] + proc = await asyncio.create_subprocess_shell( ''.join(command), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - env=config["porter_env"]) + env=config["porter_env"] + ) stdout, stderr = await proc.communicate() - logging.info(f'run porter exited with {proc.returncode}') + logger.debug(f'run porter exited with {proc.returncode}') result_stdout = None result_stderr = None if stdout: result_stdout = stdout.decode() - shell_output_logger(result_stdout, '[stdout]', logger_adapter, logging.INFO) + shell_output_logger(result_stdout, '[stdout]', logging.INFO) if stderr: result_stderr = stderr.decode() - shell_output_logger(result_stderr, '[stderr]', logger_adapter, logging.WARN) + shell_output_logger(result_stderr, '[stderr]', logging.WARN) return (proc.returncode, result_stdout, result_stderr) @@ -141,39 +147,41 @@ def service_bus_message_generator(sb_message: dict, status: str, deployment_mess message_dict["outputs"] = outputs resource_request_message = json.dumps(message_dict) - logger_adapter.info(f"Deployment Status Message: {resource_request_message}") + logger.debug(f"Deployment Status Message: {resource_request_message}") return resource_request_message -async def invoke_porter_action(msg_body: dict, sb_client: ServiceBusClient, message_logger_adapter: logging.LoggerAdapter, config: dict) -> bool: +async def invoke_porter_action(msg_body: dict, sb_client: ServiceBusClient, config: dict) -> bool: """ Handle resource message by invoking specified porter action (i.e. install, uninstall) """ + installation_id = get_installation_id(msg_body) action = msg_body["action"] - message_logger_adapter.info(f"{installation_id}: {action} action starting...") + logger.info(f"{action} action starting for {installation_id}...") sb_sender = sb_client.get_queue_sender(queue_name=config["deployment_status_queue"]) # post an update message to set the status to an 'in progress' one resource_request_message = service_bus_message_generator(msg_body, statuses.in_progress_status_string_for[action], "Job starting") await sb_sender.send_messages(ServiceBusMessage(body=resource_request_message, correlation_id=msg_body["id"], session_id=msg_body["operationId"])) - message_logger_adapter.info(f'Sent status message for {installation_id} - {statuses.in_progress_status_string_for[action]} - Job starting') + logger.info(f'Sent status message for {installation_id} - {statuses.in_progress_status_string_for[action]} - Job starting') # Build and run porter command (flagging if its a built-in action or custom so we can adapt porter command appropriately) is_custom_action = action not in ["install", "upgrade", "uninstall"] - porter_command = await build_porter_command(config, message_logger_adapter, msg_body, is_custom_action) - message_logger_adapter.debug("Starting to run porter execution command...") - returncode, _, err = await run_porter(porter_command, message_logger_adapter, config) - message_logger_adapter.debug("Finished running porter execution command.") + porter_command = await build_porter_command(config, msg_body, is_custom_action) + + logger.debug("Starting to run porter execution command...") + returncode, _, err = await run_porter(porter_command, config) + logger.debug("Finished running porter execution command.") action_completed_without_error = True # Handle command output - if returncode != 0: + if returncode != 0 and err is not None: error_message = "Error message: " + " ".join(err.split('\n')) + "; Command executed: " + " ".join(porter_command) action_completed_without_error = False if "uninstall" == action and "could not find installation" in err: - message_logger_adapter.warning("The installation doesn't exist. Treating as a successful action to allow the flow to proceed.") + logger.warning("The installation doesn't exist. Treating as a successful action to allow the flow to proceed.") action_completed_without_error = True error_message = f"A success despite of underlying error. {error_message}" @@ -185,11 +193,11 @@ async def invoke_porter_action(msg_body: dict, sb_client: ServiceBusClient, mess resource_request_message = service_bus_message_generator(msg_body, status_for_sb_message, error_message) # Post message on sb queue to notify receivers of action failure - message_logger_adapter.info(f"{installation_id}: Porter action failed with error = {error_message}") + logger.info(f"{installation_id}: Porter action failed with error = {error_message}") else: # Get the outputs - get_porter_outputs_successful, outputs = await get_porter_outputs(msg_body, message_logger_adapter, config) + get_porter_outputs_successful, outputs = await get_porter_outputs(msg_body, config) if get_porter_outputs_successful: status_for_sb_message = statuses.pass_status_string_for[action] @@ -202,24 +210,24 @@ async def invoke_porter_action(msg_body: dict, sb_client: ServiceBusClient, mess resource_request_message = service_bus_message_generator(msg_body, status_for_sb_message, status_message, outputs) await sb_sender.send_messages(ServiceBusMessage(body=resource_request_message, correlation_id=msg_body["id"], session_id=msg_body["operationId"])) - message_logger_adapter.info(f"Sent status message for {installation_id}: {status_for_sb_message}") + logger.info(f"Sent status message for {installation_id}: {status_for_sb_message}") # return true as want to continue processing the message return action_completed_without_error -async def get_porter_outputs(msg_body: dict, message_logger_adapter: logging.LoggerAdapter, config: dict): +async def get_porter_outputs(msg_body: dict, config: dict): """ Get outputs JSON from a Porter command """ porter_command = await build_porter_command_for_outputs(msg_body) - message_logger_adapter.debug("Starting to run porter output command...") - returncode, stdout, err = await run_porter(porter_command, message_logger_adapter, config) - message_logger_adapter.debug("Finished running porter output command.") + logger.debug("Starting to run porter output command...") + returncode, stdout, err = await run_porter(porter_command, config) + logger.debug("Finished running porter output command.") if returncode != 0: error_message = "Error context message = " + " ".join(err.split('\n')) - message_logger_adapter.info(f"{get_installation_id(msg_body)}: Failed to get outputs with error = {error_message}") + logger.info(f"{get_installation_id(msg_body)}: Failed to get outputs with error = {error_message}") return False, {} else: outputs_json = {} @@ -231,55 +239,54 @@ async def get_porter_outputs(msg_body: dict, message_logger_adapter: logging.Log if "{" in outputs_json[i]['value'] or "[" in outputs_json[i]['value']: outputs_json[i]['value'] = json.loads(outputs_json[i]['value'].replace("\\", "")) - message_logger_adapter.info(f"Got outputs as json: {outputs_json}") + logger.info(f"Got outputs as json: {outputs_json}") except ValueError: - message_logger_adapter.error(f"Got outputs invalid json: {stdout}") + logger.error(f"Got outputs invalid json: {stdout}") return True, outputs_json -async def runner(logger_adapter: logging.LoggerAdapter, config: dict): - async with default_credentials(config["vmss_msi_id"]) as credential: - service_bus_client = ServiceBusClient(config["service_bus_namespace"], credential) - await receive_message(service_bus_client, logger_adapter, config) - +async def runner(process_number: int, config: dict): + with tracer.start_as_current_span(process_number): + async with default_credentials(config["vmss_msi_id"]) as credential: + service_bus_client = ServiceBusClient(config["service_bus_namespace"], credential) + await receive_message(service_bus_client, config) -def start_runner_process(config: dict): - # Set up logger adapter copy for this process - logger_adapter = set_up_logger(enable_console_logging=False) - asyncio.run(runner(logger_adapter, config)) - -async def check_runners(processes: list, httpserver: Process, logger_adapter: logging.LoggerAdapter): - logger_adapter.info("Starting runners check...") +async def check_runners(processes: list, httpserver: Process): + logger.info("Starting runners check...") while True: await asyncio.sleep(30) - if all(not process.is_alive() for process in processes): - logger_adapter.error("All runner processes have failed!") + logger.error("All runner processes have failed!") httpserver.kill() if __name__ == "__main__": - logger_adapter: logging.LoggerAdapter = set_up_logger(enable_console_logging=True) - config = set_up_config(logger_adapter) + initialize_logging() + logger.info("Resource processor starting...") + with tracer.start_as_current_span("resource_processor_main"): + config = set_up_config() + + logger.info("Verifying Azure CLI and Porter functionality...") + asyncio.run(run_porter(["az account show -o table"], config)) - httpserver = Process(target=start_server) - httpserver.start() - logger_adapter.info("Started http server") + httpserver = Process(target=start_server) + httpserver.start() + logger.info("Started http server") - processes = [] - num = config["number_processes_int"] - logger_adapter.info(f"Starting {num} processes...") - for i in range(num): - logger_adapter.info(f"Starting process {str(i)}") - process = Process(target=lambda: start_runner_process(config)) - processes.append(process) - process.start() + processes = [] + num = config["number_processes_int"] + logger.info(f"Starting {num} processes...") + for i in range(num): + logger.info(f"Starting process {str(i)}") + process = Process(target=lambda: asyncio.run(runner(i, config))) + processes.append(process) + process.start() - logger_adapter.info("All proceesses have been started. Version is: %s", VERSION) + logger.info("All processes have been started. Version is: %s", VERSION) - asyncio.run(check_runners(processes, httpserver, logger_adapter)) + asyncio.run(check_runners(processes, httpserver)) - logger_adapter.warn("Exiting main...") + logger.warn("Exiting main...")