Skip to content

Commit

Permalink
Fix to bug in TableMgr::Iterator::gotoEnd() (#166)
Browse files Browse the repository at this point in the history
* Same as in the other layer, table iterator should have
`moveToLastValid()` function to find the last key with the first
(i.e., the greatest/newest) sequence number. Otherwise, stale
data may be returned if different tables have different versions of
the same key.
  • Loading branch information
greensky00 committed Mar 6, 2024
1 parent eca5ca0 commit 48b0835
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 9 deletions.
44 changes: 35 additions & 9 deletions src/table_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ Status TableMgr::Iterator::next() {
cur_key.free();
if (!windowCursor) {
// Reached the end.
windowCursor = avl_last(&curWindow);
moveToLastValid();
return Status::OUT_OF_RANGE;
}
return Status();
Expand All @@ -320,6 +320,39 @@ Status TableMgr::Iterator::gotoEnd() {
return seekInternal(empty_key, 0, SMALLER, true);
}

Status TableMgr::Iterator::moveToLastValid() {
windowCursor = avl_last(&curWindow);
while (windowCursor) {
// Find *LAST* valid item (only for BY_KEY).
//
// e.g.)
// ... Del K9 (seq 100), Ins K9 (seq 99)
// We should pick up `Del K9`.
ItrItem* item = _get_entry(windowCursor, ItrItem, an);

if (type == BY_KEY) {
ItrItem* prev_item = nullptr;
avl_node* prev_cursor = avl_prev(windowCursor);
if (prev_cursor) prev_item = _get_entry(prev_cursor, ItrItem, an);

if (prev_item) {
int cmp = cmpSizedBuf( item->lastRec.kv.key,
prev_item->lastRec.kv.key );
if (cmp == 0) {
// Same key, should take previous one.
windowCursor = prev_cursor;
continue;
}
}
}
break;
#if 0
if (item->flags == ItrItem::none) break;
else windowCursor = avl_prev(windowCursor);
#endif
}
return Status();
}

Status TableMgr::Iterator::seekInternal
( const SizedBuf& key,
Expand Down Expand Up @@ -405,14 +438,7 @@ Status TableMgr::Iterator::seekInternal
else windowCursor = avl_next(windowCursor);
}
} else { // SMALLER
windowCursor = avl_last(&curWindow);
while (windowCursor) {
// Find *LAST* valid item (only for BY_KEY).
ItrItem* item = _get_entry(windowCursor, ItrItem, an);

if (item->flags == ItrItem::none) break;
else windowCursor = avl_prev(windowCursor);
}
moveToLastValid();
}

if (!windowCursor) {
Expand Down
1 change: 1 addition & 0 deletions src/table_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ class TableMgr {
const uint64_t seqnum,
SeekOption opt,
bool goto_end = false);
Status moveToLastValid();
inline int cmpSizedBuf(const SizedBuf& l, const SizedBuf& r);
bool checkValidBySeq(ItrItem* item,
const uint64_t cur_seq,
Expand Down
101 changes: 101 additions & 0 deletions tests/jungle/key_itr_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
#include "jungle_test_common.h"

#include "libjungle/jungle.h"
#include "table_helper.h"

#include <vector>

Expand Down Expand Up @@ -881,6 +882,103 @@ int itr_key_flush_and_delete_half_even(bool recreate = false) {
return 0;
}

int different_version_and_level_test(bool deletion) {
std::string filename;
TEST_SUITE_PREPARE_PATH(filename);

jungle::Status s;
jungle::DBConfig config;
TEST_CUSTOM_DB_CONFIG(config);
jungle::DB* db;

config.maxL0TableSize = 1024 * 1024;
config.maxL1TableSize = 1024 * 1024;
CHK_Z(jungle::DB::open(&db, filename, config));

const size_t NUM = 2;

// Put older version to L1, and nwer version to L0.
// Key: `kkk`, value: `v0` and `v1`.
// If `deletion == true`, issue a delete operation.
for (size_t ii = 0; ii < NUM; ++ii) {
std::string key_str = "kkk";
std::string val_str = "v" + TestSuite::lzStr(1, ii);
if (ii == 1 && deletion) {
CHK_Z(db->del(key_str));
} else {
CHK_Z(db->set(jungle::KV(key_str, val_str)));
}
CHK_Z(db->sync());
CHK_Z(db->flushLogs());

if (ii == 0) {
jungle::CompactOptions c_opt;
for (size_t jj = 0; jj < config.numL0Partitions; ++jj) {
CHK_Z(db->compactL0(c_opt, jj));
}
}
}

// Put a greater key to log.
// Key: `kkkk`, value: `v`.
std::string key_str = "kkkk";
std::string val_str = "v";
CHK_Z(db->set(jungle::KV(key_str, val_str)));
CHK_Z(db->sync());

// Reverse scan.
jungle::Iterator itr;
std::string s_key_str = "k";
std::string e_key_str = "l";
itr.init(db, s_key_str, e_key_str);
itr.gotoEnd();

size_t count = 0;
do {
jungle::Record rec_out;
jungle::Record::Holder h(rec_out);
s = itr.get(rec_out);
if (!s) break;

TestSuite::_msg("key: %s, value: %s\n",
rec_out.kv.key.toString().c_str(),
rec_out.kv.value.toString().c_str());

if (count == 0) {
CHK_EQ(jungle::SizedBuf("kkkk"), rec_out.kv.key);
CHK_EQ(jungle::SizedBuf("v"), rec_out.kv.value);

} else if (count == 1) {
if (deletion) {
return -1;
} else {
CHK_EQ(jungle::SizedBuf("kkk"), rec_out.kv.key);
CHK_EQ(jungle::SizedBuf("v1"), rec_out.kv.value);
}

} else {
return -1;
}

rec_out.free();
count++;
} while (itr.prev().ok());
itr.close();

if (deletion) {
CHK_EQ(1, count);
} else {
CHK_EQ(2, count);
}

CHK_Z(jungle::DB::close(db));

jungle::shutdown();

TEST_SUITE_CLEANUP_PATH();
return 0;
}

int main(int argc, char** argv) {
TestSuite ts(argc, argv);

Expand All @@ -900,6 +998,9 @@ int main(int argc, char** argv) {
ts.doTest("key itr flush and delete half even test",
itr_key_flush_and_delete_half_even,
TestRange<bool>({false, true}));
ts.doTest("key itr different version and level test",
different_version_and_level_test,
TestRange<bool>({false, true}));

return 0;
}

0 comments on commit 48b0835

Please sign in to comment.