Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove cluster state initial customs #32501

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,23 +116,6 @@ public ClusterModule(Settings settings, ClusterService clusterService, List<Clus
this.allocationService = new AllocationService(settings, allocationDeciders, shardsAllocator, clusterInfoService);
}

public static Map<String, Supplier<ClusterState.Custom>> getClusterStateCustomSuppliers(List<ClusterPlugin> clusterPlugins) {
final Map<String, Supplier<ClusterState.Custom>> customSupplier = new HashMap<>();
customSupplier.put(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new);
customSupplier.put(RestoreInProgress.TYPE, RestoreInProgress::new);
customSupplier.put(SnapshotsInProgress.TYPE, SnapshotsInProgress::new);
for (ClusterPlugin plugin : clusterPlugins) {
Map<String, Supplier<ClusterState.Custom>> initialCustomSupplier = plugin.getInitialClusterStateCustomSupplier();
for (String key : initialCustomSupplier.keySet()) {
if (customSupplier.containsKey(key)) {
throw new IllegalStateException("custom supplier key [" + key + "] is registered more than once");
}
}
customSupplier.putAll(initialCustomSupplier);
}
return Collections.unmodifiableMap(customSupplier);
}

public static List<Entry> getNamedWriteables() {
List<Entry> entries = new ArrayList<>();
// Cluster State
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ public interface ClusterApplier {
*/
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener);

/**
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
*/
ClusterState.Builder newClusterStateBuilder();

/**
* Listener for results of cluster state application
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,14 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
private final AtomicReference<ClusterState> state; // last applied state

private NodeConnectionsService nodeConnectionsService;
private Supplier<ClusterState.Builder> stateBuilderSupplier;

public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, Supplier<ClusterState
.Builder> stateBuilderSupplier) {
public ClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings);
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
this.stateBuilderSupplier = stateBuilderSupplier;
}

public void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down Expand Up @@ -652,8 +649,4 @@ protected long currentTimeInNanos() {
return System.nanoTime();
}

@Override
public ClusterState.Builder newClusterStateBuilder() {
return stateBuilderSupplier.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@

import java.util.Collections;
import java.util.Map;
import java.util.function.Supplier;

public class ClusterService extends AbstractLifecycleComponent {

Expand All @@ -59,30 +58,16 @@ public class ClusterService extends AbstractLifecycleComponent {
private final OperationRouting operationRouting;

private final ClusterSettings clusterSettings;
private final Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool,
Map<String, Supplier<ClusterState.Custom>> initialClusterStateCustoms) {
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings);
this.masterService = new MasterService(settings, threadPool);
this.operationRouting = new OperationRouting(settings, clusterSettings);
this.clusterSettings = clusterSettings;
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.clusterSettings.addSettingsUpdateConsumer(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
this::setSlowTaskLoggingThreshold);
this.initialClusterStateCustoms = initialClusterStateCustoms;
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool, this::newClusterStateBuilder);
}

/**
* Creates a new cluster state builder that is initialized with the cluster name and all initial cluster state customs.
*/
public ClusterState.Builder newClusterStateBuilder() {
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
for (Map.Entry<String, Supplier<ClusterState.Custom>> entry : initialClusterStateCustoms.entrySet()) {
builder.putCustom(entry.getKey(), entry.getValue().get());
}
return builder;
this.clusterApplierService = new ClusterApplierService(settings, clusterSettings, threadPool);
}

private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -113,7 +114,7 @@ protected synchronized void doStart() {
}

protected ClusterState createInitialState(DiscoveryNode localNode) {
ClusterState.Builder builder = clusterApplier.newClusterStateBuilder();
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
return builder.nodes(DiscoveryNodes.builder().add(localNode)
.localNodeId(localNode.getId())
.masterNodeId(localNode.getId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ protected void doStart() {
// set initial state
assert committedState.get() == null;
assert localNode != null;
ClusterState.Builder builder = clusterApplier.newClusterStateBuilder();
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
ClusterState initialState = builder
.blocks(ClusterBlocks.builder()
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/elasticsearch/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
Expand Down Expand Up @@ -147,7 +148,7 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t
metaDataBuilder.transientSettings(),
e -> logUnknownSetting("transient", e),
(e, ex) -> logInvalidSetting("transient", e, ex)));
ClusterState.Builder builder = clusterService.newClusterStateBuilder();
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
builder.metaData(metaDataBuilder);
listener.onSuccess(builder.build());
}
Expand Down
3 changes: 1 addition & 2 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,7 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));

List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,
ClusterModule.getClusterStateCustomSuppliers(clusterPlugins));
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.addStateApplier(scriptModule.getScriptService());
resourcesToClose.add(clusterService);
final IngestService ingestService = new IngestService(settings, threadPool, this.environment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Map;
import java.util.function.Supplier;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -66,12 +65,4 @@ default Map<String, Supplier<ShardsAllocator>> getShardsAllocators(Settings sett
default void onNodeStarted() {
}

/**
* Returns a map of {@link ClusterState.Custom} supplier that should be invoked to initialize the initial clusterstate.
* This allows custom clusterstate extensions to be always present and prevents invariants where clusterstates are published
* but customs are not initialized.
*
* TODO: Remove this whole concept of InitialClusterStateCustomSupplier, it's not used anymore
*/
default Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() { return Collections.emptyMap(); }
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
public class ClusterModuleTests extends ModuleTestCase {
private ClusterInfoService clusterInfoService = EmptyClusterInfoService.INSTANCE;
private ClusterService clusterService = new ClusterService(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.emptyMap());
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null);
static class FakeAllocationDecider extends AllocationDecider {
protected FakeAllocationDecider(Settings settings) {
super(settings);
Expand Down Expand Up @@ -202,57 +202,6 @@ public void testAllocationDeciderOrder() {
}
}

public void testCustomSuppliers() {
Map<String, Supplier<ClusterState.Custom>> customSuppliers = ClusterModule.getClusterStateCustomSuppliers(Collections.emptyList());
assertEquals(3, customSuppliers.size());
assertTrue(customSuppliers.containsKey(SnapshotsInProgress.TYPE));
assertTrue(customSuppliers.containsKey(SnapshotDeletionsInProgress.TYPE));
assertTrue(customSuppliers.containsKey(RestoreInProgress.TYPE));

customSuppliers = ClusterModule.getClusterStateCustomSuppliers(Collections.singletonList(new ClusterPlugin() {
@Override
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
return Collections.singletonMap("foo", () -> null);
}
}));
assertEquals(4, customSuppliers.size());
assertTrue(customSuppliers.containsKey(SnapshotsInProgress.TYPE));
assertTrue(customSuppliers.containsKey(SnapshotDeletionsInProgress.TYPE));
assertTrue(customSuppliers.containsKey(RestoreInProgress.TYPE));
assertTrue(customSuppliers.containsKey("foo"));

{
// Eclipse Neon 2 didn't compile the plugins definition inside the lambda expression,
// probably due to https://bugs.eclipse.org/bugs/show_bug.cgi?id=511750, which is
// fixed in Eclipse Oxygon. Pulled out the plugins definition to make it work in older versions
List<ClusterPlugin> plugins = Collections.singletonList(new ClusterPlugin() {
@Override
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
return Collections.singletonMap(SnapshotsInProgress.TYPE, () -> null);
}
});
IllegalStateException ise = expectThrows(IllegalStateException.class,
() -> ClusterModule.getClusterStateCustomSuppliers(plugins));
assertEquals(ise.getMessage(), "custom supplier key [snapshots] is registered more than once");
}
{
List<ClusterPlugin> plugins = Arrays.asList(new ClusterPlugin() {
@Override
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
return Collections.singletonMap("foo", () -> null);
}
}, new ClusterPlugin() {
@Override
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
return Collections.singletonMap("foo", () -> null);
}
});
IllegalStateException ise = expectThrows(IllegalStateException.class,
() -> ClusterModule.getClusterStateCustomSuppliers(plugins));
assertEquals(ise.getMessage(), "custom supplier key [foo] is registered more than once");
}
}

