Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[segment replication] decouple the rateLimiter of segrep and recovery #12959

Merged
merged 9 commits into from
Apr 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER,
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING,
RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,13 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
recoveryTarget.handleFileChunk(
request,
recoveryTarget,
bytesSinceLastPause,
recoverySettings.recoveryRateLimiter(),
listener
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@
Property.NodeScope
);

public static final Setting<Boolean> INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING = Setting.boolSetting(
"indices.replication.use_individual_rate_limiter",
false,
Property.Dynamic,
Property.NodeScope
);

Ferrari248 marked this conversation as resolved.
Show resolved Hide resolved
public static final Setting<ByteSizeValue> INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting(
"indices.replication.max_bytes_per_sec",
new ByteSizeValue(200, ByteSizeUnit.MB),
Property.Dynamic,
Property.NodeScope
);

/**
* Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node.
*/
Expand Down Expand Up @@ -169,11 +183,14 @@
// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);

private volatile ByteSizeValue maxBytesPerSec;
private volatile ByteSizeValue recoveryMaxBytesPerSec;
private volatile boolean useReplicationIndividualRateLimiter;
private volatile ByteSizeValue replicationMaxBytesPerSec;
private volatile int maxConcurrentFileChunks;
private volatile int maxConcurrentOperations;
private volatile int maxConcurrentRemoteStoreStreams;
private volatile SimpleRateLimiter rateLimiter;
private volatile SimpleRateLimiter recoveryRateLimiter;
private volatile SimpleRateLimiter replicationRateLimiter;
private volatile TimeValue retryDelayStateSync;
private volatile TimeValue retryDelayNetwork;
private volatile TimeValue activityTimeout;
Expand All @@ -198,17 +215,25 @@
this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings);

this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (maxBytesPerSec.getBytes() <= 0) {
rateLimiter = null;
this.recoveryMaxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
recoveryRateLimiter = null;

Check warning on line 220 in server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java#L220

Added line #L220 was not covered by tests
} else {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
this.useReplicationIndividualRateLimiter = INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING.get(settings);
this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings);
updateReplicationRateLimiter();

logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(
INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING,
this::setUseReplicationIndividualRateLimiter
);
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations);
clusterSettings.addSettingsUpdateConsumer(
Expand All @@ -227,8 +252,12 @@

}

public RateLimiter rateLimiter() {
return rateLimiter;
public RateLimiter recoveryRateLimiter() {
return recoveryRateLimiter;
}

public RateLimiter replicaitonRateLimiter() {
Ferrari248 marked this conversation as resolved.
Show resolved Hide resolved
Ferrari248 marked this conversation as resolved.
Show resolved Hide resolved
return replicationRateLimiter;
}

public TimeValue retryDelayNetwork() {
Expand Down Expand Up @@ -294,14 +323,45 @@
this.internalRemoteUploadTimeout = internalRemoteUploadTimeout;
}

private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
this.maxBytesPerSec = maxBytesPerSec;
if (maxBytesPerSec.getBytes() <= 0) {
rateLimiter = null;
} else if (rateLimiter != null) {
rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac());
private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) {
this.recoveryMaxBytesPerSec = recoveryMaxBytesPerSec;
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
recoveryRateLimiter = null;
} else if (recoveryRateLimiter != null) {
recoveryRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac());

Check warning on line 331 in server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java#L331

Added line #L331 was not covered by tests
} else {
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
if (useReplicationIndividualRateLimiter == false) updateReplicationRateLimiter();
}

private void setUseReplicationIndividualRateLimiter(boolean useReplicationIndividualRateLimiter) {
this.useReplicationIndividualRateLimiter = useReplicationIndividualRateLimiter;
updateReplicationRateLimiter();
}

private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSec) {
this.replicationMaxBytesPerSec = replicationMaxBytesPerSec;
updateReplicationRateLimiter();
}

private void updateReplicationRateLimiter() {
if (useReplicationIndividualRateLimiter == true) {
if (replicationMaxBytesPerSec.getBytes() <= 0) {
replicationRateLimiter = null;
} else if (replicationRateLimiter != null) {
replicationRateLimiter.setMBPerSec(replicationMaxBytesPerSec.getMbFrac());
} else {
replicationRateLimiter = new SimpleRateLimiter(replicationMaxBytesPerSec.getMbFrac());

Check warning on line 355 in server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java#L355

Added line #L355 was not covered by tests
}
} else {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
replicationRateLimiter = null;
} else if (replicationRateLimiter != null) {
replicationRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac());
} else {
replicationRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
// always fetch the ratelimiter - it might be updated in real-time on the recovery settings
final RateLimiter rl = recoverySettings.rateLimiter();
final RateLimiter rl;
if (SegmentReplicationTargetService.Actions.FILE_CHUNK.equals(action)) {
rl = recoverySettings.replicaitonRateLimiter();

Check warning on line 83 in server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java#L83

Added line #L83 was not covered by tests
} else {
rl = recoverySettings.recoveryRateLimiter();
}
Ferrari248 marked this conversation as resolved.
Show resolved Hide resolved
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(content.length());
if (bytes > rl.getMinPauseCheckBytes()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@
try (ReplicationRef<SegmentReplicationTarget> ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) {
final SegmentReplicationTarget target = ref.get();
final ActionListener<Void> listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.replicaitonRateLimiter(), listener);

Check warning on line 638 in server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java#L638

Added line #L638 was not covered by tests
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3163,7 +3163,7 @@
public InputStream maybeRateLimitRestores(InputStream stream) {
return maybeRateLimit(
maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE),
recoverySettings::rateLimiter,
recoverySettings::recoveryRateLimiter,
restoreRateLimitingTimeInNanos,
BlobStoreTransferContext.SNAPSHOT_RESTORE
);
Expand All @@ -3186,7 +3186,7 @@
remoteDownloadRateLimitingTimeInNanos,
BlobStoreTransferContext.REMOTE_DOWNLOAD
),
recoverySettings::rateLimiter,
recoverySettings::recoveryRateLimiter,

Check warning on line 3189 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3189

Added line #L3189 was not covered by tests
remoteDownloadRateLimitingTimeInNanos,
BlobStoreTransferContext.REMOTE_DOWNLOAD
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.test.OpenSearchTestCase;

import java.util.concurrent.TimeUnit;
Expand All @@ -47,7 +49,36 @@ public void testZeroBytesPerSecondIsNoRateLimit() {
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertEquals(null, recoverySettings.rateLimiter());
assertEquals(null, recoverySettings.recoveryRateLimiter());
clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0)
.put(RecoverySettings.INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING.getKey(), true)
.build()
);
assertEquals(null, recoverySettings.replicaitonRateLimiter());
}

public void testSetReplicationMaxBytesPerSec() {
clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB))
.put(RecoverySettings.INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING.getKey(), false)
.build()
);
assertEquals(40, (int) recoverySettings.replicaitonRateLimiter().getMBPerSec());
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertNull(recoverySettings.recoveryRateLimiter());
assertNull(recoverySettings.replicaitonRateLimiter());
clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB))
.put(RecoverySettings.INDICES_USE_REPLICATION_INDIVIDUAL_RATE_LIMITER_SETTING.getKey(), true)
.build()
);
assertEquals(60, (int) recoverySettings.replicaitonRateLimiter().getMBPerSec());
}

public void testRetryDelayStateSync() {
Expand Down
Loading