Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Structured array for manifest #39

Closed
wants to merge 10 commits into from
134 changes: 90 additions & 44 deletions virtualizarr/manifests/manifest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import itertools
import re
from typing import Any, Iterable, Iterator, List, Mapping, Tuple, Union, cast
from typing import Any, Iterable, Iterator, List, Mapping, NewType, Tuple, Union, cast

import numpy as np
from pydantic import BaseModel, ConfigDict, field_validator
from pydantic import BaseModel, ConfigDict

from ..types import ChunkKey

Expand All @@ -14,6 +13,11 @@
_CHUNK_KEY = rf"^{_INTEGER}+({_SEPARATOR}{_INTEGER})*$" # matches 1 integer, optionally followed by more integers each separated by a separator (i.e. a period)


ChunkDict = NewType(
"ChunkDict", dict[ChunkKey, dict[str, Union[str, int]]]
) # just the .zattrs (for one array or for the whole store/group)


class ChunkEntry(BaseModel):
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we really need this class anymore

Information for a single chunk in the manifest.
Expand Down Expand Up @@ -42,11 +46,20 @@ def to_kerchunk(self) -> List[Union[str, int]]:
return [self.path, self.offset, self.length]


# TODO we want the path field to contain a variable-length string, but that's not available until numpy 2.0
# See https://numpy.org/neps/nep-0055-string_dtype.html
MANIFEST_STRUCTURED_ARRAY_DTYPES = np.dtype(
[("path", "<U32"), ("offset", np.int32), ("length", np.int32)]
)

Comment on lines +51 to +55
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because file paths can be strings of any length, we really need to be using numpy's new variable-width string dtype here.

Unfortunately it's only coming out with numpy 2.0, and although there is a release candidate for numpy 2.0, it's so new that pandas doesn't support it yet. Xarray has a pandas dependency, so currently we can't actually build an environment that let's us try virtualizarr with the variable-length string dtype yet.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pandas just released 2.2.2, which is compatible with the upcoming numpy 2.0 release.

Not sure if that will break any part of xarray that we need for VirtualiZarr, but this might now be close enough to test out variable-length dtypes now.


class ChunkManifest(BaseModel):
"""
In-memory representation of a single Zarr chunk manifest.

Stores the manifest as a dictionary under the .chunks attribute, in this form:
Stores the manifest internally as a numpy structured array.

The manifest can be converted to or from a dictionary form looking like this

{
"0.0.0": {"path": "s3://bucket/foo.nc", "offset": 100, "length": 100},
Expand All @@ -55,24 +68,45 @@ class ChunkManifest(BaseModel):
"0.1.1": {"path": "s3://bucket/foo.nc", "offset": 400, "length": 100},
}

using the .from_dict() and .dict() methods, so users of this class can think of the manifest as if it were a dict.

See the chunk manifest SPEC proposal in https://github.com/zarr-developers/zarr-specs/issues/287 .

Validation is done when this object is instatiated, and this class is immutable,
so it's not possible to have a ChunkManifest object that does not represent a complete valid grid of chunks.
"""

model_config = ConfigDict(frozen=True)
model_config = ConfigDict(
frozen=True,
arbitrary_types_allowed=True, # so pydantic doesn't complain about the numpy array field
)

entries: Mapping[ChunkKey, ChunkEntry]
# shape_chunk_grid: Tuple[int, ...] # TODO do we need this for anything?
# TODO how to type hint to indicate a numpy structured array with specifically-typed fields?
entries: np.ndarray

@field_validator("entries")
@classmethod
def validate_chunks(cls, entries: Any) -> Mapping[ChunkKey, ChunkEntry]:
validate_chunk_keys(list(entries.keys()))
def from_dict(cls, chunks: ChunkDict) -> "ChunkManifest":
# TODO do some input validation here first?
validate_chunk_keys(chunks.keys())

# TODO should we actually pass shape in, in case there are not enough chunks to give correct idea of full shape?
shape = get_chunk_grid_shape(chunks.keys())

