Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
  • Loading branch information
sarthakaggarwal97 committed Jun 11, 2024
1 parent feab228 commit 19b3924
Show file tree
Hide file tree
Showing 14 changed files with 339 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,12 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Counter;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.compositeindex.CompositeField;
import org.opensearch.index.compositeindex.DateDimension;
import org.opensearch.index.compositeindex.Dimension;
Expand All @@ -27,9 +24,10 @@
import org.opensearch.index.compositeindex.startree.aggregators.MetricTypeFieldPair;
import org.opensearch.index.compositeindex.startree.aggregators.ValueAggregator;
import org.opensearch.index.compositeindex.startree.aggregators.ValueAggregatorFactory;
import org.opensearch.index.compositeindex.startree.builder.CompositeFieldWriter;
import org.opensearch.index.compositeindex.startree.builder.StarTreeDocValues;
import org.opensearch.index.compositeindex.startree.builder.SingleTreeBuilder;
import org.opensearch.index.compositeindex.startree.builder.StarTreeDocValuesIteratorFactory;
import org.opensearch.index.compositeindex.startree.data.StarTreeDocValues;
import org.opensearch.index.compositeindex.startree.data.StarTreeDocument;
import org.opensearch.index.compositeindex.startree.node.StarTreeNode;
import org.opensearch.index.compositeindex.startree.utils.StarTreeBuilderUtils;
import org.opensearch.index.mapper.NumberFieldMapper;
Expand All @@ -47,35 +45,36 @@
/**
* Base class for star tree builder
*/
public abstract class BaseCompositeFieldStarTreeBuilder implements CompositeFieldWriter {
public abstract class BaseSingleStarTreeBuilder implements SingleTreeBuilder {

// TODO: STAR_TREE_CODEC will be moved to CodecService once the Star Tree Codec is defined
public static final String STAR_TREE_CODEC = "startreecodec";

private static final Logger logger = LogManager.getLogger(BaseCompositeFieldStarTreeBuilder.class);
private static final Logger logger = LogManager.getLogger(BaseSingleStarTreeBuilder.class);

public static final int STAR_IN_DOC_VALUES_INDEX = 0;

public final String[] dimensionsSplitOrder;
public final Set<Integer> skipStarNodeCreationForDimensions;
public final String[] metrics;
protected final String[] dimensionsSplitOrder;
protected final Set<Integer> skipStarNodeCreationForDimensions;
protected final String[] metrics;

public final int numMetrics;
public final int numDimensions;
public int numDocs;
public int totalDocs;
public int numNodes;
public final int maxLeafDocuments;
protected final int numMetrics;
protected final int numDimensions;
protected int numDocs;
protected int totalDocs;
protected int numNodes;
protected final int maxLeafDocuments;

public final StarTreeBuilderUtils.TreeNode rootNode = getNewNode();
protected final StarTreeBuilderUtils.TreeNode rootNode = getNewNode();

public IndexOutput indexOutput;
public DocIdSetIterator[] dimensionReaders;
public DocIdSetIterator[] metricReaders;
// TODO: This will be initialized with OnHeap / OffHeap Implementations (Commented it's occurrences for now)
// private IndexOutput indexOutput;
protected DocIdSetIterator[] dimensionReaders;
protected DocIdSetIterator[] metricReaders;

public ValueAggregator[] valueAggregators;
public DocValuesConsumer docValuesConsumer;
public DocValuesProducer docValuesProducer;
protected ValueAggregator[] valueAggregators;
protected DocValuesConsumer docValuesConsumer;
protected DocValuesProducer docValuesProducer;

private final StarTreeDocValuesIteratorFactory starTreeDocValuesIteratorFactory;
private final CompositeField compositeField;
Expand All @@ -90,7 +89,7 @@ public abstract class BaseCompositeFieldStarTreeBuilder implements CompositeFiel
* @param docValuesConsumer to consume the new aggregated metrics during flush
* @param state stores the segment state
*/
protected BaseCompositeFieldStarTreeBuilder(
protected BaseSingleStarTreeBuilder(
CompositeField compositeField,
DocValuesProducer docValuesProducer,
DocValuesConsumer docValuesConsumer,
Expand All @@ -99,11 +98,11 @@ protected BaseCompositeFieldStarTreeBuilder(

logger.info("Building in base star tree builder");

String docFileName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, "stttree");
logger.info("Star tree file name : {}", docFileName);
// String docFileName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, "stttree");
// logger.info("Star tree file name : {}", docFileName);

indexOutput = state.directory.createOutput(docFileName, state.context);
CodecUtil.writeIndexHeader(indexOutput, STAR_TREE_CODEC, 0, state.segmentInfo.getId(), state.segmentSuffix);
// indexOutput = state.directory.createOutput(docFileName, state.context);
// CodecUtil.writeIndexHeader(indexOutput, STAR_TREE_CODEC, 0, state.segmentInfo.getId(), state.segmentSuffix);

starTreeDocValuesIteratorFactory = new StarTreeDocValuesIteratorFactory();
this.compositeField = compositeField;
Expand Down Expand Up @@ -145,7 +144,7 @@ protected BaseCompositeFieldStarTreeBuilder(
int index = 0;
for (MetricTypeFieldPair metricTypeFieldPair : metricTypeFieldPairs) {
metrics[index] = metricTypeFieldPair.toFieldName();
valueAggregators[index] = ValueAggregatorFactory.getValueAggregator(metricTypeFieldPair.getFunctionType());
valueAggregators[index] = ValueAggregatorFactory.getValueAggregator(metricTypeFieldPair.getMetricType());
// Ignore the column for COUNT aggregation function
if (valueAggregators[index].getAggregationType() != MetricType.COUNT) {
String metricName = metricTypeFieldPair.getField();
Expand All @@ -165,7 +164,7 @@ protected BaseCompositeFieldStarTreeBuilder(
/**
* Generates the AggregationFunctionColumnPairs for all the metrics on a field
*/
private List<MetricTypeFieldPair> generateAggregationFunctionColumnPairs() {
List<MetricTypeFieldPair> generateAggregationFunctionColumnPairs() {
List<MetricTypeFieldPair> metricTypeFieldPairs = new ArrayList<>();
for (Metric metric : this.compositeField.getMetrics()) {
for (MetricType metricType : metric.getMetrics()) {
Expand Down Expand Up @@ -293,17 +292,17 @@ protected StarTreeDocument mergeSegmentStarTreeDocument(
) {
// TODO: HANDLE KEYWORDS LATER!
if (aggregatedStarTreeDocument == null) {
long[] dimensions = Arrays.copyOf(segmentStarTreeDocument._dimensions, numDimensions);
long[] dimensions = Arrays.copyOf(segmentStarTreeDocument.dimensions, numDimensions);
Object[] metrics = new Object[numMetrics];
for (int i = 0; i < numMetrics; i++) {
metrics[i] = valueAggregators[i].getInitialAggregatedValue(segmentStarTreeDocument._metrics[i]);
metrics[i] = valueAggregators[i].getInitialAggregatedValue(segmentStarTreeDocument.metrics[i]);
}
return new StarTreeDocument(dimensions, metrics);
} else {
for (int i = 0; i < numMetrics; i++) {
aggregatedStarTreeDocument._metrics[i] = valueAggregators[i].applyRawValue(
aggregatedStarTreeDocument._metrics[i],
segmentStarTreeDocument._metrics[i]
aggregatedStarTreeDocument.metrics[i] = valueAggregators[i].applyRawValue(
aggregatedStarTreeDocument.metrics[i],
segmentStarTreeDocument.metrics[i]
);
}
return aggregatedStarTreeDocument;
Expand All @@ -324,17 +323,17 @@ protected StarTreeDocument mergeStarTreeDocument(
StarTreeDocument starTreeStarTreeDocument
) {
if (aggregatedStarTreeDocument == null) {
long[] dimensions = Arrays.copyOf(starTreeStarTreeDocument._dimensions, numDimensions);
long[] dimensions = Arrays.copyOf(starTreeStarTreeDocument.dimensions, numDimensions);
Object[] metrics = new Object[numMetrics];
for (int i = 0; i < numMetrics; i++) {
metrics[i] = valueAggregators[i].cloneAggregatedValue((Long) starTreeStarTreeDocument._metrics[i]);
metrics[i] = valueAggregators[i].cloneAggregatedValue(starTreeStarTreeDocument.metrics[i]);
}
return new StarTreeDocument(dimensions, metrics);
} else {
for (int i = 0; i < numMetrics; i++) {
aggregatedStarTreeDocument._metrics[i] = valueAggregators[i].applyAggregatedValue(
(Long) starTreeStarTreeDocument._metrics[i],
(Long) aggregatedStarTreeDocument._metrics[i]
aggregatedStarTreeDocument.metrics[i] = valueAggregators[i].applyAggregatedValue(
starTreeStarTreeDocument.metrics[i],
aggregatedStarTreeDocument.metrics[i]
);
}
return aggregatedStarTreeDocument;
Expand Down Expand Up @@ -366,7 +365,7 @@ public void build(Iterator<StarTreeDocument> starTreeDocumentIterator) throws IO
logger.info("Generated star tree docs : [{}] from segment docs : [{}]", numStarTreeDocument, numSegmentStarTreeDocument);

if (numDocs == 0) {
StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes);
// StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes);
return;
}

Expand All @@ -386,7 +385,7 @@ public void build(Iterator<StarTreeDocument> starTreeDocumentIterator) throws IO
createSortedDocValuesIndices(docValuesConsumer);

// Serialize and save in disk
StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes);
// StarTreeBuilderUtils.serializeTree(indexOutput, rootNode, dimensionsSplitOrder, numNodes);

// TODO: Write star tree metadata for off heap implementation

Expand Down Expand Up @@ -502,7 +501,7 @@ private StarTreeDocument createAggregatedDocs(StarTreeBuilderUtils.TreeNode node
}
assert aggregatedStarTreeDocument != null;
for (int i = node.dimensionId + 1; i < numDimensions; i++) {
aggregatedStarTreeDocument._dimensions[i] = STAR_IN_DOC_VALUES_INDEX;
aggregatedStarTreeDocument.dimensions[i] = STAR_IN_DOC_VALUES_INDEX;
}
node.aggregatedDocId = numDocs;
appendToStarTree(aggregatedStarTreeDocument);
Expand All @@ -526,7 +525,7 @@ private StarTreeDocument createAggregatedDocs(StarTreeBuilderUtils.TreeNode node
}
assert aggregatedStarTreeDocument != null;
for (int i = node.dimensionId + 1; i < numDimensions; i++) {
aggregatedStarTreeDocument._dimensions[i] = STAR_IN_DOC_VALUES_INDEX;
aggregatedStarTreeDocument.dimensions[i] = STAR_IN_DOC_VALUES_INDEX;
}
node.aggregatedDocId = numDocs;
appendToStarTree(aggregatedStarTreeDocument);
Expand Down Expand Up @@ -608,8 +607,8 @@ private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer) t
Map<String, Map<Long, BytesRef>> ordinalToSortedSetDocValueMap = new HashMap<>();
for (int docId = 0; docId < numDocs; docId++) {
StarTreeDocument starTreeDocument = getStarTreeDocument(docId);
for (int i = 0; i < starTreeDocument._dimensions.length; i++) {
long val = starTreeDocument._dimensions[i];
for (int i = 0; i < starTreeDocument.dimensions.length; i++) {
long val = starTreeDocument.dimensions[i];
StarTreeDocValuesWriter starTreeDocValuesWriter = dimensionWriters.get(i);
switch (starTreeDocValuesWriter.getDocValuesType()) {
case SORTED_SET:
Expand All @@ -626,9 +625,9 @@ private void createSortedDocValuesIndices(DocValuesConsumer docValuesConsumer) t
throw new IllegalStateException("Unsupported doc values type");
}
}
for (int i = 0; i < starTreeDocument._metrics.length; i++) {
for (int i = 0; i < starTreeDocument.metrics.length; i++) {
try {
Number parse = NumberFieldMapper.NumberType.LONG.parse(starTreeDocument._metrics[i], true);
Number parse = NumberFieldMapper.NumberType.LONG.parse(starTreeDocument.metrics[i], true);
StarTreeDocValuesWriter starTreeDocValuesWriter = metricWriters.get(i);
((SortedNumericDocValuesWriter) starTreeDocValuesWriter.getDocValuesWriter()).addValue(docId, parse.longValue());
} catch (IllegalArgumentException e) {
Expand Down Expand Up @@ -686,7 +685,7 @@ public SortedSetDocValues getSortedSet(FieldInfo field) {
/**
* Returns the respective doc values writer based on doc values type
*/
private DocValuesWriter<?> getDocValuesWriter(DocValuesType docValuesType, FieldInfo fi, Counter counter) {
DocValuesWriter<?> getDocValuesWriter(DocValuesType docValuesType, FieldInfo fi, Counter counter) {
final ByteBlockPool.Allocator byteBlockAllocator = new ByteBlockPool.DirectTrackingAllocator(counter);
final ByteBlockPool docValuesBytePool = new ByteBlockPool(byteBlockAllocator);
switch (docValuesType) {
Expand All @@ -708,40 +707,23 @@ private long handleDateDimension(final String fieldName, final long val) {
}

public void close() throws IOException {
boolean success = false;
try {
if (indexOutput != null) {
indexOutput.writeInt(-1);
CodecUtil.writeFooter(indexOutput); // write checksum
}
success = true;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (success) {
IOUtils.close(indexOutput);
} else {
IOUtils.closeWhileHandlingException(indexOutput);
}
indexOutput = null;
}
// boolean success = false;
// try {
// if (indexOutput != null) {
// indexOutput.writeInt(-1);
// CodecUtil.writeFooter(indexOutput); // write checksum
// }
// success = true;
// } catch (Exception e) {
// throw new RuntimeException(e);
// } finally {
// if (success) {
// IOUtils.close(indexOutput);
// } else {
// IOUtils.closeWhileHandlingException(indexOutput);
// }
// indexOutput = null;
// }
}

/**
* Star tree document
*/
public static class StarTreeDocument {
public final long[] _dimensions;
public final Object[] _metrics;

public StarTreeDocument(long[] dimensions, Object[] metrics) {
_dimensions = dimensions;
_metrics = metrics;
}

@Override
public String toString() {
return Arrays.toString(_dimensions) + " | " + Arrays.toString(_metrics);
}
}
}
Loading

0 comments on commit 19b3924

Please sign in to comment.