Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] [Build-TimeReduction V1] Merge Optimization: Streaming vectors from Java Layer to JNI layer to avoid OOM/Circuit Breaker in native engines #1506

Closed
2 of 4 tasks
navneet1v opened this issue Mar 4, 2024 · 2 comments
Assignees
Labels
enhancement indexing-improvements This label should be attached to all the github issues which will help improving the indexing time. v2.14.0

Comments

@navneet1v
Copy link
Collaborator

navneet1v commented Mar 4, 2024

Problem Statement

While testing on with below details, I was seeing CB exceptions.

  1. dataset with 1536D and 5M records
  2. 1 data node with 128GB ram and 16vCPUs
  3. OpenSearch heap size: 32gb, version 2.11.0
  4. Shards: 1
  5. Force merge segments to 1

Logs:

If CB is enabled

024-02-24 03:36:09,149 | INFO: Start optimize (opensearch.py:169) (372349)                                                                                              
2024-02-24 03:42:42,914 | WARNING: VectorDB optimize error: TransportError(429, 'circuit_breaking_exception', '[parent] Data too large, data for [<http_request>] would b
e [32224869544/30gb], which is larger than the limit of [31111669350/28.9gb], real usage: [32224869544/30gb], new bytes reserved: [0/0b], usages [request=0/0b, fielddata
=0/0b, in_flight_requests=378/378b]') (task_runner.py:258) (371437)                                                                                                      
2024-02-24 03:42:42,992 | WARNING: Failed to run performance case, reason = TransportError(429, 'circuit_breaking_exception', '[parent] Data too large, data for [<http_r
equest>] would be [32224869544/30gb], which is larger than the limit of [31111669350/28.9gb], real usage: [32224869544/30gb], new bytes reserved: [0/0b], usages [request
=0/0b, fielddata=0/0b, in_flight_requests=378/378b]') (task_runner.py:186) (371437)  

If CB is not enabled

 WARN ][o.o.m.j.JvmGcMonitorService] [integTest-0] [gc][1433] overhead, spent [1.5s] collecting in the last [1.5s]                                                                                        [49/1803]
» WARN ][o.o.m.j.JvmGcMonitorService] [integTest-0] [gc][1434] overhead, spent [1s] collecting in the last [1s]
» ERROR][o.o.b.OpenSearchUncaughtExceptionHandler] [integTest-0] fatal error in thread [opensearch[integTest-0][refresh][T#1]], exiting
»  java.lang.OutOfMemoryError: Java heap space
»       at org.opensearch.common.util.concurrent.ThreadContext.propagateTransients(ThreadContext.java:574) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.common.util.concurrent.ThreadContext.stashContext(ThreadContext.java:162) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:847) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
»       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
»       at java.lang.Thread.run(Thread.java:829) [?:?]
» ERROR][o.o.b.OpenSearchUncaughtExceptionHandler] [integTest-0] fatal error in thread [opensearch[integTest-0][write][T#13]], exiting
»  java.lang.OutOfMemoryError: Java heap space
»       at org.apache.lucene.util.ArrayUtil.copyOfSubArray(ArrayUtil.java:613) ~[lucene-core-9.7.0.jar:9.7.0 ccf4b198ec328095d45d2746189dc8ca633e8bcf - 2023-06-21 11:48:16]
»       at org.apache.lucene.util.BytesRef.deepCopyOf(BytesRef.java:175) ~[lucene-core-9.7.0.jar:9.7.0 ccf4b198ec328095d45d2746189dc8ca633e8bcf - 2023-06-21 11:48:16]
»       at org.opensearch.core.common.bytes.BytesArray.<init>(BytesArray.java:64) ~[opensearch-core-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.index.translog.TranslogWriter.assertNoSeqNumberConflict(TranslogWriter.java:345) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.index.translog.TranslogWriter.add(TranslogWriter.java:286) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.index.translog.Translog.add(Translog.java:571) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.index.translog.InternalTranslogManager.add(InternalTranslogManager.java:327) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.index.engine.InternalEngine.index(InternalEngine.java:1025) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.index.shard.IndexShard.index(IndexShard.java:1123) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:1068) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.index.shard.IndexShard.applyIndexOperationOnPrimary(IndexShard.java:959) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(TransportShardBulkAction.java:619) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.action.bulk.TransportShardBulkAction$2.doRun(TransportShardBulkAction.java:466) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.action.bulk.TransportShardBulkAction.performOnPrimary(TransportShardBulkAction.java:530) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnPrimary(TransportShardBulkAction.java:411) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.action.bulk.TransportShardBulkAction.dispatchedShardOperationOnPrimary(TransportShardBulkAction.java:123) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.action.support.replication.TransportWriteAction$1.doRun(TransportWriteAction.java:223) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:908) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52) ~[opensearch-2.11.1-SNAPSHOT.jar:2.11.1-SNAPSHOT]
»       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
»       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
»       at java.lang.Thread.run(Thread.java:829) [?:?]
» WARN ][o.o.k.i.c.K.KNN80DocValuesConsumer] [integTest-0] Refresh operation complete in 31371 ms
»   ↓ last 40 non error or warning messages from /workplace/workspace/k-NN/build/testclusters/integTest-0/logs/opensearch.stdout.log ↓
» [2024-03-01T08:51:09,072][INFO ][o.o.m.j.JvmGcMonitorService] [integTest-0] [gc][1366] overhead, spent [313ms] collecting in the last [1s]
» [2024-03-01T08:51:52,157][INFO ][o.o.m.j.JvmGcMonitorService] [integTest-0] [gc][1409] overhead, spent [457ms] collecting in the last [1s]
» [2024-03-01T08:52:22.500264Z] [BUILD] Stopping node

