Skip to content

Commit

Permalink
[Data stream lifecycle]Add warning when the user's retention is not t…
Browse files Browse the repository at this point in the history
…he effective retention (elastic#107781)
  • Loading branch information
gmarouli authored Apr 26, 2024
1 parent 0b747ac commit a6a29d0
Show file tree
Hide file tree
Showing 10 changed files with 424 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStreamFactoryRetention;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionResolver;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
Expand Down Expand Up @@ -213,7 +215,8 @@ private MetadataIndexTemplateService getMetadataIndexTemplateService() {
new IndexScopedSettings(Settings.EMPTY, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS),
xContentRegistry(),
EmptySystemIndices.INSTANCE,
indexSettingProviders
indexSettingProviders,
new DataStreamGlobalRetentionResolver(DataStreamFactoryRetention.emptyFactoryRetention())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -186,6 +187,41 @@ public TimeValue getDataStreamRetention() {
return dataRetention == null ? null : dataRetention.value;
}

/**
* This method checks if the effective retention is matching what the user has configured; if the effective retention
* does not match then it adds a warning informing the user about the effective retention and the source.
*/
public void addWarningHeaderIfDataRetentionNotEffective(@Nullable DataStreamGlobalRetention globalRetention) {
if (globalRetention == null) {
return;
}
Tuple<TimeValue, DataStreamLifecycle.RetentionSource> effectiveDataRetentionWithSource = getEffectiveDataRetentionWithSource(
globalRetention
);
String effectiveRetentionStringRep = effectiveDataRetentionWithSource.v1().getStringRep();
switch (effectiveDataRetentionWithSource.v2()) {
case DEFAULT_GLOBAL_RETENTION -> HeaderWarning.addWarning(
"Not providing a retention is not allowed for this project. The default retention of ["
+ effectiveRetentionStringRep
+ "] will be applied."
);
case MAX_GLOBAL_RETENTION -> {
String retentionProvidedPart = getDataStreamRetention() == null
? "Not providing a retention is not allowed for this project."
: "The retention provided ["
+ (getDataStreamRetention() == null ? "infinite" : getDataStreamRetention().getStringRep())
+ "] is exceeding the max allowed data retention of this project ["
+ effectiveRetentionStringRep
+ "].";
HeaderWarning.addWarning(
retentionProvidedPart + " The max retention of [" + effectiveRetentionStringRep + "] will be applied"
);
}
case DATA_STREAM_CONFIGURATION -> {
}
}
}

/**
* The configuration as provided by the user about the least amount of time data should be kept by elasticsearch.
* This method differentiates between a missing retention and a nullified retention and this is useful for template
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,18 @@ public class MetadataDataStreamsService {

private final ClusterService clusterService;
private final IndicesService indicesService;
private final DataStreamGlobalRetentionResolver globalRetentionResolver;
private final MasterServiceTaskQueue<UpdateLifecycleTask> updateLifecycleTaskQueue;
private final MasterServiceTaskQueue<SetRolloverOnWriteTask> setRolloverOnWriteTaskQueue;

public MetadataDataStreamsService(ClusterService clusterService, IndicesService indicesService) {
public MetadataDataStreamsService(
ClusterService clusterService,
IndicesService indicesService,
DataStreamGlobalRetentionResolver globalRetentionResolver
) {
this.clusterService = clusterService;
this.indicesService = indicesService;
this.globalRetentionResolver = globalRetentionResolver;
ClusterStateTaskExecutor<UpdateLifecycleTask> updateLifecycleExecutor = new SimpleBatchedAckListenerTaskExecutor<>() {

@Override
Expand Down Expand Up @@ -199,17 +205,16 @@ static ClusterState modifyDataStream(
* Creates an updated cluster state in which the requested data streams have the data stream lifecycle provided.
* Visible for testing.
*/
static ClusterState updateDataLifecycle(
ClusterState currentState,
List<String> dataStreamNames,
@Nullable DataStreamLifecycle lifecycle
) {
ClusterState updateDataLifecycle(ClusterState currentState, List<String> dataStreamNames, @Nullable DataStreamLifecycle lifecycle) {
Metadata metadata = currentState.metadata();
Metadata.Builder builder = Metadata.builder(metadata);
for (var dataStreamName : dataStreamNames) {
var dataStream = validateDataStream(metadata, dataStreamName);
builder.put(dataStream.copy().setLifecycle(lifecycle).build());
}
if (lifecycle != null) {
lifecycle.addWarningHeaderIfDataRetentionNotEffective(globalRetentionResolver.resolve(currentState));
}
return ClusterState.builder(currentState).metadata(builder.build()).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public class MetadataIndexTemplateService {
private final NamedXContentRegistry xContentRegistry;
private final SystemIndices systemIndices;
private final Set<IndexSettingProvider> indexSettingProviders;
private final DataStreamGlobalRetentionResolver globalRetentionResolver;

/**
* This is the cluster state task executor for all template-based actions.
Expand Down Expand Up @@ -180,7 +181,8 @@ public MetadataIndexTemplateService(
IndexScopedSettings indexScopedSettings,
NamedXContentRegistry xContentRegistry,
SystemIndices systemIndices,
IndexSettingProviders indexSettingProviders
IndexSettingProviders indexSettingProviders,
DataStreamGlobalRetentionResolver globalRetentionResolver
) {
this.clusterService = clusterService;
this.taskQueue = clusterService.createTaskQueue("index-templates", Priority.URGENT, TEMPLATE_TASK_EXECUTOR);
Expand All @@ -190,6 +192,7 @@ public MetadataIndexTemplateService(
this.xContentRegistry = xContentRegistry;
this.systemIndices = systemIndices;
this.indexSettingProviders = indexSettingProviders.getIndexSettingProviders();
this.globalRetentionResolver = globalRetentionResolver;
}

public void removeTemplates(final RemoveRequest request, final ActionListener<AcknowledgedResponse> listener) {
Expand Down Expand Up @@ -333,10 +336,11 @@ public ClusterState addComponentTemplate(
final String composableTemplateName = entry.getKey();
final ComposableIndexTemplate composableTemplate = entry.getValue();
try {
validateLifecycleIsOnlyAppliedOnDataStreams(
validateLifecycle(
tempStateWithComponentTemplateAdded.metadata(),
composableTemplateName,
composableTemplate
composableTemplate,
globalRetentionResolver.resolve(currentState)
);
validateIndexTemplateV2(composableTemplateName, composableTemplate, tempStateWithComponentTemplateAdded);
} catch (Exception e) {
Expand All @@ -359,6 +363,12 @@ public ClusterState addComponentTemplate(
}
}

if (finalComponentTemplate.template().lifecycle() != null) {
finalComponentTemplate.template()
.lifecycle()
.addWarningHeaderIfDataRetentionNotEffective(globalRetentionResolver.resolve(currentState));
}

logger.info("{} component template [{}]", existing == null ? "adding" : "updating", name);
return ClusterState.builder(currentState)
.metadata(Metadata.builder(currentState.metadata()).put(name, finalComponentTemplate))
Expand Down Expand Up @@ -715,7 +725,7 @@ private void validateIndexTemplateV2(String name, ComposableIndexTemplate indexT

validate(name, templateToValidate);
validateDataStreamsStillReferenced(currentState, name, templateToValidate);
validateLifecycleIsOnlyAppliedOnDataStreams(currentState.metadata(), name, templateToValidate);
validateLifecycle(currentState.metadata(), name, templateToValidate, globalRetentionResolver.resolve(currentState));

if (templateToValidate.isDeprecated() == false) {
validateUseOfDeprecatedComponentTemplates(name, templateToValidate, currentState.metadata().componentTemplates());
Expand Down Expand Up @@ -784,19 +794,25 @@ private void emitWarningIfPipelineIsDeprecated(String name, Map<String, Pipeline
);
}

private static void validateLifecycleIsOnlyAppliedOnDataStreams(
// Visible for testing
static void validateLifecycle(
Metadata metadata,
String indexTemplateName,
ComposableIndexTemplate template
ComposableIndexTemplate template,
@Nullable DataStreamGlobalRetention globalRetention
) {
boolean hasLifecycle = (template.template() != null && template.template().lifecycle() != null)
|| resolveLifecycle(template, metadata.componentTemplates()) != null;
if (hasLifecycle && template.getDataStreamTemplate() == null) {
throw new IllegalArgumentException(
"index template ["
+ indexTemplateName
+ "] specifies lifecycle configuration that can only be used in combination with a data stream"
);
DataStreamLifecycle lifecycle = template.template() != null && template.template().lifecycle() != null
? template.template().lifecycle()
: resolveLifecycle(template, metadata.componentTemplates());
if (lifecycle != null) {
if (template.getDataStreamTemplate() == null) {
throw new IllegalArgumentException(
"index template ["
+ indexTemplateName
+ "] specifies lifecycle configuration that can only be used in combination with a data stream"
);
}
lifecycle.addWarningHeaderIfDataRetentionNotEffective(globalRetention);
}
}

Expand Down
45 changes: 33 additions & 12 deletions server/src/main/java/org/elasticsearch/node/NodeConstruction.java
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,27 @@ private ScriptService createScriptService(SettingsModule settingsModule, ThreadP
return scriptService;
}

private DataStreamGlobalRetentionResolver createDataStreamServicesAndGlobalRetentionResolver(
ThreadPool threadPool,
ClusterService clusterService,
IndicesService indicesService,
MetadataCreateIndexService metadataCreateIndexService
) {
DataStreamGlobalRetentionResolver dataStreamGlobalRetentionResolver = new DataStreamGlobalRetentionResolver(
DataStreamFactoryRetention.load(pluginsService, clusterService.getClusterSettings())
);
modules.bindToInstance(DataStreamGlobalRetentionResolver.class, dataStreamGlobalRetentionResolver);
modules.bindToInstance(
MetadataCreateDataStreamService.class,
new MetadataCreateDataStreamService(threadPool, clusterService, metadataCreateIndexService)
);
modules.bindToInstance(
MetadataDataStreamsService.class,
new MetadataDataStreamsService(clusterService, indicesService, dataStreamGlobalRetentionResolver)
);
return dataStreamGlobalRetentionResolver;
}

private UpdateHelper createUpdateHelper(DocumentParsingProvider documentParsingProvider, ScriptService scriptService) {
UpdateHelper updateHelper = new UpdateHelper(scriptService, documentParsingProvider);

Expand Down Expand Up @@ -746,6 +767,7 @@ private void construct(
);

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);

final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(
settings,
clusterService,
Expand All @@ -761,12 +783,6 @@ private void construct(
indexSettingProviders
);

modules.bindToInstance(
MetadataCreateDataStreamService.class,
new MetadataCreateDataStreamService(threadPool, clusterService, metadataCreateIndexService)
);
modules.bindToInstance(MetadataDataStreamsService.class, new MetadataDataStreamsService(clusterService, indicesService));

final MetadataUpdateSettingsService metadataUpdateSettingsService = new MetadataUpdateSettingsService(
clusterService,
clusterModule.getAllocationService(),
Expand All @@ -776,10 +792,12 @@ private void construct(
threadPool
);

DataStreamGlobalRetentionResolver dataStreamGlobalRetentionResolver = new DataStreamGlobalRetentionResolver(
DataStreamFactoryRetention.load(pluginsService, clusterService.getClusterSettings())
final DataStreamGlobalRetentionResolver dataStreamGlobalRetentionResolver = createDataStreamServicesAndGlobalRetentionResolver(
threadPool,
clusterService,
indicesService,
metadataCreateIndexService
);
modules.bindToInstance(DataStreamGlobalRetentionResolver.class, dataStreamGlobalRetentionResolver);

record PluginServiceInstances(
Client client,
Expand Down Expand Up @@ -851,7 +869,8 @@ record PluginServiceInstances(
indicesService,
systemIndices,
indexSettingProviders,
metadataCreateIndexService
metadataCreateIndexService,
dataStreamGlobalRetentionResolver
),
pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll)
);
Expand Down Expand Up @@ -1426,7 +1445,8 @@ private List<ReservedClusterStateHandler<?>> buildReservedStateHandlers(
IndicesService indicesService,
SystemIndices systemIndices,
IndexSettingProviders indexSettingProviders,
MetadataCreateIndexService metadataCreateIndexService
MetadataCreateIndexService metadataCreateIndexService,
DataStreamGlobalRetentionResolver globalRetentionResolver
) {
List<ReservedClusterStateHandler<?>> reservedStateHandlers = new ArrayList<>();

Expand All @@ -1440,7 +1460,8 @@ private List<ReservedClusterStateHandler<?>> buildReservedStateHandlers(
settingsModule.getIndexScopedSettings(),
xContentRegistry,
systemIndices,
indexSettingProviders
indexSettingProviders,
globalRetentionResolver
);
reservedStateHandlers.add(new ReservedComposableIndexTemplateAction(templateService, settingsModule.getIndexScopedSettings()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStreamFactoryRetention;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionResolver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
Expand Down Expand Up @@ -73,6 +75,7 @@ public class ReservedComposableIndexTemplateActionTests extends ESTestCase {
ClusterService clusterService;
IndexScopedSettings indexScopedSettings;
IndicesService indicesService;
private DataStreamGlobalRetentionResolver globalRetentionResolver;

@Before
public void setup() throws IOException {
Expand All @@ -89,14 +92,16 @@ public void setup() throws IOException {
doReturn(mapperService).when(indexService).mapperService();
doReturn(indexService).when(indicesService).createIndex(any(), any(), anyBoolean());

globalRetentionResolver = new DataStreamGlobalRetentionResolver(DataStreamFactoryRetention.emptyFactoryRetention());
templateService = new MetadataIndexTemplateService(
clusterService,
mock(MetadataCreateIndexService.class),
indicesService,
indexScopedSettings,
mock(NamedXContentRegistry.class),
mock(SystemIndices.class),
new IndexSettingProviders(Set.of())
new IndexSettingProviders(Set.of()),
globalRetentionResolver
);
}

Expand Down Expand Up @@ -890,7 +895,8 @@ public void testTemplatesWithReservedPrefix() throws Exception {
indexScopedSettings,
mock(NamedXContentRegistry.class),
mock(SystemIndices.class),
new IndexSettingProviders(Set.of())
new IndexSettingProviders(Set.of()),
globalRetentionResolver
);

ClusterState state = ClusterState.builder(new ClusterName("elasticsearch")).metadata(metadata).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.RetentionSource.DATA_STREAM_CONFIGURATION;
Expand Down Expand Up @@ -356,20 +357,20 @@ private static DataStreamLifecycle.Retention randomRetention() {
return switch (randomInt(2)) {
case 0 -> null;
case 1 -> DataStreamLifecycle.Retention.NULL;
default -> new DataStreamLifecycle.Retention(TimeValue.timeValueMillis(randomMillisUpToYear9999()));
default -> new DataStreamLifecycle.Retention(randomTimeValue(1, 365, TimeUnit.DAYS));
};
}

@Nullable
private static DataStreamLifecycle.Downsampling randomDownsampling() {
static DataStreamLifecycle.Downsampling randomDownsampling() {
return switch (randomInt(2)) {
case 0 -> null;
case 1 -> DataStreamLifecycle.Downsampling.NULL;
default -> {
var count = randomIntBetween(0, 9);
List<DataStreamLifecycle.Downsampling.Round> rounds = new ArrayList<>();
var previous = new DataStreamLifecycle.Downsampling.Round(
TimeValue.timeValueDays(randomIntBetween(1, 365)),
randomTimeValue(1, 365, TimeUnit.DAYS),
new DownsampleConfig(new DateHistogramInterval(randomIntBetween(1, 24) + "h"))
);
rounds.add(previous);
Expand Down
Loading

0 comments on commit a6a29d0

Please sign in to comment.