diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 8760ee3c94309..a593cfd6eb419 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -289,6 +289,7 @@ public void apply(Settings value, Settings current, Settings previous) { ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER, ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, + RecoverySettings.SEGREP_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 53b42347aa30d..9bfb14ea2f685 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -65,6 +65,13 @@ public class RecoverySettings { Property.NodeScope ); + public static final Setting SEGREP_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting( + "segrep.max_bytes_per_sec", + new ByteSizeValue(0), + Property.Dynamic, + Property.NodeScope + ); + /** * Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node. */ @@ -170,10 +177,12 @@ public class RecoverySettings { public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES); private volatile ByteSizeValue maxBytesPerSec; + private volatile ByteSizeValue segrepMaxBytesPerSec; private volatile int maxConcurrentFileChunks; private volatile int maxConcurrentOperations; private volatile int maxConcurrentRemoteStoreStreams; private volatile SimpleRateLimiter rateLimiter; + private volatile SimpleRateLimiter segrepRateLimiter; private volatile TimeValue retryDelayStateSync; private volatile TimeValue retryDelayNetwork; private volatile TimeValue activityTimeout; @@ -204,11 +213,18 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { } else { rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac()); } + this.segrepMaxBytesPerSec = SEGREP_MAX_BYTES_PER_SEC_SETTING.get(settings); + if (segrepMaxBytesPerSec.getBytes() <= 0) { + segrepRateLimiter = null; + } else { + segrepRateLimiter = new SimpleRateLimiter(segrepMaxBytesPerSec.getMbFrac()); + } logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec); this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(SEGREP_MAX_BYTES_PER_SEC_SETTING, this::setSegrepMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations); clusterSettings.addSettingsUpdateConsumer( @@ -231,6 +247,10 @@ public RateLimiter rateLimiter() { return rateLimiter; } + public RateLimiter segrepRateLimiter() { + return segrepRateLimiter; + } + public TimeValue retryDelayNetwork() { return retryDelayNetwork; } @@ -305,6 +325,17 @@ private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) { } } + private void setSegrepMaxBytesPerSec(ByteSizeValue segrepMaxBytesPerSec) { + this.segrepMaxBytesPerSec = segrepMaxBytesPerSec; + if (segrepMaxBytesPerSec.getBytes() <= 0) { + segrepRateLimiter = null; + } else if (segrepRateLimiter != null) { + segrepRateLimiter.setMBPerSec(segrepMaxBytesPerSec.getMbFrac()); + } else { + segrepRateLimiter = new SimpleRateLimiter(segrepMaxBytesPerSec.getMbFrac()); + } + } + public int getMaxConcurrentFileChunks() { return maxConcurrentFileChunks; } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java index b52fe66816098..dd6c33ba63728 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java @@ -78,7 +78,12 @@ public void writeFileChunk( // Pause using the rate limiter, if desired, to throttle the recovery final long throttleTimeInNanos; // always fetch the ratelimiter - it might be updated in real-time on the recovery settings - final RateLimiter rl = recoverySettings.rateLimiter(); + final RateLimiter rl; + if (SegmentReplicationTargetService.Actions.FILE_CHUNK.equals(action)) { + rl = recoverySettings.segrepRateLimiter(); + } else { + rl = recoverySettings.rateLimiter(); + } if (rl != null) { long bytes = bytesSinceLastPause.addAndGet(content.length()); if (bytes > rl.getMinPauseCheckBytes()) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 4942d39cfa48a..cc2d09429ed85 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -635,7 +635,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha try (ReplicationRef ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) { final SegmentReplicationTarget target = ref.get(); final ActionListener listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request); - target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener); + target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.segrepRateLimiter(), listener); } } } diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java index 75639661f539d..dc09ecbac79b8 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java @@ -48,6 +48,10 @@ public void testZeroBytesPerSecondIsNoRateLimit() { Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() ); assertEquals(null, recoverySettings.rateLimiter()); + clusterSettings.applySettings( + Settings.builder().put(RecoverySettings.SEGREP_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() + ); + assertEquals(null, recoverySettings.segrepRateLimiter()); } public void testRetryDelayStateSync() {