Skip to content

Commit

Permalink
Merge branch 'main' into one-pass
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant committed Jul 30, 2024
2 parents 8e1d507 + a3c201d commit d1922ab
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 71 deletions.
9 changes: 5 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ repos:
- --target-version=py312

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.4.3
rev: v0.5.4
hooks:
- id: ruff

Expand All @@ -35,7 +35,7 @@ repos:
language_version: python3

- repo: https://github.com/asottile/pyupgrade
rev: v3.15.2
rev: v3.16.0
hooks:
- id: pyupgrade
args:
Expand All @@ -52,16 +52,17 @@ repos:
- id: yesqa

- repo: https://github.com/adamchainz/blacken-docs
rev: 1.16.0
rev: 1.18.0
hooks:
- id: blacken-docs
additional_dependencies:
- black

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.10.0
rev: v1.11.0
hooks:
- id: mypy
files: "src/"
args: [--ignore-missing-imports]
additional_dependencies:
- dask
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ src_paths = ["src", "tests"]

[tool.mypy]
python_version = "3.9"
files = ["src", "tests"]
files = ["src"]
exclude = ["tests/"]
strict = false
warn_unused_configs = true
show_error_codes = true
Expand Down
1 change: 1 addition & 0 deletions src/dask_awkward/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
with_field,
with_name,
with_parameter,
without_field,
without_parameters,
zeros_like,
zip,
Expand Down
1 change: 1 addition & 0 deletions src/dask_awkward/lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
with_field,
with_name,
with_parameter,
without_field,
without_parameters,
zeros_like,
zip,
Expand Down
114 changes: 87 additions & 27 deletions src/dask_awkward/lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,17 @@ def f(self, other):
if is_dask_collection(other):
task = (op, self.key, *other.__dask_keys__())
deps.append(other)
plns.append(other.name)
if inv:
plns.insert(0, other.name)
else:
plns.append(other.name)
else:
task = (op, self.key, other)
if inv:
task = (op, other, self.key)
else:
task = (op, self.key, other)
if inv:
plns.reverse()
graph = HighLevelGraph.from_collections(
name,
layer=AwkwardMaterializedLayer(
Expand All @@ -520,10 +528,16 @@ def f(self, other):
),
dependencies=tuple(deps),
)
if isinstance(other, Scalar):
meta = op(self._meta, other._meta)
if isinstance(other, (Scalar, Array)):
if inv:
meta = op(other._meta, self._meta)
else:
meta = op(self._meta, other._meta)
else:
meta = op(self._meta, other)
if inv:
meta = op(other, self._meta)
else:
meta = op(self._meta, other)
commit_to_reports(name, self.report)
return new_scalar_object(graph, name, meta=meta)

