Skip to content

Commit

Permalink
[rocks-java] Added support for RocksDB Java API
Browse files Browse the repository at this point in the history
  • Loading branch information
adamretter committed Oct 20, 2017
1 parent 8603c10 commit 39702ad
Show file tree
Hide file tree
Showing 5 changed files with 387 additions and 0 deletions.
1 change: 1 addition & 0 deletions bin/ycsb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ DATABASES = {
"redis" : "com.yahoo.ycsb.db.RedisClient",
"rest" : "com.yahoo.ycsb.webservice.rest.RestClient",
"riak" : "com.yahoo.ycsb.db.riak.RiakKVClient",
"rocksdb" : "com.yahoo.ycsb.db.RocksDBClient",
"s3" : "com.yahoo.ycsb.db.S3Client",
"solr" : "com.yahoo.ycsb.db.solr.SolrClient",
"solr6" : "com.yahoo.ycsb.db.solr6.SolrClient",
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ LICENSE file.
<arangodb3.version>4.1.7</arangodb3.version>
<azurestorage.version>4.0.0</azurestorage.version>
<cloudspanner.version>0.24.0-beta</cloudspanner.version>
<rocksdb.version>5.8.0</rocksdb.version>
</properties>

<modules>
Expand Down Expand Up @@ -147,6 +148,7 @@ LICENSE file.
<module>redis</module>
<module>rest</module>
<module>riak</module>
<module>rocksdb</module>
<module>s3</module>
<module>solr</module>
<module>solr6</module>
Expand Down
45 changes: 45 additions & 0 deletions rocksdb/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2012 - 2016 YCSB contributors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you
may not use this file except in compliance with the License. You
may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License. See accompanying
LICENSE file.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>binding-parent</artifactId>
<version>0.14.0-SNAPSHOT</version>
<relativePath>../binding-parent</relativePath>
</parent>

<artifactId>rocksdb-binding</artifactId>
<name>RocksDB Java Binding</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdb.version}</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
317 changes: 317 additions & 0 deletions rocksdb/src/main/java/com/yahoo/ycsb/db/RocksDBClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
package com.yahoo.ycsb.db;

import com.yahoo.ycsb.*;
import org.rocksdb.*;

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* RocksDB binding for <a href="http://rocksdb.org/">RocksDB</a>.
*
* See {@code rocksdb/README.md} for details.
*/
public class RocksDBClient extends DB {

private static final String ROCKSDB_DIR = "rocksdb.dir";

private static Path rocksDbDir = null;
private static RocksDB rocksDB = null;
private static final AtomicInteger REFERENCES = new AtomicInteger();
private static final ConcurrentMap<String, ColumnFamilyHandle> COLUMN_FAMILIES = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, Lock> COLUMN_FAMILY_LOCKS = new ConcurrentHashMap<>();

@Override
public void init() throws DBException {
super.init();
if(rocksDB == null) {
synchronized (RocksDBClient.class) {
if(rocksDB == null) {
try {
RocksDB.loadLibrary();

this.rocksDbDir = Paths.get(getProperties().getProperty(ROCKSDB_DIR));
if(!Files.exists(rocksDbDir)) {
Files.createDirectories(rocksDbDir);
}

final List<String> cfNames = loadColumnFamilyNames();
final List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();
for(final String cfName : cfNames) {
final ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(
cfName.getBytes(StandardCharsets.UTF_8),
new ColumnFamilyOptions().optimizeLevelStyleCompaction()
);
cfDescriptors.add(cfDescriptor);
}

final int rocksThreads = Runtime.getRuntime().availableProcessors() * 2;

if(cfDescriptors.isEmpty()) {
final Options options = new Options()
.optimizeLevelStyleCompaction()
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true)
.setIncreaseParallelism(rocksThreads)
.setMaxBackgroundCompactions(rocksThreads)
.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL);
this.rocksDB = RocksDB.open(options, rocksDbDir.toAbsolutePath().toString());
} else {
final DBOptions options = new DBOptions()
.setCreateIfMissing(true)
.setCreateMissingColumnFamilies(true)
.setIncreaseParallelism(rocksThreads)
.setMaxBackgroundCompactions(rocksThreads)
.setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL);
final List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
this.rocksDB = RocksDB.open(options, rocksDbDir.toAbsolutePath().toString(), cfDescriptors, cfHandles);
for(int i = 0; i < cfNames.size(); i++) {
COLUMN_FAMILIES.put(cfNames.get(i), cfHandles.get(i));
}
}
} catch(final IOException | RocksDBException e) {
throw new DBException(e);
}
}
}
}
REFERENCES.incrementAndGet();
}

@Override
public void cleanup() throws DBException {
try {
super.cleanup();
if (REFERENCES.get() == 1) {
for (final ColumnFamilyHandle cf : COLUMN_FAMILIES.values()) {
cf.dispose();
}
rocksDB.close();
saveColumnFamilyNames();
}
} catch (final IOException e) {
throw new DBException(e);
} finally {
REFERENCES.decrementAndGet();
}
}

@Override
public Status read(final String table, final String key, final Set<String> fields,
final HashMap<String, ByteIterator> result) {
try {
if (!COLUMN_FAMILIES.containsKey(table)) {
createColumnFamily(table);
}

final ColumnFamilyHandle cf = COLUMN_FAMILIES.get(table);
final byte[] values = rocksDB.get(cf, key.getBytes(StandardCharsets.UTF_8));
if(values == null) {
return Status.NOT_FOUND;
}
deserializeValues(values, fields, result);
return Status.OK;
} catch(final RocksDBException e) {
e.printStackTrace();
return Status.ERROR;
}
}

