Skip to content

Commit

Permalink
Full oplog based updates
Browse files Browse the repository at this point in the history
  • Loading branch information
lalinsky committed Mar 10, 2024
1 parent 613e0b8 commit c4638c1
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 58 deletions.
2 changes: 2 additions & 0 deletions src/fpindex/base_segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class BaseSegment {
virtual ~BaseSegment() = default;

uint32_t id() const { return info_.id(); }
uint64_t min_oplog_id() const { return info_.min_oplog_id(); }
uint64_t max_oplog_id() const { return info_.max_oplog_id(); }

virtual bool Search(const std::vector<uint32_t> &hashes, std::vector<SearchResult> *results) = 0;
virtual std::vector<SearchResult> Search(const std::vector<uint32_t> &hashes);
Expand Down
34 changes: 32 additions & 2 deletions src/fpindex/index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ bool Index::Open() {

bool Index::Close() {
std::lock_guard<std::mutex> lock(mutex_);
if (oplog_) {
oplog_->Close();
oplog_.reset();
}
return true;
}

Expand All @@ -172,17 +176,43 @@ bool Index::Search(const std::vector<uint32_t>& hashes, std::vector<SearchResult
bool Index::Update(IndexUpdate&& update) {
std::lock_guard<std::mutex> lock(mutex_);

if (!data_) {
LOG_ERROR() << "index is not open";
return false;
}

if (!oplog_ || !oplog_->IsReady()) {
LOG_ERROR() << "oplog is not open";
return false;
}

auto stage = data_->stage_;
if (!stage) {
LOG_ERROR() << "stage is not open";
return false;
}

auto last_oplog_id = oplog_->GetLastId();
if (!last_oplog_id) {
LOG_ERROR() << "failed to get last oplog id";
return false;
}

if (stage->max_oplog_id() != last_oplog_id) {
LOG_ERROR() << "stage is not up-to-date";
return false;
}

auto check_update = [=](const auto& entries) {
return stage->CheckUpdate(entries);
};
auto entries = update.Finish();
if (!oplog_->Write(entries)) {
LOG_ERROR() << "failed to write to oplog";
if (!oplog_->Write(entries, check_update)) {
LOG_ERROR() << "failed to write oplog";
return false;
}

stage->Update(entries);
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion src/fpindex/logging.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include <string>
#include <QDebug>
#include <string>

#define LOG_ERROR() qCritical()
#define LOG_WARNING() qWarning()
Expand Down
39 changes: 38 additions & 1 deletion src/fpindex/oplog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ bool Oplog::Open() {
return true;
}

bool Oplog::Write(std::vector<OplogEntry> &entries) {
bool Oplog::Close() {
std::lock_guard<std::mutex> lock(mutex_);
db_.reset();
ready_ = false;
return true;
}

bool Oplog::Write(std::vector<OplogEntry> &entries, std::function<bool(std::vector<OplogEntry> &)> callback) {
std::lock_guard<std::mutex> lock(mutex_);

sqlite3 *db = db_->get();
Expand Down Expand Up @@ -96,6 +103,14 @@ bool Oplog::Write(std::vector<OplogEntry> &entries) {
}
}

if (!rollback) {
if (callback) {
if (!callback(entries)) {
rollback = true;
}
}
}

const char *end_txn_sql = rollback ? "ROLLBACK TRANSACTION" : "COMMIT TRANSACTION";
rc = sqlite3_exec(db, end_txn_sql, nullptr, nullptr, &err_msg);
if (rc != SQLITE_OK) {
Expand All @@ -107,4 +122,26 @@ bool Oplog::Write(std::vector<OplogEntry> &entries) {
return true;
};

std::optional<uint64_t> Oplog::GetLastId() {
std::lock_guard<std::mutex> lock(mutex_);

sqlite3 *db = db_->get();
sqlite3_stmt *stmt = nullptr;
int rc = sqlite3_prepare_v2(db, "SELECT MAX(op_id) FROM oplog", -1, &stmt, nullptr);
if (rc != SQLITE_OK) {
LOG_ERROR() << "failed to prepare statement: " << sqlite3_errstr(rc);
return std::nullopt;
}
auto finalize_stmt = util::MakeCleanup([stmt]() {
sqlite3_finalize(stmt);
});

rc = sqlite3_step(stmt);
if (rc != SQLITE_ROW) {
LOG_ERROR() << "failed to get last oplog id: " << sqlite3_errstr(rc);
return std::nullopt;
}
return sqlite3_column_int64(stmt, 0);
}

} // namespace fpindex
5 changes: 4 additions & 1 deletion src/fpindex/oplog.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <mutex>
#include <optional>

#include "fpindex/io/database.h"
#include "fpindex/proto/internal.pb.h"
Expand All @@ -11,8 +12,10 @@ class Oplog {
public:
Oplog(std::shared_ptr<io::Database> db);
bool Open();
bool Write(std::vector<OplogEntry> &entries);
bool Close();
bool Write(std::vector<OplogEntry> &entries, std::function<bool(std::vector<OplogEntry> &)> callback = nullptr);
bool IsReady();
std::optional<uint64_t> GetLastId();

protected:
bool CreateTable();
Expand Down
70 changes: 27 additions & 43 deletions src/fpindex/segment_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,56 +11,32 @@

namespace fpindex {

bool SegmentBuilder::InsertOrUpdate(uint32_t id, const google::protobuf::RepeatedField<uint32_t>& hashes) {
DeleteInternal(id);
void SegmentBuilder::InsertOrUpdate(uint32_t id, const google::protobuf::RepeatedField<uint32_t>& hashes) {
Delete(id);
for (auto hash : hashes) {
data_.insert(std::make_pair(hash, id));
}
ids_.insert(id);
updates_[id] = DocStatus::UPDATED;
return true;
}

bool SegmentBuilder::Delete(uint32_t id) {
DeleteInternal(id);
updates_[id] = DocStatus::DELETED;
return true;
}

void SegmentBuilder::DeleteInternal(uint32_t id) {
void SegmentBuilder::Delete(uint32_t id) {
if (auto it = ids_.find(id); it != ids_.end()) {
ids_.erase(it);
std::erase_if(data_, [id](const auto& pair) {
return pair.second == id;
});
}
updates_[id] = DocStatus::DELETED;
}

bool SegmentBuilder::Contains(uint32_t id) {
std::shared_lock<std::shared_mutex> lock;
if (!frozen_) {
lock = std::shared_lock<std::shared_mutex>(mutex_);
}
std::shared_lock<std::shared_mutex> lock(mutex_);
return ids_.contains(id);
}

bool SegmentBuilder::IsFrozen() {
std::unique_lock<std::shared_mutex> lock(mutex_);
return frozen_;
}

void SegmentBuilder::Freeze() {
if (!frozen_) {
std::unique_lock<std::shared_mutex> lock(mutex_);
frozen_ = true;
}
}

bool SegmentBuilder::Search(const std::vector<uint32_t>& hashes, std::vector<SearchResult>* results) {
std::shared_lock<std::shared_mutex> lock;
if (!frozen_) {
lock = std::shared_lock<std::shared_mutex>(mutex_);
}
std::shared_lock<std::shared_mutex> lock(mutex_);
std::map<uint32_t, uint32_t> scores;
for (auto hash : hashes) {
auto range = data_.equal_range(hash);
Expand All @@ -77,10 +53,7 @@ bool SegmentBuilder::Search(const std::vector<uint32_t>& hashes, std::vector<Sea
}

std::shared_ptr<Segment> SegmentBuilder::Save(const std::shared_ptr<io::File>& file) {
std::shared_lock<std::shared_mutex> lock;
if (!frozen_) {
lock = std::shared_lock<std::shared_mutex>(mutex_);
}
std::shared_lock<std::shared_mutex> lock(mutex_);

auto stream = file->GetOutputStream();
auto coded_stream = std::make_unique<google::protobuf::io::CodedOutputStream>(stream.get());
Expand Down Expand Up @@ -118,11 +91,27 @@ std::shared_ptr<Segment> SegmentBuilder::Save(const std::shared_ptr<io::File>& f
return result;
}

bool SegmentBuilder::Update(const std::vector<OplogEntry>& entries) {
std::unique_lock<std::shared_mutex> lock(mutex_);
if (frozen_) {
return false;
bool SegmentBuilder::CheckUpdate(const std::vector<OplogEntry>& entries) {
std::shared_lock<std::shared_mutex> lock(mutex_);
auto min_oplog_id = info_.min_oplog_id();
auto max_oplog_id = info_.max_oplog_id();
for (const auto& entry : entries) {
if (min_oplog_id == 0) {
min_oplog_id = entry.id();
max_oplog_id = entry.id();
} else {
if (entry.id() <= max_oplog_id) {
LOG_ERROR() << "oplog entries are not in ascending order";
return false;
}
max_oplog_id = entry.id();
}
}
return true;
}

void SegmentBuilder::Update(const std::vector<OplogEntry>& entries) {
std::unique_lock<std::shared_mutex> lock(mutex_);
for (const auto& entry : entries) {
if (entry.data().has_insert_or_update()) {
auto data = entry.data().insert_or_update();
Expand All @@ -135,14 +124,9 @@ bool SegmentBuilder::Update(const std::vector<OplogEntry>& entries) {
info_.set_min_oplog_id(entry.id());
info_.set_max_oplog_id(entry.id());
} else {
if (entry.id() <= info_.max_oplog_id()) {
LOG_ERROR() << "oplog entries are not in ascending order";
return false;
}
info_.set_max_oplog_id(entry.id());
}
}
return true;
}

} // namespace fpindex
14 changes: 4 additions & 10 deletions src/fpindex/segment_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,24 @@ class SegmentBuilder : public BaseSegment {
SegmentBuilder(const SegmentBuilder&) = delete;
SegmentBuilder& operator=(const SegmentBuilder&) = delete;

bool Update(const std::vector<OplogEntry>& update);
bool CheckUpdate(const std::vector<OplogEntry>& update);
void Update(const std::vector<OplogEntry>& update);

bool Contains(uint32_t id);

bool Search(const std::vector<uint32_t>& hashes, std::vector<SearchResult>* results) override;

// Freeze the segment so that no more data can be added to it.
void Freeze();
bool IsFrozen();

// Serialize the segment data to the output stream.
std::shared_ptr<Segment> Save(const std::shared_ptr<io::File>& file);

private:
bool InsertOrUpdate(uint32_t id, const google::protobuf::RepeatedField<uint32_t>& hashes);
bool Delete(uint32_t id);

void DeleteInternal(uint32_t id);
void InsertOrUpdate(uint32_t id, const google::protobuf::RepeatedField<uint32_t>& hashes);
void Delete(uint32_t id);

std::shared_mutex mutex_;
std::multimap<uint32_t, uint32_t> data_;
std::unordered_set<uint32_t> ids_;
std::unordered_map<uint32_t, DocStatus> updates_;
std::atomic<bool> frozen_{false};
};

} // namespace fpindex

0 comments on commit c4638c1

Please sign in to comment.