Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] fix: The aggs result of NestedAggregator with sub NestedAggregator may be not accurately #14496

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix the computed max shards of cluster to avoid int overflow ([#14155](https://github.com/opensearch-project/OpenSearch/pull/14155))
- Fixed rest-high-level client searchTemplate & mtermVectors endpoints to have a leading slash ([#14465](https://github.com/opensearch-project/OpenSearch/pull/14465))
- Write shard level metadata blob when snapshotting searchable snapshot indexes ([#13190](https://github.com/opensearch-project/OpenSearch/pull/13190))
- Fix aggs result of NestedAggregator with sub NestedAggregator ([#13324](https://github.com/opensearch-project/OpenSearch/pull/13324))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.util.BitSet;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.core.ParseField;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.ObjectMapper;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -88,12 +90,25 @@ public class NestedAggregator extends BucketsAggregator implements SingleBucketA
) throws IOException {
super(name, factories, context, parent, cardinality, metadata);

Query parentFilter = parentObjectMapper != null ? parentObjectMapper.nestedTypeFilter() : Queries.newNonNestedFilter();
Query parentFilter = isParent(parentObjectMapper, childObjectMapper, context.mapperService())
? parentObjectMapper.nestedTypeFilter()
: Queries.newNonNestedFilter();
this.parentFilter = context.bitsetFilterCache().getBitSetProducer(parentFilter);
this.childFilter = childObjectMapper.nestedTypeFilter();
this.collectsFromSingleBucket = cardinality.map(estimate -> estimate < 2);
}

private boolean isParent(ObjectMapper parentObjectMapper, ObjectMapper childObjectMapper, MapperService mapperService) {
if (parentObjectMapper == null) {
return false;
}
ObjectMapper parent;
do {
parent = childObjectMapper.getParentObjectMapper(mapperService);
} while (parent != null && parent != parentObjectMapper);
return parentObjectMapper == parent;
}

@Override
public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(ctx);
Expand All @@ -107,20 +122,17 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final L
if (collectsFromSingleBucket) {
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
public void collect(int parentAggDoc, long bucket) throws IOException {
// parentAggDoc can be 0 when aggregation:
if (parentDocs == null || childDocs == null) {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
Tuple<Integer, Integer> res = getParentAndChildId(parentDocs, childDocs, parentAggDoc);
int currentParentDoc = res.v1();
int childDocId = res.v2();

for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
for (; childDocId < currentParentDoc; childDocId = childDocs.nextDoc()) {
collectBucket(sub, childDocId, bucket);
}
}
Expand All @@ -130,6 +142,43 @@ public void collect(int parentDoc, long bucket) throws IOException {
}
}

/**
* In one case, it's talking about the parent doc (from the Lucene block-join standpoint),
* while in the other case, it's talking about a child doc ID (from the block-join standpoint)
* from the parent aggregation, where we're trying to aggregate over a sibling of that child.
* So, we need to map from that document to its parent, then join to the appropriate sibling.
*
* @param parentAggDoc the parent aggregation's current doc
* (which may or may not be a block-level parent doc)
* @return a tuple consisting of the current block-level parent doc (the parent of the
* parameter doc), and the next matching child doc (hopefully under this parent)
* for the aggregation (according to the child doc iterator).
*/
static Tuple<Integer, Integer> getParentAndChildId(BitSet parentDocs, DocIdSetIterator childDocs, int parentAggDoc) throws IOException {
int currentParentAggDoc;
int prevParentDoc = parentDocs.prevSetBit(parentAggDoc);
if (prevParentDoc == -1) {
currentParentAggDoc = parentDocs.nextSetBit(0);
} else if (prevParentDoc == parentAggDoc) {
// parentAggDoc is the parent of that child, and is belongs to parentDocs
currentParentAggDoc = parentAggDoc;
if (currentParentAggDoc == 0) {
prevParentDoc = -1;
} else {
prevParentDoc = parentDocs.prevSetBit(currentParentAggDoc - 1);
}
} else {
// parentAggDoc is the sibling of that child, and it means the block-join parent
currentParentAggDoc = parentDocs.nextSetBit(prevParentDoc + 1);
}

int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
return Tuple.tuple(currentParentAggDoc, childDocId);
}

@Override
protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException {
super.preGetSubLeafCollectors(ctx);
Expand Down Expand Up @@ -191,9 +240,8 @@ public void setScorer(Scorable scorer) throws IOException {

@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
// parentAggDoc can be 0 when aggregation:
if (parentDocs == null || childDocs == null) {
return;
}

Expand All @@ -214,11 +262,9 @@ void processBufferedChildBuckets() throws IOException {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(currentParentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
Tuple<Integer, Integer> res = getParentAndChildId(parentDocs, childDocs, currentParentDoc);
int currentParentDoc = res.v1();
int childDocId = res.v2();

for (; childDocId < currentParentDoc; childDocId = childDocs.nextDoc()) {
cachedScorer.doc = childDocId;
Expand Down
Loading
Loading