Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Added source-side classes for orchestrating replication events #3470

Merged
merged 16 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,19 @@ public final EngineConfig config() {
*/
protected abstract SegmentInfos getLatestSegmentInfos();

/**
* Fetch a snapshot of the latest SegmentInfos from the engine. Using this method
kartg marked this conversation as resolved.
Show resolved Hide resolved
* ensures that segment files are retained in the directory until the reference is closed.
*
* @return {@link GatedCloseable} - A wrapper around a {@link SegmentInfos} instance that
* must be closed for segment files to be deleted.
* @throws EngineException - When segment infos cannot be safely retrieved
*/
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
// default implementation
return new GatedCloseable<>(getLatestSegmentInfos(), () -> {});
}

public MergeStats getMergeStats() {
return new MergeStats();
}
Expand Down Expand Up @@ -846,6 +859,15 @@ public final CommitStats commitStats() {
*/
public abstract long getPersistedLocalCheckpoint();

/**
* @return the latest checkpoint that has been processed but not necessarily persisted.
* Also see {@link #getPersistedLocalCheckpoint()}
*/
public long getProcessedLocalCheckpoint() {
// default implementation
return 0L;
kartg marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2305,6 +2305,17 @@ public SegmentInfos getLatestSegmentInfos() {
}
}

@Override
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
final SegmentInfos segmentInfos = getLatestSegmentInfos();
try {
indexWriter.incRefDeleter(segmentInfos);
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
return new GatedCloseable<>(segmentInfos, () -> indexWriter.decRefDeleter(segmentInfos));
}

@Override
protected final void writerSegmentStats(SegmentsStats stats) {
stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());
Expand Down Expand Up @@ -2724,6 +2735,7 @@ public long getLastSyncedGlobalCheckpoint() {
return getTranslog().getLastSyncedGlobalCheckpoint();
}

@Override
public long getProcessedLocalCheckpoint() {
return localCheckpointTracker.getProcessedCheckpoint();
}
Expand Down
18 changes: 18 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2638,6 +2638,14 @@ public long getLocalCheckpoint() {
return getEngine().getPersistedLocalCheckpoint();
}

/**
* Fetch the latest checkpoint that has been processed but not necessarily persisted.
* Also see {@link #getLocalCheckpoint()}.
*/
public long getProcessedLocalCheckpoint() {
return getEngine().getProcessedLocalCheckpoint();
}

/**
* Returns the global checkpoint for the shard.
*
Expand Down Expand Up @@ -4005,4 +4013,14 @@ public void verifyShardBeforeIndexClosing() throws IllegalStateException {
RetentionLeaseSyncer getRetentionLeaseSyncer() {
return retentionLeaseSyncer;
}

/**
* Fetch the latest SegmentInfos held by the shard's underlying Engine, wrapped
* by a a {@link GatedCloseable} to ensure files are not deleted/merged away.
*
* @throws EngineException - When segment infos cannot be safely retrieved
*/
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}
}
91 changes: 56 additions & 35 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
return getMetadata(commit, false);
}

/**
* Conveience wrapper around the {@link #getMetadata(IndexCommit)} method for null input.
kartg marked this conversation as resolved.
Show resolved Hide resolved
*/
public MetadataSnapshot getMetadata() throws IOException {
return getMetadata(null, false);
}

