Skip to content

Commit

Permalink
Track deletes only in the tombstone map instead of maintaining as copy (
Browse files Browse the repository at this point in the history
#27868)

Today we maintain a copy of every delete in the live version maps. This is unnecessary
and might add quite some overhead if maps grow large. This change moves out the deletes
tracking into the tombstone map only and relies on the cleaning of tombstones when deletes
are collected.
  • Loading branch information
s1monw committed Feb 19, 2018
1 parent 7157dda commit 4ad361d
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1541,4 +1541,9 @@ public interface Warmer {
public boolean isRecovering() {
return false;
}

/**
* Tries to prune buffered deletes from the version map.
*/
public abstract void maybePruneDeletes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1235,7 +1235,7 @@ public DeleteResult delete(Delete delete) throws IOException {
}
throw e;
}
maybePruneDeletedTombstones();
maybePruneDeletes();
return deleteResult;
}

Expand Down Expand Up @@ -1368,7 +1368,8 @@ public static DeletionStrategy processButSkipLucene(boolean currentlyDeleted, lo
}
}

private void maybePruneDeletedTombstones() {
@Override
public void maybePruneDeletes() {
// It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
// every 1/4 of gcDeletesInMillis:
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().relativeTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
Expand Down Expand Up @@ -1458,7 +1459,7 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
// TODO: maybe we should just put a scheduled job in threadPool?
// We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
// for a long time:
maybePruneDeletedTombstones();
maybePruneDeletes();
mergeScheduler.refreshConfig();
}

Expand Down Expand Up @@ -1678,32 +1679,15 @@ public void trimTranslog() throws EngineException {
}

private void pruneDeletedTombstones() {
long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();

// TODO: not good that we reach into LiveVersionMap here; can we move this inside VersionMap instead? problem is the dirtyLock...

// we only need to prune the deletes map; the current/old version maps are cleared on refresh:
for (Map.Entry<BytesRef, DeleteVersionValue> entry : versionMap.getAllTombstones()) {
BytesRef uid = entry.getKey();
try (Releasable ignored = versionMap.acquireLock(uid)) {
// can we do it without this lock on each value? maybe batch to a set and get the lock once per set?

// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
DeleteVersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
if (versionValue != null) {
if (timeMSec - versionValue.time > getGcDeletesInMillis()) {
versionMap.removeTombstoneUnderLock(uid);
}
}
}
}

final long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
versionMap.pruneTombstones(timeMSec, engineConfig.getIndexSettings().getGcDeletesInMillis());
lastDeleteVersionPruneTimeMSec = timeMSec;
}

// testing
void clearDeletedTombstones() {
versionMap.clearTombstones();
// clean with current time Long.MAX_VALUE and interval 0 since we use a greater than relationship here.
versionMap.pruneTombstones(Long.MAX_VALUE, 0);
}

@Override
Expand Down Expand Up @@ -2253,7 +2237,7 @@ private void ensureCanFlush() {
public void onSettingsChanged() {
mergeScheduler.refreshConfig();
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
maybePruneDeletedTombstones();
maybePruneDeletes();
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
// this is an anti-viral settings you can only opt out for the entire index
// only if a shard starts up again due to relocation or if the index is closed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

/** Maps _uid value to its version information. */
final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
Expand All @@ -40,6 +41,10 @@ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accounta

private static final class VersionLookup {

/** Tracks bytes used by current map, i.e. what is freed on refresh. For deletes, which are also added to tombstones, we only account
* for the CHM entry here, and account for BytesRef/VersionValue against the tombstones, since refresh would not clear this RAM. */
final AtomicLong ramBytesUsed = new AtomicLong();

private static final VersionLookup EMPTY = new VersionLookup(Collections.emptyMap());
private final Map<BytesRef, VersionValue> map;

Expand All @@ -55,6 +60,10 @@ private static final class VersionLookup {
// map reference itself.
private boolean unsafe;

// minimum timestamp of delete operations that were made while this map was active. this is used to make sure they are kept in
// the tombstone
private final AtomicLong minDeleteTimestamp = new AtomicLong(Long.MAX_VALUE);

private VersionLookup(Map<BytesRef, VersionValue> map) {
this.map = map;
}
Expand All @@ -71,7 +80,6 @@ boolean isEmpty() {
return map.isEmpty();
}


int size() {
return map.size();
}
Expand All @@ -83,6 +91,16 @@ boolean isUnsafe() {
void markAsUnsafe() {
unsafe = true;
}

public VersionValue remove(BytesRef uid) {
return map.remove(uid);
}

public void updateMinDeletedTimestamp(DeleteVersionValue delete) {
long time = delete.time;
minDeleteTimestamp.updateAndGet(prev -> Math.min(time, prev));
}

}

private static final class Maps {
Expand All @@ -98,6 +116,7 @@ private static final class Maps {
boolean needsSafeAccess;
final boolean previousMapsNeededSafeAccess;


Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess) {
this.current = current;
this.old = old;
Expand All @@ -123,8 +142,8 @@ boolean shouldInheritSafeAccess() {
* Builds a new map for the refresh transition this should be called in beforeRefresh()
*/
Maps buildTransitionMap() {
return new Maps(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(current.size())),
current, shouldInheritSafeAccess());
return new Maps(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(current.size())), current,
shouldInheritSafeAccess());
}

/**
Expand All @@ -133,6 +152,39 @@ Maps buildTransitionMap() {
Maps invalidateOldMap() {
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
}

void put(BytesRef uid, VersionValue version) {
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
long ramAccounting = BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed;
VersionValue previousValue = current.put(uid, version);
ramAccounting += previousValue == null ? 0 : -(BASE_BYTES_PER_CHM_ENTRY + previousValue.ramBytesUsed() + uidRAMBytesUsed);
adjustRam(ramAccounting);
}

void adjustRam(long value) {
if (value != 0) {
long v = current.ramBytesUsed.addAndGet(value);
assert v >= 0 : "bytes=" + v;
}
}

void remove(BytesRef uid, DeleteVersionValue deleted) {
VersionValue previousValue = current.remove(uid);
current.updateMinDeletedTimestamp(deleted);
if (previousValue != null) {
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
adjustRam(-(BASE_BYTES_PER_CHM_ENTRY + previousValue.ramBytesUsed() + uidRAMBytesUsed));
}
if (old != VersionLookup.EMPTY) {
// we also need to remove it from the old map here to make sure we don't read this stale value while
// we are in the middle of a refresh. Most of the time the old map is an empty map so we can skip it there.
old.remove(uid);
}
}

long getMinDeleteTimestamp() {
return Math.min(current.minDeleteTimestamp.get(), old.minDeleteTimestamp.get());
}
}

// All deletes also go here, and delete "tombstones" are retained after refresh:
Expand Down Expand Up @@ -178,12 +230,6 @@ Maps invalidateOldMap() {
BASE_BYTES_PER_CHM_ENTRY = chmEntryShallowSize + 2 * RamUsageEstimator.NUM_BYTES_OBJECT_REF;
}

/**
* Tracks bytes used by current map, i.e. what is freed on refresh. For deletes, which are also added to tombstones, we only account
* for the CHM entry here, and account for BytesRef/VersionValue against the tombstones, since refresh would not clear this RAM.
*/
final AtomicLong ramBytesUsedCurrent = new AtomicLong();

/**
* Tracks bytes used by tombstones (deletes)
*/
Expand All @@ -199,7 +245,6 @@ public void beforeRefresh() throws IOException {
assert (unsafeKeysMap = unsafeKeysMap.buildTransitionMap()) != null;
// This is not 100% correct, since concurrent indexing ops can change these counters in between our execution of the previous
// line and this one, but that should be minor, and the error won't accumulate over time:
ramBytesUsedCurrent.set(0);
}

@Override
Expand Down Expand Up @@ -292,48 +337,28 @@ void putUnderLock(BytesRef uid, VersionValue version) {
private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) {
assert keyedLock.isHeldByCurrentThread(uid);
assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length;
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
final VersionValue prev = maps.current.put(uid, version);
if (prev != null) {
// Deduct RAM for the version we just replaced:
long prevBytes = BASE_BYTES_PER_CHM_ENTRY;
if (prev.isDelete() == false) {
prevBytes += prev.ramBytesUsed() + uidRAMBytesUsed;
}
ramBytesUsedCurrent.addAndGet(-prevBytes);
}

// Add RAM for the new version:
long newBytes = BASE_BYTES_PER_CHM_ENTRY;
if (version.isDelete() == false) {
newBytes += version.ramBytesUsed() + uidRAMBytesUsed;
}
ramBytesUsedCurrent.addAndGet(newBytes);

final VersionValue prevTombstone;
if (version.isDelete()) {
// Also enroll the delete into tombstones, and account for its RAM too:
prevTombstone = tombstones.put(uid, (DeleteVersionValue) version);

// We initially account for BytesRef/VersionValue RAM for a delete against the tombstones, because this RAM will not be freed up
// on refresh. Later, in removeTombstoneUnderLock, if we clear the tombstone entry but the delete remains in current, we shift
// the accounting to current:
ramBytesUsedTombstones.addAndGet(BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed);

if (prevTombstone == null && prev != null && prev.isDelete()) {
// If prev was a delete that had already been removed from tombstones, then current was already accounting for the
// BytesRef/VersionValue RAM, so we now deduct that as well:
ramBytesUsedCurrent.addAndGet(-(prev.ramBytesUsed() + uidRAMBytesUsed));
}
maps.put(uid, version);
removeTombstoneUnderLock(uid);
} else {
// UID came back to life so we remove the tombstone:
prevTombstone = tombstones.remove(uid);
DeleteVersionValue versionValue = (DeleteVersionValue) version;
putTombstone(uid, versionValue);
maps.remove(uid, versionValue);
}
}

private void putTombstone(BytesRef uid, DeleteVersionValue version) {
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
// Also enroll the delete into tombstones, and account for its RAM too:
final VersionValue prevTombstone = tombstones.put(uid, version);
long accountRam = (BASE_BYTES_PER_CHM_ENTRY + version.ramBytesUsed() + uidRAMBytesUsed);
// Deduct tombstones bytes used for the version we just removed or replaced:
if (prevTombstone != null) {
long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed));
assert v >= 0 : "bytes=" + v;
accountRam -= (BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed);
}
if (accountRam != 0) {
long v = ramBytesUsedTombstones.addAndGet(accountRam);
assert v >= 0: "bytes=" + v;
}
}

Expand All @@ -343,42 +368,35 @@ private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) {
void removeTombstoneUnderLock(BytesRef uid) {
assert keyedLock.isHeldByCurrentThread(uid);
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;

final VersionValue prev = tombstones.remove(uid);
if (prev != null) {
assert prev.isDelete();
long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed));
assert v >= 0 : "bytes=" + v;
}
final VersionValue curVersion = maps.current.get(uid);
if (curVersion != null && curVersion.isDelete()) {
// We now shift accounting of the BytesRef from tombstones to current, because a refresh would clear this RAM. This should be
// uncommon, because with the default refresh=1s and gc_deletes=60s, deletes should be cleared from current long before we drop
// them from tombstones:
ramBytesUsedCurrent.addAndGet(curVersion.ramBytesUsed() + uidRAMBytesUsed);
}
}

/**
* Caller has a lock, so that this uid will not be concurrently added/deleted by another thread.
*/
DeleteVersionValue getTombstoneUnderLock(BytesRef uid) {
assert keyedLock.isHeldByCurrentThread(uid);
return tombstones.get(uid);
}

/**
* Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd).
*/
Iterable<Map.Entry<BytesRef, DeleteVersionValue>> getAllTombstones() {
return tombstones.entrySet();
}

/**
* clears all tombstones ops
*/
void clearTombstones() {
tombstones.clear();
void pruneTombstones(long currentTime, long pruneInterval) {
for (Map.Entry<BytesRef, DeleteVersionValue> entry : tombstones.entrySet()) {
BytesRef uid = entry.getKey();
try (Releasable ignored = acquireLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get
// the lock once per set?
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
DeleteVersionValue versionValue = tombstones.get(uid);
if (versionValue != null) {
// check if the value is old enough to be removed
final boolean isTooOld = currentTime - versionValue.time > pruneInterval;
if (isTooOld) {
// version value can't be removed it's
// not yet flushed to lucene ie. it's part of this current maps object
final boolean isNotTrackedByCurrentMaps = versionValue.time < maps.getMinDeleteTimestamp();
if (isNotTrackedByCurrentMaps) {
removeTombstoneUnderLock(uid);
}
}
}
}
}
}

/**
Expand All @@ -387,8 +405,6 @@ void clearTombstones() {
synchronized void clear() {
maps = new Maps();
tombstones.clear();
ramBytesUsedCurrent.set(0);

// NOTE: we can't zero this here, because a refresh thread could be calling InternalEngine.pruneDeletedTombstones at the same time,
// and this will lead to an assert trip. Presumably it's fine if our ramBytesUsedTombstones is non-zero after clear since the index
// is being closed:
Expand All @@ -397,15 +413,15 @@ synchronized void clear() {

@Override
public long ramBytesUsed() {
return ramBytesUsedCurrent.get() + ramBytesUsedTombstones.get();
return maps.current.ramBytesUsed.get() + ramBytesUsedTombstones.get();
}

/**
* Returns how much RAM would be freed up by refreshing. This is {@link #ramBytesUsed} except does not include tombstones because they
* don't clear on refresh.
*/
long ramBytesUsedForRefresh() {
return ramBytesUsedCurrent.get();
return maps.current.ramBytesUsed.get();
}

@Override
Expand All @@ -421,6 +437,11 @@ Map<BytesRef, VersionValue> getAllCurrent() {
return maps.current.map;
}

/** Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd). */
Map<BytesRef, DeleteVersionValue> getAllTombstones() {
return tombstones;
}

/**
* Acquires a releaseable lock for the given uId. All *UnderLock methods require
* this lock to be hold by the caller otherwise the visibility guarantees of this version
Expand Down
Loading

0 comments on commit 4ad361d

Please sign in to comment.