Root Cause

The reason why this OOM exception/CB is tripping because while creating the Faiss/nmslib index at a segment level we first load all the vectors(floats) in JVM heap. As vectors are 4byte floats this lead to an array of size ~28.4GB ((4 * 1536 * 5000000)/2^30) and then OOM. Ref: https://github.com/opensearch-project/k-NN/blob/main/src/main/java/org/opensearch/knn/index/codec/util/KNNCodecUtil.java#L42-L59

Solution

The solution I am proposing here is while reading the vectors from doc values we will stream/transfer the vectors to a memory address in Native memory(RAM) and then pass that address to JNI layer while creating indices for native libraries( Faiss and Nmslib), rather than accumulating the vectors in a list in heap and then pass this to JNI layer. I have done a POC implementation for this same here. This will help resolve the issue described at the start reason being we are just keeping a finite amount of vectors in heap hence no OOM or CB will happen.

Critical Design Choices

How many vectors we should be streaming at once from Java to JNI Layer?

This is an interesting choice to take, as we don’t want to stream a lot of vectors to JNI layer at once because it can lead to GC getting triggered when Heap Memory is under stress, we also don’t want to steam too less which can lead to this context switch and more fragmentation in Native memory.

Approach 1

So what I am proposing here is may be instead of Number of vectors we should focus on amount of data we should be streaming because this is what actually being sent. Considering a typical JVM size as 32GB for production workloads streaming 100Mb which is 0.003% of the whole heap.

Dimensions Stream size in Mb Number of Vectors that can be streamed Number of Vectors in Segment Number of Trips required to send data to JNI Approximate Segment Size with graphs + doc values(in GB) Total Number of floats Total Size of floats (in GB)
128 100 204800 5000000 25 5.66244 640,000,000 2
256 100 102400 5000000 49 10.66923 1,280,000,000 5
512 100 51200 5000000 98 20.68281 2,560,000,000 10
768 100 34133 5000000 147 30.69639 3,840,000,000 14
1024 100 25600 5000000 196 40.70997 5,120,000,000 19
1536 100 17066 5000000 293 60.73713 7,680,000,000 29

Table 1: Providing details around segment size and vectors

Considering the above table we can see that number of trips to JNI will be increased as the dimension increase if we keep a constant data that can be sent to JNI.

The concern here is not the number of trips we are making to JNI, problem is every time we go to JNI we will be adding the floats in c++ stl vectors. If there is not enough memory to expand the vector in place then c++ will copy this whole vector to new memory location and then all data to it(ref). This will add latency in the overall system. Check below benchmarks which shows that if you send all data at once and if you send in batch how much extra latency gets added.
So what I am proposing here is may be instead of Number of vectors we should focus on amount of data we should be streaming because this is what actually being sent. Considering a typical JVM size as 32GB for production workloads streaming 100Mb which is 0.003% of the whole heap.

