Skip to content

Commit

Permalink
Squash Commit
Browse files Browse the repository at this point in the history
Signed-off-by: luyuncheng <luyuncheng@bytedance.com>
  • Loading branch information
luyuncheng committed Jul 7, 2024
1 parent 5139b16 commit 2a61fcd
Show file tree
Hide file tree
Showing 9 changed files with 1,121 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
### Features
* Reuse KNNVectorFieldData for reduce disk usage [#1571](https://github.com/opensearch-project/k-NN/pull/1571)
### Enhancements
### Bug Fixes
### Infrastructure
Expand Down
24 changes: 23 additions & 1 deletion src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.index.memory.NativeMemoryCacheManagerDto;
import org.opensearch.knn.index.util.IndexHyperParametersUtil;
Expand Down Expand Up @@ -80,6 +81,7 @@ public class KNNSettings {
public static final String MODEL_CACHE_SIZE_LIMIT = "knn.model.cache.size.limit";
public static final String ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD = "index.knn.advanced.filtered_exact_search_threshold";
public static final String KNN_FAISS_AVX2_DISABLED = "knn.faiss.avx2.disabled";
public static final String KNN_SYNTHETIC_SOURCE_ENABLED = "index.knn.synthetic_source.enabled";

/**
* Default setting values
Expand Down Expand Up @@ -252,6 +254,13 @@ public class KNNSettings {
NodeScope
);

public static final Setting<Boolean> KNN_SYNTHETIC_SOURCE_ENABLED_SETTING = Setting.boolSetting(
KNN_SYNTHETIC_SOURCE_ENABLED,
false,
IndexScope,
Dynamic
);

/**
* Dynamic settings
*/
Expand Down Expand Up @@ -369,6 +378,10 @@ private Setting<?> getSetting(String key) {
return KNN_VECTOR_STREAMING_MEMORY_LIMIT_PCT_SETTING;
}

if (KNN_SYNTHETIC_SOURCE_ENABLED.equals(key)) {
return KNN_SYNTHETIC_SOURCE_ENABLED_SETTING;
}

throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}

Expand All @@ -387,7 +400,8 @@ public List<Setting<?>> getSettings() {
MODEL_CACHE_SIZE_LIMIT_SETTING,
ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD_SETTING,
KNN_FAISS_AVX2_DISABLED_SETTING,
KNN_VECTOR_STREAMING_MEMORY_LIMIT_PCT_SETTING
KNN_VECTOR_STREAMING_MEMORY_LIMIT_PCT_SETTING,
KNN_SYNTHETIC_SOURCE_ENABLED_SETTING
);
return Stream.concat(settings.stream(), dynamicCacheSettings.values().stream()).collect(Collectors.toList());
}
Expand Down Expand Up @@ -432,6 +446,14 @@ public static Integer getFilteredExactSearchThreshold(final String indexName) {
.getAsInt(ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD, ADVANCED_FILTERED_EXACT_SEARCH_THRESHOLD_DEFAULT_VALUE);
}

/**
* check this index enabled/disabled synthetic source
* @param indexSettings settings
*/
public static boolean isKNNSyntheticSourceEnabled(IndexSettings indexSettings) {
return indexSettings.getValue(KNN_SYNTHETIC_SOURCE_ENABLED_SETTING);
}

