Skip to content

Commit

Permalink
Add draft for lazy unique index population.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Sep 30, 2023
1 parent 386c932 commit 4c327c4
Show file tree
Hide file tree
Showing 10 changed files with 575 additions and 222 deletions.
16 changes: 10 additions & 6 deletions order/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"Settings",
"Lazy", "Model",
"AdapterModel", "Adapter", "Materialized", "DataProvider",
"UniqueObject", "UniqueObjectIndex",
"UniqueObject", "LazyUniqueObject", "UniqueObjectIndex",
"DuplicateObjectException", "DuplicateNameException", "DuplicateIdException",
]

Expand All @@ -23,14 +23,18 @@

# provisioning imports
from order.settings import Settings
from order.models.base import Lazy, Model
from order.types import Lazy
from order.models.base import Model
from order.models.unique import (
UniqueObject, UniqueObjectIndex, DuplicateObjectException, DuplicateNameException,
DuplicateIdException,
UniqueObject, LazyUniqueObject, UniqueObjectIndex, DuplicateObjectException,
DuplicateNameException, DuplicateIdException,
)
from order.models.campaign import Campaign
from order.models.dataset import Dataset
from order.adapters.base import AdapterModel, Adapter, Materialized, DataProvider

# import adapters to trigger their registration
import order.adapters.order
import order.adapters.dbs
import order.adapters.xsdb
import order.adapters.das
# import order.adapters.dbs
# import order.adapters.xsdb
28 changes: 16 additions & 12 deletions order/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,19 @@
import shutil
from contextlib import contextmanager
from abc import ABCMeta, abstractmethod, abstractproperty
from typing import Any, Sequence, Dict
from types import GeneratorType

from pydantic import BaseModel

from order.types import Any, Sequence, Dict, GeneratorType, NonEmptyStrictStr, StrictStr, Field
from order.settings import Settings
from order.util import create_hash


class AdapterModel(BaseModel):

adapter: str
arguments: Dict[str, Any]
key: str
adapter: NonEmptyStrictStr
key: StrictStr
arguments: Dict[NonEmptyStrictStr, Any] = Field(default_factory=lambda: {})

