Skip to content

Commit

Permalink
Inline PlainActionFuture#newFuture (#102060)
Browse files Browse the repository at this point in the history
There's no need for this static method which just calls the constructor,
the caller may as well construct the object directly.
  • Loading branch information
DaveCTurner authored Nov 13, 2023
1 parent 0c18798 commit f017fab
Show file tree
Hide file tree
Showing 104 changed files with 295 additions and 306 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ protected void updateTimestamp(String name, Metadata old) {
}

void updateTaskState() {
PlainActionFuture<PersistentTask<?>> future = PlainActionFuture.newFuture();
PlainActionFuture<PersistentTask<?>> future = new PlainActionFuture<>();
updatePersistentTaskState(state, future);
state = ((GeoIpTaskState) future.actionGet().getState());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected void createRepository(String repoName) {

private void ensureSasTokenPermissions() {
final BlobStoreRepository repository = getRepository();
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
final PlainActionFuture<Void> future = new PlainActionFuture<>();
repository.threadPool().generic().execute(ActionRunnable.wrap(future, l -> {
final AzureBlobStore blobStore = (AzureBlobStore) repository.blobStore();
final AzureBlobServiceClient azureBlobServiceClient = blobStore.getService().client("default", LocationMode.PRIMARY_ONLY);
Expand Down Expand Up @@ -136,7 +136,7 @@ public void testMultiBlockUpload() throws Exception {
final BlobStoreRepository repo = getRepository();
// The configured threshold for this test suite is 1mb
final int blobSize = ByteSizeUnit.MB.toIntBytes(2);
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
PlainActionFuture<Void> future = new PlainActionFuture<>();
repo.threadPool().generic().execute(ActionRunnable.run(future, () -> {
final BlobContainer blobContainer = repo.blobStore().blobContainer(repo.basePath().add("large_write"));
blobContainer.writeBlob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public TimeValue masterNodeTimeout() {
}
};

PlainActionFuture<AcknowledgedResponse> future = PlainActionFuture.newFuture();
PlainActionFuture<AcknowledgedResponse> future = new PlainActionFuture<>();
internalCluster().getAnyMasterNodeInstance(ClusterService.class)
.submitUnbatchedStateUpdateTask("get_mappings_cancellation_test", new AckedClusterStateUpdateTask(ackedRequest, future) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void verifyCancellationDuringQueryPhase(String searchAction, Request searchReque
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();

PlainActionFuture<Response> future = PlainActionFuture.newFuture();
PlainActionFuture<Response> future = new PlainActionFuture<>();
Cancellable cancellable = getRestClient().performRequestAsync(searchRequest, wrapAsRestResponseListener(future));

awaitForBlock(plugins);
Expand Down Expand Up @@ -141,7 +141,7 @@ void verifyCancellationDuringFetchPhase(String searchAction, Request searchReque
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();

PlainActionFuture<Response> future = PlainActionFuture.newFuture();
PlainActionFuture<Response> future = new PlainActionFuture<>();
Cancellable cancellable = getRestClient().performRequestAsync(searchRequest, wrapAsRestResponseListener(future));

awaitForBlock(plugins);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testSimple() throws Exception {
}
// Force merge to ensure that there are more than one numeric value to justify doc value.
client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get();
PlainActionFuture<AnalyzeIndexDiskUsageResponse> future = PlainActionFuture.newFuture();
PlainActionFuture<AnalyzeIndexDiskUsageResponse> future = new PlainActionFuture<>();
client().execute(
AnalyzeIndexDiskUsageAction.INSTANCE,
new AnalyzeIndexDiskUsageRequest(new String[] { index }, AnalyzeIndexDiskUsageRequest.DEFAULT_INDICES_OPTIONS, true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testCancellation() throws Exception {
}

final ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final PlainActionFuture<DiscoveryNode> findHealthNodeFuture = PlainActionFuture.newFuture();
final PlainActionFuture<DiscoveryNode> findHealthNodeFuture = new PlainActionFuture<>();
// the health node might take a bit of time to be assigned by the persistent task framework so we wait until we have a health
// node in the cluster before proceeding with the test
// proceeding with the execution before the health node assignment would yield a non-deterministic behaviour as we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private void runTestAutomaticRefresh(final IntToLongFunction count) throws Inter
client().prepareIndex("test").setId("0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
indexingDone.countDown(); // one doc is indexed above blocking
IndexShard shard = indexService.getShard(0);
PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
PlainActionFuture<Boolean> future = new PlainActionFuture<>();
shard.scheduledRefresh(future);
boolean hasRefreshed = future.actionGet();
if (randomTimeValue == TimeValue.ZERO) {
Expand Down Expand Up @@ -193,7 +193,7 @@ public void testPendingRefreshWithIntervalChange() throws Exception {
}

private static void scheduleRefresh(IndexShard shard, boolean expectRefresh) {
PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
PlainActionFuture<Boolean> future = new PlainActionFuture<>();
shard.scheduledRefresh(future);
assertThat(future.actionGet(), equalTo(expectRefresh));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,15 +290,15 @@ protected boolean masterSupportsFetchingLatestSnapshots() {
}
};

PlainActionFuture<Optional<ShardSnapshot>> latestSnapshots = PlainActionFuture.newFuture();
PlainActionFuture<Optional<ShardSnapshot>> latestSnapshots = new PlainActionFuture<>();
shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, latestSnapshots);
assertThat(latestSnapshots.actionGet().isPresent(), is(equalTo(false)));
}

private Optional<ShardSnapshot> getLatestShardSnapshot(ShardId shardId) throws Exception {
ShardSnapshotsService shardSnapshotsService = getShardSnapshotsService();

PlainActionFuture<Optional<ShardSnapshot>> future = PlainActionFuture.newFuture();
PlainActionFuture<Optional<ShardSnapshot>> future = new PlainActionFuture<>();
shardSnapshotsService.fetchLatestSnapshotsForShard(shardId, future);
return future.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ private PlainActionFuture<GetShardSnapshotResponse> getLatestSnapshotForShardFut
boolean useAllRepositoriesRequest
) {
ShardId shardId = new ShardId(new Index(indexName, "__na__"), shard);
PlainActionFuture<GetShardSnapshotResponse> future = PlainActionFuture.newFuture();
PlainActionFuture<GetShardSnapshotResponse> future = new PlainActionFuture<>();
final GetShardSnapshotRequest request;
if (useAllRepositoriesRequest && randomBoolean()) {
request = GetShardSnapshotRequest.latestSnapshotInAllRepositories(shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private ActionFuture<CleanupRepositoryResponse> startBlockedCleanup(String repoN
final BlobStoreRepository repository = getRepositoryOnMaster(repoName);

logger.info("--> creating a garbage data blob");
final PlainActionFuture<Void> garbageFuture = PlainActionFuture.newFuture();
final PlainActionFuture<Void> garbageFuture = new PlainActionFuture<>();
repository.threadPool()
.generic()
.execute(
Expand Down Expand Up @@ -137,7 +137,7 @@ public void testCleanupOldIndexN() throws ExecutionException, InterruptedExcepti
final BlobStoreRepository repository = getRepositoryOnMaster(repoName);
logger.info("--> write two outdated index-N blobs");
for (int i = 0; i < 2; ++i) {
final PlainActionFuture<Void> createOldIndexNFuture = PlainActionFuture.newFuture();
final PlainActionFuture<Void> createOldIndexNFuture = new PlainActionFuture<>();
final int generation = i;
repository.threadPool()
.generic()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ public void testCancel() throws Exception {
)
);
BlockingOnRewriteQueryBuilder.blockOnRewrite();
PlainActionFuture<Response> future = PlainActionFuture.newFuture();
PlainActionFuture<Response> future = new PlainActionFuture<>();
Request restRequest = new Request("POST", "/_field_caps?fields=*");
restRequest.setEntity(new StringEntity("""
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,7 @@ public void testDeleteIndexDuringSnapshot() throws Exception {
final int concurrentLoops = randomIntBetween(2, 5);
final List<Future<Void>> futures = new ArrayList<>(concurrentLoops);
for (int i = 0; i < concurrentLoops; i++) {
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
final PlainActionFuture<Void> future = new PlainActionFuture<>();
futures.add(future);
startSnapshotDeleteLoop(repoName, indexName, "test-snap-" + i, future);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public PlainActionFuture<BulkResponse> withBackoff(
BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer,
BulkRequest bulkRequest
) {
PlainActionFuture<BulkResponse> future = PlainActionFuture.newFuture();
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
withBackoff(consumer, bulkRequest, future);
return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@

public class PlainActionFuture<T> implements ActionFuture<T>, ActionListener<T> {

public static <T> PlainActionFuture<T> newFuture() {
return new PlainActionFuture<>();
}

@Override
public void onResponse(T result) {
set(result);
Expand Down Expand Up @@ -442,13 +438,13 @@ private static RuntimeException unwrapEsException(ElasticsearchException esEx) {
}

public static <T, E extends Exception> T get(CheckedConsumer<PlainActionFuture<T>, E> e) throws E {
PlainActionFuture<T> fut = newFuture();
PlainActionFuture<T> fut = new PlainActionFuture<>();
e.accept(fut);
return fut.actionGet();
}

public static <T, E extends Exception> T get(CheckedConsumer<PlainActionFuture<T>, E> e, long timeout, TimeUnit unit) throws E {
PlainActionFuture<T> fut = newFuture();
PlainActionFuture<T> fut = new PlainActionFuture<>();
e.accept(fut);
return fut.actionGet(timeout, unit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ public final <Request extends ActionRequest, Response extends ActionResponse> Ac
ActionType<Response> action,
Request request
) {
PlainActionFuture<Response> actionFuture = PlainActionFuture.newFuture();
PlainActionFuture<Response> actionFuture = new PlainActionFuture<>();
execute(action, request, actionFuture);
return actionFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ static <C extends CloseableChannel> void closeChannels(List<C> channels, boolean
if (blocking) {
ArrayList<ActionFuture<Void>> futures = new ArrayList<>(channels.size());
for (final C channel : channels) {
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
PlainActionFuture<Void> closeFuture = new PlainActionFuture<>();
channel.addCloseListener(closeFuture);
futures.add(closeFuture);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
private volatile BoundTransportAddress boundAddress;
private final AtomicLong totalChannelsAccepted = new AtomicLong();
private final Map<HttpChannel, RequestTrackingHttpChannel> httpChannels = new ConcurrentHashMap<>();
private final PlainActionFuture<Void> allClientsClosedListener = PlainActionFuture.newFuture();
private final PlainActionFuture<Void> allClientsClosedListener = new PlainActionFuture<>();
private final RefCounted refCounted = AbstractRefCounted.of(() -> allClientsClosedListener.onResponse(null));
private final Set<HttpServerChannel> httpServerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final long shutdownGracePeriodMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,7 @@ public void externalRefresh(String source, ActionListener<Engine.RefreshResult>
*/
// TODO: Remove or rename for increased clarity
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
PlainActionFuture<FlushResult> future = PlainActionFuture.newFuture();
PlainActionFuture<FlushResult> future = new PlainActionFuture<>();
flush(force, waitIfOngoing, future);
future.actionGet();
}
Expand All @@ -1167,7 +1167,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
* a lucene commit if nothing needs to be committed.
*/
public final void flush() throws EngineException {
PlainActionFuture<FlushResult> future = PlainActionFuture.newFuture();
PlainActionFuture<FlushResult> future = new PlainActionFuture<>();
flush(false, false, future);
future.actionGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2469,7 +2469,7 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En
if (flushFirst) {
logger.trace("start flush for snapshot");
// TODO: Split acquireLastIndexCommit into two apis one with blocking flushes one without
PlainActionFuture<FlushResult> future = PlainActionFuture.newFuture();
PlainActionFuture<FlushResult> future = new PlainActionFuture<>();
flush(false, true, future);
future.actionGet();
logger.trace("finish flush for snapshot");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1404,7 +1404,7 @@ public boolean flush(FlushRequest request) {
verifyNotClosed();
final long time = System.nanoTime();
// TODO: Transition this method to async to support async flush
PlainActionFuture<Engine.FlushResult> future = PlainActionFuture.newFuture();
PlainActionFuture<Engine.FlushResult> future = new PlainActionFuture<>();
getEngine().flush(force, waitIfOngoing, future);
Engine.FlushResult flushResult = future.actionGet();
flushMetric.inc(System.nanoTime() - time);
Expand Down Expand Up @@ -3928,7 +3928,7 @@ public final void ensureShardSearchActive(Consumer<Boolean> listener) {
// a refresh can be a costly operation, so we should fork to a refresh thread to be safe:
threadPool.executor(ThreadPool.Names.REFRESH).execute(() -> {
if (location == pendingRefreshLocation.get()) {
getEngine().maybeRefresh("ensure-shard-search-active", PlainActionFuture.newFuture());
getEngine().maybeRefresh("ensure-shard-search-active", new PlainActionFuture<>());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected boolean shouldRefreshFileState(ClusterState clusterState) {
*/
@Override
protected void processFileChanges() throws ExecutionException, InterruptedException, IOException {
PlainActionFuture<Void> completion = PlainActionFuture.newFuture();
PlainActionFuture<Void> completion = new PlainActionFuture<>();
logger.info("processing path [{}] for [{}]", watchedFile(), NAMESPACE);
try (
var fis = Files.newInputStream(watchedFile());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;

import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -286,7 +285,7 @@ private ActionFuture<NodesResponse> startBlockingTestNodesAction(CountDownLatch
}

private ActionFuture<NodesResponse> startBlockingTestNodesAction(CountDownLatch checkLatch, NodesRequest request) throws Exception {
PlainActionFuture<NodesResponse> future = newFuture();
PlainActionFuture<NodesResponse> future = new PlainActionFuture<>();
startBlockingTestNodesAction(checkLatch, request, future);
return future;
}
Expand Down Expand Up @@ -677,7 +676,7 @@ protected void taskOperation(
TestTasksRequest testTasksRequest = new TestTasksRequest();
testTasksRequest.setActions("internal:testAction[n]"); // pick all test actions
testTasksRequest.setNodes(testNodes[0].getNodeId(), testNodes[1].getNodeId()); // only first two nodes
PlainActionFuture<TestTasksResponse> taskFuture = newFuture();
PlainActionFuture<TestTasksResponse> taskFuture = new PlainActionFuture<>();
CancellableTask task = (CancellableTask) testNodes[0].transportService.getTaskManager()
.registerAndExecute(
"direct",
Expand All @@ -690,7 +689,7 @@ protected void taskOperation(
taskExecutesLatch.await();
logger.info("All test tasks are now executing");

PlainActionFuture<Void> cancellationFuture = newFuture();
PlainActionFuture<Void> cancellationFuture = new PlainActionFuture<>();
logger.info("Cancelling tasks");

testNodes[0].transportService.getTaskManager().cancelTaskAndDescendants(task, "test case", false, cancellationFuture);
Expand Down Expand Up @@ -734,7 +733,7 @@ protected void taskOperation(

TestTasksRequest testTasksRequest = new TestTasksRequest();
testTasksRequest.setNodes(testNodes[0].getNodeId()); // only local node
PlainActionFuture<TestTasksResponse> taskFuture = newFuture();
PlainActionFuture<TestTasksResponse> taskFuture = new PlainActionFuture<>();
CancellableTask task = (CancellableTask) testNodes[0].transportService.getTaskManager()
.registerAndExecute(
"direct",
Expand Down Expand Up @@ -808,7 +807,7 @@ protected void taskOperation(

TestTasksRequest testTasksRequest = new TestTasksRequest();
testTasksRequest.setActions("internal:testTasksAction[n]");
PlainActionFuture<TestTasksResponse> taskFuture = newFuture();
PlainActionFuture<TestTasksResponse> taskFuture = new PlainActionFuture<>();
CancellableTask task = (CancellableTask) testNodes[0].transportService.getTaskManager()
.registerAndExecute(
"direct",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private void executeOnPrimaryOrReplica(boolean phase1) throws Throwable {
phase1,
taskId
);
final PlainActionFuture<Void> res = PlainActionFuture.newFuture();
final PlainActionFuture<Void> res = new PlainActionFuture<>();
action.shardOperationOnPrimary(request, indexShard, res.delegateFailureAndWrap((l, r) -> {
assertNotNull(r);
l.onResponse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private BulkRequest createBulkRequest() {
public void testRetryBacksOff() throws Exception {
BulkRequest bulkRequest = createBulkRequest();
Retry2 retry2 = new Retry2(CALLS_TO_FAIL);
PlainActionFuture<BulkResponse> future = PlainActionFuture.newFuture();
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
retry2.consumeRequestWithRetries(bulkClient::bulk, bulkRequest, future);
BulkResponse response = future.actionGet();

Expand All @@ -93,7 +93,7 @@ public void testRetryFailsAfterRetry() throws Exception {
BulkRequest bulkRequest = createBulkRequest();
try {
Retry2 retry2 = new Retry2(CALLS_TO_FAIL - 1);
PlainActionFuture<BulkResponse> future = PlainActionFuture.newFuture();
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
retry2.consumeRequestWithRetries(bulkClient::bulk, bulkRequest, future);
BulkResponse response = future.actionGet();
/*
Expand Down
Loading

0 comments on commit f017fab

Please sign in to comment.