Skip to content

Commit

Permalink
Merge branch 'master' into compile-with-jdk-9
Browse files Browse the repository at this point in the history
* master:
  [Docs] Fix base directory to include for put_mapping.asciidoc
  Open engine should keep only starting commit (elastic#28228)
  [Docs] Fix Java Api index administration usage (elastic#28133)
  Fix eclipse build. (elastic#28236)
  Never return null from Strings.tokenizeToStringArray (elastic#28224)
  Fallback to TransportMasterNodeAction for cluster health retries (elastic#28195)
  [Docs] Changes to ingest.asciidoc (elastic#28212)
  TEST: Update logging for testAckedIndexing
  • Loading branch information
jasontedor committed Jan 16, 2018
2 parents 9434717 + 4f5be7d commit 6860f66
Show file tree
Hide file tree
Showing 17 changed files with 355 additions and 116 deletions.
57 changes: 9 additions & 48 deletions docs/java-api/admin/indices/put-mapping.asciidoc
Original file line number Diff line number Diff line change
@@ -1,54 +1,23 @@
[[java-admin-indices-put-mapping]]
:base-dir: {docdir}/../../server/src/test/java/org/elasticsearch/action/admin/indices/create

==== Put Mapping

The PUT mapping API allows you to add a new type while creating an index:

[source,java]
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
client.admin().indices().prepareCreate("twitter") <1>
.addMapping("tweet", "{\n" + <2>
" \"tweet\": {\n" +
" \"properties\": {\n" +
" \"message\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
" }\n" +
" }")
.get();
include-tagged::{base-dir}/CreateIndexIT.java[addMapping-create-index-request]
--------------------------------------------------
<1> <<java-admin-indices-create-index,Creates an index>> called `twitter`
<2> It also adds a `tweet` mapping type.


The PUT mapping API also allows to add a new type to an existing index:

[source,java]
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
client.admin().indices().preparePutMapping("twitter") <1>
.setType("user") <2>
.setSource("{\n" + <3>
" \"properties\": {\n" +
" \"name\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
"}")
.get();
// You can also provide the type in the source document
client.admin().indices().preparePutMapping("twitter")
.setType("user")
.setSource("{\n" +
" \"user\":{\n" + <4>
" \"properties\": {\n" +
" \"name\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}")
.get();
include-tagged::{base-dir}/CreateIndexIT.java[putMapping-request-source]
--------------------------------------------------
<1> Puts a mapping on existing index called `twitter`
<2> Adds a `user` mapping type.
Expand All @@ -57,20 +26,12 @@ client.admin().indices().preparePutMapping("twitter")

You can use the same API to update an existing mapping:

[source,java]
["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
client.admin().indices().preparePutMapping("twitter") <1>
.setType("user") <2>
.setSource("{\n" + <3>
" \"properties\": {\n" +
" \"user_name\": {\n" +
" \"type\": \"text\"\n" +
" }\n" +
" }\n" +
"}")
.get();
include-tagged::{base-dir}/CreateIndexIT.java[putMapping-request-source-append]
--------------------------------------------------
<1> Puts a mapping on existing index called `twitter`
<2> Updates the `user` mapping type.
<3> This `user` has now a new field `user_name`

:base-dir!:
25 changes: 13 additions & 12 deletions docs/reference/ingest.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,27 @@

[partintro]
--
You can use ingest node to pre-process documents before the actual indexing takes place.
This pre-processing happens by an ingest node that intercepts bulk and index requests, applies the
transformations, and then passes the documents back to the index or bulk APIs.
Use an ingest node to pre-process documents before the actual document indexing happens.
The ingest node intercepts bulk and index requests, it applies transformations, and it then
passes the documents back to the index or bulk APIs.

You can enable ingest on any node or even have dedicated ingest nodes. Ingest is enabled by default
on all nodes. To disable ingest on a node, configure the following setting in the `elasticsearch.yml` file:
All nodes enable ingest by default, so any node can handle ingest tasks. You can also create
dedicated ingest nodes. To disable ingest for a node, configure the following setting in the
elasticsearch.yml file:

[source,yaml]
--------------------------------------------------
node.ingest: false
--------------------------------------------------

To pre-process documents before indexing, you <<pipeline,define a pipeline>> that specifies
a series of <<ingest-processors,processors>>. Each processor transforms the document in some way.
For example, you may have a pipeline that consists of one processor that removes a field from
the document followed by another processor that renames a field. Configured pipelines are then stored
in the <<cluster-state,cluster state>>.
To pre-process documents before indexing, <<pipeline,define a pipeline>> that specifies a series of
<<ingest-processors,processors>>. Each processor transforms the document in some specific way. For example, a
pipeline might have one processor that removes a field from the document, followed by
another processor that renames a field. The <<cluster-state,cluster state>> then stores
the configured pipelines.

To use a pipeline, you simply specify the `pipeline` parameter on an index or bulk request to
tell the ingest node which pipeline to use. For example:
To use a pipeline, simply specify the `pipeline` parameter on an index or bulk request. This
way, the ingest node knows which pipeline to use. For example:

[source,js]
--------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions libs/elasticsearch-core/src/main/eclipse-build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// this is just shell gradle file for eclipse to have separate projects for elasticsearch-core src and tests
apply from: '../../build.gradle'
6 changes: 6 additions & 0 deletions libs/elasticsearch-core/src/test/eclipse-build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// this is just shell gradle file for eclipse to have separate projects for elasticsearch-core src and tests
apply from: '../../build.gradle'

dependencies {
testCompile project(':libs:elasticsearch-core')
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -125,7 +126,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
@Override
public void onNoLongerMaster(String source) {
logger.trace("stopped being master while waiting for events with priority [{}]. retrying.", request.waitForEvents());
doExecute(task, request, listener);
// TransportMasterNodeAction implements the retry logic, which is triggered by passing a NotMasterException
listener.onFailure(new NotMasterException("no longer master. source: [" + source + "]"));
}

@Override
Expand Down
5 changes: 4 additions & 1 deletion server/src/main/java/org/elasticsearch/common/Strings.java
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,9 @@ public static String[] split(String toSplit, String delimiter) {
* @see #delimitedListToStringArray
*/
public static String[] tokenizeToStringArray(final String s, final String delimiters) {
if (s == null) {
return EMPTY_ARRAY;
}
return toStringArray(tokenizeToCollection(s, delimiters, ArrayList::new));
}

Expand Down Expand Up @@ -536,7 +539,7 @@ public static String[] delimitedListToStringArray(String str, String delimiter)
*/
public static String[] delimitedListToStringArray(String str, String delimiter, String charsToDelete) {
if (str == null) {
return new String[0];
return EMPTY_ARRAY;
}
if (delimiter == null) {
return new String[]{str};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,37 +45,72 @@ public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;
private final IndexCommit startingCommit;
private final ObjectIntHashMap<IndexCommit> snapshottedCommits; // Number of snapshots held against each commit point.
private IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint.
private IndexCommit lastCommit; // the most recent commit point

CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier) {
LongSupplier globalCheckpointSupplier, IndexCommit startingCommit) {
this.openMode = openMode;
this.translogDeletionPolicy = translogDeletionPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.startingCommit = startingCommit;
this.snapshottedCommits = new ObjectIntHashMap<>();
}

@Override
public void onInit(List<? extends IndexCommit> commits) throws IOException {
public synchronized void onInit(List<? extends IndexCommit> commits) throws IOException {
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
assert startingCommit == null : "CREATE_INDEX_AND_TRANSLOG must not have starting commit; commit [" + startingCommit + "]";
break;
case OPEN_INDEX_CREATE_TRANSLOG:
assert commits.isEmpty() == false : "index is opened, but we have no commits";
// When an engine starts with OPEN_INDEX_CREATE_TRANSLOG, a new fresh index commit will be created immediately.
// We therefore can simply skip processing here as `onCommit` will be called right after with a new commit.
break;
case OPEN_INDEX_AND_TRANSLOG:
assert commits.isEmpty() == false : "index is opened, but we have no commits";
onCommit(commits);
assert startingCommit != null && commits.contains(startingCommit) : "Starting commit not in the existing commit list; "
+ "startingCommit [" + startingCommit + "], commit list [" + commits + "]";
keepOnlyStartingCommitOnInit(commits);
// OPEN_INDEX_CREATE_TRANSLOG can open an index commit from other shard with a different translog history,
// We therefore should not use that index commit to update the translog deletion policy.
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
updateTranslogDeletionPolicy();
}
break;
default:
throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
}
}

/**
* Keeping existing unsafe commits when opening an engine can be problematic because these commits are not safe
* at the recovering time but they can suddenly become safe in the future.
* The following issues can happen if unsafe commits are kept oninit.
* <p>
* 1. Replica can use unsafe commit in peer-recovery. This happens when a replica with a safe commit c1(max_seqno=1)
* and an unsafe commit c2(max_seqno=2) recovers from a primary with c1(max_seqno=1). If a new document(seqno=2)
* is added without flushing, the global checkpoint is advanced to 2; and the replica recovers again, it will use
* the unsafe commit c2(max_seqno=2 at most gcp=2) as the starting commit for sequenced-based recovery even the
* commit c2 contains a stale operation and the document(with seqno=2) will not be replicated to the replica.
* <p>
* 2. Min translog gen for recovery can go backwards in peer-recovery. This happens when are replica with a safe commit
* c1(local_checkpoint=1, recovery_translog_gen=1) and an unsafe commit c2(local_checkpoint=2, recovery_translog_gen=2).
* The replica recovers from a primary, and keeps c2 as the last commit, then sets last_translog_gen to 2. Flushing a new
* commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery
* translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1
* while the local checkpoint of c2 is 2.
* <p>
* 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced
* (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit,
* the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog.
*/
private void keepOnlyStartingCommitOnInit(List<? extends IndexCommit> commits) {
commits.stream().filter(commit -> startingCommit.equals(commit) == false).forEach(IndexCommit::delete);
assert startingCommit.isDeleted() == false : "Starting commit must not be deleted";
lastCommit = startingCommit;
safeCommit = startingCommit;
}

@Override
public synchronized void onCommit(List<? extends IndexCommit> commits) throws IOException {
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public InternalEngine(EngineConfig engineConfig) {
"Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]";
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
this.combinedDeletionPolicy = new CombinedDeletionPolicy(openMode, translogDeletionPolicy,
translog::getLastSyncedGlobalCheckpoint);
translog::getLastSyncedGlobalCheckpoint, startingCommit);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
assert engineConfig.getForceNewHistoryUUID() == false
Expand Down Expand Up @@ -411,28 +411,44 @@ public void skipTranslogRecovery() {
}

private IndexCommit getStartingCommitPoint() throws IOException {
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
final long minRetainedTranslogGen = translog.getMinFileGeneration();
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose full translog
// files are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
// To avoid this issue, we only select index commits whose translog files are fully retained.
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
recoverableCommits.add(commit);
final IndexCommit startingIndexCommit;
final List<IndexCommit> existingCommits;
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
startingIndexCommit = null;
break;
case OPEN_INDEX_CREATE_TRANSLOG:
// Use the last commit
existingCommits = DirectoryReader.listCommits(store.directory());
startingIndexCommit = existingCommits.get(existingCommits.size() - 1);
break;
case OPEN_INDEX_AND_TRANSLOG:
// Use the safe commit
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
final long minRetainedTranslogGen = translog.getMinFileGeneration();
existingCommits = DirectoryReader.listCommits(store.directory());
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
// To avoid this issue, we only select index commits whose translog are fully retained.
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
recoverableCommits.add(commit);
}
}
assert recoverableCommits.isEmpty() == false : "No commit point with translog found; " +
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
} else {
// TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
}
assert recoverableCommits.isEmpty() == false : "No commit point with full translog found; " +
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
return CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
} else {
return CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
}
break;
default:
throw new IllegalArgumentException("unknown mode: " + openMode);
}
return null;
return startingIndexCommit;
}

private void recoverFromTranslogInternal() throws IOException {
Expand Down Expand Up @@ -557,9 +573,7 @@ private ExternalSearcherManager createSearcherManager(SearchFactory externalSear
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
internalSearcherManager = new SearcherManager(directoryReader,
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
// The index commit from IndexWriterConfig is null if the engine is open with other modes
// rather than CREATE_INDEX_AND_TRANSLOG. In those cases lastCommittedSegmentInfos will be retrieved from the last commit.
lastCommittedSegmentInfos = store.readCommittedSegmentsInfo(indexWriter.getConfig().getIndexCommit());
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager,
externalSearcherFactory);
success = true;
Expand Down
Loading

0 comments on commit 6860f66

Please sign in to comment.