Skip to content

Commit

Permalink
Merge pull request #302 from populationgenomics/sept-23-upstream-1-c9…
Browse files Browse the repository at this point in the history
…db002

Sept 23 upstream 1 (c9db002)
  • Loading branch information
illusional authored Sep 19, 2023
2 parents abaf45c + a1f10dd commit eb6b29c
Show file tree
Hide file tree
Showing 631 changed files with 4,813 additions and 3,226 deletions.
65 changes: 58 additions & 7 deletions auth/auth/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@

import aiohttp
import aiohttp_session
import kubernetes_asyncio.client
import kubernetes_asyncio.client.rest
import kubernetes_asyncio.config
import uvloop
from aiohttp import web
from prometheus_async.aio.web import server_stats # type: ignore

from gear import (
AuthClient,
Database,
K8sCache,
Transaction,
check_csrf_token,
create_session,
Expand Down Expand Up @@ -53,6 +57,9 @@

CLOUD = get_global_config()['cloud']
ORGANIZATION_DOMAIN = os.environ['HAIL_ORGANIZATION_DOMAIN']
DEFAULT_NAMESPACE = os.environ['HAIL_DEFAULT_NAMESPACE']

is_test_deployment = DEFAULT_NAMESPACE != 'default'

deploy_config = get_deploy_config()

Expand Down Expand Up @@ -124,7 +131,14 @@ async def check_valid_new_user(tx: Transaction, username, login_id, is_developer


async def insert_new_user(
db: Database, username: str, login_id: Optional[str], is_developer: bool, is_service_account: bool
db: Database,
username: str,
login_id: Optional[str],
is_developer: bool,
is_service_account: bool,
*,
hail_identity: Optional[str] = None,
hail_credentials_secret_name: Optional[str] = None,
) -> bool:
@transaction(db)
async def _insert(tx):
Expand All @@ -134,10 +148,18 @@ async def _insert(tx):

await tx.execute_insertone(
'''
INSERT INTO users (state, username, login_id, is_developer, is_service_account)
VALUES (%s, %s, %s, %s, %s);
INSERT INTO users (state, username, login_id, is_developer, is_service_account, hail_identity, hail_credentials_secret_name)
VALUES (%s, %s, %s, %s, %s, %s, %s);
''',
('creating', username, login_id, is_developer, is_service_account),
(
'creating',
username,
login_id,
is_developer,
is_service_account,
hail_identity,
hail_credentials_secret_name,
),
)

await _insert() # pylint: disable=no-value-for-parameter
Expand Down Expand Up @@ -367,8 +389,29 @@ async def create_user(request: web.Request, userdata): # pylint: disable=unused
is_developer = body['is_developer']
is_service_account = body['is_service_account']

hail_identity = body.get('hail_identity')
hail_credentials_secret_name = body.get('hail_credentials_secret_name')
if (hail_identity or hail_credentials_secret_name) and not is_test_deployment:
raise web.HTTPBadRequest(text='Cannot specify an existing hail identity for a new user')
if hail_credentials_secret_name:
try:
k8s_cache: K8sCache = request.app['k8s_cache']
await k8s_cache.read_secret(hail_credentials_secret_name, DEFAULT_NAMESPACE)
except kubernetes_asyncio.client.rest.ApiException as e:
raise web.HTTPBadRequest(
text=f'hail credentials secret name specified but was not found in namespace {DEFAULT_NAMESPACE}: {hail_credentials_secret_name}'
) from e

try:
await insert_new_user(db, username, login_id, is_developer, is_service_account)
await insert_new_user(
db,
username,
login_id,
is_developer,
is_service_account,
hail_identity=hail_identity,
hail_credentials_secret_name=hail_credentials_secret_name,
)
except AuthUserError as e:
raise e.http_response()

Expand Down Expand Up @@ -750,12 +793,20 @@ async def on_startup(app):
app['client_session'] = httpx.client_session()
app['flow_client'] = get_flow_client('/auth-oauth2-client-secret/client_secret.json')

kubernetes_asyncio.config.load_incluster_config()
app['k8s_client'] = kubernetes_asyncio.client.CoreV1Api()
app['k8s_cache'] = K8sCache(app['k8s_client'])


async def on_cleanup(app):
try:
await app['db'].async_close()
k8s_client: kubernetes_asyncio.client.CoreV1Api = app['k8s_client']
await k8s_client.api_client.rest_client.pool_manager.close()
finally:
await app['client_session'].close()
try:
await app['db'].async_close()
finally:
await app['client_session'].close()


class AuthAccessLogger(AccessLogger):
Expand Down
122 changes: 10 additions & 112 deletions auth/auth/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import logging
import os
import random
import secrets
from typing import Any, Awaitable, Callable, Dict, List, Optional
from typing import Any, Awaitable, Callable, Dict, List

import aiohttp
import kubernetes_asyncio.client
Expand All @@ -17,7 +16,6 @@
from gear.cloud_config import get_gcp_config, get_global_config
from hailtop import aiotools, httpx
from hailtop import batch_client as bc
from hailtop.auth.sql_config import SQLConfig, create_secret_data_from_config
from hailtop.utils import secret_alnum_string, time_msecs

log = logging.getLogger('auth.driver')
Expand All @@ -34,7 +32,7 @@ class DatabaseConflictError(Exception):


class EventHandler:
def __init__(self, handler, event=None, bump_secs=60.0, min_delay_secs=0.1):
def __init__(self, handler, event=None, bump_secs=5.0, min_delay_secs=0.1):
self.handler = handler
if event is None:
event = asyncio.Event()
Expand Down Expand Up @@ -234,86 +232,6 @@ async def delete(self):
self.app_obj_id = None


class DatabaseResource:
def __init__(self, db_instance, name=None):
self.db_instance = db_instance
self.name = name
self.password = None

async def create(self, name):
assert self.name is None

if is_test_deployment:
return

await self._delete(name)

self.password = secrets.token_urlsafe(16)
await self.db_instance.just_execute(
f'''
CREATE DATABASE `{name}`;
CREATE USER '{name}'@'%' IDENTIFIED BY '{self.password}';
GRANT ALL ON `{name}`.* TO '{name}'@'%';
'''
)
self.name = name

def secret_data(self):
with open('/database-server-config/sql-config.json', 'r', encoding='utf-8') as f:
server_config = SQLConfig.from_json(f.read())
with open('/database-server-config/server-ca.pem', 'r', encoding='utf-8') as f:
server_ca = f.read()
client_cert: Optional[str]
client_key: Optional[str]
if server_config.using_mtls():
with open('/database-server-config/client-cert.pem', 'r', encoding='utf-8') as f:
client_cert = f.read()
with open('/database-server-config/client-key.pem', 'r', encoding='utf-8') as f:
client_key = f.read()
else:
client_cert = None
client_key = None

if is_test_deployment:
return create_secret_data_from_config(server_config, server_ca, client_cert, client_key)

assert self.name is not None
assert self.password is not None

config = SQLConfig(
host=server_config.host,
port=server_config.port,
user=self.name,
password=self.password,
instance=server_config.instance,
connection_name=server_config.connection_name,
db=self.name,
ssl_ca='/sql-config/server-ca.pem',
ssl_cert='/sql-config/client-cert.pem' if client_cert is not None else None,
ssl_key='/sql-config/client-key.pem' if client_key is not None else None,
ssl_mode='VERIFY_CA',
)
return create_secret_data_from_config(config, server_ca, client_cert, client_key)

async def _delete(self, name):
if is_test_deployment:
return

# no DROP USER IF EXISTS in current db version
row = await self.db_instance.execute_and_fetchone('SELECT 1 FROM mysql.user WHERE User = %s;', (name,))
if row is not None:
await self.db_instance.just_execute(f"DROP USER '{name}';")

await self.db_instance.just_execute(f'DROP DATABASE IF EXISTS `{name}`;')

async def delete(self):
if self.name is None:
return
await self._delete(self.name)
self.name = None


class K8sNamespaceResource:
def __init__(self, k8s_client, name=None):
self.k8s_client = k8s_client
Expand Down Expand Up @@ -410,7 +328,6 @@ async def delete(self):


async def _create_user(app, user, skip_trial_bp, cleanup):
db_instance = app['db_instance']
db = app['db']
k8s_client = app['k8s_client']
identity_client = app['identity_client']
Expand Down Expand Up @@ -481,21 +398,14 @@ async def _create_user(app, user, skip_trial_bp, cleanup):
updates['hail_credentials_secret_name'] = hail_credentials_secret_name

namespace_name = user['namespace_name']
if namespace_name is None and user['is_developer'] == 1:
# auth services in test namespaces cannot/should not be creating and deleting namespaces
if namespace_name is None and user['is_developer'] == 1 and not is_test_deployment:
namespace_name = ident
namespace = K8sNamespaceResource(k8s_client)
cleanup.append(namespace.delete)
await namespace.create(namespace_name)
updates['namespace_name'] = namespace_name

db_resource = DatabaseResource(db_instance)
cleanup.append(db_resource.delete)
await db_resource.create(ident)

db_secret = K8sSecretResource(k8s_client)
cleanup.append(db_secret.delete)
await db_secret.create('database-server-config', namespace_name, db_resource.secret_data())

if not skip_trial_bp and user['is_service_account'] != 1:
trial_bp = user['trial_bp_name']
if trial_bp is None:
Expand Down Expand Up @@ -536,7 +446,6 @@ async def create_user(app, user, skip_trial_bp=False):


async def delete_user(app, user):
db_instance = app['db_instance']
db = app['db']
k8s_client = app['k8s_client']
identity_client = app['identity_client']
Expand Down Expand Up @@ -572,9 +481,6 @@ async def delete_user(app, user):
namespace = K8sNamespaceResource(k8s_client, namespace_name)
await namespace.delete()

db_resource = DatabaseResource(db_instance, user['username'])
await db_resource.delete()

trial_bp_name = user['trial_bp_name']
if trial_bp_name is not None:
batch_client = app['batch_client']
Expand Down Expand Up @@ -619,10 +525,6 @@ async def async_main():

app['client_session'] = httpx.client_session()

db_instance = Database()
await db_instance.async_init(maxsize=50, config_file='/database-server-config/sql-config.json')
app['db_instance'] = db_instance

kubernetes_asyncio.config.load_incluster_config()
app['k8s_client'] = kubernetes_asyncio.client.CoreV1Api()

Expand All @@ -647,18 +549,14 @@ async def users_changed_handler():
await app['db'].async_close()
finally:
try:
if 'db_instance_pool' in app:
await app['db_instance_pool'].async_close()
await app['client_session'].close()
finally:
try:
await app['client_session'].close()
if user_creation_loop is not None:
user_creation_loop.shutdown()
finally:
try:
if user_creation_loop is not None:
user_creation_loop.shutdown()
await app['identity_client'].close()
finally:
try:
await app['identity_client'].close()
finally:
k8s_client: kubernetes_asyncio.client.CoreV1Api = app['k8s_client']
await k8s_client.api_client.rest_client.pool_manager.close()
k8s_client: kubernetes_asyncio.client.CoreV1Api = app['k8s_client']
await k8s_client.api_client.rest_client.pool_manager.close()
21 changes: 20 additions & 1 deletion batch/batch/cloud/azure/driver/create_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import logging
import os
from shlex import quote as shq
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

from gear.cloud_config import get_global_config
from hailtop.config import get_deploy_config

from ....batch_configuration import DEFAULT_NAMESPACE, DOCKER_PREFIX, DOCKER_ROOT_IMAGE, INTERNAL_GATEWAY_IP
from ....file_store import FileStore
Expand Down Expand Up @@ -80,6 +81,15 @@ def create_vm_config(

assert instance_config.is_valid_configuration(resource_rates.keys())

touch_commands: List[str] = []
for jvm_cores in (1, 2, 4, 8):
for _ in range(cores // jvm_cores):
idx = len(touch_commands)
log_path = f'/batch/jvm-container-logs/jvm-{idx}.log'
touch_commands.append(f'sudo touch {log_path}')

jvm_touch_command = '\n'.join(touch_commands)

startup_script = r'''#cloud-config
mounts:
Expand Down Expand Up @@ -139,6 +149,9 @@ def create_vm_config(
sudo mkdir -p /mnt/disks/$WORKER_DATA_DISK_NAME/batch/
sudo ln -s /mnt/disks/$WORKER_DATA_DISK_NAME/batch /batch
sudo mkdir -p /batch/jvm-container-logs/
{jvm_touch_command}
sudo mkdir -p /mnt/disks/$WORKER_DATA_DISK_NAME/logs/
sudo ln -s /mnt/disks/$WORKER_DATA_DISK_NAME/logs /logs
Expand Down Expand Up @@ -218,6 +231,11 @@ def create_vm_config(
{make_global_config_str}
mkdir /deploy-config
cat >/deploy-config/deploy-config.json <<EOF
{ json.dumps(get_deploy_config().with_location('gce').get_config()) }
EOF
# retry once
az acr login --name $DOCKER_PREFIX
docker pull $BATCH_WORKER_IMAGE || \
Expand Down Expand Up @@ -256,6 +274,7 @@ def create_vm_config(
-v /batch:/batch:shared \
-v /logs:/logs \
-v /global-config:/global-config \
-v /deploy-config:/deploy-config \
-v /cloudfuse:/cloudfuse:shared \
-v /etc/netns:/etc/netns \
-v /sys/fs/cgroup:/sys/fs/cgroup \
Expand Down
12 changes: 10 additions & 2 deletions batch/batch/cloud/azure/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,17 @@ def __init__(
self.resource_group = resource_group
self.namespace = namespace
self.region_monitor = region_monitor
self.inst_coll_manager = inst_coll_manager
self.job_private_inst_manager = job_private_inst_manager
self.billing_manager = billing_manager
self._billing_manager = billing_manager
self._inst_coll_manager = inst_coll_manager

@property
def billing_manager(self) -> AzureBillingManager:
return self._billing_manager

@property
def inst_coll_manager(self) -> InstanceCollectionManager:
return self._inst_coll_manager

async def shutdown(self) -> None:
try:
Expand Down
Loading

0 comments on commit eb6b29c

Please sign in to comment.