Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused ThreadPool.Names#SAME #107249

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.telemetry.metric.Instrument;
Expand Down Expand Up @@ -64,7 +65,6 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
private static final Logger logger = LogManager.getLogger(ThreadPool.class);

public static class Names {
public static final String SAME = "same";
public static final String GENERIC = "generic";
public static final String CLUSTER_COORDINATION = "cluster_coordination";
public static final String GET = "get";
Expand Down Expand Up @@ -99,9 +99,13 @@ public static class Names {
public static final String THREAD_POOL_METRIC_NAME_REJECTED = ".threads.rejected.total";

public enum ThreadPoolType {
@Deprecated(forRemoval = true)
@UpdateForV9 // no longer used, remove in v9
DIRECT("direct"),
FIXED("fixed"),
FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"), // TODO: remove in 9.0
@Deprecated(forRemoval = true)
@UpdateForV9 // no longer used, remove in v9
FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"),
SCALING("scaling");

private final String type;
Expand All @@ -127,7 +131,6 @@ public static ThreadPoolType fromType(String type) {
}

public static final Map<String, ThreadPoolType> THREAD_POOL_TYPES = Map.ofEntries(
entry(Names.SAME, ThreadPoolType.DIRECT),
entry(Names.GENERIC, ThreadPoolType.SCALING),
entry(Names.GET, ThreadPoolType.FIXED),
entry(Names.ANALYZE, ThreadPoolType.FIXED),
Expand Down Expand Up @@ -335,16 +338,10 @@ public ThreadPool(final Settings settings, MeterRegistry meterRegistry, final Ex
executors.put(entry.getKey(), executorHolder);
}

executors.put(Names.SAME, new ExecutorHolder(EsExecutors.DIRECT_EXECUTOR_SERVICE, new Info(Names.SAME, ThreadPoolType.DIRECT)));
this.executors = Map.copyOf(executors);
this.executors.forEach((k, v) -> instruments.put(k, setupMetrics(meterRegistry, k, v)));
this.instruments = instruments;
final List<Info> infos = executors.values()
.stream()
.filter(holder -> holder.info.getName().equals("same") == false)
.map(holder -> holder.info)
.toList();
this.threadPoolInfo = new ThreadPoolInfo(infos);
this.threadPoolInfo = new ThreadPoolInfo(executors.values().stream().map(holder -> holder.info).toList());
this.scheduler = Scheduler.initScheduler(settings, "scheduler");
this.slowSchedulerWarnThresholdNanos = SLOW_SCHEDULER_TASK_WARN_THRESHOLD_SETTING.get(settings).nanos();
this.cachedTimeThread = new CachedTimeThread(
Expand Down Expand Up @@ -481,10 +478,6 @@ public ThreadPoolStats stats() {
List<ThreadPoolStats.Stats> stats = new ArrayList<>();
for (ExecutorHolder holder : executors.values()) {
final String name = holder.info.getName();
// no need to have info on "same" thread pool
if ("same".equals(name)) {
continue;
}
int threads = -1;
int queue = -1;
int active = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ protected final ThreadPool.Info info(final ThreadPool threadPool, final String n
return info;
}
}
assert "same".equals(name);
return null;
return fail(null, "unknown threadpool name: " + name);
}

protected final ThreadPoolStats.Stats stats(final ThreadPool threadPool, final String name) {
Expand All @@ -30,10 +29,10 @@ protected final ThreadPoolStats.Stats stats(final ThreadPool threadPool, final S
return stats;
}
}
throw new IllegalArgumentException(name);
return fail(null, "unknown threadpool name: " + name);
}

protected final void terminateThreadPoolIfNeeded(final ThreadPool threadPool) throws InterruptedException {
protected final void terminateThreadPoolIfNeeded(final ThreadPool threadPool) {
if (threadPool != null) {
terminate(threadPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {

public void testCorrectThreadPoolTypePermittedInSettings() throws InterruptedException {
public void testCorrectThreadPoolTypePermittedInSettings() {
String threadPoolName = randomThreadPoolName();
ThreadPool.ThreadPoolType correctThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName);
ThreadPool threadPool = null;
Expand All @@ -41,13 +41,7 @@ public void testCorrectThreadPoolTypePermittedInSettings() throws InterruptedExc
.build(),
MeterRegistry.NOOP
);
ThreadPool.Info info = info(threadPool, threadPoolName);
if (ThreadPool.Names.SAME.equals(threadPoolName)) {
assertNull(info); // we don't report on the "same" thread pool
} else {
// otherwise check we have the expected type
assertEquals(info.getThreadPoolType(), correctThreadPoolType);
}
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), correctThreadPoolType);
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public ExecutorService generic() {

@Override
public ExecutorService executor(String name) {
return Names.SAME.equals(name) ? EsExecutors.DIRECT_EXECUTOR_SERVICE : forkingExecutor;
return forkingExecutor;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,20 +443,4 @@ public void testThreadPoolSchedulesPeriodicFutureTasks() {
assertThat(strings, contains("periodic-0", "periodic-1", "periodic-2"));
}

public void testSameExecutor() {
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
final ThreadPool threadPool = taskQueue.getThreadPool();
final AtomicBoolean executed = new AtomicBoolean(false);
final AtomicBoolean executedNested = new AtomicBoolean(false);
threadPool.generic().execute(() -> {
final var executor = threadPool.executor(ThreadPool.Names.SAME);
assertSame(EsExecutors.DIRECT_EXECUTOR_SERVICE, executor);
executor.execute(() -> assertTrue(executedNested.compareAndSet(false, true)));
assertThat(executedNested.get(), is(true));
assertTrue(executed.compareAndSet(false, true));
});
taskQueue.runAllRunnableTasks();
assertThat(executed.get(), is(true));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -35,7 +34,6 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MlDailyMaintenanceServiceIT extends MlNativeAutodetectIntegTestCase {

Expand All @@ -46,7 +44,6 @@ public class MlDailyMaintenanceServiceIT extends MlNativeAutodetectIntegTestCase
public void setUpMocks() {
jobConfigProvider = new JobConfigProvider(client(), xContentRegistry());
threadPool = mock(ThreadPool.class);
when(threadPool.executor(ThreadPool.Names.SAME)).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
}

public void testTriggerDeleteJobsInStateDeletingWithoutDeletionTask() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,11 @@

public class MlInitializationServiceIT extends MlNativeAutodetectIntegTestCase {

private ThreadPool threadPool;
private MlInitializationService mlInitializationService;

@Before
public void setUpMocks() {
threadPool = mock(ThreadPool.class);
when(threadPool.executor(ThreadPool.Names.SAME)).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
final var threadPool = mock(ThreadPool.class);
when(threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
MlDailyMaintenanceService mlDailyMaintenanceService = mock(MlDailyMaintenanceService.class);
ClusterService clusterService = mock(ClusterService.class);
Expand Down