Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use auth libraries to authenticate in E2E tests #3344

Merged
merged 3 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cli/tre/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def get_auth_token_client_credentials(
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)

credential = ClientSecretCredential(aad_tenant_id, client_id, client_secret, connection_verify=verify, authority=get_authority_domain())
credential = ClientSecretCredential(aad_tenant_id, client_id, client_secret, connection_verify=verify, authority=get_aad_authority_fqdn())
token = event_loop.run_until_complete(credential.get_token(f'{api_scope}/.default'))
event_loop.run_until_complete(credential.close())

Expand All @@ -36,13 +36,13 @@ def get_public_client_application(
):
return msal.PublicClientApplication(
client_id=client_id,
authority=AuthorityBuilder(instance=get_authority_domain(), tenant=aad_tenant_id),
authority=AuthorityBuilder(instance=get_aad_authority_fqdn(), tenant=aad_tenant_id),
token_cache=token_cache)


def get_cloud() -> cloud.Cloud:
return cloud.get_active_cloud()


def get_authority_domain():
def get_aad_authority_fqdn() -> str:
return get_cloud().endpoints.active_directory.replace('https://', '')
4 changes: 2 additions & 2 deletions e2e_tests/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ def get_cloud() -> cloud.Cloud:
return cloud.get_active_cloud()


def get_aad_authority_url() -> str:
return get_cloud().endpoints.active_directory
def get_aad_authority_fqdn() -> str:
return get_cloud().endpoints.active_directory.replace('https://', '')
42 changes: 16 additions & 26 deletions e2e_tests/helpers.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
from json import JSONDecodeError

import asyncio
from typing import List, Optional
from contextlib import asynccontextmanager
from httpx import AsyncClient, Timeout, Response
import logging
from starlette import status
from azure.identity import ClientSecretCredential, UsernamePasswordCredential

import config
from e2e_tests import cloud


LOGGER = logging.getLogger(__name__)
TIMEOUT = Timeout(10, read=30)
AAD_AUTHORITY_URL = cloud.get_aad_authority_url()


class InstallFailedException(Exception):
Expand Down Expand Up @@ -105,29 +103,21 @@ async def check_aad_auth_redirect(endpoint, verify) -> None:


async def get_admin_token(verify) -> str:
async with AsyncClient(verify=verify) as client:
responseJson = ""
headers = {'Content-Type': "application/x-www-form-urlencoded"}
if config.TEST_ACCOUNT_CLIENT_ID != "" and config.TEST_ACCOUNT_CLIENT_SECRET != "":
# Use Client Credentials flow
payload = f"grant_type=client_credentials&client_id={config.TEST_ACCOUNT_CLIENT_ID}&client_secret={config.TEST_ACCOUNT_CLIENT_SECRET}&scope=api://{config.API_CLIENT_ID}/.default"
url = f"{AAD_AUTHORITY_URL}/{config.AAD_TENANT_ID}/oauth2/v2.0/token"

else:
# Use Resource Owner Password Credentials flow
payload = f"grant_type=password&resource={config.API_CLIENT_ID}&username={config.TEST_USER_NAME}&password={config.TEST_USER_PASSWORD}&scope=api://{config.API_CLIENT_ID}/user_impersonation&client_id={config.TEST_APP_ID}"
url = f"{AAD_AUTHORITY_URL}/{config.AAD_TENANT_ID}/oauth2/token"

response = await client.post(url, headers=headers, content=payload)
try:
responseJson = response.json()
except JSONDecodeError:
assert False, f"Failed to parse response as JSON. status_code: {response.status_code}, content: {response.content}"

assert "access_token" in responseJson, f"Failed to get access_token. content: {response.content}"
token = responseJson["access_token"]
assert token is not None, "Token not returned"
return token if (response.status_code == status.HTTP_200_OK) else ""
scope_uri = f"api://{config.API_CLIENT_ID}"
return get_token(scope_uri, verify)


def get_token(scope_uri, verify) -> str:
if config.TEST_ACCOUNT_CLIENT_ID != "" and config.TEST_ACCOUNT_CLIENT_SECRET != "":
# Logging in as an Enterprise Application: Use Client Credentials flow
credential = ClientSecretCredential(config.AAD_TENANT_ID, config.TEST_ACCOUNT_CLIENT_ID, config.TEST_ACCOUNT_CLIENT_SECRET, connection_verify=verify, authority=cloud.get_aad_authority_fqdn())
token = credential.get_token(f'{scope_uri}/.default')
else:
# Logging in as a User: Use Resource Owner Password Credentials flow
credential = UsernamePasswordCredential(config.TEST_APP_ID, config.TEST_USER_NAME, config.TEST_USER_PASSWORD, connection_verify=verify, authority=cloud.get_aad_authority_fqdn(), tenant_id=config.AAD_TENANT_ID)
token = credential.get_token(f'{scope_uri}/user_impersonation')

return token.token


def assert_status(response: Response, expected_status: List[int] = [200], message_prefix: str = "Unexpected HTTP Status"):
Expand Down
29 changes: 3 additions & 26 deletions e2e_tests/resources/workspace.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
import logging
import config
from httpx import AsyncClient, Timeout
from json import JSONDecodeError
from starlette import status
from typing import Tuple
from e2e_tests import cloud
from e2e_tests.helpers import get_auth_header, get_full_endpoint
from e2e_tests.helpers import get_auth_header, get_full_endpoint, get_token

LOGGER = logging.getLogger(__name__)
TIMEOUT = Timeout(10, read=30)
AAD_AUTHORITY_URL = cloud.get_aad_authority_url()


async def get_workspace(client, workspace_id: str, headers) -> dict:
Expand Down Expand Up @@ -41,24 +36,6 @@ async def get_workspace_auth_details(admin_token, workspace_id, verify) -> Tuple
async with AsyncClient(verify=verify) as client:
auth_headers = get_auth_header(admin_token)
scope_uri = await get_identifier_uri(client, workspace_id, auth_headers)
access_token = get_token(scope_uri, verify)

if config.TEST_ACCOUNT_CLIENT_ID != "" and config.TEST_ACCOUNT_CLIENT_SECRET != "":
# Logging in as an Enterprise Application: Use Client Credentials flow
payload = f"grant_type=client_credentials&client_id={config.TEST_ACCOUNT_CLIENT_ID}&client_secret={config.TEST_ACCOUNT_CLIENT_SECRET}&scope={scope_uri}/.default"
url = f"{AAD_AUTHORITY_URL}/{config.AAD_TENANT_ID}/oauth2/v2.0/token"

else:
# Logging in as a User: Use Resource Owner Password Credentials flow
payload = f"grant_type=password&resource={workspace_id}&username={config.TEST_USER_NAME}&password={config.TEST_USER_PASSWORD}&scope={scope_uri}/user_impersonation&client_id={config.TEST_APP_ID}"
url = f"{AAD_AUTHORITY_URL}/{config.AAD_TENANT_ID}/oauth2/token"

response = await client.post(url, headers=auth_headers, content=payload)
try:
responseJson = response.json()
except JSONDecodeError:
raise Exception("Failed to parse response as JSON: {}".format(response.content))

if "access_token" not in responseJson or response.status_code != status.HTTP_200_OK:
raise Exception("Failed to get access_token: {}".format(response.content))

return responseJson["access_token"], scope_uri
return access_token, scope_uri