Skip to content

Commit

Permalink
Merge branch 'main' into reproducible-builds
Browse files Browse the repository at this point in the history
  • Loading branch information
dblock committed Oct 14, 2022
2 parents 20f3dfa + d708860 commit 885ff84
Show file tree
Hide file tree
Showing 28 changed files with 1,895 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add a new node role 'search' which is dedicated to provide search capability ([#4689](https://github.com/opensearch-project/OpenSearch/pull/4689))
- Introduce experimental searchable snapshot API ([#4680](https://github.com/opensearch-project/OpenSearch/pull/4680))
- Recommissioning of zone. REST layer support. ([#4624](https://github.com/opensearch-project/OpenSearch/pull/4604))
- Added in-flight cancellation of SearchShardTask based on resource consumption ([#4565](https://github.com/opensearch-project/OpenSearch/pull/4565))
- Apply reproducible builds configuration for OpenSearch plugins through gradle plugin ([#4746](https://github.com/opensearch-project/OpenSearch/pull/4746))

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
- Bumps `reactor-netty-http` from 1.0.18 to 1.0.23
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
import org.opensearch.script.ScriptMetadata;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.TaskResultsService;

import java.util.ArrayList;
Expand Down Expand Up @@ -420,7 +419,6 @@ protected void configure() {
bind(NodeMappingRefreshAction.class).asEagerSingleton();
bind(MappingUpdatedAction.class).asEagerSingleton();
bind(TaskResultsService.class).asEagerSingleton();
bind(TaskResourceTrackingService.class).asEagerSingleton();
bind(AllocationDeciders.class).toInstance(allocationDeciders);
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.watcher.ResourceWatcherService;
Expand Down Expand Up @@ -581,7 +584,22 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardIndexingPressureMemoryManager.MAX_OUTSTANDING_REQUESTS,
IndexingPressure.MAX_INDEXING_BYTES,
TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED,
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED
TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED,

// Settings related to search backpressure
SearchBackpressureSettings.SETTING_ENABLED,
SearchBackpressureSettings.SETTING_ENFORCED,
SearchBackpressureSettings.SETTING_CANCELLATION_RATIO,
SearchBackpressureSettings.SETTING_CANCELLATION_RATE,
SearchBackpressureSettings.SETTING_CANCELLATION_BURST,
NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES,
NodeDuressSettings.SETTING_CPU_THRESHOLD,
NodeDuressSettings.SETTING_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_TOTAL_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD,
SearchShardTaskSettings.SETTING_CPU_TIME_THRESHOLD,
SearchShardTaskSettings.SETTING_ELAPSED_TIME_THRESHOLD
)
)
);
Expand Down
57 changes: 57 additions & 0 deletions server/src/main/java/org/opensearch/common/util/MovingAverage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

/**
* MovingAverage is used to calculate the moving average of last 'n' observations.
*
* @opensearch.internal
*/
public class MovingAverage {
private final int windowSize;
private final long[] observations;

private long count = 0;
private long sum = 0;
private double average = 0;

public MovingAverage(int windowSize) {
if (windowSize <= 0) {
throw new IllegalArgumentException("window size must be greater than zero");
}

this.windowSize = windowSize;
this.observations = new long[windowSize];
}

/**
* Records a new observation and evicts the n-th last observation.
*/
public synchronized double record(long value) {
long delta = value - observations[(int) (count % observations.length)];
observations[(int) (count % observations.length)] = value;

count++;
sum += delta;
average = (double) sum / Math.min(count, observations.length);
return average;
}

public double getAverage() {
return average;
}

public long getCount() {
return count;
}

public boolean isReady() {
return count >= windowSize;
}
}
33 changes: 33 additions & 0 deletions server/src/main/java/org/opensearch/common/util/Streak.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import java.util.concurrent.atomic.AtomicInteger;

/**
* Streak is a data structure that keeps track of the number of successive successful events.
*
* @opensearch.internal
*/
public class Streak {
private final AtomicInteger successiveSuccessfulEvents = new AtomicInteger();

public int record(boolean isSuccessful) {
if (isSuccessful) {
return successiveSuccessfulEvents.incrementAndGet();
} else {
successiveSuccessfulEvents.set(0);
return 0;
}
}

public int length() {
return successiveSuccessfulEvents.get();
}
}
124 changes: 124 additions & 0 deletions server/src/main/java/org/opensearch/common/util/TokenBucket.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.util;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;

/**
* TokenBucket is used to limit the number of operations at a constant rate while allowing for short bursts.
*
* @opensearch.internal
*/
public class TokenBucket {
/**
* Defines a monotonically increasing counter.
*
* Usage examples:
* 1. clock = System::nanoTime can be used to perform rate-limiting per unit time
* 2. clock = AtomicLong::get can be used to perform rate-limiting per unit number of operations
*/
private final LongSupplier clock;

/**
* Defines the number of tokens added to the bucket per clock cycle.
*/
private final double rate;

/**
* Defines the capacity and the maximum number of operations that can be performed per clock cycle before
* the bucket runs out of tokens.
*/
private final double burst;

/**
* Defines the current state of the token bucket.
*/
private final AtomicReference<State> state;

public TokenBucket(LongSupplier clock, double rate, double burst) {
this(clock, rate, burst, burst);
}

public TokenBucket(LongSupplier clock, double rate, double burst, double initialTokens) {
if (rate <= 0.0) {
throw new IllegalArgumentException("rate must be greater than zero");
}

if (burst <= 0.0) {
throw new IllegalArgumentException("burst must be greater than zero");
}

this.clock = clock;
this.rate = rate;
this.burst = burst;
this.state = new AtomicReference<>(new State(Math.min(initialTokens, burst), clock.getAsLong()));
}

/**
* If there are enough tokens in the bucket, it requests/deducts 'n' tokens and returns true.
* Otherwise, returns false and leaves the bucket untouched.
*/
public boolean request(double n) {
if (n <= 0) {
throw new IllegalArgumentException("requested tokens must be greater than zero");
}

// Refill tokens
State currentState, updatedState;
do {
currentState = state.get();
long now = clock.getAsLong();
double incr = (now - currentState.lastRefilledAt) * rate;
updatedState = new State(Math.min(currentState.tokens + incr, burst), now);
} while (state.compareAndSet(currentState, updatedState) == false);

// Deduct tokens
do {
currentState = state.get();
if (currentState.tokens < n) {
return false;
}
updatedState = new State(currentState.tokens - n, currentState.lastRefilledAt);
} while (state.compareAndSet(currentState, updatedState) == false);

return true;
}

public boolean request() {
return request(1.0);
}

/**
* Represents an immutable token bucket state.
*/
private static class State {
final double tokens;
final double lastRefilledAt;

public State(double tokens, double lastRefilledAt) {
this.tokens = tokens;
this.lastRefilledAt = lastRefilledAt;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
State state = (State) o;
return Double.compare(state.tokens, tokens) == 0 && Double.compare(state.lastRefilledAt, lastRefilledAt) == 0;
}

@Override
public int hashCode() {
return Objects.hash(tokens, lastRefilledAt);
}
}
}
27 changes: 26 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.opensearch.indices.replication.SegmentReplicationSourceFactory;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory;
Expand Down Expand Up @@ -796,6 +798,23 @@ protected Node(
// development. Then we can deprecate Getter and Setter for IndexingPressureService in ClusterService (#478).
clusterService.setIndexingPressureService(indexingPressureService);

final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService(
settings,
clusterService.getClusterSettings(),
threadPool
);

final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings(
settings,
clusterService.getClusterSettings()
);

final SearchBackpressureService searchBackpressureService = new SearchBackpressureService(
searchBackpressureSettings,
taskResourceTrackingService,
threadPool
);

final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
RepositoriesModule repositoriesModule = new RepositoriesModule(
this.environment,
Expand Down Expand Up @@ -883,7 +902,8 @@ protected Node(
responseCollectorService,
searchTransportService,
indexingPressureService,
searchModule.getValuesSourceRegistry().getUsageService()
searchModule.getValuesSourceRegistry().getUsageService(),
searchBackpressureService
);

final SearchService searchService = newSearchService(
Expand Down Expand Up @@ -941,6 +961,8 @@ protected Node(
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
Expand Down Expand Up @@ -1104,6 +1126,7 @@ public Node start() throws NodeValidationException {
injector.getInstance(SearchService.class).start();
injector.getInstance(FsHealthService.class).start();
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);

Expand Down Expand Up @@ -1259,6 +1282,7 @@ private Node stop() {
injector.getInstance(NodeConnectionsService.class).stop();
injector.getInstance(FsHealthService.class).stop();
nodeService.getMonitorService().stop();
nodeService.getSearchBackpressureService().stop();
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
Expand Down Expand Up @@ -1318,6 +1342,7 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(Discovery.class));
toClose.add(() -> stopWatch.stop().start("monitor"));
toClose.add(nodeService.getMonitorService());
toClose.add(nodeService.getSearchBackpressureService());
toClose.add(() -> stopWatch.stop().start("fsHealth"));
toClose.add(injector.getInstance(FsHealthService.class));
toClose.add(() -> stopWatch.stop().start("gateway"));
Expand Down
10 changes: 9 additions & 1 deletion server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.plugins.PluginsService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.AggregationUsageService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -81,6 +82,7 @@ public class NodeService implements Closeable {
private final SearchTransportService searchTransportService;
private final IndexingPressureService indexingPressureService;
private final AggregationUsageService aggregationUsageService;
private final SearchBackpressureService searchBackpressureService;

private final Discovery discovery;

Expand All @@ -101,7 +103,8 @@ public class NodeService implements Closeable {
ResponseCollectorService responseCollectorService,
SearchTransportService searchTransportService,
IndexingPressureService indexingPressureService,
AggregationUsageService aggregationUsageService
AggregationUsageService aggregationUsageService,
SearchBackpressureService searchBackpressureService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -119,6 +122,7 @@ public class NodeService implements Closeable {
this.searchTransportService = searchTransportService;
this.indexingPressureService = indexingPressureService;
this.aggregationUsageService = aggregationUsageService;
this.searchBackpressureService = searchBackpressureService;
clusterService.addStateApplier(ingestService);
}

Expand Down Expand Up @@ -203,6 +207,10 @@ public MonitorService getMonitorService() {
return monitorService;
}

public SearchBackpressureService getSearchBackpressureService() {
return searchBackpressureService;
}

@Override
public void close() throws IOException {
IOUtils.close(indicesService);
Expand Down
Loading

0 comments on commit 885ff84

Please sign in to comment.