public void testPre63CustomsFiltering() {
final String whiteListedClusterCustom = randomFrom(ClusterModule.PRE_6_3_CLUSTER_CUSTOMS_WHITE_LIST);
final String whiteListedMetaDataCustom = randomFrom(ClusterModule.PRE_6_3_METADATA_CUSTOMS_WHITE_LIST);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand All @@ -38,24 +39,30 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.hamcrest.CollectionAssertions;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertIndexTemplateExists;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -279,13 +286,11 @@ public void testIndicesIgnoreUnavailableFalse() throws Exception {
}
}

public void testPrivateCustomsAreExcluded() {
public void testPrivateCustomsAreExcluded() throws Exception {
// ensure that the custom is injected into the cluster state
assertBusy(() -> assertTrue(clusterService().state().customs().containsKey("test")));
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().setCustoms(true).get();
assertFalse(clusterStateResponse.getState().customs().containsKey("test"));
// just to make sure there is something
assertTrue(clusterStateResponse.getState().customs().containsKey(SnapshotDeletionsInProgress.TYPE));
ClusterState state = internalCluster().getInstance(ClusterService.class).state();
assertTrue(state.customs().containsKey("test"));
}

private static class TestCustom extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
Expand Down Expand Up @@ -333,17 +338,61 @@ public static class PrivateCustomPlugin extends Plugin implements ClusterPlugin

public PrivateCustomPlugin() {}

@Override
public Map<String, Supplier<ClusterState.Custom>> getInitialClusterStateCustomSupplier() {
return Collections.singletonMap("test", () -> new TestCustom(1));
}

@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, "test", TestCustom::new));
entries.add(new NamedWriteableRegistry.Entry(NamedDiff.class, "test", TestCustom::readDiffFrom));
return entries;
}

private final AtomicBoolean installed = new AtomicBoolean();

@Override
public Collection<Object> createComponents(
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final ResourceWatcherService resourceWatcherService,
final ScriptService scriptService,
final NamedXContentRegistry xContentRegistry,
final Environment environment,
final NodeEnvironment nodeEnvironment,
final NamedWriteableRegistry namedWriteableRegistry) {
clusterService.addListener(event -> {
final ClusterState state = event.state();
if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
return;
}

if (state.nodes().isLocalNodeElectedMaster()) {
if (state.custom("test") == null) {
if (installed.compareAndSet(false, true)) {
clusterService.submitStateUpdateTask("install-metadata-custom", new ClusterStateUpdateTask(Priority.URGENT) {

@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.custom("test") == null) {
final ClusterState.Builder builder = ClusterState.builder(currentState);
builder.putCustom("test", new TestCustom(42));
return builder.build();
} else {
return currentState;
}
}

@Override
public void onFailure(String source, Exception e) {
throw new AssertionError(e);
}

});
}
}
}

});
return Collections.emptyList();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ static class TimedClusterApplierService extends ClusterApplierService {
public volatile Long currentTimeOverride = null;

TimedClusterApplierService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
super(settings, clusterSettings, threadPool, () -> ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)));
super(settings, clusterSettings, threadPool);
}

@Override
Expand Down
Loading