/**
* Returns a new MetadataSnapshot for the given commit. If the given commit is <code>null</code>
* the latest commit point is used.
Expand Down Expand Up @@ -315,6 +322,10 @@ public MetadataSnapshot getMetadata(IndexCommit commit, boolean lockDirectory) t
}
}

public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOException {
kartg marked this conversation as resolved.
Show resolved Hide resolved
return new MetadataSnapshot(segmentInfos, directory, logger);
}

/**
* Renames all the given files from the key of the map to the
* value of the map. All successfully renamed files are removed from the map in-place.
Expand Down Expand Up @@ -477,7 +488,7 @@ public static MetadataSnapshot readMetadataSnapshot(
Directory dir = new NIOFSDirectory(indexLocation)
) {
failIfCorrupted(dir);
return new MetadataSnapshot(null, dir, logger);
return new MetadataSnapshot((IndexCommit) null, dir, logger);
} catch (IndexNotFoundException ex) {
// that's fine - happens all the time no need to log
} catch (FileNotFoundException | NoSuchFileException ex) {
Expand Down Expand Up @@ -682,7 +693,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}
}
directory.syncMetaData();
final Store.MetadataSnapshot metadataOrEmpty = getMetadata(null);
final Store.MetadataSnapshot metadataOrEmpty = getMetadata();
verifyAfterCleanup(sourceMetadata, metadataOrEmpty);
} finally {
metadataLock.writeLock().unlock();
Expand Down Expand Up @@ -822,7 +833,14 @@ public MetadataSnapshot(Map<String, StoreFileMetadata> metadata, Map<String, Str
}

MetadataSnapshot(IndexCommit commit, Directory directory, Logger logger) throws IOException {
LoadedMetadata loadedMetadata = loadMetadata(commit, directory, logger);
this(loadMetadata(commit, directory, logger));
}

MetadataSnapshot(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
this(loadMetadata(segmentInfos, directory, logger));
}

private MetadataSnapshot(LoadedMetadata loadedMetadata) {
metadata = loadedMetadata.fileMetadata;
commitUserData = loadedMetadata.userData;
numDocs = loadedMetadata.numDocs;
Expand Down Expand Up @@ -890,40 +908,9 @@ static class LoadedMetadata {
}

static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logger logger) throws IOException {
long numDocs;
Map<String, StoreFileMetadata> builder = new HashMap<>();
Map<String, String> commitUserDataBuilder = new HashMap<>();
try {
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
numDocs = Lucene.getNumDocs(segmentCommitInfos);
commitUserDataBuilder.putAll(segmentCommitInfos.getUserData());
// we don't know which version was used to write so we take the max version.
Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion();
for (SegmentCommitInfo info : segmentCommitInfos) {
final Version version = info.info.getVersion();
if (version == null) {
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
checksumFromLuceneFile(
directory,
file,
builder,
logger,
version,
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file))
);
}
}
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentCommitInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
return loadMetadata(segmentCommitInfos, directory, logger);
} catch (CorruptIndexException | IndexNotFoundException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we either know the index is corrupted or it's just not there
throw ex;
Expand All @@ -949,6 +936,40 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
}
throw ex;
}
}

static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
long numDocs = Lucene.getNumDocs(segmentInfos);
Map<String, String> commitUserDataBuilder = new HashMap<>();
commitUserDataBuilder.putAll(segmentInfos.getUserData());
Map<String, StoreFileMetadata> builder = new HashMap<>();
// we don't know which version was used to write so we take the max version.
Version maxVersion = segmentInfos.getMinSegmentLuceneVersion();
for (SegmentCommitInfo info : segmentInfos) {
final Version version = info.info.getVersion();
if (version == null) {
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
checksumFromLuceneFile(
directory,
file,
builder,
logger,
version,
SEGMENT_INFO_EXTENSION.equals(IndexFileNames.getExtension(file))
);
}
}
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest;

import java.io.IOException;

/**
* Request object for fetching segment metadata for a {@link ReplicationCheckpoint} from
* a {@link SegmentReplicationSource}. This object is created by the target node and sent
* to the source node.
*
* @opensearch.internal
*/
public class CheckpointInfoRequest extends SegmentReplicationTransportRequest {

private final ReplicationCheckpoint checkpoint;

public CheckpointInfoRequest(StreamInput in) throws IOException {
super(in);
checkpoint = new ReplicationCheckpoint(in);
}

public CheckpointInfoRequest(
long replicationId,
String targetAllocationId,
DiscoveryNode targetNode,
ReplicationCheckpoint checkpoint
) {
super(replicationId, targetAllocationId, targetNode);
this.checkpoint = checkpoint;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
checkpoint.writeTo(out);
}

public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.SegmentReplicationTransportRequest;

import java.io.IOException;
import java.util.List;

/**
* Request object for fetching a list of segment files metadata from a {@link SegmentReplicationSource}.
* This object is created by the target node and sent to the source node.
*
* @opensearch.internal
*/
public class GetSegmentFilesRequest extends SegmentReplicationTransportRequest {

private final List<StoreFileMetadata> filesToFetch;
private final ReplicationCheckpoint checkpoint;

public GetSegmentFilesRequest(StreamInput in) throws IOException {
super(in);
this.filesToFetch = in.readList(StoreFileMetadata::new);
this.checkpoint = new ReplicationCheckpoint(in);
}

public GetSegmentFilesRequest(
long replicationId,
String targetAllocationId,
DiscoveryNode targetNode,
List<StoreFileMetadata> filesToFetch,
ReplicationCheckpoint checkpoint
) {
super(replicationId, targetAllocationId, targetNode);
this.filesToFetch = filesToFetch;
this.checkpoint = checkpoint;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(filesToFetch);
checkpoint.writeTo(out);
}

public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}
}
Loading