Skip to content

Commit

Permalink
Proposal for multiread implementation into HyperLevelDB
Browse files Browse the repository at this point in the history
See Level/leveldown#13 for a description
See https://github.com/matonga/leveldown for a LevelDB implementation of multiread
  • Loading branch information
Matías Moreno committed Jun 19, 2019
1 parent ae8caf4 commit 97b6a25
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 0 deletions.
21 changes: 21 additions & 0 deletions leveldown-hyper.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,25 @@ LevelDOWNHyper.repair = function (location, callback) {
binding.repair(location, callback)
}

LevelDOWNHyper.prototype.map = function (keys, options, callback) {
if (typeof options == 'function') callback = options;

if (typeof callback !== 'function') {
throw new Error('map() requires a callback argument')
}

if (!Array.isArray (keys)) {
throw new Error('keys argument must be an array');
}
for (var i=0; i<keys.length; i++) {
var err = this._checkKey (keys[i]);
if (err) return process.nextTick(callback, err)
keys[i] = this._serializeKey(keys[i])
}
if (typeof options !== 'object' || options === null) options = {}
options.asBuffer = options.asBuffer !== false

this.binding.map(keys, options, callback)
};

module.exports = LevelDOWNHyper
49 changes: 49 additions & 0 deletions src/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ void Database::Init () {
Nan::SetPrototypeMethod(tpl, "getProperty", Database::GetProperty);
Nan::SetPrototypeMethod(tpl, "iterator", Database::Iterator);
Nan::SetPrototypeMethod(tpl, "liveBackup", Database::LiveBackup);
Nan::SetPrototypeMethod(tpl, "map", Database::Map);
}

NAN_METHOD(Database::New) {
Expand Down Expand Up @@ -317,6 +318,54 @@ NAN_METHOD(Database::Get) {
Nan::AsyncQueueWorker(worker);
}

NAN_METHOD(Database::Map) {
LD_METHOD_SETUP_COMMON(map, 1, 2)

bool asBuffer = BooleanOptionValue(optionsObj, "asBuffer", true);
bool fillCache = BooleanOptionValue(optionsObj, "fillCache", true);

v8::Local<v8::Array> array = v8::Local<v8::Array>::Cast(info[0]);

leveldb::Slice *keys = new leveldb::Slice[array->Length()];
for (unsigned i=0; i<array->Length(); i++) {
//LD_STRING_OR_BUFFER_TO_COPY (keys[i], array->Get[i], keys[i]);
// no va a funcionar porque algún idiota no pensó en devolver el Slice en lugar de ser parámetro de la macro :-(
// así que transcribo la macro aquí y cambio la última línea:
v8::Local<v8::Value> from = array->Get(i);
size_t keySz_;
char* keyCh_;
if (!from->ToObject().IsEmpty()
&& node::Buffer::HasInstance(from->ToObject())) {
keySz_ = node::Buffer::Length(from->ToObject());
keyCh_ = new char[keySz_];
memcpy (keyCh_, node::Buffer::Data(from->ToObject()), keySz_);
} else {
v8::Local<v8::String> keyStr = from->ToString();
keySz_ = keyStr->Utf8Length();
keyCh_ = new char[keySz_];
keyStr->WriteUtf8(
keyCh_
, -1
, NULL, v8::String::NO_NULL_TERMINATION
);
}
keys[i] = leveldb::Slice (keyCh_, keySz_);
}
MapWorker* worker = new MapWorker (
database,
new Nan::Callback (callback),
keys,
array->Length(),
asBuffer,
fillCache
);
// persist to prevent accidental GC
v8::Local<v8::Object> _this = info.This ();
worker->SaveToPersistent ("database", _this);
Nan::AsyncQueueWorker (worker);
}


NAN_METHOD(Database::Delete) {
LD_METHOD_SETUP_COMMON(del, 1, 2)

Expand Down
1 change: 1 addition & 0 deletions src/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class Database : public Nan::ObjectWrap {
static NAN_METHOD(Write);
static NAN_METHOD(Iterator);
static NAN_METHOD(LiveBackup);
static NAN_METHOD(Map);
};

} // namespace leveldown
Expand Down
65 changes: 65 additions & 0 deletions src/database_async.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,71 @@ void BatchWorker::Execute () {
SetStatus(database->WriteBatchToDatabase(options, batch));
}

/** MAP WORKER **/

MapWorker::MapWorker (
Database *database,
Nan::Callback *callback,
leveldb::Slice *keys,
unsigned keys_length,
bool asBuffer,
bool fillCache
) : AsyncWorker (database, callback, "leveldown-hyper:db.map"),
keys (keys),
values (new std::string[keys_length]),
keys_length (keys_length),
asBuffer (asBuffer),
fillCache (fillCache)
{

}

MapWorker::~MapWorker () {
for (unsigned i=0; i<keys_length; i++) {
delete [] keys[i].data (); // esto no falla
}
//delete keys; // Mismatched free() / delete / delete[]
//delete values; // Invalid free() / delete / delete[] / realloc()
}

void MapWorker::Execute () {
leveldb::ReadOptions options;
//options.as_buffer = asBuffer;
options.fill_cache = fillCache;
for (unsigned i=0; i<keys_length; i++) {
leveldb::Status status = database->GetFromDatabase (&options, keys[i], values[i]);
if (status.ok ()) {
// values[i] = lo que sea que resolvió
} else
if (status.IsNotFound ()) {
values[i] = "";
} else {
SetStatus (status);
return;
}
}
SetStatus (leveldb::Status::OK ());
}

void MapWorker::HandleOKCallback () {
v8::Local<v8::Array> array = Nan::New<v8::Array>(keys_length);
for (unsigned i=0; i<keys_length; i++) {
if (!values[i].size ()) {
continue;
}
if (asBuffer) {
array->Set (Nan::New<v8::Integer>(static_cast<int>(i)), Nan::CopyBuffer ((char *)values[i].data(), values[i].size ()).ToLocalChecked ());
} else {
array->Set (Nan::New<v8::Integer>(static_cast<int>(i)), Nan::New<v8::String>((char *)values[i].data (), values[i].size ()).ToLocalChecked ());
}
}
v8::Local<v8::Value> argv[] = {
Nan::Null(),
array
};
callback->Call (2, argv, async_resource);
}

/** APPROXIMATE SIZE WORKER **/

ApproximateSizeWorker::ApproximateSizeWorker (
Expand Down
22 changes: 22 additions & 0 deletions src/database_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,28 @@ class BatchWorker : public AsyncWorker {
leveldb::WriteBatch* batch;
};

class MapWorker : public AsyncWorker {
public:
MapWorker (
Database *database,
Nan::Callback *callback,
leveldb::Slice *keys,
unsigned keys_length,
bool asBuffer,
bool fillCache
);

virtual ~MapWorker ();
virtual void Execute ();
virtual void HandleOKCallback ();
private:
leveldb::Slice *keys;
std::string *values;
unsigned keys_length;
bool asBuffer;
bool fillCache;
};

class ApproximateSizeWorker : public AsyncWorker {
public:
ApproximateSizeWorker (
Expand Down

0 comments on commit 97b6a25

Please sign in to comment.