Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Datafeed config CRUD operations #32854

Merged
merged 5 commits into from
Aug 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ public final class MlMetaIndex {
*/
public static final String INDEX_NAME = ".ml-meta";

public static final String INCLUDE_TYPE_KEY = "include_type";

public static final String TYPE = "doc";

private MlMetaIndex() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOut
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
DelegatingMapParams extendedParams =
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_CLUSTER_STATE, "true"), params);
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params);
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
return builder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
datafeed.doXContentBody(builder, params);
builder.endObject();
datafeed.toXContent(builder, params);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -111,7 +111,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (description != null) {
builder.field(DESCRIPTION.getPreferredName(), description);
}
if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) {
if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false)) {
builder.field(TYPE.getPreferredName(), CALENDAR_TYPE);
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.Operator;
import org.elasticsearch.xpack.core.ml.job.config.RuleAction;
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.Intervals;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils;

import java.io.IOException;
Expand Down Expand Up @@ -170,7 +170,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (eventId != null) {
builder.field(EVENT_ID.getPreferredName(), eventId);
}
if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) {
if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false)) {
builder.field(TYPE.getPreferredName(), SCHEDULED_EVENT_TYPE);
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
public static final String DOC_COUNT = "doc_count";

public static final ParseField ID = new ParseField("datafeed_id");
public static final ParseField CONFIG_TYPE = new ParseField("config_type");
public static final ParseField QUERY_DELAY = new ParseField("query_delay");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField INDEXES = new ParseField("indexes");
Expand All @@ -93,6 +94,7 @@ private static ObjectParser<Builder, Void> createParser(boolean ignoreUnknownFie
ObjectParser<Builder, Void> parser = new ObjectParser<>("datafeed_config", ignoreUnknownFields, Builder::new);

parser.declareString(Builder::setId, ID);
parser.declareString((c, s) -> {}, CONFIG_TYPE);
parser.declareString(Builder::setJobId, Job.ID);
parser.declareStringArray(Builder::setIndices, INDEXES);
parser.declareStringArray(Builder::setIndices, INDICES);
Expand Down Expand Up @@ -202,6 +204,16 @@ public DatafeedConfig(StreamInput in) throws IOException {
}
}

/**
* The name of datafeed configuration document name from the datafeed ID.
*
* @param datafeedId The datafeed ID
* @return The ID of document the datafeed config is persisted in
*/
public static String documentId(String datafeedId) {
return "datafeed-" + datafeedId;
}

public String getId() {
return id;
}
Expand All @@ -210,6 +222,10 @@ public String getJobId() {
return jobId;
}

public String getConfigType() {
return TYPE;
}

public TimeValue getQueryDelay() {
return queryDelay;
}
Expand Down Expand Up @@ -304,14 +320,11 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
doXContentBody(builder, params);
builder.endObject();
return builder;
}

public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(ID.getPreferredName(), id);
builder.field(Job.ID.getPreferredName(), jobId);
if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false) == true) {
builder.field(CONFIG_TYPE.getPreferredName(), TYPE);
}
builder.field(QUERY_DELAY.getPreferredName(), queryDelay.getStringRep());
if (frequency != null) {
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
Expand All @@ -333,9 +346,10 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th
if (chunkingConfig != null) {
builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig);
}
if (headers.isEmpty() == false && params.paramAsBoolean(ToXContentParams.FOR_CLUSTER_STATE, false) == true) {
if (headers.isEmpty() == false && params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false) == true) {
builder.field(HEADERS.getPreferredName(), headers);
}
builder.endObject();
return builder;
}

Expand Down Expand Up @@ -475,6 +489,10 @@ public void setId(String datafeedId) {
id = ExceptionsHelper.requireNonNull(datafeedId, ID.getPreferredName());
}

public String getId() {
return id;
}

public void setJobId(String jobId) {
this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
// negative means "unknown", which should only happen for a 5.4 job
if (detectorIndex >= 0
// no point writing this to cluster state, as the indexes will get reassigned on reload anyway
&& params.paramAsBoolean(ToXContentParams.FOR_CLUSTER_STATE, false) == false) {
&& params.paramAsBoolean(ToXContentParams.FOR_INTERNAL_STORAGE, false) == false) {
builder.field(DETECTOR_INDEX.getPreferredName(), detectorIndex);
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -101,7 +101,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(DESCRIPTION.getPreferredName(), description);
}
builder.field(ITEMS.getPreferredName(), items);
if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) {
if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false)) {
builder.field(TYPE.getPreferredName(), FILTER_TYPE);
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@
public final class ToXContentParams {

/**
* Parameter to indicate whether we are serialising to X Content for cluster state output.
* Parameter to indicate whether we are serialising to X Content for
* internal storage. Certain fields need to be persisted but should
* not be visible everywhere.
*/
public static final String FOR_CLUSTER_STATE = "for_cluster_state";
public static final String FOR_INTERNAL_STORAGE = "for_internal_storage";

/**
* When serialising POJOs to X Content this indicates whether the type field
* should be included or not
*/
public static final String INCLUDE_TYPE = "include_type";

private ToXContentParams() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
Expand All @@ -36,17 +40,22 @@
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.datafeed.ChunkingConfig.Mode;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
Expand All @@ -63,6 +72,10 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId) {
}

public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long bucketSpanMillis) {
return createRandomizedDatafeedConfigBuilder(jobId, bucketSpanMillis).build();
}

