Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sequence numbers to identify out of order delivery in replicas & recovery #24060

Merged
merged 7 commits into from
Apr 14, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbersService;

import java.io.IOException;

Expand All @@ -43,15 +46,18 @@
* in more than one document! It will only return the first one it
* finds. */

final class PerThreadIDAndVersionLookup {
final class PerThreadIDAndVersionSeqNoLookup {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "and" is in an awkward place now, maybe PerThreadIDVersionAndSeqNoLookup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed and fixed.

// TODO: do we really need to store all this stuff? some if it might not speed up anything.
// we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff

/** terms enum for uid field */
private final TermsEnum termsEnum;
/** _version data */
private final NumericDocValues versions;

/** _seq_no data */
private final NumericDocValues seqNos;
/** _primary_term data */
private final NumericDocValues primaryTerms;
/** Reused for iteration (when the term exists) */
private PostingsEnum docsEnum;

Expand All @@ -61,7 +67,7 @@ final class PerThreadIDAndVersionLookup {
/**
* Initialize lookup for the provided segment
*/
PerThreadIDAndVersionLookup(LeafReader reader) throws IOException {
PerThreadIDAndVersionSeqNoLookup(LeafReader reader) throws IOException {
Fields fields = reader.fields();
Terms terms = fields.terms(UidFieldMapper.NAME);
termsEnum = terms.iterator();
Expand All @@ -74,6 +80,8 @@ final class PerThreadIDAndVersionLookup {
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME +
"] field");
}
seqNos = reader.getNumericDocValues(SeqNoFieldMapper.NAME);
primaryTerms = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
Object readerKey = null;
assert (readerKey = reader.getCoreCacheKey()) != null;
this.readerKey = readerKey;
Expand Down Expand Up @@ -113,4 +121,21 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
return DocIdSetIterator.NO_MORE_DOCS;
}
}

/** Return null if id is not found. */
public DocIdAndSeqNo lookupSequenceNo(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think lookupSeqNo?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this method can be package private? And maybe the other lookup methods in this class too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed on all accounts.

assert context.reader().getCoreCacheKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
int docID = getDocID(id, liveDocs);
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
return new DocIdAndSeqNo(docID, seqNos == null ? SequenceNumbersService.UNASSIGNED_SEQ_NO : seqNos.get(docID), context);
} else {
return null;
}
}

/** returns 0 if the primary term is not found */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add link to IndexMetaData#primaryTerm as it has a note about the primary term on an operational shard always being strictly positive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good one.

public long lookUpPrimaryTerm(int docID) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be package private?

return primaryTerms == null ? 0 : primaryTerms.get(docID);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.lucene.uid;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReader.CoreClosedListener;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.mapper.UidFieldMapper;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentMap;

import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND;

/** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */
public class VersionsAndSeqNoResolver {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this class final? and does it need to be public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure


static final ConcurrentMap<Object, CloseableThreadLocal<PerThreadIDAndVersionSeqNoLookup>> lookupStates =
ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();

// Evict this reader from lookupStates once it's closed:
private static final CoreClosedListener removeLookupState = key -> {
CloseableThreadLocal<PerThreadIDAndVersionSeqNoLookup> ctl = lookupStates.remove(key);
if (ctl != null) {
ctl.close();
}
};

private static PerThreadIDAndVersionSeqNoLookup getLookupState(LeafReader reader) throws IOException {
Object key = reader.getCoreCacheKey();
CloseableThreadLocal<PerThreadIDAndVersionSeqNoLookup> ctl = lookupStates.get(key);
if (ctl == null) {
// First time we are seeing this reader's core; make a
// new CTL:
ctl = new CloseableThreadLocal<>();
CloseableThreadLocal<PerThreadIDAndVersionSeqNoLookup> other = lookupStates.putIfAbsent(key, ctl);
if (other == null) {
// Our CTL won, we must remove it when the
// core is closed:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that you can put this comment all on one line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it was like that before, but we are here now. 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

reader.addCoreClosedListener(removeLookupState);
} else {
// Another thread beat us to it: just use
// their CTL:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that you can put this comment all on one line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it was like that before, but we are here now. 😄

ctl = other;
}
}

PerThreadIDAndVersionSeqNoLookup lookupState = ctl.get();
if (lookupState == null) {
lookupState = new PerThreadIDAndVersionSeqNoLookup(reader);
ctl.set(lookupState);
}

return lookupState;
}

private VersionsAndSeqNoResolver() {
}

/** Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. */
public static class DocIdAndVersion {
public final int docId;
public final long version;
public final LeafReaderContext context;

public DocIdAndVersion(int docId, long version, LeafReaderContext context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this constructor can be package private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

this.docId = docId;
this.version = version;
this.context = context;
}
}

/** Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a seqNo. */
public static class DocIdAndSeqNo {
public final int docId;
public final long seqNo;
public final LeafReaderContext context;

public DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this constructor can be package private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

this.docId = docId;
this.seqNo = seqNo;
this.context = context;
}
}


/**
* Load the internal doc ID and version for the uid from the reader, returning<ul>
* <li>null if the uid wasn't found,
* <li>a doc ID and a version otherwise
* </ul>
*/
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh please give us a message here what field it is... it will save some CPU cycles if shit hits the fan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good one.

List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return null;
}
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReaderContext context = leaves.get(i);
LeafReader leaf = context.reader();
PerThreadIDAndVersionSeqNoLookup lookup = getLookupState(leaf);
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context);
if (result != null) {
return result;
}
}
return null;
}

/**
* Load the internal doc ID and sequence number for the uid from the reader, returning<ul>
* <li>null if the uid wasn't found,
* <li>a doc ID and the associated seqNo otherwise
* </ul>
*/
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return null;
}
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReaderContext context = leaves.get(i);
LeafReader leaf = context.reader();
PerThreadIDAndVersionSeqNoLookup lookup = getLookupState(leaf);
DocIdAndSeqNo result = lookup.lookupSequenceNo(term.bytes(), leaf.getLiveDocs(), context);
if (result != null) {
return result;
}
}
return null;
}

/**
* Load the primaryTerm associated with the given {@link DocIdAndSeqNo}
*/
public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo) throws IOException {
LeafReader leaf = docIdAndSeqNo.context.reader();
PerThreadIDAndVersionSeqNoLookup lookup = getLookupState(leaf);
long result = lookup.lookUpPrimaryTerm(docIdAndSeqNo.docId);
assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]"
+ " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]";
return result;
}

/**
* Load the version for the uid from the reader, returning<ul>
* <li>{@link Versions#NOT_FOUND} if no matching doc exists,
* <li>the version associated with the provided uid otherwise
* </ul>
*/
public static long loadVersion(IndexReader reader, Term term) throws IOException {
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
}
}
Loading