Expand Down Expand Up @@ -667,7 +681,7 @@ def new_known_scalar(
Examples
--------
>>> from dask_awkward.core import new_known_scalar
>>> from dask_awkward.lib.core import new_known_scalar
>>> a = new_known_scalar(5, label="five")
>>> a
dask.awkward<five, type=Scalar, dtype=int64, known_value=5>
Expand Down Expand Up @@ -959,29 +973,75 @@ def repartition(
npartitions: int | None = None,
divisions: tuple[int, ...] | None = None,
rows_per_partition: int | None = None,
one_to_n: int | None = None,
n_to_one: int | None = None,
) -> Array:
"""Restructure the partitioning of the whole array
Various schemes are possible, with one of the mutually exclusive
optional arguments for each. Of these, the first three require
knowledge of the number of rows in each existing partition, which
will be eagerly computed if not already known, and some shuffling of
data between partitions.
- npartitions: split all the rows as evenly as possible into this
many output partitions.
- divisions: exact row count offsets of each output partition
- rows_per_partition: each partition will have this many rows,
except the last, which will have this number or fewer
- one_to_n: each input partition becomes n output partitions
- n_to_one: every n adjacent input partitions becomes one
output partition. Note that exactly one output partition
(npartitions=1) is a special case of this.
"""
from dask_awkward.layers import AwkwardMaterializedLayer
from dask_awkward.lib.structure import repartition_layer
from dask_awkward.lib.structure import (
repartition_layer,
simple_repartition_layer,
)

if sum(bool(_) for _ in [npartitions, divisions, rows_per_partition]) != 1:
if (
sum(
bool(_)
for _ in (
npartitions,
divisions,
rows_per_partition,
one_to_n,
n_to_one,
)
)
!= 1
):
raise ValueError("Please specify exactly one of the inputs")
if not self.known_divisions:
self.eager_compute_divisions()
nrows = self.defined_divisions[-1]
new_divisions: tuple[int, ...] = tuple()
if divisions:
new_divisions = divisions
elif npartitions:
rows_per_partition = math.ceil(nrows / npartitions)
if rows_per_partition:
new_divs = list(range(0, nrows, rows_per_partition))
new_divs.append(nrows)
new_divisions = tuple(new_divs)

token = tokenize(self, divisions)
key = f"repartition-{token}"

new_layer_raw = repartition_layer(self, key, new_divisions)
new_divisions: tuple[int, ...] = ()
if npartitions and npartitions == 1:
npartitions, n_to_one = None, self.npartitions
if n_to_one or one_to_n:
token = tokenize(self, n_to_one, one_to_n)
key = f"repartition-{token}"
new_layer_raw, new_divisions = simple_repartition_layer(
self, n_to_one, one_to_n, key
)
else:
if not self.known_divisions:
self.eager_compute_divisions()
nrows = self.defined_divisions[-1]
if divisions:
if divisions == self.divisions:
# noop
return self
new_divisions = divisions
elif npartitions:
rows_per_partition = math.ceil(nrows / npartitions)
if rows_per_partition:
new_divs = list(range(0, nrows, rows_per_partition))
new_divs.append(nrows)
new_divisions = tuple(new_divs)
token = tokenize(self, divisions)
key = f"repartition-{token}"
new_layer_raw = repartition_layer(self, key, new_divisions)

new_layer = AwkwardMaterializedLayer(
new_layer_raw,
previous_layer_names=[self.name],
Expand Down Expand Up @@ -2445,7 +2505,7 @@ def meta_or_identity(obj: Any) -> Any:
--------
>>> import awkward as ak
>>> import dask_awkward as dak
>>> from dask_awkward.core import meta_or_identity
>>> from dask_awkward.lib.core import meta_or_identity
>>> x = ak.from_iter([[1, 2, 3], [4]])
>>> x = dak.from_awkward(x, npartitions=2)
>>> x
Expand Down Expand Up @@ -2641,7 +2701,7 @@ def normalize_single_outer_inner_index(
Examples
--------
>>> from dask_awkward.utils import normalize_single_outer_inner_index
>>> from dask_awkward.lib.core import normalize_single_outer_inner_index
>>> divisions = (0, 3, 6, 9)
>>> normalize_single_outer_inner_index(divisions, 0)
(0, 0)
Expand Down
113 changes: 98 additions & 15 deletions src/dask_awkward/lib/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
"values_astype",
"where",
"with_field",
"without_field",
"with_name",
"with_parameter",
"without_parameters",
Expand Down Expand Up @@ -602,26 +603,20 @@ def mask(
@borrow_docstring(ak.nan_to_num)
def nan_to_num(
array: Array,
copy: bool = True,
nan: float = 0.0,
posinf: Any | None = None,
neginf: Any | None = None,
highlevel: bool = True,
behavior: Any | None = None,
attrs: Mapping[str, Any] | None = None,
) -> Array:
# return map_partitions(
# ak.nan_to_num,
# array,
# output_partitions=1,
# copy=copy,
# nan=nan,
# posinf=posinf,
# neginf=neginf,
# highlevel=highlevel,
# behavior=behavior,
# )
raise DaskAwkwardNotImplemented("TODO")
return map_partitions(
ak.nan_to_num,
array,
nan=nan,
posinf=posinf,
neginf=neginf,
highlevel=True,
behavior=behavior,
)


def _numaxis0(*integers):
Expand Down Expand Up @@ -1093,6 +1088,46 @@ def with_field(
)


class _WithoutFieldFn:
def __init__(
self,
highlevel: bool,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
) -> None:
self.highlevel = highlevel
self.behavior = behavior
self.attrs = attrs

def __call__(self, array: ak.Array, where: str) -> ak.Array:
return ak.without_field(
array, where=where, behavior=self.behavior, attrs=self.attrs
)


@borrow_docstring(ak.without_field)
def without_field(
base: Array,
where: str,
highlevel: bool = True,
behavior: Mapping | None = None,
attrs: Mapping[str, Any] | None = None,
) -> Array:
if not highlevel:
raise ValueError("Only highlevel=True is supported")

if not isinstance(base, Array):
raise ValueError("Base argument in without_field must be a dask_awkward.Array")

return map_partitions(
_WithoutFieldFn(highlevel=highlevel, behavior=behavior, attrs=attrs),
base,
where,
label="without-field",
output_divisions=1,
)


class _WithNameFn:
def __init__(
self,
Expand Down Expand Up @@ -1360,6 +1395,54 @@ def repartition_layer(arr: Array, key: str, divisions: tuple[int, ...]) -> dict:
return layer


def _subpart(data: ak.Array, parts: int, part: int) -> ak.Array:
from dask_awkward.lib.core import is_typetracer

if is_typetracer(data):
return data
rows_per = len(data) // parts
return data[
part * rows_per : None if part == (parts - 1) else (part + 1) * rows_per
]


def _subcat(*arrs: tuple[ak.Array, ...]) -> ak.Array:
return ak.concatenate(arrs)


def simple_repartition_layer(
arr: Array, n_to_one: int | None, one_to_n: int | None, key: str
) -> tuple[dict, tuple[Any, ...]]:
layer: dict[tuple[str, int], tuple[Any, ...]] = {}
new_divisions: tuple[Any, ...]
if n_to_one:
for i0, i in enumerate(range(0, arr.npartitions, n_to_one)):
layer[(key, i0)] = (_subcat,) + tuple(
(arr.name, part)
for part in range(i, min(i + n_to_one, arr.npartitions))
)
new_divisions = arr.divisions[::n_to_one]
if arr.npartitions % n_to_one:
new_divisions = new_divisions + (arr.divisions[-1],)
layer[(key, i0 + 1)] = (_subcat,) + tuple(
(arr.name, part0) for part0 in range(len(layer), arr.npartitions)
)
elif one_to_n:
for i in range(arr.npartitions):
for part in range(one_to_n):
layer[(key, (i * one_to_n + part))] = (
_subpart,
(arr.name, i),
one_to_n,
part,
)
# TODO: if arr.known_divisions:
new_divisions = (None,) * (arr.npartitions * one_to_n + 1)
else:
raise ValueError
return layer, new_divisions


@borrow_docstring(ak.enforce_type)
def enforce_type(
array: Array,
Expand Down
Loading

0 comments on commit d1922ab

Please sign in to comment.