Benchmark Total Number of Vectors to Transfer Dimension VectorsPerTransfer Mode (Single Shot) Cnt Score Units Heap Used (in Mb) Number of trips to JNI Layer
TransferVectorsBenchmarks.transferVectors 1M 128 100000 ss 3 1.917 s/op 48.82813 10
TransferVectorsBenchmarks.transferVectors 1M 128 500000 ss 3 1.596 s/op 244.14063 2
TransferVectorsBenchmarks.transferVectors 1M 128 1000000 ss 3 1.443 s/op 488.28125 1
TransferVectorsBenchmarks.transferVectors 1M 256 100000 ss 3 3.89 s/op 97.65625 10
TransferVectorsBenchmarks.transferVectors 1M 256 500000 ss 3 3.143 s/op 488.28125 2
TransferVectorsBenchmarks.transferVectors 1M 256 1000000 ss 3 2.814 s/op 976.5625 1
TransferVectorsBenchmarks.transferVectors 1M 384 100000 ss 3 6.446 s/op 146.48438 10
TransferVectorsBenchmarks.transferVectors 1M 384 500000 ss 3 5.119 s/op 732.42188 2
TransferVectorsBenchmarks.transferVectors 1M 384 1000000 ss 3 4.568 s/op 1464.84375 1
TransferVectorsBenchmarks.transferVectors 1M 512 100000 ss 3 7.752 s/op 195.3125 10
TransferVectorsBenchmarks.transferVectors 1M 512 500000 ss 3 6.073 s/op 976.5625 2
TransferVectorsBenchmarks.transferVectors 1M 512 1000000 ss 3 5.297 s/op 1953.125 1
TransferVectorsBenchmarks.transferVectors 1M 768 100000 ss 3 12.577 s/op 292.96875 10
TransferVectorsBenchmarks.transferVectors 1M 768 500000 ss 3 9.723 s/op 1464.84375 2
TransferVectorsBenchmarks.transferVectors 1M 768 1000000 ss 3 8.663 s/op 2929.6875 1
TransferVectorsBenchmarks.transferVectors 1M 1024 100000 ss 3 15.406 s/op 390.625 10
TransferVectorsBenchmarks.transferVectors 1M 1024 500000 ss 3 11.838 s/op 1953.125 2
TransferVectorsBenchmarks.transferVectors 1M 1024 1000000 ss 3 10.306 s/op 3906.25 1
TransferVectorsBenchmarks.transferVectors 1M 1536 100000 ss 3 24.882 s/op 585.9375 10
TransferVectorsBenchmarks.transferVectors 1M 1536 500000 ss 3 19.091 s/op 2929.6875 2
TransferVectorsBenchmarks.transferVectors 1M 1536 1000000 ss 3 16.978 s/op 5859.375 1

Table 2: Benchmarking results when a fixed number of vectors are transferred without initial capacity being set.

Approach 2 (Recommended)

To ensure that we are not adding any extra latency that is coming due to sizing and re-sizing of the stl::vector we can set an initial capacity for the vector. If we do so then there is no re-sizing happening and we can avoid the extra latency. Below benchmarks run the same experiment except now the stl::vector expansion is not happening.
Benchmark Total Number of Vectors to Transfer Dimension Vectors Per Transfer Mode (Single Shot) Cnt Score Units Heap Used (in Mb) Number of trips to JNI Layer
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 128 100000 ss 3 0.688 s/op 48.82813 10
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 128 500000 ss 3 0.693 s/op 244.14063 2
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 128 1000000 ss 3 0.705 s/op 488.28125 1
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 256 100000 ss 3 1.195 s/op 97.65625 10
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 256 500000 ss 3 1.194 s/op 488.28125 2
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 256 1000000 ss 3 1.198 s/op 976.5625 1
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 384 100000 ss 3 1.727 s/op 146.48438 10
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 384 500000 ss 3 1.721 s/op 732.42188 2
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 384 1000000 ss 3 1.747 s/op 1464.84375 1
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 512 100000 ss 3 2.355 s/op 195.3125 10
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 512 500000 ss 3 2.336 s/op 976.5625 2
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 512 1000000 ss 3 2.364 s/op 1953.125 1
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 960 100000 ss 3 4.142 s/op 366.21094 10
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 960 500000 ss 3 4.154 s/op 1831.05469 2
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 960 1000000 ss 3 4.162 s/op 3662.10938 1
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 1024 100000 ss 3 4.354 s/op 390.625 10
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 1024 500000 ss 3 4.357 s/op 1953.125 2
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 1024 1000000 ss 3 4.405 s/op 3906.25 1
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 1536 100000 ss 3 6.423 s/op 585.9375 10
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 1536 500000 ss 3 6.434 s/op 2929.6875 2
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 1536 1000000 ss 3 6.457 s/op 5859.375 1

Table 3: Benchmarking results when a fixed number of vectors are transferred with initial capacity being set.

As we can see by removing the expansion out of picture and applying few more optimization which is only possible by setting the initial capacity(directly converting java float array to vectors rather than an intermediate storage), we are able to reduce the time taken to move vectors from Java to JNI layer > 50%.

How do we find out accurately the number of vectors which we want to stream to JNI layer without reading all the vectors in the segment?

