Skip to content

Commit

Permalink
Remove unused ThreadPool.Names#SAME (#107249)
Browse files Browse the repository at this point in the history
`SAME` is a distinguished threadpool name that callers could use to
obtain a special `ExecutorService` that runs tasks immediately, directly
on the calling thread. In fact there are no callers that use this name
any more, so we can remove it and all the associated special handling.

Relates #106279
  • Loading branch information
DaveCTurner authored Apr 9, 2024
1 parent 2588c72 commit bdc9873
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 49 deletions.
21 changes: 7 additions & 14 deletions server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.telemetry.metric.Instrument;
Expand Down Expand Up @@ -64,7 +65,6 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
private static final Logger logger = LogManager.getLogger(ThreadPool.class);

public static class Names {
public static final String SAME = "same";
public static final String GENERIC = "generic";
public static final String CLUSTER_COORDINATION = "cluster_coordination";
public static final String GET = "get";
Expand Down Expand Up @@ -99,9 +99,13 @@ public static class Names {
public static final String THREAD_POOL_METRIC_NAME_REJECTED = ".threads.rejected.total";

public enum ThreadPoolType {
@Deprecated(forRemoval = true)
@UpdateForV9 // no longer used, remove in v9
DIRECT("direct"),
FIXED("fixed"),
FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"), // TODO: remove in 9.0
@Deprecated(forRemoval = true)
@UpdateForV9 // no longer used, remove in v9
FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"),
SCALING("scaling");

private final String type;
Expand All @@ -127,7 +131,6 @@ public static ThreadPoolType fromType(String type) {
}

public static final Map<String, ThreadPoolType> THREAD_POOL_TYPES = Map.ofEntries(
entry(Names.SAME, ThreadPoolType.DIRECT),
entry(Names.GENERIC, ThreadPoolType.SCALING),
entry(Names.GET, ThreadPoolType.FIXED),
entry(Names.ANALYZE, ThreadPoolType.FIXED),
Expand Down Expand Up @@ -335,16 +338,10 @@ public ThreadPool(final Settings settings, MeterRegistry meterRegistry, final Ex
executors.put(entry.getKey(), executorHolder);
}

executors.put(Names.SAME, new ExecutorHolder(EsExecutors.DIRECT_EXECUTOR_SERVICE, new Info(Names.SAME, ThreadPoolType.DIRECT)));
this.executors = Map.copyOf(executors);
this.executors.forEach((k, v) -> instruments.put(k, setupMetrics(meterRegistry, k, v)));
this.instruments = instruments;
final List<Info> infos = executors.values()
.stream()
.filter(holder -> holder.info.getName().equals("same") == false)
.map(holder -> holder.info)
.toList();
this.threadPoolInfo = new ThreadPoolInfo(infos);
this.threadPoolInfo = new ThreadPoolInfo(executors.values().stream().map(holder -> holder.info).toList());
this.scheduler = Scheduler.initScheduler(settings, "scheduler");
this.slowSchedulerWarnThresholdNanos = SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING.get(settings).nanos();
this.cachedTimeThread = new CachedTimeThread(
Expand Down Expand Up @@ -481,10 +478,6 @@ public ThreadPoolStats stats() {
List<ThreadPoolStats.Stats> stats = new ArrayList<>();
for (ExecutorHolder holder : executors.values()) {
final String name = holder.info.getName();
// no need to have info on "same" thread pool
if ("same".equals(name)) {
continue;
}
int threads = -1;
int queue = -1;
int active = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ protected final ThreadPool.Info info(final ThreadPool threadPool, final String n
return info;
}
}
assert "same".equals(name);
return null;
return fail(null, "unknown threadpool name: " + name);
}

protected final ThreadPoolStats.Stats stats(final ThreadPool threadPool, final String name) {
Expand All @@ -30,10 +29,10 @@ protected final ThreadPoolStats.Stats stats(final ThreadPool threadPool, final S
return stats;
}
}
throw new IllegalArgumentException(name);
return fail(null, "unknown threadpool name: " + name);
}

protected final void terminateThreadPoolIfNeeded(final ThreadPool threadPool) throws InterruptedException {
protected final void terminateThreadPoolIfNeeded(final ThreadPool threadPool) {
if (threadPool != null) {
terminate(threadPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {

public void testCorrectThreadPoolTypePermittedInSettings() throws InterruptedException {
public void testCorrectThreadPoolTypePermittedInSettings() {
String threadPoolName = randomThreadPoolName();
ThreadPool.ThreadPoolType correctThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName);
ThreadPool threadPool = null;
Expand All @@ -41,13 +41,7 @@ public void testCorrectThreadPoolTypePermittedInSettings() throws InterruptedExc
.build(),
MeterRegistry.NOOP
);
ThreadPool.Info info = info(threadPool, threadPoolName);
if (ThreadPool.Names.SAME.equals(threadPoolName)) {
assertNull(info); // we don't report on the "same" thread pool
} else {
// otherwise check we have the expected type
assertEquals(info.getThreadPoolType(), correctThreadPoolType);
}
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), correctThreadPoolType);
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public ExecutorService generic() {

@Override
public ExecutorService executor(String name) {
return Names.SAME.equals(name) ? EsExecutors.DIRECT_EXECUTOR_SERVICE : forkingExecutor;
return forkingExecutor;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,20 +443,4 @@ public void testThreadPoolSchedulesPeriodicFutureTasks() {
assertThat(strings, contains("periodic-0", "periodic-1", "periodic-2"));
}

public void testSameExecutor() {
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
final ThreadPool threadPool = taskQueue.getThreadPool();
final AtomicBoolean executed = new AtomicBoolean(false);
final AtomicBoolean executedNested = new AtomicBoolean(false);
threadPool.generic().execute(() -> {
final var executor = threadPool.executor(ThreadPool.Names.SAME);
assertSame(EsExecutors.DIRECT_EXECUTOR_SERVICE, executor);
executor.execute(() -> assertTrue(executedNested.compareAndSet(false, true)));
assertThat(executedNested.get(), is(true));
assertTrue(executed.compareAndSet(false, true));
});
taskQueue.runAllRunnableTasks();
assertThat(executed.get(), is(true));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -35,7 +34,6 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MlDailyMaintenanceServiceIT extends MlNativeAutodetectIntegTestCase {

Expand All @@ -46,7 +44,6 @@ public class MlDailyMaintenanceServiceIT extends MlNativeAutodetectIntegTestCase
public void setUpMocks() {
jobConfigProvider = new JobConfigProvider(client(), xContentRegistry());
threadPool = mock(ThreadPool.class);
when(threadPool.executor(ThreadPool.Names.SAME)).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
}

public void testTriggerDeleteJobsInStateDeletingWithoutDeletionTask() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,11 @@

public class MlInitializationServiceIT extends MlNativeAutodetectIntegTestCase {

private ThreadPool threadPool;
private MlInitializationService mlInitializationService;

@Before
public void setUpMocks() {
threadPool = mock(ThreadPool.class);
when(threadPool.executor(ThreadPool.Names.SAME)).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
final var threadPool = mock(ThreadPool.class);
when(threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
MlDailyMaintenanceService mlDailyMaintenanceService = mock(MlDailyMaintenanceService.class);
ClusterService clusterService = mock(ClusterService.class);
Expand Down

0 comments on commit bdc9873

Please sign in to comment.