# TODO what if pydantic adjusts anything during validation?
return entries
# Initializing to empty implies that entries with path='' are treated as missing chunks
entries = np.empty(shape=shape, dtype=MANIFEST_STRUCTURED_ARRAY_DTYPES)

# populate the array
for key, entry in chunks.items():
try:
entries[split(key)] = tuple(entry.values())
except (ValueError, TypeError) as e:
msg = (
"Each chunk entry must be of the form dict(path=<str>, offset=<int>, length=<int>), "
f"but got {entry}"
)
raise ValueError(msg) from e

return ChunkManifest(entries=entries)

@property
def ndim_chunk_grid(self) -> int:
Expand All @@ -81,7 +115,7 @@ def ndim_chunk_grid(self) -> int:

Not the same as the dimension of an array backed by this chunk manifest.
"""
return get_ndim_from_key(list(self.entries.keys())[0])
return self.entries.ndim

@property
def shape_chunk_grid(self) -> Tuple[int, ...]:
Expand All @@ -90,23 +124,56 @@ def shape_chunk_grid(self) -> Tuple[int, ...]:

Not the same as the shape of an array backed by this chunk manifest.
"""
return get_chunk_grid_shape(list(self.entries.keys()))
return self.entries.shape

def __repr__(self) -> str:
return f"ChunkManifest<shape={self.shape_chunk_grid}>"

def __getitem__(self, key: ChunkKey) -> ChunkEntry:
return self.chunks[key]
indices = split(key)
return ChunkEntry(self.entries[indices])

def __iter__(self) -> Iterator[ChunkKey]:
return iter(self.chunks.keys())

def __len__(self) -> int:
return len(self.chunks)
return self.entries.size

def dict(self) -> dict[str, dict[str, Union[str, int]]]:
"""Converts the entire manifest to a nested dictionary."""
return {k: dict(entry) for k, entry in self.entries.items()}
def dict(self) -> ChunkDict:
"""
Converts the entire manifest to a nested dictionary, of the form

{
"0.0.0": {"path": "s3://bucket/foo.nc", "offset": 100, "length": 100},
"0.0.1": {"path": "s3://bucket/foo.nc", "offset": 200, "length": 100},
"0.1.0": {"path": "s3://bucket/foo.nc", "offset": 300, "length": 100},
"0.1.1": {"path": "s3://bucket/foo.nc", "offset": 400, "length": 100},
}
"""

def _entry_to_dict(entry: Tuple[str, int, int]) -> dict[str, Union[str, int]]:
return {
"path": entry[0],
"offset": entry[1],
"length": entry[2],
}

# print(self.entries.dtype)

coord_vectors = np.mgrid[
tuple(slice(None, length) for length in self.shape_chunk_grid)
]
# print(coord_vectors)
# print(self.entries)

# TODO don't include entry if path='' ?
return cast(
ChunkDict,
{
join(inds): _entry_to_dict(entry.item())
for *inds, entry in np.nditer([*coord_vectors, self.entries])
},
)

@staticmethod
def from_zarr_json(filepath: str) -> "ChunkManifest":
Expand All @@ -125,12 +192,12 @@ def from_kerchunk_chunk_dict(cls, kerchunk_chunk_dict) -> "ChunkManifest":
return ChunkManifest(entries=chunkentries)


def split(key: ChunkKey) -> List[int]:
return list(int(i) for i in key.split("."))
def split(key: ChunkKey) -> Tuple[int, ...]:
return tuple(int(i) for i in key.split("."))


def join(inds: Iterable[int]) -> ChunkKey:
return cast(ChunkKey, ".".join(str(i) for i in inds))
def join(inds: Iterable[Any]) -> ChunkKey:
return cast(ChunkKey, ".".join(str(i) for i in list(inds)))