@property
def name(self) -> str:
Expand Down Expand Up @@ -194,17 +193,24 @@ def __init__(
shutil.rmtree(self.cache_directory)

@contextmanager
def materialize(self, adapter_model: AdapterModel | dict[str, Any]) -> GeneratorType:
def materialize(
self,
adapter_model: AdapterModel | dict[str, Any],
adapter_kwargs: dict[str, Any] | None = None,
) -> GeneratorType:
if not isinstance(adapter_model, AdapterModel):
adapter_model = AdapterModel(**adapter_model)

# get the adapter class and instantiate it
adapter = AdapterMeta.get_cls(adapter_model.name)()

# merge kwargs
adapter_kwargs = {**adapter_model.arguments, **(adapter_kwargs or {})}

# determine the basename of the cache file (if existing)
h = (
os.path.realpath(self.data_location),
adapter.get_cache_key(**adapter_model.arguments),
adapter.get_cache_key(**adapter_kwargs),
)
cache_name = f"{create_hash(h)}.json"

Expand All @@ -220,7 +226,7 @@ def materialize(self, adapter_model: AdapterModel | dict[str, Any]) -> Generator

# invoke the adapter
args = (self.data_location,) if adapter.needs_data_location else ()
materialized = adapter.retrieve_data(*args, **adapter_model.arguments)
materialized = adapter.retrieve_data(*args, **adapter_kwargs)

# complain when the return value is not a materialized container
if not isinstance(materialized, Materialized):
Expand All @@ -232,10 +238,8 @@ def materialize(self, adapter_model: AdapterModel | dict[str, Any]) -> Generator
# yield the materialized data and cache it if the receiving context did not raise
try:
yield materialized
except Exception as e:
if isinstance(e, self.SkipCaching):
return
raise e
except self.SkipCaching:
return

# cache it
if writable_path:
Expand Down
29 changes: 29 additions & 0 deletions order/adapters/das.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# coding: utf-8

from __future__ import annotations


__all__ = ["DASDatasetAdapter"]


from order.adapters.base import Adapter, Materialized


class DASDatasetAdapter(Adapter):

name = "das_dataset"

def retrieve_data(self, *, keys: list[str]) -> Materialized:
if keys[0].startswith("/SCALE"):
return Materialized(n_events=1, n_files=1)
return Materialized(n_events=5_000_000, n_files=12)


class DASLFNsAdapter(Adapter):

name = "das_lfns"

def retrieve_data(self, *, keys: list[str]) -> Materialized:
if keys[0].startswith("/SCALE"):
return Materialized(lfns=["/SCALE/b/NANOAODSIM"])
return Materialized(lfns=["/a/b/NANOAODSIM"])
70 changes: 65 additions & 5 deletions order/adapters/order.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,73 @@ def retrieve_data(
dataset_dir = os.path.join(self.remove_scheme(data_location), "datasets", campaign_name)

# read yaml files in the datasets directory
datasets = {}
datasets = []
for path in glob.glob(os.path.join(dataset_dir, "*.yaml")):
with open(path, "r") as f:
# allow multiple documents per file
for data in yaml.load_all(f, Loader=yaml.SafeLoader):
if "name" not in data:
raise KeyError(f"no field 'name' defined in dataset yaml file {path}")
datasets[data["name"]] = data
stream = yaml.load_all(f, Loader=yaml.SafeLoader)
for i, entry in enumerate(stream):
if "name" not in entry:
raise AttributeError(
f"no field 'name' defined in enty {i} of dataset yaml file {path}",
)
if "id" not in entry:
raise AttributeError(
f"no field 'id' defined in enty {i} of dataset yaml file {path}",
)
datasets.append(
self.create_lazy_dataset_dict(campaign_name, entry["name"], entry["id"]),
)

return Materialized(datasets=datasets)

@classmethod
def create_lazy_dataset_dict(cls, campaign_name: str, name: str, id: int) -> dict:
return {
"name": name,
"id": id,
"class_name": "Dataset",
"adapter": {
"adapter": "order_dataset",
"arguments": {
"campaign_name": campaign_name,
"dataset_name": name,
},
"key": "dataset",
},
}


class DatasetAdapter(OrderAdapter):

name = "order_dataset"

def retrieve_data(
self,
data_location: str,
*,
campaign_name: str,
dataset_name: str,
) -> Materialized:
# only supporting local evaluation for now
if not self.location_is_local(data_location):
raise NotImplementedError(f"non-local location {data_location} not handled by {self}")

# build the yaml file path
path = os.path.join(
self.remove_scheme(data_location),
"datasets",
campaign_name,
f"{dataset_name}.yaml",
)
if not os.path.exists(path):
raise Exception(f"dataset file {path} not existing")

# open the file and look for the dataset
with open(path, "r") as f:
stream = yaml.load_all(f, Loader=yaml.SafeLoader)
for entry in stream:
if entry.get("name") == dataset_name:
return Materialized(dataset=entry)

raise Exception(f"no dataset entry with name '{dataset_name}' found in {path}")
98 changes: 35 additions & 63 deletions order/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,76 +7,39 @@
from __future__ import annotations


__all__ = ["Lazy", "Model"]
__all__ = ["Model"]


import re
from typing import Union, Any
from types import GeneratorType

from typing_extensions import Annotated, _AnnotatedAlias as AnnotatedType
from pydantic import BaseModel, Field, Strict, ConfigDict
from pydantic.fields import FieldInfo
from pydantic import BaseModel, ConfigDict

from order.types import Any, GeneratorType, Field, FieldInfo, Lazy
from order.adapters.base import AdapterModel, DataProvider
from order.util import no_value


class Lazy(object):

@classmethod
def __class_getitem__(cls, types):
if not isinstance(types, tuple):
types = (types,)
return Union[tuple(map(cls.make_strict, types)) + (AdapterModel,)]

@classmethod
def make_strict(cls, type_: type) -> AnnotatedType:
# some types cannot be strict
if not cls.can_make_strict(type_):
return type_

# when not decorated with strict meta data, just create a new strict tyoe
if (
not isinstance(type_, AnnotatedType) or
not any(isinstance(m, Strict) for m in getattr(type_, "__metadata__", []))
):
return Annotated[type_, Strict()]

# when already strict, return as is
metadata = type_.__metadata__
if all(m.strict for m in metadata if isinstance(m, Strict)):
return type_

# at this point, strict metadata exists but it is actually disabled,
# so replace it in metadata and return a new annotated type
metadata = [
(Strict() if isinstance(m, Strict) else m)
for m in metadata
]
return Annotated[(*type_.__args__, *metadata)]

@classmethod
def can_make_strict(cls, type_: type) -> bool:
if type_.__dict__.get("_name") in ("Dict", "List"):
return False

return True


class ModelMeta(type(BaseModel)):

def __new__(meta_cls, class_name: str, bases: tuple, class_dict: dict[str, Any]) -> "ModelMeta":
# convert "Lazy" annotations to proper fields and add access properties
lazy_attrs = []
for attr, type_str in list(class_dict.get("__annotations__", {}).items()):
type_names = meta_cls.parse_lazy_annotation(type_str)
type_names = Lazy.parse_annotation(type_str)
if type_names:
meta_cls.register_lazy_attr(attr, type_names, class_name, class_dict)
meta_cls.register_lazy_attr(attr, type_names, class_name, bases, class_dict)
lazy_attrs.append(attr)

# store names of lazy attributes
class_dict["_lazy_attrs"] = [(attr, meta_cls.get_lazy_attr(attr)) for attr in lazy_attrs]
# store names of lazy attributes, considering also bases
lazy_attrs_dict = {}
for base in reversed(bases):
if getattr(base, "_lazy_attrs", None) is None:
continue
lazy_attrs_dict.update({
attr: lazy_attr
for attr, lazy_attr in base._lazy_attrs.default.items()
if lazy_attr in base.__fields__
})
lazy_attrs_dict.update({attr: meta_cls.get_lazy_attr(attr) for attr in lazy_attrs})
class_dict["_lazy_attrs"] = lazy_attrs_dict

# check the model_config
class_dict["model_config"] = model_config = class_dict.get("model_config") or ConfigDict()
Expand All @@ -95,12 +58,12 @@ def __new__(meta_cls, class_name: str, bases: tuple, class_dict: dict[str, Any])
# create the class
cls = super().__new__(meta_cls, class_name, bases, class_dict)

return cls
# remove non-existing lazy attributes from above added dict after class was created
for attr, lazy_attr in list(cls._lazy_attrs.default.items()):
if lazy_attr not in cls.__fields__:
del cls._lazy_attrs.default[attr]

@classmethod
def parse_lazy_annotation(meta_cls, type_str: str) -> list[str] | None:
m = re.match(r"^Lazy\[(.+)\]$", type_str)
return m and [s.strip() for s in m.group(1).split(",")]
return cls

@classmethod
def get_lazy_attr(meta_cls, attr: str) -> str:
Expand All @@ -112,6 +75,7 @@ def register_lazy_attr(
attr: str,
type_names: list[str],
class_name: str,
bases: tuple,
class_dict: dict[str, Any],
) -> None:
# if a field already exist, get it
Expand All @@ -122,12 +86,15 @@ def register_lazy_attr(
)
class_dict.pop(attr, None)

# store existing fields
class_dict.setdefault("__orig_fields__", {})[attr] = field

# exchange the annotation with the lazy one
lazy_attr = meta_cls.get_lazy_attr(attr)
class_dict["__annotations__"][lazy_attr] = class_dict["__annotations__"].pop(attr)

# add a field for the lazy attribute with aliases
_field = Field(alias=attr, serialization_alias=attr, repr=False)
# make sure the field has an alias set and is skipped in repr
_field = Field(alias=attr, repr=False)
field = FieldInfo.merge_field_infos(field, _field) if field else _field
class_dict[lazy_attr] = field

Expand All @@ -146,7 +113,7 @@ def fget(self):
with DataProvider.instance().materialize(adapter_model) as materialized:
# loop through known lazy attributes and check which of them is assigned a
# materialized value
for attr_, lazy_attr_ in self._lazy_attrs:
for attr_, lazy_attr_ in self._lazy_attrs.items():
# the adapter model must be compatible that the called one
adapter_model_ = getattr(self, lazy_attr_)
if not adapter_model.compare_signature(adapter_model_):
Expand Down Expand Up @@ -199,6 +166,11 @@ def __repr_args__(self) -> GeneratorType:
"""
yield from super().__repr_args__()

for attr, lazy_attr in self._lazy_attrs:
for attr, lazy_attr in self._lazy_attrs.items():
# skip when field was originally skipped
orig_field = self.__orig_fields__.get(attr)
if orig_field and not orig_field.repr:
continue

value = getattr(self, lazy_attr)
yield attr, f"lazy({value.name})" if isinstance(value, AdapterModel) else value
9 changes: 4 additions & 5 deletions order/models/campaign.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
__all__ = ["GT", "Campaign"]


from typing import Dict

from order.models.base import Model, Lazy
from order.models.dataset import Dataset
from order.types import Lazy, Field
from order.models.base import Model
from order.models.dataset import DatasetIndex


class GT(Model):
Expand All @@ -28,4 +27,4 @@ class Campaign(Model):
tier: Lazy[str]
ecm: Lazy[float]
recommended_gt: GT
datasets: Lazy[Dict[str, Dataset]]
datasets: DatasetIndex = Field(default_factory=DatasetIndex)
Loading

0 comments on commit 4c327c4

Please sign in to comment.