@Override
public Status scan(final String table, final String startkey, final int recordcount, final Set<String> fields,
final Vector<HashMap<String, ByteIterator>> result) {
try {
if (!COLUMN_FAMILIES.containsKey(table)) {
createColumnFamily(table);
}

final ColumnFamilyHandle cf = COLUMN_FAMILIES.get(table);
final RocksIterator iterator = rocksDB.newIterator(cf);

int iterations = 0;
for(iterator.seek(startkey.getBytes(StandardCharsets.UTF_8)); iterator.isValid() && iterations < recordcount;
iterator.next()) {
final HashMap<String, ByteIterator> values = new HashMap<>();
deserializeValues(iterator.value(), fields, values);
result.add(values);
iterations++;
}

iterator.dispose();

return Status.OK;
} catch(final RocksDBException e) {
e.printStackTrace();
return Status.ERROR;
}
}

@Override
public Status update(final String table, final String key, final HashMap<String, ByteIterator> values) {
//TODO(AR) consider if this would be faster with merge operator

try {
if (!COLUMN_FAMILIES.containsKey(table)) {
createColumnFamily(table);
}

final ColumnFamilyHandle cf = COLUMN_FAMILIES.get(table);
final HashMap<String, ByteIterator> result = new HashMap<>();
final byte[] currentValues = rocksDB.get(cf, key.getBytes(StandardCharsets.UTF_8));
if(currentValues == null) {
return Status.NOT_FOUND;
}
deserializeValues(currentValues, null, result);

//update
result.putAll(values);

//store
rocksDB.put(cf, key.getBytes(StandardCharsets.UTF_8), serializeValues(result));

return Status.OK;

} catch(final RocksDBException | IOException e) {
e.printStackTrace();
return Status.ERROR;
}
}

@Override
public Status insert(final String table, final String key, final HashMap<String, ByteIterator> values) {
try {
if (!COLUMN_FAMILIES.containsKey(table)) {
createColumnFamily(table);
}

final ColumnFamilyHandle cf = COLUMN_FAMILIES.get(table);
rocksDB.put(cf, key.getBytes(StandardCharsets.UTF_8), serializeValues(values));

return Status.OK;
} catch(final RocksDBException | IOException e) {
e.printStackTrace();
return Status.ERROR;
}
}

@Override
public Status delete(final String table, final String key) {
try {
if (!COLUMN_FAMILIES.containsKey(table)) {
createColumnFamily(table);
}

final ColumnFamilyHandle cf = COLUMN_FAMILIES.get(table);
rocksDB.remove(cf, key.getBytes(StandardCharsets.UTF_8));

return Status.OK;
} catch(final RocksDBException e) {
e.printStackTrace();
return Status.ERROR;
}
}

private void saveColumnFamilyNames() throws IOException {
try(final PrintWriter writer = new PrintWriter(new FileWriter(rocksDbDir.resolve("cfNames").toFile()))) {
writer.println(new String(RocksDB.DEFAULT_COLUMN_FAMILY, StandardCharsets.UTF_8));
for(final String cfName : COLUMN_FAMILIES.keySet()) {
writer.println(cfName);
}
}
}

private List<String> loadColumnFamilyNames() throws IOException {
final List<String> cfNames = new ArrayList<>();
final File f = rocksDbDir.resolve("cfNames").toFile();
if(f.exists()) {
try (final LineNumberReader reader = new LineNumberReader(new FileReader(f))) {
String line = null;
while ((line = reader.readLine()) != null) {
cfNames.add(line);
}
}
}
return cfNames;
}

private HashMap<String, ByteIterator> deserializeValues(final byte[] values, final Set<String> fields,
final HashMap<String, ByteIterator> result) {
final ByteBuffer buf = ByteBuffer.allocate(4);

int offset = 0;
while(offset < values.length) {
buf.put(values, offset, 4);
buf.flip();
final int keyLen = buf.getInt();
buf.clear();
offset += 4;

final String key = new String(values, offset, keyLen);
offset += keyLen;

buf.put(values, offset, 4);
buf.flip();
final int valueLen = buf.getInt();
buf.clear();
offset += 4;

if(fields == null || fields.contains(key)) {
result.put(key, new ByteArrayByteIterator(values, offset, valueLen));
}

offset += valueLen;
}

return result;
}

private byte[] serializeValues(final Map<String, ByteIterator> values) throws IOException {
try(final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
final ByteBuffer buf = ByteBuffer.allocate(4);

for(final Map.Entry<String, ByteIterator> value : values.entrySet()) {
final byte[] keyBytes = value.getKey().getBytes(StandardCharsets.UTF_8);
final byte[] valueBytes = value.getValue().toArray();

buf.putInt(keyBytes.length);
baos.write(buf.array());
baos.write(keyBytes);

buf.clear();

buf.putInt(valueBytes.length);
baos.write(buf.array());
baos.write(valueBytes);

buf.clear();
}
return baos.toByteArray();
}
}

private void createColumnFamily(final String name) throws RocksDBException {
COLUMN_FAMILY_LOCKS.putIfAbsent(name, new ReentrantLock());

final Lock l = COLUMN_FAMILY_LOCKS.get(name);
l.lock();
try {
if(!COLUMN_FAMILIES.containsKey(name)) {
final ColumnFamilyHandle cfHandle = rocksDB.createColumnFamily(
new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8),
new ColumnFamilyOptions().optimizeLevelStyleCompaction())
);
COLUMN_FAMILIES.put(name, cfHandle);
}
} finally {
l.unlock();
}
}
}
Loading

0 comments on commit 39702ad

Please sign in to comment.