Skip to content

Commit

Permalink
fix: support options in ak.merge_union_of_records (#2236)
Browse files Browse the repository at this point in the history
  • Loading branch information
agoose77 committed Feb 14, 2023
1 parent 4292cf6 commit 6b49960
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 109 deletions.
3 changes: 3 additions & 0 deletions src/awkward/_nplikes/array_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ def regularize_index_for_length(
def nonzero(self, x: ArrayLike) -> tuple[ArrayLike, ...]:
return self._module.nonzero(x)

def where(self, condition: ArrayLike, x1: ArrayLike, x2: ArrayLike) -> ArrayLike:
return self._module.where(condition, x1, x2)

def unique_values(self, x: ArrayLike) -> ArrayLike:
return self._module.unique(
x,
Expand Down
4 changes: 4 additions & 0 deletions src/awkward/_nplikes/numpylike.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ def reshape(
def nonzero(self, x: ArrayLike) -> tuple[ArrayLike, ...]:
...

@abstractmethod
def where(self, condition: ArrayLike, x1: ArrayLike, x2: ArrayLike) -> ArrayLike:
...

@abstractmethod
def unique_values(self, x: ArrayLike) -> ArrayLike:
...
Expand Down
9 changes: 8 additions & 1 deletion src/awkward/_nplikes/typetracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1031,9 +1031,16 @@ def nonzero(self, x: ArrayLike) -> tuple[TypeTracerArray, ...]:
try_touch_data(x)
return (TypeTracerArray._new(np.int64, (unknown_length,)),) * len(x.shape)

def where(
self, condition: ArrayLike, x1: ArrayLike, x2: ArrayLike
) -> TypeTracerArray:
condition, x1, x2 = self.broadcast_arrays(condition, x1, x2)
result_dtype = numpy.result_type(x1, x2)
return TypeTracerArray._new(result_dtype, shape=condition.shape)

def unique_values(self, x: ArrayLike) -> TypeTracerArray:
try_touch_data(x)
return TypeTracerArray._new(x.dtype, shape=(None,))
return TypeTracerArray._new(x.dtype, shape=(unknown_length,))

def concat(self, arrays, *, axis: int | None = 0) -> TypeTracerArray:
if axis is None:
Expand Down
314 changes: 206 additions & 108 deletions src/awkward/operations/ak_merge_union_of_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


import awkward as ak
from awkward._nplikes.numpylike import NumpyMetadata
from awkward._nplikes.numpylike import ArrayLike, NumpyMetadata

np = NumpyMetadata.instance()
cpu = ak._backends.NumpyBackend.instance()
Expand All @@ -24,11 +24,21 @@ def merge_union_of_records(array, axis=-1, *, highlevel=True, behavior=None):
Simplifies unions of records, e.g.
>>> array = ak.concatenate(([{"a": 1}], [{"b": 2}]))
>>> array
<Array [{a: 1}, {b: 2}] type='2 * union[{a: int64}, {b: int64}]'>
into records of options, i.e.
>>> ak.merge_union_of_records(array)
<Array [{a: 1, b: None}, {a: None, ...}] type='2 * {a: ?int64, b: ?int64}'>
Missing records are preserved in the result, e.g.
>>> array = ak.concatenate(([{"a": 1}], [{"b": 2}, None]))
>>> array
<Array [{a: 1}, {b: 2}, None] type='3 * union[{a: int64}, ?{b: int64}]'>
>>> ak.merge_union_of_records(array)
<Array [{a: 1, b: None}, {...}, None] type='3 * ?{a: ?int64, b: ?int64}'>
"""
with ak._errors.OperationErrorContext(
"ak.merge_union_of_records",
Expand All @@ -41,32 +51,102 @@ def _impl(array, axis, highlevel, behavior):
behavior = ak._util.behavior_of(array, behavior=behavior)
layout = ak.to_layout(array, allow_record=False)

def apply_displace_index(layout, backend, **kwargs):
if layout.is_record:
return layout
elif layout.is_option and layout.content.is_record:
raise ak._errors.wrap_error(
TypeError(
"optional records cannot be merged by this function. First call `ak.merge_option_of_records` "
"to convert these into records of options."
def invert_record_union(
tags: ArrayLike, index: ArrayLike, contents, *, backend
) -> ak.contents.RecordArray:
index_nplike = backend.index_nplike
# First, create an ordered list containing the union of all fields
seen_fields = set()
all_fields = []
for content in contents:
# Find new fields
for field in content.fields:
if field not in seen_fields:
seen_fields.add(field)
all_fields.append(field)

# Build unions for each field
outer_field_contents = []
for field in all_fields:
field_tags = index_nplike.asarray(tags, copy=True)
field_index = index_nplike.asarray(index, copy=True)

# Build contents for union representing current field
field_contents = [c.content(field) for c in contents if c.has_field(field)]

# Find the best location for option type.
# We will potentially have fewer contents in this per-field union
# than the original outer union-of-records, because some recordarrays
# may not have the given field.
tag_for_missing = 0
for i, content in enumerate(field_contents):
if content.is_option:
tag_for_missing = i
break

# If at least one recordarray doesn't have this field, we add
# a special option
if len(field_contents) < len(contents):
# Make the tagged content an option, growing by one to ensure we
# have a known `None` value to index into
tagged_content = field_contents[tag_for_missing]
indexedoption_index = backend.index_nplike.arange(
tagged_content.length + 1, dtype=np.int64
)
indexedoption_index[
index_nplike.shape_item_as_index(tagged_content.length)
] = -1
field_contents[
tag_for_missing
] = ak.contents.IndexedOptionArray.simplified(
ak.index.Index64(indexedoption_index), tagged_content
)

# Index of None values in tagged content (content with extra None item at end)
index_missing = index_nplike.shape_item_as_index(
field_contents[tag_for_missing].length - 1
)
elif layout.is_indexed and layout.content.is_record:
record = layout.content
# Transpose index-of-record to record-of-index
return ak.contents.RecordArray(
[
ak.contents.IndexedArray.simplified(
layout.index, c, parameters=layout._parameters
)
for c in record.contents
],
record.fields,
record.length,
backend=backend,
# Now build contents for union, by looping over outermost index
# Overwrite tags to adjust for new contents length
# and use the tagged content for any missing values
k = 0
for j, content in enumerate(contents):
tag_is_j = field_tags == j

if content.has_field(field):
# Rewrite tags to account for missing fields
field_tags[tag_is_j] = k
k += 1

else:
# Rewrite tags to point to option content
field_tags[tag_is_j] = tag_for_missing
# Point each value to missing value
field_index[tag_is_j] = index_missing

outer_field_contents.append(
ak.contents.UnionArray.simplified(
ak.index.Index8(field_tags),
ak.index.Index64(field_index),
field_contents,
)
)
else:
raise ak._errors.wrap_error(TypeError(layout))
return ak.contents.RecordArray(
outer_field_contents, all_fields, backend=backend
)

def compact_option_index(index: ArrayLike, *, backend) -> ArrayLike:
# Find dense (outer) index into non-null items.
# This is in trivial order: the re-arranging is done by the union (below)
is_none = index < 0
num_none = backend.index_nplike.count_nonzero(is_none)
dense_index = backend.index_nplike.empty(index.size, dtype=index.dtype)
dense_index[is_none] = -1
dense_index[~is_none] = backend.index_nplike.arange(
index.size - num_none,
dtype=index.dtype,
)
return dense_index

def apply(layout, depth, backend, **kwargs):
posaxis = ak._util.maybe_posaxis(layout, axis, depth)
Expand All @@ -75,93 +155,111 @@ def apply(layout, depth, backend, **kwargs):
np.AxisError(f"axis={axis} exceeds the depth of this array ({depth})")
)
elif depth == posaxis + 1 and layout.is_union:
if all(x.is_record for x in layout.contents):
# First, find all ordered fields, regularising any index-of-record
# such that we have record-of-index
seen_fields = set()
all_fields = []
regularised_contents = []
for content in layout.contents:
# Ensure that we have record-of-index
regularised_content = ak._do.recursively_apply(
content, apply_displace_index
)
regularised_contents.append(regularised_content)

# Find new fields
for field in regularised_content.fields:
if field not in seen_fields:
seen_fields.add(field)
all_fields.append(field)

# Build unions for each field
outer_field_contents = []
for field in all_fields:
field_tags = backend.index_nplike.asarray(layout.tags, copy=True)
field_index = backend.index_nplike.asarray(layout.index, copy=True)

# Build contents for union representing current field
field_contents = [
c.content(field)
for c in regularised_contents
if c.has_field(field)
]

# Find the best location for option type.
# We will potentially have fewer contents in this per-field union
# than the original outer union-of-records, because some recordarrays
# may not have the given field.
tag_for_missing = 0
for i, content in enumerate(field_contents):
if not all(
x.is_record or x.is_indexed or x.is_option for x in layout.contents
):
return

# Any option types need to be re-written
if any(x.is_option for x in layout.contents):
# We'll rebuild the union to include only the non-null items.
inner_union_index_parts = []
next_contents = []
next_tags_sparse = backend.index_nplike.asarray(layout.tags, copy=True)
for tag, content in enumerate(layout.contents):
is_this_tag = backend.index_nplike.asarray(layout.tags) == tag

# Union arrays for this content
tag_index = backend.index_nplike.asarray(layout.index)[is_this_tag]

# For unmasked arrays, we can directly take the content
if isinstance(content, ak.contents.UnmaskedArray):
next_contents.append(content.content)
inner_union_index_parts.append(tag_index)
# Otherwise, we need to rebuild the index
elif content.is_option or content.is_indexed:
# Let's work with indexed option types for ease
if content.is_option:
tag_for_missing = i
break

# If at least one recordarray doesn't have this field, we add
# a special option
if len(field_contents) < len(regularised_contents):
# Make the tagged content an option, growing by one to ensure we
# have a known `None` value to index into
tagged_content = field_contents[tag_for_missing]
indexedoption_index = backend.index_nplike.arange(
tagged_content.length + 1, dtype=np.int64
)
indexedoption_index[tagged_content.length] = -1
field_contents[
tag_for_missing
] = ak.contents.IndexedOptionArray.simplified(
ak.index.Index64(indexedoption_index), tagged_content
)
content = content.to_IndexedOptionArray64()

# Now build contents for union, by looping over outermost index
# Overwrite tags to adjust for new contents length
# and use the tagged content for any missing values
k = 0
for j, content in enumerate(regularised_contents):
tag_is_j = field_tags == j

if content.has_field(field):
# Rewrite tags to account for missing fields
field_tags[tag_is_j] = k
k += 1

else:
# Rewrite tags to point to option content
field_tags[tag_is_j] = tag_for_missing
# Point each value to missing value
field_index[tag_is_j] = (
field_contents[tag_for_missing].length - 1
)

outer_field_contents.append(
ak.contents.UnionArray.simplified(
ak.index.Index8(field_tags),
ak.index.Index64(field_index),
field_contents,
# First, find the inner index that actually re-arranges the (non-null) items
content_index = backend.index_nplike.asarray(content.index)
merged_index = content_index[tag_index]
is_non_null = merged_index >= 0
inner_union_index_parts.append(merged_index[is_non_null])
# Mask out tags of items that are missing
next_tags_sparse[is_this_tag] = backend.index_nplike.where(
is_non_null, tag, -1
)
)
return ak.contents.RecordArray(
outer_field_contents, all_fields, backend=backend

# The length of this index/option content is irrelevant; the union provides the length
next_contents.append(content.content)
# Non-indexed/option types are trivially included as-is
else:
next_contents.append(content)
inner_union_index_parts.append(tag_index)

# We'll create an outermost indexed-option type, which re-instates the missing values.
# This should have the same length as the original union, and its index should be "dense"
# (contiguous, monotonic integers; or -1). Therefore, we can directly compute it from the "sparse"
# tags index, which has the same length as the original union, and has only missing items set to -1.
outer_option_dense_index = compact_option_index(
next_tags_sparse, backend=backend
)

# Ignore missing items for inner union, creating a dense array of tags
next_tags = next_tags_sparse[next_tags_sparse >= 0]
# Build dense index from parts for each tag
next_index = backend.index_nplike.empty(next_tags.size, dtype=np.int64)
for tag, content_index in enumerate(inner_union_index_parts):
next_index[next_tags == tag] = content_index

# Return option around record of unions
return ak.contents.IndexedOptionArray(
ak.index.Index64(outer_option_dense_index),
invert_record_union(
next_tags, next_index, next_contents, backend=backend
),
)

# Any index types need to be re-written
elif any(x.is_indexed for x in layout.contents):
# We'll create an outermost indexed-option type, which re-instates the missing values
current_index = backend.index_nplike.asarray(layout.index)
next_index = backend.index_nplike.empty(
current_index.size, dtype=np.int64
)

# We'll rebuild the union to include only the non-null items.
next_contents = []
for tag, content in enumerate(layout.contents):
is_this_tag = backend.index_nplike.asarray(layout.tags) == tag

# Rewrite union index of indexed types
if content.is_indexed:
content_index = backend.index_nplike.asarray(content.index)
next_index[is_this_tag] = content_index[
current_index[is_this_tag]
]
next_contents.append(content.content)

else:
next_index[is_this_tag] = current_index[is_this_tag]
next_contents.append(content)

return invert_record_union(
backend.index_nplike.asarray(layout.tags),
next_index,
next_contents,
backend=backend,
)

else:
return invert_record_union(
backend.index_nplike.asarray(layout.tags),
backend.index_nplike.asarray(layout.index),
layout.contents,
backend=backend,
)

out = ak._do.recursively_apply(layout, apply)
Expand Down
Loading

0 comments on commit 6b49960

Please sign in to comment.