On examining closely the DocValues interface provides a function called as cost() which returns the max documents present. But this includes the deleted documents too.

The segment creation/graph creation happens in 2 scenarios:

  1. When the OS refresh happens
  2. When merge is happening for segments(automated/force merge)

For #1, the number of vectors won’t will be that high which can cause the OOM issues, hence we can stream all the vectors to JNI layer directly and we don’t need to depend on cost() to determine the size of vectors upfront.

For #2, as the merges happen for large segments and there will be deleted docs then cost() function cannot be used, as this will lead to creating large chunk of memory which we will not use for graph creation. To accurately find the number of docs in BinaryDocValues we can use the LiveDoc bits. Please refer this POC code ref. Sample sudo code below:

class KNN80DocValuesReader {
    private MergeState mergeState;
    public BinaryDocValues getBinary(FieldInfo field) {
        // iterate over all the docValues producers present in the segments getting merged
        for (i in mergeState.docValuesProducers.length) {
             BinaryDocValues values = mergeState.docValuesProducers[i].getBinary(field);
             Bits liveDocs = this.mergeState.liveDocs[i];
             // check if liveDocs is not null, indicating presence of deleted docs
             if (liveDocs != null) {
                log.info("There are some deleted docs present");
                // so we counted all the live docs here
                int docId;
                for(docId = values.nextDoc(); docId != DocIdSetIterator.NO_MORE_DOCS; docId =
                        values.nextDoc()) {
                    if (liveDocs.get(docId)) {
                        liveDocsCount++;
                    }
                }
             } else {
                // no live docs are present so lets use all the docs.
                liveDocsCount += values.cost();
             }
        }
        // finally set live docs and cost in and use them later while streaming the vectors
        set liveDocs and cost in KNN80BinaryDocValues
        return KNN80BinaryDocValues;
    }
}

So, Approach 2 and setting the initial capacity for the stl::vector we can remove the decision of selecting the accurate size of vectors to be transferred and become more memory oriented transfers. Below are the benchmarks that shows with different memory size like 100MB, 200MB .. 500MB what will be the impact on latency. We can start with a default value of 1% of heap size by setting this in cluster setting. This will provide user flexibility to change it in future.

The benefit of using a memory based limit on the transfer of vectors over fixed number based limit is even when number of shards increase on the node, the system will be stable. Example if we decide to use lets say 100K as a limit then having 50 shards on the node can lead to heap CBs as total heap required will ~29GB((586*50)/1024).

dimension vectorsPerTransfer Heap Used (in Mb)
128 100000 48.82813
128 500000 244.14063
128 1000000 488.28125
256 100000 97.65625
256 500000 488.28125
256 1000000 976.5625
384 100000 146.48438
384 500000 732.42188
384 1000000 1464.84375
512 100000 195.3125
512 500000 976.5625
512 1000000 1953.125
960 100000 366.21094
960 500000 1831.05469
960 1000000 3662.10938
1024 100000 390.625
1024 500000 1953.125
1024 1000000 3906.25
1536 100000 585.9375
1536 500000 2929.6875
1536 1000000 5859.375

Table 4: Shows the details on how heap usage changes based dimension for a fixed number of vectors.

Benchmarks with different size of data transfers

Benchmark Total Number of Vectors to Transfer Dimension SizeOfVectorTransferInMb Mode (Single Shot) Cnt Score Units Number of records transferred per trip Number of Trips to JNI
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 128 100 ss 3 0.694 s/op 204800 5
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 128 200 ss 3 0.685 s/op 409600 3
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 128 500 ss 3 0.702 s/op 1024000 1
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 256 100 ss 3 1.19 s/op 102400 10
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 256 200 ss 3 1.198 s/op 204800 5
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 256 500 ss 3 1.205 s/op 512000 2
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 384 100 ss 3 1.721 s/op 68266 15
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 384 200 ss 3 1.727 s/op 136533 8
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 384 500 ss 3 1.72 s/op 341333 3
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 512 100 ss 3 2.358 s/op 51200 20
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 512 200 ss 3 2.348 s/op 102400 10
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 512 500 ss 3 2.348 s/op 256000 4
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 960 100 ss 3 4.16 s/op 27306 37
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 960 200 ss 3 4.15 s/op 54613 19
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 960 500 ss 3 4.166 s/op 136533 8
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 1024 100 ss 3 4.344 s/op 25600 40
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 1024 200 ss 3 4.342 s/op 51200 20
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 1024 500 ss 3 4.358 s/op 128000 8
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 1536 100 ss 3 6.407 s/op 17066 59
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 1536 200 ss 3 6.408 s/op 34133 30
TransferVectorsBenchmarks.transferVectors_withCapacity 1M 1536 500 ss 3 6.31 s/op 85333 12

