Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove license state listeners on closables #36308

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.license;

import org.elasticsearch.Version;

/**
* Marker interface for callbacks that are invoked when the license state changes.
*/
@FunctionalInterface
public interface LicenseStateListener {

/**
* Callback when the license state changes. See {@link XPackLicenseState#update(License.OperationMode, boolean, Version)}.
*/
void licenseStateChanged();

}
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private static class Status {
}
}

private final List<Runnable> listeners;
private final List<LicenseStateListener> listeners;
private final boolean isSecurityEnabled;
private final boolean isSecurityExplicitlyEnabled;

Expand Down Expand Up @@ -315,17 +315,17 @@ void update(OperationMode mode, boolean active, @Nullable Version mostRecentTria
}
}
}
listeners.forEach(Runnable::run);
listeners.forEach(LicenseStateListener::licenseStateChanged);
}

/** Add a listener to be notified on license change */
public void addListener(Runnable runnable) {
listeners.add(Objects.requireNonNull(runnable));
public void addListener(final LicenseStateListener listener) {
listeners.add(Objects.requireNonNull(listener));
}

/** Remove a listener */
public void removeListener(Runnable runnable) {
listeners.remove(runnable);
public void removeListener(final LicenseStateListener listener) {
listeners.remove(Objects.requireNonNull(listener));
}

/** Return the current license type. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.ingest.PipelineConfiguration;
import org.elasticsearch.license.LicenseStateListener;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.protocol.xpack.watcher.DeleteWatchRequest;
import org.elasticsearch.protocol.xpack.watcher.PutWatchRequest;
Expand Down Expand Up @@ -78,7 +79,7 @@
import static org.elasticsearch.xpack.core.monitoring.exporter.MonitoringTemplateUtils.pipelineName;
import static org.elasticsearch.xpack.monitoring.Monitoring.CLEAN_WATCHER_HISTORY;

public class LocalExporter extends Exporter implements ClusterStateListener, CleanerService.Listener {
public class LocalExporter extends Exporter implements ClusterStateListener, CleanerService.Listener, LicenseStateListener {

private static final Logger logger = LogManager.getLogger(LocalExporter.class);

Expand Down Expand Up @@ -106,9 +107,10 @@ public LocalExporter(Exporter.Config config, Client client, CleanerService clean
this.clusterAlertBlacklist = ClusterAlertsUtil.getClusterAlertsBlacklist(config);
this.cleanerService = cleanerService;
this.dateTimeFormatter = dateTimeFormatter(config);
// if additional listeners are added here, adjust LocalExporterTests#testLocalExporterRemovesListenersOnClose accordingly
clusterService.addListener(this);
cleanerService.add(this);
licenseState.addListener(this::licenseChanged);
licenseState.addListener(this);
}

@Override
Expand All @@ -121,7 +123,8 @@ public void clusterChanged(ClusterChangedEvent event) {
/**
* When the license changes, we need to ensure that Watcher is setup properly.
*/
private void licenseChanged() {
@Override
public void licenseStateChanged() {
watcherSetup.set(false);
}

Expand Down Expand Up @@ -153,7 +156,7 @@ public void doClose() {
// we also remove the listener in resolveBulk after we get to RUNNING, but it's okay to double-remove
clusterService.removeListener(this);
cleanerService.remove(this);
licenseState.removeListener(this::licenseChanged);
licenseState.removeListener(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.monitoring.exporter.local;

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

public class LocalExporterTests extends ESTestCase {

public void testLocalExporterRemovesListenersOnClose() {
final ClusterService clusterService = mock(ClusterService.class);
final XPackLicenseState licenseState = mock(XPackLicenseState.class);
final Exporter.Config config = new Exporter.Config("name", "type", Settings.EMPTY, clusterService, licenseState);
final CleanerService cleanerService = mock(CleanerService.class);
final LocalExporter exporter = new LocalExporter(config, mock(Client.class), cleanerService);
verify(clusterService).addListener(exporter);
verify(cleanerService).add(exporter);
verify(licenseState).addListener(exporter);
exporter.close();
verify(clusterService).removeListener(exporter);
verify(cleanerService).remove(exporter);
verify(licenseState).removeListener(exporter);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.license.LicenseStateListener;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
import org.elasticsearch.xpack.core.security.authz.accesscontrol.IndicesAccessControl;
Expand All @@ -26,7 +27,7 @@
* Opts out of the query cache if field level security is active for the current request,
* and its unsafe to cache.
*/
public final class OptOutQueryCache extends AbstractIndexComponent implements QueryCache {
public final class OptOutQueryCache extends AbstractIndexComponent implements LicenseStateListener, QueryCache {

private final IndicesQueryCache indicesQueryCache;
private final ThreadContext context;
Expand All @@ -43,14 +44,20 @@ public OptOutQueryCache(
this.context = Objects.requireNonNull(context, "threadContext must not be null");
this.indexName = indexSettings.getIndex().getName();
this.licenseState = Objects.requireNonNull(licenseState, "licenseState");
licenseState.addListener(() -> this.clear("license state changed"));
licenseState.addListener(this);
}

@Override
public void close() throws ElasticsearchException {
licenseState.removeListener(this);
clear("close");
}

@Override
public void licenseStateChanged() {
clear("license state changed");
}

@Override
public void clear(String reason) {
logger.debug("full cache clear, reason [{}]", reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
Expand Down Expand Up @@ -184,6 +184,22 @@ public void testOptOutQueryCacheIndexDoesNotHaveFieldLevelSecurity() {
verify(indicesQueryCache).doCache(same(weight), same(policy));
}

public void testOptOutQueryCacheRemovesLicenseStateListenerOnClose() {
final Settings.Builder settings = Settings.builder()
.put("index.version.created", Version.CURRENT)
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0);
final IndexMetaData indexMetaData = IndexMetaData.builder("index").settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, Settings.EMPTY);
final IndicesQueryCache indicesQueryCache = mock(IndicesQueryCache.class);
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final XPackLicenseState licenseState = mock(XPackLicenseState.class);
final OptOutQueryCache cache = new OptOutQueryCache(indexSettings, indicesQueryCache, threadContext, licenseState);
verify(licenseState).addListener(cache);
cache.close();
verify(licenseState).removeListener(cache);
}

private static FieldPermissionsDefinition fieldPermissionDef(String[] granted, String[] denied) {
return new FieldPermissionsDefinition(granted, denied);
}
Expand Down