public void initialize(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

package org.opensearch.knn.index;

import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.search.DocIdSetIterator;
import org.opensearch.index.fielddata.LeafFieldData;
import org.opensearch.index.fielddata.ScriptDocValues;
import org.opensearch.index.fielddata.SortedBinaryDocValues;
import org.opensearch.index.mapper.DocValueFetcher;
import org.opensearch.search.DocValueFormat;

import java.io.IOException;

Expand Down Expand Up @@ -70,4 +73,41 @@ public ScriptDocValues<float[]> getScriptValues() {
public SortedBinaryDocValues getBytesValues() {
throw new UnsupportedOperationException("knn vector field '" + fieldName + "' doesn't support sorting");
}

@Override
public DocValueFetcher.Leaf getLeafValueFetcher(DocValueFormat format) {
final BinaryDocValues binaryDocValues;

try {
binaryDocValues = DocValues.getBinary(reader, fieldName);
} catch (IOException e) {
throw new IllegalStateException("Cannot load KNNDocValues from lucene", e);
}

return new DocValueFetcher.Leaf() {
float[] floats;
boolean docExists = false;

@Override
public boolean advanceExact(int docId) throws IOException {
if (binaryDocValues.advanceExact(docId)) {
docExists = true;
floats = vectorDataType.getVectorFromBytesRef(binaryDocValues.binaryValue());
return docExists;
}
docExists = false;
return docExists;
}

@Override
public int docValueCount() throws IOException {
return 1;
}

@Override
public Object nextValue() throws IOException {
return floats;
}
};
}
}
259 changes: 259 additions & 0 deletions src/main/java/org/opensearch/knn/index/fetch/KNNFetchSubPhase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.knn.index.fetch;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.log4j.Log4j2;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.BitSet;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.mapper.DocValueFetcher;
import org.opensearch.index.mapper.DocumentMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.ObjectMapper;
import org.opensearch.index.mapper.ValueFetcher;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.mapper.KNNVectorFieldMapper;
import org.opensearch.search.SearchHit;
import org.opensearch.search.fetch.FetchContext;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.FetchSubPhaseProcessor;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.lookup.SourceLookup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.knn.common.KNNConstants.BYTES_PER_KILOBYTES;

/**
* Fetch sub phase which pull data from doc values.
* and fulfill the value into source map
*/
@Log4j2
public class KNNFetchSubPhase implements FetchSubPhase {

@Override
public FetchSubPhaseProcessor getProcessor(FetchContext fetchContext) throws IOException {
IndexSettings indexSettings = fetchContext.getIndexSettings();
if (!KNNSettings.isKNNSyntheticSourceEnabled(indexSettings)) {
log.debug("Synthetic is disabled for index: {}", fetchContext.getIndexName());
return null;
}
MapperService mapperService = fetchContext.mapperService();

List<DocValueField> fields = new ArrayList<>();
for (MappedFieldType mappedFieldType : mapperService.fieldTypes()) {
if (mappedFieldType != null && mappedFieldType instanceof KNNVectorFieldMapper.KNNVectorFieldType) {
String fieldName = mappedFieldType.name();
ValueFetcher fetcher = new DocValueFetcher(
mappedFieldType.docValueFormat(null, null),
fetchContext.searchLookup().doc().getForField(mappedFieldType)
);
fields.add(new DocValueField(fieldName, fetcher));
}
}
return new KNNFetchSubPhaseProcessor(fetchContext, fields);
}

@AllArgsConstructor
@Getter
class KNNFetchSubPhaseProcessor implements FetchSubPhaseProcessor {

private final FetchContext fetchContext;
private final List<DocValueField> fields;

@Override
public void setNextReader(LeafReaderContext readerContext) throws IOException {
for (DocValueField f : fields) {
f.fetcher.setNextReader(readerContext);
}
}

@Override
public void process(HitContext hitContext) throws IOException {
MapperService mapperService = fetchContext.mapperService();
final boolean hasNested = mapperService.hasNested();
SearchHit hit = hitContext.hit();
Map<String, Object> maps = hit.getSourceAsMap();
if (maps == null) {
// when source is disabled, return
return;
}

if (hasNested) {
syntheticNestedFieldWithDocValues(mapperService, hitContext, maps);
}
for (DocValueField f : fields) {
if (maps.containsKey(f.field)) {
continue;
}
List<Object> docValuesSource = f.fetcher.fetchValues(hitContext.sourceLookup());
if (docValuesSource.size() > 0) {
maps.put(f.field, docValuesSource.get(0));
}
}
BytesStreamOutput streamOutput = new BytesStreamOutput(BYTES_PER_KILOBYTES);
XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), streamOutput);
builder.value(maps);
hitContext.hit().sourceRef(BytesReference.bytes(builder));
}

protected void syntheticNestedFieldWithDocValues(MapperService mapperService, HitContext hitContext, Map<String, Object> sourceMaps)
throws IOException {
DocumentMapper documentMapper = mapperService.documentMapper();
Map<String, ObjectMapper> mapperMap = documentMapper.objectMappers();

for (ObjectMapper objectMapper : mapperMap.values()) {
if (objectMapper == null) {
continue;
}
if (!objectMapper.nested().isNested()) {
continue;
}
String path = objectMapper.fullPath();
for (DocValueField f : fields) {
if (!checkNestedField(path, f, sourceMaps)) {
continue;
}
// nested array in one nested path
Object nestedObj = sourceMaps.get(path);
ArrayList nestedDocList = (ArrayList) nestedObj;

log.debug(
"object mapper: nested:"
+ objectMapper.nested().isNested()
+ " Value:"
+ objectMapper.fullPath()
+ " field:"
+ f.field
);

innerProcessOneNestedField(objectMapper, hitContext, nestedDocList, f, path);
}
}
}

private void innerProcessOneNestedField(
ObjectMapper objectMapper,
HitContext hitContext,
ArrayList nestedDocList,
DocValueField f,
String path
) throws IOException {

BitSet parentBits = getParentDocBitSet(hitContext);
DocIdSetIterator childIter = getChildDocIdSetIterator(objectMapper, hitContext);
LeafReaderContext subReaderContext = hitContext.readerContext();

SearchHit hit = hitContext.hit();
int currentParent = hit.docId() - subReaderContext.docBase;
int previousParent = parentBits.prevSetBit(currentParent - 1);
int childDocId = childIter.advance(previousParent + 1);
SourceLookup nestedVecSourceLookup = new SourceLookup();

// when nested field only have vector field and exclude source, list is empty
boolean isEmpty = nestedDocList.isEmpty();

for (int offset = 0; childDocId < currentParent && childDocId != DocIdSetIterator.NO_MORE_DOCS; childDocId = childIter
.nextDoc(), offset++) {
nestedVecSourceLookup.setSegmentAndDocument(subReaderContext, childDocId);
List<Object> nestedVecDocValuesSource = f.fetcher.fetchValues(nestedVecSourceLookup);
if (nestedVecDocValuesSource == null || nestedVecDocValuesSource.isEmpty()) {
continue;
}
if (isEmpty) {
nestedDocList.add(new HashMap<String, Object>());
}
if (offset < nestedDocList.size()) {
Object sourceObj = nestedDocList.get(offset);
if (sourceObj instanceof Map) {
Map<String, Object> sourceMap = (Map<String, Object>) sourceObj;
String suffix = f.field.substring(path.length() + 1);
sourceMap.put(suffix, nestedVecDocValuesSource.get(0));
}
} else {
/**
* TODO nested field partial doc only have vector and source exclude
* this source map nestedDocList would out-of-order, can not fill the vector into right offset
* "nested_field" : [
* {"nested_vector": [2.6, 2.6]},
* {"nested_numeric": 2, "nested_vector": [3.1, 2.3]}
* ]
*/
throw new UnsupportedOperationException(
String.format("\"Nested Path \"%s\" in Field \"%s\" with _ID \"%s\" can not be empty\"", path, f.field, hit.getId())
);
}
}
}

private BitSet getParentDocBitSet(HitContext hitContext) throws IOException {
Query parentFilter = Queries.newNonNestedFilter();
LeafReaderContext subReaderContext = hitContext.readerContext();
BitSet parentBits = fetchContext.getQueryShardContext().bitsetFilter(parentFilter).getBitSet(subReaderContext);
return parentBits;
}

private DocIdSetIterator getChildDocIdSetIterator(ObjectMapper objectMapper, HitContext hitContext) throws IOException {
Query childFilter = objectMapper.nestedTypeFilter();
ContextIndexSearcher searcher = fetchContext.searcher();
LeafReaderContext subReaderContext = hitContext.readerContext();
final Weight childWeight = searcher.createWeight(searcher.rewrite(childFilter), ScoreMode.COMPLETE_NO_SCORES, 1f);
Scorer childScorer = childWeight.scorer(subReaderContext);
DocIdSetIterator childIter = childScorer.iterator();
return childIter;
}

private boolean checkNestedField(String path, DocValueField f, Map<String, Object> sourceMaps) {
if (!f.field.startsWith(path)) {
return false;
}
if (!sourceMaps.containsKey(path)) {
return false;
}

// path to nested field:
Object nestedObj = sourceMaps.get(path);
if (!(nestedObj instanceof ArrayList)) {
return false;
}
return true;
}
}

@Getter
public static class DocValueField {
private final String field;
private final ValueFetcher fetcher;

DocValueField(String field, ValueFetcher fetcher) {
this.field = field;
this.fetcher = fetcher;
}
}
}
Loading

0 comments on commit 2a61fcd

Please sign in to comment.