Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track deletes only in the tombstone map instead of maintaining as copy #27868

Merged
merged 21 commits into from
Feb 19, 2018
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1616,32 +1616,14 @@ public void trimTranslog() throws EngineException {
}

private void pruneDeletedTombstones() {
long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();

// TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock...

// we only need to prune the deletes map; the current/old version maps are cleared on refresh:
for (Map.Entry<BytesRef, DeleteVersionValue> entry : versionMap.getAllTombstones()) {
BytesRef uid = entry.getKey();
try (Releasable ignored = versionMap.acquireLock(uid)) {
// can we do it without this lock on each value? maybe batch to a set and get the lock once per set?

// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
DeleteVersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
if (versionValue != null) {
if (timeMSec - versionValue.time > getGcDeletesInMillis()) {
versionMap.removeTombstoneUnderLock(uid);
}
}
}
}

final long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
versionMap.pruneTombstones(timeMSec, engineConfig.getIndexSettings().getGcDeletesInMillis());
lastDeleteVersionPruneTimeMSec = timeMSec;
}

// testing
void clearDeletedTombstones() {
versionMap.clearTombstones();
versionMap.pruneTombstones(-1, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait - this feels weird - current time is -1 and interval is 0? Shouldn't current time be Long.MAX_VALUE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time means we clean everything, I think that is correct?

}

@Override
Expand Down
177 changes: 103 additions & 74 deletions server/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/** Maps _uid value to its version information. */
final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
Expand All @@ -54,6 +55,7 @@ private static final class VersionLookup {
// that will prevent concurrent updates to the same document ID and therefore we can rely on the happens-before guanratee of the
// map reference itself.
private boolean unsafe;
private final AtomicLong minDeleteTimestamp = new AtomicLong(Long.MAX_VALUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment/java doc to what this mean? (minimum timestamp of delete operations that were made while this map was active. this is used to make sure they are kept in the tombstone)


private VersionLookup(Map<BytesRef, VersionValue> map) {
this.map = map;
Expand All @@ -71,7 +73,6 @@ boolean isEmpty() {
return map.isEmpty();
}


int size() {
return map.size();
}
Expand All @@ -83,6 +84,15 @@ boolean isUnsafe() {
void markAsUnsafe() {
unsafe = true;
}

public VersionValue remove(BytesRef uid) {
return map.remove(uid);
}

public void updateMinDeletedTimestamp(DeleteVersionValue delete) {
long time = delete.time;
minDeleteTimestamp.updateAndGet(prev -> Math.min(time, prev));
}
}

private static final class Maps {
Expand All @@ -97,15 +107,20 @@ private static final class Maps {
// have the volatile read of the Maps reference to make it visible even across threads.
boolean needsSafeAccess;
final boolean previousMapsNeededSafeAccess;
/** Tracks bytes used by current map, i.e. what is freed on refresh. For deletes, which are also added to tombstones, we only account
* for the CHM entry here, and account for BytesRef/VersionValue against the tombstones, since refresh would not clear this RAM. */
final AtomicLong ramBytesUsed;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to move it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an idea - maybe move it to VersionLookup and the VersionLookup class for the tombstones as well? then all accounting is in one place?


Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess) {
Maps(AtomicLong ramBytesUsed, VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess) {
this.current = current;
this.old = old;
this.previousMapsNeededSafeAccess = previousMapsNeededSafeAccess;
this.ramBytesUsed = ramBytesUsed;
}

Maps() {
this(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()), VersionLookup.EMPTY, false);
this(new AtomicLong(),
new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()), VersionLookup.EMPTY, false);
}

boolean isSafeAccessMode() {
Expand All @@ -123,15 +138,49 @@ boolean shouldInheritSafeAccess() {
* Builds a new map for the refresh transition this should be called in beforeRefresh()
*/
Maps buildTransitionMap() {
return new Maps(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(current.size())),
current, shouldInheritSafeAccess());
return new Maps(new AtomicLong(),
new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(current.size())), current,
shouldInheritSafeAccess());
}

/**
* builds a new map that invalidates the old map but maintains the current. This should be called in afterRefresh()
*/
Maps invalidateOldMap() {
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
return new Maps(ramBytesUsed, current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
}

void put(BytesRef uid, VersionValue version) {
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
long ramAccounting = BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed;
VersionValue previousValue = current.put(uid, version);
ramAccounting += previousValue == null ? 0 : -(BASE_BYTES_PER_CHM_ENTRY + previousValue.ramBytesUsed() + uidRAMBytesUsed);
adjustRam(ramAccounting);
}

void adjustRam(long value) {
if (value != 0) {
long v = ramBytesUsed.addAndGet(value);
assert v >= 0 : "bytes=" + v;
}
}

void remove(BytesRef uid, DeleteVersionValue deleted) {
VersionValue previousValue = current.remove(uid);
current.updateMinDeletedTimestamp(deleted);
if (previousValue != null) {
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
adjustRam(-(BASE_BYTES_PER_CHM_ENTRY + previousValue.ramBytesUsed() + uidRAMBytesUsed));
}
if (old != VersionLookup.EMPTY) {
// we also need to remove it from the old map here to make sure we don't read this stale value while
// we are in the middle of a refresh. Most of the time the old map is an empty map so we can skip it there.
old.remove(uid);
}
}

long getMinDeleteTimestamp() {
return Math.min(current.minDeleteTimestamp.get(), old.minDeleteTimestamp.get());
}
}

Expand Down Expand Up @@ -178,12 +227,6 @@ Maps invalidateOldMap() {
BASE_BYTES_PER_CHM_ENTRY = chmEntryShallowSize + 2 * RamUsageEstimator.NUM_BYTES_OBJECT_REF;
}

/**
* Tracks bytes used by current map, i.e. what is freed on refresh. For deletes, which are also added to tombstones, we only account
* for the CHM entry here, and account for BytesRef/VersionValue against the tombstones, since refresh would not clear this RAM.
*/
final AtomicLong ramBytesUsedCurrent = new AtomicLong();

/**
* Tracks bytes used by tombstones (deletes)
*/
Expand All @@ -199,7 +242,6 @@ public void beforeRefresh() throws IOException {
assert (unsafeKeysMap = unsafeKeysMap.buildTransitionMap()) != null;
// This is not 100% correct, since concurrent indexing ops can change these counters in between our execution of the previous
// line and this one, but that should be minor, and the error won't accumulate over time:
ramBytesUsedCurrent.set(0);
}

@Override
Expand Down Expand Up @@ -292,48 +334,31 @@ void putUnderLock(BytesRef uid, VersionValue version) {
private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) {
assert keyedLock.isHeldByCurrentThread(uid);
assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length;
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
final VersionValue prev = maps.current.put(uid, version);
if (prev != null) {
// Deduct RAM for the version we just replaced:
long prevBytes = BASE_BYTES_PER_CHM_ENTRY;
if (prev.isDelete() == false) {
prevBytes += prev.ramBytesUsed() + uidRAMBytesUsed;
}
ramBytesUsedCurrent.addAndGet(-prevBytes);
}

// Add RAM for the new version:
long newBytes = BASE_BYTES_PER_CHM_ENTRY;
if (version.isDelete() == false) {
newBytes += version.ramBytesUsed() + uidRAMBytesUsed;
}
ramBytesUsedCurrent.addAndGet(newBytes);

final VersionValue prevTombstone;
if (version.isDelete()) {
// Also enroll the delete into tombstones, and account for its RAM too:
prevTombstone = tombstones.put(uid, (DeleteVersionValue) version);

// We initially account for BytesRef/VersionValue RAM for a delete against the tombstones, because this RAM will not be freed up
// on refresh. Later, in removeTombstoneUnderLock, if we clear the tombstone entry but the delete remains in current, we shift
// the accounting to current:
ramBytesUsedTombstones.addAndGet(BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed);

if (prevTombstone == null && prev != null && prev.isDelete()) {
// If prev was a delete that had already been removed from tombstones, then current was already accounting for the
// BytesRef/VersionValue RAM, so we now deduct that as well:
ramBytesUsedCurrent.addAndGet(-(prev.ramBytesUsed() + uidRAMBytesUsed));
}
maps.put(uid, version);
removeTombstoneUnderLock(uid);
} else {
// UID came back to life so we remove the tombstone:
prevTombstone = tombstones.remove(uid);
DeleteVersionValue versionValue = (DeleteVersionValue) version;
putTombstone(uid, versionValue);
maps.remove(uid, versionValue);
}
}

private void putTombstone(BytesRef uid, DeleteVersionValue version) {
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
// Also enroll the delete into tombstones, and account for its RAM too:
final VersionValue prevTombstone = tombstones.put(uid, version);
// We initially account for BytesRef/VersionValue RAM for a delete against the tombstones, because this RAM will not be freed up
// on refresh. Later, in removeTombstoneUnderLock, if we clear the tombstone entry but the delete remains in current, we shift
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is wrong now.

// the accounting to current:
long accountRam = (BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed);
// Deduct tombstones bytes used for the version we just removed or replaced:
if (prevTombstone != null) {
long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed));
assert v >= 0 : "bytes=" + v;
accountRam -= (BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed);
}
if (accountRam != 0) {
long v = ramBytesUsedTombstones.addAndGet(accountRam);
assert v >= 0: "bytes=" + v;
}
}

Expand All @@ -343,19 +368,34 @@ private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) {
void removeTombstoneUnderLock(BytesRef uid) {
assert keyedLock.isHeldByCurrentThread(uid);
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;

final VersionValue prev = tombstones.remove(uid);
if (prev != null) {
assert prev.isDelete();
long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed));
assert v >= 0 : "bytes=" + v;
}
final VersionValue curVersion = maps.current.get(uid);
if (curVersion != null && curVersion.isDelete()) {
// We now shift accounting of the BytesRef from tombstones to current, because a refresh would clear this RAM. This should be
// uncommon, because with the default refresh=1s and gc_deletes=60s, deletes should be cleared from current long before we drop
// them from tombstones:
ramBytesUsedCurrent.addAndGet(curVersion.ramBytesUsed() + uidRAMBytesUsed);
}

void pruneTombstones(long currentTime, long pruneInterval) {
for (Map.Entry<BytesRef, DeleteVersionValue> entry : tombstones.entrySet()) {
BytesRef uid = entry.getKey();
try (Releasable ignored = acquireLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get
// the lock once per set?
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
DeleteVersionValue versionValue = tombstones.get(uid);
if (versionValue != null) {
// check if the value is old enough to be removed
final boolean isTooOld = currentTime - versionValue.time > pruneInterval;
if (isTooOld) {
// version value can't be removed it's
// not yet flushed to lucene ie. it's part of this current maps object
final boolean isNotTrackedByCurrentMaps = versionValue.time < maps.getMinDeleteTimestamp();
if (isNotTrackedByCurrentMaps) {
removeTombstoneUnderLock(uid);
}
}
}
}
}
}

Expand All @@ -367,28 +407,12 @@ DeleteVersionValue getTombstoneUnderLock(BytesRef uid) {
return tombstones.get(uid);
}

/**
* Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd).
*/
Iterable<Map.Entry<BytesRef, DeleteVersionValue>> getAllTombstones() {
return tombstones.entrySet();
}

/**
* clears all tombstones ops
*/
void clearTombstones() {
tombstones.clear();
}

/**
* Called when this index is closed.
*/
synchronized void clear() {
maps = new Maps();
tombstones.clear();
ramBytesUsedCurrent.set(0);

// NOTE: we can't zero this here, because a refresh thread could be calling InternalEngine.pruneDeletedTombstones at the same time,
// and this will lead to an assert trip. Presumably it's fine if our ramBytesUsedTombstones is non-zero after clear since the index
// is being closed:
Expand All @@ -397,15 +421,15 @@ synchronized void clear() {

@Override
public long ramBytesUsed() {
return ramBytesUsedCurrent.get() + ramBytesUsedTombstones.get();
return maps.ramBytesUsed.get() + ramBytesUsedTombstones.get();
}

/**
* Returns how much RAM would be freed up by refreshing. This is {@link #ramBytesUsed} except does not include tombstones because they
* don't clear on refresh.
*/
long ramBytesUsedForRefresh() {
return ramBytesUsedCurrent.get();
return maps.ramBytesUsed.get();
}

@Override
Expand All @@ -421,6 +445,11 @@ Map<BytesRef, VersionValue> getAllCurrent() {
return maps.current.map;
}

/** Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd). */
Map<BytesRef, DeleteVersionValue> getAllTombstones() {
return tombstones;
}

/**
* Acquires a releaseable lock for the given uId. All *UnderLock methods require
* this lock to be hold by the caller otherwise the visibility guarantees of this version
Expand Down
Loading