Skip to content

Commit

Permalink
TEST: Create following engines in the main thread (#33391)
Browse files Browse the repository at this point in the history
There are two races in the testUpdateAndReadChangesConcurrently if the
following engines are created in the worker threads. We fixed the
translog issue in #33352, but there is still another race with
createStore.

This commit ensures that we create all engines in the main thread.

Relates #33352
Closes #33344
  • Loading branch information
dnhatn authored Sep 5, 2018
1 parent 41839cf commit 39e3bd9
Showing 1 changed file with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

package org.elasticsearch.index.engine;

import java.nio.file.Path;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.SnapshotMatchers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.IndexSettingsModule;
Expand Down Expand Up @@ -202,7 +200,7 @@ public void testUpdateAndReadChangesConcurrently() throws Exception {
CountDownLatch readyLatch = new CountDownLatch(followers.length + 1);
AtomicBoolean isDone = new AtomicBoolean();
for (int i = 0; i < followers.length; i++) {
followers[i] = new Follower(engine, isDone, readyLatch, createTempDir());
followers[i] = new Follower(engine, isDone, readyLatch);
followers[i].start();
}
boolean onPrimary = randomBoolean();
Expand All @@ -228,28 +226,30 @@ public void testUpdateAndReadChangesConcurrently() throws Exception {
operations.add(op);
}
readyLatch.countDown();
readyLatch.await();
concurrentlyApplyOps(operations, engine);
assertThat(engine.getLocalCheckpointTracker().getCheckpoint(), equalTo(operations.size() - 1L));
isDone.set(true);
for (Follower follower : followers) {
follower.join();
IOUtils.close(follower.engine, follower.engine.store);
}
}

class Follower extends Thread {
private final Engine leader;
private final InternalEngine engine;
private final TranslogHandler translogHandler;
private final AtomicBoolean isDone;
private final CountDownLatch readLatch;
private final Path translogPath;

Follower(Engine leader, AtomicBoolean isDone, CountDownLatch readLatch, Path translogPath) {
Follower(Engine leader, AtomicBoolean isDone, CountDownLatch readLatch) throws IOException {
this.leader = leader;
this.isDone = isDone;
this.readLatch = readLatch;
this.translogHandler = new TranslogHandler(xContentRegistry(), IndexSettingsModule.newIndexSettings(shardId.getIndexName(),
engine.engineConfig.getIndexSettings().getSettings()));
this.translogPath = translogPath;
leader.engineConfig.getIndexSettings().getSettings()));
this.engine = createEngine(createStore(), createTempDir());
}

void pullOperations(Engine follower) throws IOException {
Expand All @@ -267,16 +267,15 @@ void pullOperations(Engine follower) throws IOException {

@Override
public void run() {
try (Store store = createStore();
InternalEngine follower = createEngine(store, translogPath)) {
try {
readLatch.countDown();
readLatch.await();
while (isDone.get() == false ||
follower.getLocalCheckpointTracker().getCheckpoint() < leader.getLocalCheckpoint()) {
pullOperations(follower);
engine.getLocalCheckpointTracker().getCheckpoint() < leader.getLocalCheckpoint()) {
pullOperations(engine);
}
assertConsistentHistoryBetweenTranslogAndLuceneIndex(follower, mapperService);
assertThat(getDocIds(follower, true), equalTo(getDocIds(leader, true)));
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
assertThat(getDocIds(engine, true), equalTo(getDocIds(leader, true)));
} catch (Exception ex) {
throw new AssertionError(ex);
}
Expand Down

0 comments on commit 39e3bd9

Please sign in to comment.