def get_ndim_from_key(key: str) -> int:
Expand All @@ -154,9 +221,6 @@ def validate_chunk_keys(chunk_keys: Iterable[ChunkKey]):
f"Inconsistent number of dimensions between chunk key {key} and {first_key}: {other_ndim} vs {ndim}"
)

# Check that the keys collectively form a complete grid
check_keys_form_grid(chunk_keys)


def get_chunk_grid_shape(chunk_keys: Iterable[ChunkKey]) -> Tuple[int, ...]:
# find max chunk index along each dimension
Expand All @@ -167,24 +231,6 @@ def get_chunk_grid_shape(chunk_keys: Iterable[ChunkKey]) -> Tuple[int, ...]:
return chunk_grid_shape


def check_keys_form_grid(chunk_keys: Iterable[ChunkKey]):
"""Check that the chunk keys collectively form a complete grid"""

chunk_grid_shape = get_chunk_grid_shape(chunk_keys)

# create every possible combination
all_possible_combos = itertools.product(
*[range(length) for length in chunk_grid_shape]
)
all_required_chunk_keys: set[ChunkKey] = set(
join(inds) for inds in all_possible_combos
)

# check that every possible combination is represented once in the list of chunk keys
if set(chunk_keys) != all_required_chunk_keys:
raise ValueError("Chunk keys do not form a complete grid")