Table 5: Benchmarking results when fixed memory of vectors are transferred with initial capacity being set.

If we compare table 5 with table 3, we can see that the difference in the latencies are very minimal < 100ms. This provides a strong evidence that we should use a fixed amount of memory for data transfer which is more flexible and robust as compared to transferring a fixed amount of vectors.

Apart from vectors should we also stream the docIds?

I think we should not stream docids. Here are some stats, In Lucene we can have at 2^(31)-1 docIds and an Integer takes 4 bytes so total required memory to hold all docids is 0.5GB((2^(31)-1)/2^30). We do this we will doing over engineering in our solution.

In native memory should we store vectors as 1-D array or create a 2-D array to store the vectors?

If we look at create index interfaces of Faiss and Nmslib we can see that both of them takes a vectors in 1D array. Hence it make sense to use a 1-D array otherwise we will need to construct the 1-D array from 2D array which will consume computations for a larger datasets.

Test Plan

Below are the list of tests/Benchmarks that will performed apart from Unit test and Integration tests.

Correctness Testing

The below tests will ensure that we are able to merge the large dataset even when heap size is less that what Opensearch can accommodate. The below configurations errors out on Opensearch version 2.13. The reason for choosing these configurations because we have seen errors in these configurations recently.
Data set Dimension Data set size Data Nodes Count Data Node Type Heap Size in GB Number of Shards Replicas Max Number of Segments Vectors Size Engine Data Type Algorithm
Cohere 768 1000000 1 r6g.2xlarge 2 1 0 1 2.86102 Nmslib float32 HNSW
Open AI 1536 5000000 1 r6g.4xlarge 32 1 0 1 28.61023 Nmslib float32 HNSW
SIFT 128 1000000000 8 r6g.12xlarge 32 64 1 1 476.83716 Nmslib float32 HNSW
Cohere 768 1000000 1 r6g.2xlarge 2 1 0 1 2.86102 Faiss float32 HNSW
Open AI 1536 5000000 1 r6g.4xlarge 32 1 0 1 28.61023 Faiss float32 HNSW
SIFT 128 1000000000 8 r6g.12xlarge 32 64 1 1 476.83716 Faiss float16 HNSW

A/B Testing with Opensearch version 2.13

We are going to use nightly benchmarks to validate if there are any regression happening in the system with this change. The main parameters we will be looking for is the indexing time, force merge time and refresh time. Along with that recall should remain intact.

Currently nightly benchmarks doesn’t run training related workloads for those workloads we are going to run them separately. Below are the details.

Data set Dimension Data set size Data Nodes Count Data Node Type Heap Size in GB Number of Shards Max Number of Segments Vectors Size Engine Data Type Algorithm
SIFT 128 1000000 3 r5.4xlarge 32 24 1 0.47684 Faiss float32 HNSW-PQ
SIFT 128 1000000 3 r5.4xlarge 32 24 1 0.47684 Faiss float32 IVF
SIFT 128 1000000 3 r5.4xlarge 32 24 1 0.47684 Faiss float32 IVF-PQ

The experiment results will be compared with results here: #1473

Tasks

  • Complete the RFC for the feature
  • Add benchmark code to k-NN plugin for all the benchmarks used in the RFC.
  • Raise PR for the feature including UTs and ITs
  • Benchmark with nightly runs to validate no regression
@navneet1v navneet1v self-assigned this Mar 4, 2024
@vamshin vamshin changed the title [FEATURE] Streaming vectors from Java Layer to JNI layer to avoid OOM/http request CBs in Native engines [FEATURE] Merge Optimization: Streaming vectors from Java Layer to JNI layer to avoid OOM/Circuit Breaker in native engines Mar 13, 2024
@navneet1v
Copy link
Collaborator Author

The issue is updated with all the details on the solution and benchmarking

@navneet1v
Copy link
Collaborator Author

Closing the PR as changes are merged and will be released with 2.14 version of Opensearch

@navneet1v navneet1v changed the title [FEATURE] Merge Optimization: Streaming vectors from Java Layer to JNI layer to avoid OOM/Circuit Breaker in native engines [FEATURE] [Build-TimeReduction V1] Merge Optimization: Streaming vectors from Java Layer to JNI layer to avoid OOM/Circuit Breaker in native engines Jun 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement indexing-improvements This label should be attached to all the github issues which will help improving the indexing time. v2.14.0
Projects
Status: Done
Development

No branches or pull requests

2 participants