private static DatafeedConfig.Builder createRandomizedDatafeedConfigBuilder(String jobId, long bucketSpanMillis) {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(randomValidDatafeedId(), jobId);
builder.setIndices(randomStringList(1, 10));
builder.setTypes(randomStringList(0, 10));
Expand Down Expand Up @@ -100,7 +113,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b
if (aggHistogramInterval == null) {
builder.setFrequency(TimeValue.timeValueSeconds(randomIntBetween(1, 1_000_000)));
} else {
builder.setFrequency(TimeValue.timeValueMillis(randomIntBetween(1, 5) * aggHistogramInterval));
builder.setFrequency(TimeValue.timeValueSeconds(randomIntBetween(1, 5) * aggHistogramInterval));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test failed with the error:

java.lang.IllegalArgumentException: frequency has to be a multiple of 1s; actual was '900ms'
...
   >    at org.elasticsearch.xpack.core.ml.utils.time.TimeUtils.checkMultiple(TimeUtils.java:101)
   >    at org.elasticsearch.xpack.core.ml.utils.time.TimeUtils.checkPositiveMultiple(TimeUtils.java:74)

It's clear that TimeValue.timeValueMillis(randomIntBetween(1, 5) * aggHistogramInterval) is unlikely to return a whole number of seconds but I'm amazed it has not happened before.

Reproduce command was

/gradlew :x-pack:plugin:core:test -Dtests.seed=7D58D672CDD2EF69 -Dtests.class=org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfigTests -Dtests.method="testToXContentForInternalStorage" -Dtests.security.manager=true -Dtests.locale=ln -Dtests.timezone=America/Rankin_Inlet -Dcompiler.java=10 -Druntime.java=10

I'll make the same change in ...protocol...DatafeedConfigTests

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there is no validation in protocol..DatafeedConfigTests so not an issue

}
}
if (randomBoolean()) {
Expand All @@ -109,7 +122,7 @@ public static DatafeedConfig createRandomizedDatafeedConfig(String jobId, long b
if (randomBoolean()) {
builder.setChunkingConfig(ChunkingConfigTests.createRandomizedChunk());
}
return builder.build();
return builder;
}

@Override
Expand Down Expand Up @@ -167,6 +180,33 @@ public void testFutureMetadataParse() throws IOException {
assertNotNull(DatafeedConfig.LENIENT_PARSER.apply(parser, null).build());
}

public void testToXContentForInternalStorage() throws IOException {
DatafeedConfig.Builder builder = createRandomizedDatafeedConfigBuilder("foo", 300);

// headers are only persisted to cluster state
Map<String, String> headers = new HashMap<>();
headers.put("header-name", "header-value");
builder.setHeaders(headers);
DatafeedConfig config = builder.build();

ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"));

BytesReference forClusterstateXContent = XContentHelper.toXContent(config, XContentType.JSON, params, false);
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, forClusterstateXContent.streamInput());

DatafeedConfig parsedConfig = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build();
assertThat(parsedConfig.getHeaders(), hasEntry("header-name", "header-value"));

// headers are not written without the FOR_INTERNAL_STORAGE param
BytesReference nonClusterstateXContent = XContentHelper.toXContent(config, XContentType.JSON, ToXContent.EMPTY_PARAMS, false);
parser = XContentFactory.xContent(XContentType.JSON)
.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, nonClusterstateXContent.streamInput());

parsedConfig = DatafeedConfig.LENIENT_PARSER.apply(parser, null).build();
assertThat(parsedConfig.getHeaders().entrySet(), hasSize(0));
}

public void testCopyConstructor() {
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
DatafeedConfig datafeedConfig = createTestInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;

Expand Down Expand Up @@ -69,7 +70,7 @@ protected void doExecute(PostCalendarEventsAction.Request request,
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
indexRequest.source(event.toXContent(builder,
new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY,
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE,
"true"))));
} catch (IOException e) {
throw new IllegalStateException("Failed to serialise event", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.calendars.Calendar;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -55,7 +56,7 @@ protected void doExecute(PutCalendarAction.Request request, ActionListener<PutCa
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, calendar.documentId());
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
indexRequest.source(calendar.toXContent(builder,
new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"))));
new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true"))));
} catch (IOException e) {
throw new IllegalStateException("Failed to serialise calendar with id [" + calendar.getId() + "]", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -55,7 +56,7 @@ protected void doExecute(PutFilterAction.Request request, ActionListener<PutFilt
indexRequest.opType(DocWriteRequest.OpType.CREATE);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"));
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true"));
indexRequest.source(filter.toXContent(builder, params));
} catch (IOException e) {
throw new IllegalStateException("Failed to serialise filter with id [" + filter.getId() + "]", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.ml.job.JobManager;

import java.io.IOException;
Expand Down Expand Up @@ -106,7 +107,7 @@ private void indexUpdatedFilter(MlFilter filter, long version, UpdateFilterActio
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(MlMetaIndex.INCLUDE_TYPE_KEY, "true"));
ToXContent.MapParams params = new ToXContent.MapParams(Collections.singletonMap(ToXContentParams.INCLUDE_TYPE, "true"));
indexRequest.source(filter.toXContent(builder, params));
} catch (IOException e) {
throw new IllegalStateException("Failed to serialise filter with id [" + filter.getId() + "]", e);
Expand Down
Loading