def concat_manifests(manifests: List["ChunkManifest"], axis: int) -> "ChunkManifest":
"""
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines 188-263 are what we get rid of by doing concatenation/stacking via the wrapped structured array.

Concatenate manifests along an existing dimension.
Expand Down
65 changes: 25 additions & 40 deletions virtualizarr/tests/test_manifests/test_manifest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import pytest
from pydantic import ValidationError

from virtualizarr.manifests import ChunkManifest, concat_manifests, stack_manifests

Expand All @@ -9,7 +8,7 @@ def test_create_manifest(self):
chunks = {
"0.0.0": {"path": "s3://bucket/foo.nc", "offset": 100, "length": 100},
}
manifest = ChunkManifest(entries=chunks)
manifest = ChunkManifest.from_dict(chunks)
assert manifest.dict() == chunks

chunks = {
Expand All @@ -18,15 +17,15 @@ def test_create_manifest(self):
"0.1.0": {"path": "s3://bucket/foo.nc", "offset": 300, "length": 100},
"0.1.1": {"path": "s3://bucket/foo.nc", "offset": 400, "length": 100},
}
manifest = ChunkManifest(entries=chunks)
manifest = ChunkManifest.from_dict(chunks)
assert manifest.dict() == chunks

def test_invalid_chunk_entries(self):
chunks = {
"0.0.0": {"path": "s3://bucket/foo.nc"},
}
with pytest.raises(ValidationError, match="missing"):
ChunkManifest(entries=chunks)
with pytest.raises(ValueError, match="must be of the form"):
ChunkManifest.from_dict(chunks)

chunks = {
"0.0.0": {
Expand All @@ -35,36 +34,22 @@ def test_invalid_chunk_entries(self):
"length": 100,
},
}
with pytest.raises(ValidationError, match="should be a valid integer"):
ChunkManifest(entries=chunks)
with pytest.raises(ValueError, match="must be of the form"):
ChunkManifest.from_dict(chunks)

def test_invalid_chunk_keys(self):
chunks = {
"0.0.": {"path": "s3://bucket/foo.nc", "offset": 100, "length": 100},
}
with pytest.raises(ValueError, match="Invalid format for chunk key: '0.0.'"):
ChunkManifest(entries=chunks)
ChunkManifest.from_dict(chunks)

chunks = {
"0.0": {"path": "s3://bucket/foo.nc", "offset": 100, "length": 100},
"0": {"path": "s3://bucket/foo.nc", "offset": 200, "length": 100},
}
with pytest.raises(ValueError, match="Inconsistent number of dimensions"):
ChunkManifest(entries=chunks)

chunks = {
"0.0.0": {"path": "s3://bucket/foo.nc", "offset": 100, "length": 100},
"0.0.1": {"path": "s3://bucket/foo.nc", "offset": 200, "length": 100},
"0.1.0": {"path": "s3://bucket/foo.nc", "offset": 300, "length": 100},
}
with pytest.raises(ValueError, match="do not form a complete grid"):
ChunkManifest(entries=chunks)

chunks = {
"1": {"path": "s3://bucket/foo.nc", "offset": 100, "length": 100},
}
with pytest.raises(ValueError, match="do not form a complete grid"):
ChunkManifest(entries=chunks)
ChunkManifest.from_dict(chunks)


class TestProperties:
Expand All @@ -75,21 +60,21 @@ def test_chunk_grid_info(self):
"0.1.0": {"path": "s3://bucket/foo.nc", "offset": 300, "length": 100},
"0.1.1": {"path": "s3://bucket/foo.nc", "offset": 400, "length": 100},
}
manifest = ChunkManifest(entries=chunks)
manifest = ChunkManifest.from_dict(chunks)
assert manifest.ndim_chunk_grid == 3
assert manifest.shape_chunk_grid == (1, 2, 2)


class TestEquals:
def test_equals(self):
manifest1 = ChunkManifest(
entries={
manifest1 = ChunkManifest.from_dict(
{
"0.0.0": {"path": "foo.nc", "offset": 100, "length": 100},
"0.0.1": {"path": "foo.nc", "offset": 200, "length": 100},
}
)
manifest2 = ChunkManifest(
entries={
manifest2 = ChunkManifest.from_dict(
{
"0.0.0": {"path": "foo.nc", "offset": 300, "length": 100},
"0.0.1": {"path": "foo.nc", "offset": 400, "length": 100},
}
Expand All @@ -102,21 +87,21 @@ def test_equals(self):
# Perhaps by testing the property that splitting along a dimension then concatenating the pieces along that dimension should recreate the original manifest?
class TestCombineManifests:
def test_concat(self):
manifest1 = ChunkManifest(
entries={
manifest1 = ChunkManifest.from_dict(
{
"0.0.0": {"path": "foo.nc", "offset": 100, "length": 100},
"0.0.1": {"path": "foo.nc", "offset": 200, "length": 100},
}
)
manifest2 = ChunkManifest(
entries={
manifest2 = ChunkManifest.from_dict(
{
"0.0.0": {"path": "foo.nc", "offset": 300, "length": 100},
"0.0.1": {"path": "foo.nc", "offset": 400, "length": 100},
}
)
axis = 1
expected = ChunkManifest(
entries={
expected = ChunkManifest.from_dict(
{
"0.0.0": {"path": "foo.nc", "offset": 100, "length": 100},
"0.0.1": {"path": "foo.nc", "offset": 200, "length": 100},
"0.1.0": {"path": "foo.nc", "offset": 300, "length": 100},
Expand All @@ -128,21 +113,21 @@ def test_concat(self):
assert result.dict() == expected.dict()

def test_stack(self):
manifest1 = ChunkManifest(
entries={
manifest1 = ChunkManifest.from_dict(
{
"0.0": {"path": "foo.nc", "offset": 100, "length": 100},
"0.1": {"path": "foo.nc", "offset": 200, "length": 100},
}
)
manifest2 = ChunkManifest(
entries={
manifest2 = ChunkManifest.from_dict(
{
"0.0": {"path": "foo.nc", "offset": 300, "length": 100},
"0.1": {"path": "foo.nc", "offset": 400, "length": 100},
}
)
axis = 1
expected = ChunkManifest(
entries={
expected = ChunkManifest.from_dict(
{
"0.0.0": {"path": "foo.nc", "offset": 100, "length": 100},
"0.0.1": {"path": "foo.nc", "offset": 200, "length": 100},
"0.1.0": {"path": "foo.nc", "offset": 300, "length": 100},
Expand Down
Loading