Skip to content

Commit

Permalink
Introduce mapping version to index metadata (#33147)
Browse files Browse the repository at this point in the history
This commit introduces mapping version to index metadata. This value is
monotonically increasing and is updated on mapping updates. This will be
useful in cross-cluster replication so that we can request mapping
updates from the leader only when there is a mapping update as opposed
to the strategy we employ today which is to request a mapping update any
time there is an index metadata update. As index metadata updates can
occur for many reasons other than mapping updates, this leads to some
unnecessary requests and work in cross-cluster replication.
  • Loading branch information
jasontedor committed Aug 27, 2018
1 parent 35b4bda commit 610a9c8
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ public String toString() {
final String TAB = " ";
for (IndexMetaData indexMetaData : metaData) {
sb.append(TAB).append(indexMetaData.getIndex());
sb.append(": v[").append(indexMetaData.getVersion()).append("]\n");
sb.append(": v[").append(indexMetaData.getVersion()).append("], mv[").append(indexMetaData.getMappingVersion()).append("]\n");
for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
sb.append(TAB).append(TAB).append(shard).append(": ");
sb.append("p_term [").append(indexMetaData.primaryTerm(shard)).append("], ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
import org.elasticsearch.action.support.ActiveShardCount;
Expand Down Expand Up @@ -291,6 +292,7 @@ public Iterator<Setting<Integer>> settings() {

public static final String KEY_IN_SYNC_ALLOCATIONS = "in_sync_allocations";
static final String KEY_VERSION = "version";
static final String KEY_MAPPING_VERSION = "mapping_version";
static final String KEY_ROUTING_NUM_SHARDS = "routing_num_shards";
static final String KEY_SETTINGS = "settings";
static final String KEY_STATE = "state";
Expand All @@ -309,6 +311,9 @@ public Iterator<Setting<Integer>> settings() {

private final Index index;
private final long version;

private final long mappingVersion;

private final long[] primaryTerms;

private final State state;
Expand Down Expand Up @@ -336,7 +341,7 @@ public Iterator<Setting<Integer>> settings() {
private final ActiveShardCount waitForActiveShards;
private final ImmutableOpenMap<String, RolloverInfo> rolloverInfos;

private IndexMetaData(Index index, long version, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings,
private IndexMetaData(Index index, long version, long mappingVersion, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings,
ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases,
ImmutableOpenMap<String, Custom> customs, ImmutableOpenIntMap<Set<String>> inSyncAllocationIds,
DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
Expand All @@ -345,6 +350,8 @@ private IndexMetaData(Index index, long version, long[] primaryTerms, State stat

this.index = index;
this.version = version;
assert mappingVersion >= 0 : mappingVersion;
this.mappingVersion = mappingVersion;
this.primaryTerms = primaryTerms;
assert primaryTerms.length == numberOfShards;
this.state = state;
Expand Down Expand Up @@ -394,6 +401,9 @@ public long getVersion() {
return this.version;
}

public long getMappingVersion() {
return mappingVersion;
}

/**
* The term of the current selected primary. This is a non-negative number incremented when
Expand Down Expand Up @@ -644,6 +654,7 @@ private static class IndexMetaDataDiff implements Diff<IndexMetaData> {
private final String index;
private final int routingNumShards;
private final long version;
private final long mappingVersion;
private final long[] primaryTerms;
private final State state;
private final Settings settings;
Expand All @@ -656,6 +667,7 @@ private static class IndexMetaDataDiff implements Diff<IndexMetaData> {
IndexMetaDataDiff(IndexMetaData before, IndexMetaData after) {
index = after.index.getName();
version = after.version;
mappingVersion = after.mappingVersion;
routingNumShards = after.routingNumShards;
state = after.state;
settings = after.settings;
Expand All @@ -672,6 +684,11 @@ private static class IndexMetaDataDiff implements Diff<IndexMetaData> {
index = in.readString();
routingNumShards = in.readInt();
version = in.readLong();
if (in.getVersion().onOrAfter(Version.V_6_5_0)) {
mappingVersion = in.readVLong();
} else {
mappingVersion = 1;
}
state = State.fromId(in.readByte());
settings = Settings.readSettingsFromStream(in);
primaryTerms = in.readVLongArray();
Expand Down Expand Up @@ -708,6 +725,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(index);
out.writeInt(routingNumShards);
out.writeLong(version);
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
out.writeVLong(mappingVersion);
}
out.writeByte(state.id);
Settings.writeSettingsToStream(settings, out);
out.writeVLongArray(primaryTerms);
Expand All @@ -724,6 +744,7 @@ public void writeTo(StreamOutput out) throws IOException {
public IndexMetaData apply(IndexMetaData part) {
Builder builder = builder(index);
builder.version(version);
builder.mappingVersion(mappingVersion);
builder.setRoutingNumShards(routingNumShards);
builder.state(state);
builder.settings(settings);
Expand All @@ -740,6 +761,11 @@ public IndexMetaData apply(IndexMetaData part) {
public static IndexMetaData readFrom(StreamInput in) throws IOException {
Builder builder = new Builder(in.readString());
builder.version(in.readLong());
if (in.getVersion().onOrAfter(Version.V_6_5_0)) {
builder.mappingVersion(in.readVLong());
} else {
builder.mappingVersion(1);
}
builder.setRoutingNumShards(in.readInt());
builder.state(State.fromId(in.readByte()));
builder.settings(readSettingsFromStream(in));
Expand Down Expand Up @@ -779,6 +805,9 @@ public static IndexMetaData readFrom(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(index.getName()); // uuid will come as part of settings
out.writeLong(version);
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
out.writeVLong(mappingVersion);
}
out.writeInt(routingNumShards);
out.writeByte(state.id());
writeSettingsToStream(settings, out);
Expand Down Expand Up @@ -822,6 +851,7 @@ public static class Builder {
private String index;
private State state = State.OPEN;
private long version = 1;
private long mappingVersion = 1;
private long[] primaryTerms = null;
private Settings settings = Settings.Builder.EMPTY_SETTINGS;
private final ImmutableOpenMap.Builder<String, MappingMetaData> mappings;
Expand All @@ -844,6 +874,7 @@ public Builder(IndexMetaData indexMetaData) {
this.index = indexMetaData.getIndex().getName();
this.state = indexMetaData.state;
this.version = indexMetaData.version;
this.mappingVersion = indexMetaData.mappingVersion;
this.settings = indexMetaData.getSettings();
this.primaryTerms = indexMetaData.primaryTerms.clone();
this.mappings = ImmutableOpenMap.builder(indexMetaData.mappings);
Expand Down Expand Up @@ -1010,6 +1041,15 @@ public Builder version(long version) {
return this;
}

public long mappingVersion() {
return mappingVersion;
}

public Builder mappingVersion(final long mappingVersion) {
this.mappingVersion = mappingVersion;
return this;
}

/**
* returns the primary term for the given shard.
* See {@link IndexMetaData#primaryTerm(int)} for more information.
Expand Down Expand Up @@ -1137,7 +1177,7 @@ public IndexMetaData build() {

final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);

return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
return new IndexMetaData(new Index(index, uuid), version, mappingVersion, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
tmpAliases.build(), customs.build(), filledInSyncAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters,
indexCreatedVersion, indexUpgradedVersion, getRoutingNumShards(), routingPartitionSize, waitForActiveShards, rolloverInfos.build());
}
Expand All @@ -1146,6 +1186,7 @@ public static void toXContent(IndexMetaData indexMetaData, XContentBuilder build
builder.startObject(indexMetaData.getIndex().getName());

builder.field(KEY_VERSION, indexMetaData.getVersion());
builder.field(KEY_MAPPING_VERSION, indexMetaData.getMappingVersion());
builder.field(KEY_ROUTING_NUM_SHARDS, indexMetaData.getRoutingNumShards());
builder.field(KEY_STATE, indexMetaData.getState().toString().toLowerCase(Locale.ENGLISH));

Expand Down Expand Up @@ -1219,6 +1260,7 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti
if (token != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("expected object but got a " + token);
}
boolean mappingVersion = false;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
Expand Down Expand Up @@ -1317,6 +1359,9 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti
builder.state(State.fromString(parser.text()));
} else if (KEY_VERSION.equals(currentFieldName)) {
builder.version(parser.longValue());
} else if (KEY_MAPPING_VERSION.equals(currentFieldName)) {
mappingVersion = true;
builder.mappingVersion(parser.longValue());
} else if (KEY_ROUTING_NUM_SHARDS.equals(currentFieldName)) {
builder.setRoutingNumShards(parser.intValue());
} else {
Expand All @@ -1326,6 +1371,9 @@ public static IndexMetaData fromXContent(XContentParser parser) throws IOExcepti
throw new IllegalArgumentException("Unexpected token " + token);
}
}
if (Assertions.ENABLED && Version.indexCreated(builder.settings).onOrAfter(Version.V_6_5_0)) {
assert mappingVersion : "mapping version should be present for indices created on or after 6.5.0";
}
return builder.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
import org.elasticsearch.cluster.AckedClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -38,6 +37,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DocumentMapper;
Expand Down Expand Up @@ -300,6 +300,7 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
MetaData.Builder builder = MetaData.builder(metaData);
boolean updated = false;
for (IndexMetaData indexMetaData : updateList) {
boolean updatedMapping = false;
// do the actual merge here on the master, and update the mapping source
// we use the exact same indexService and metadata we used to validate above here to actually apply the update
final Index index = indexMetaData.getIndex();
Expand All @@ -316,7 +317,7 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
if (existingSource.equals(updatedSource)) {
// same source, no changes, ignore it
} else {
updated = true;
updatedMapping = true;
// use the merged mapping source
if (logger.isDebugEnabled()) {
logger.debug("{} update_mapping [{}] with source [{}]", index, mergedMapper.type(), updatedSource);
Expand All @@ -326,7 +327,7 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt

}
} else {
updated = true;
updatedMapping = true;
if (logger.isDebugEnabled()) {
logger.debug("{} create_mapping [{}] with source [{}]", index, mappingType, updatedSource);
} else if (logger.isInfoEnabled()) {
Expand All @@ -340,7 +341,16 @@ private ClusterState applyRequest(ClusterState currentState, PutMappingClusterSt
for (DocumentMapper mapper : mapperService.docMappers(true)) {
indexMetaDataBuilder.putMapping(new MappingMetaData(mapper.mappingSource()));
}
if (updatedMapping) {
indexMetaDataBuilder.mappingVersion(1 + indexMetaDataBuilder.mappingVersion());
}
/*
* This implicitly increments the index metadata version and builds the index metadata. This means that we need to have
* already incremented the mapping version if necessary. Therefore, the mapping version increment must remain before this
* statement.
*/
builder.put(indexMetaDataBuilder);
updated |= updatedMapping;
}
if (updated) {
return ClusterState.builder(currentState).metaData(builder).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,8 @@ List<SearchOperationListener> getSearchOperationListener() { // pkg private for
}

@Override
public boolean updateMapping(IndexMetaData indexMetaData) throws IOException {
return mapperService().updateMapping(indexMetaData);
public boolean updateMapping(final IndexMetaData currentIndexMetaData, final IndexMetaData newIndexMetaData) throws IOException {
return mapperService().updateMapping(currentIndexMetaData, newIndexMetaData);
}

private class StoreCloseListener implements Store.OnClose {
Expand Down
Loading

0 comments on commit 610a9c8

Please sign in to comment.