diff --git a/.gitignore b/.gitignore index ee88a7870b..882b67e2ec 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,9 @@ output* .settings .checkstyle +# ignore standard intellij +.idea/ +*.iml +*.iws + .DS_Store diff --git a/README.md b/README.md index afc698c091..71510e8e21 100644 --- a/README.md +++ b/README.md @@ -32,9 +32,9 @@ Getting Started 1. Download the latest release of YCSB: ```sh - curl -O --location https://github.com/brianfrankcooper/YCSB/releases/download/0.3.1/ycsb-0.3.1.tar.gz - tar xfvz ycsb-0.3.1.tar.gz - cd ycsb-0.3.1 + curl -O --location https://github.com/brianfrankcooper/YCSB/releases/download/0.4.0/ycsb-0.4.0.tar.gz + tar xfvz ycsb-0.4.0.tar.gz + cd ycsb-0.4.0 ``` 2. Set up a database to benchmark. There is a README file under each binding diff --git a/bin/ycsb b/bin/ycsb index d18de33c4d..8036dc71bb 100755 --- a/bin/ycsb +++ b/bin/ycsb @@ -51,6 +51,7 @@ DATABASES = { "cassandra-8" : "com.yahoo.ycsb.db.CassandraClient8", "cassandra-10" : "com.yahoo.ycsb.db.CassandraClient10", "cassandra-cql": "com.yahoo.ycsb.db.CassandraCQLClient", + "cassandra2-cql": "com.yahoo.ycsb.db.CassandraCQLClient", "couchbase" : "com.yahoo.ycsb.db.CouchbaseClient", "dynamodb" : "com.yahoo.ycsb.db.DynamoDBClient", "elasticsearch": "com.yahoo.ycsb.db.ElasticSearchClient", @@ -62,6 +63,7 @@ DATABASES = { "infinispan-cs": "com.yahoo.ycsb.db.InfinispanRemoteClient", "infinispan" : "com.yahoo.ycsb.db.InfinispanClient", "jdbc" : "com.yahoo.ycsb.db.JdbcDBClient", + "kudu" : "com.yahoo.ycsb.db.KuduYCSBClient", "mapkeeper" : "com.yahoo.ycsb.db.MapKeeperClient", "mongodb" : "com.yahoo.ycsb.db.MongoDbClient", "mongodb-async": "com.yahoo.ycsb.db.AsyncMongoDbClient", @@ -138,11 +140,12 @@ def is_distribution(): # presumes maven can run, so should only be run on source checkouts # will invoke the 'package' goal for the given binding in order to resolve intra-project deps # presumes maven properly handles system-specific path separators -def get_classpath_from_maven(database): +# Given module is full module name eg. 'core' or 'couchbase-binding' +def get_classpath_from_maven(module): try: - debug("Running 'mvn -pl com.yahoo.ycsb:"+database+"-binding -am package -DskipTests " + debug("Running 'mvn -pl com.yahoo.ycsb:" + module + " -am package -DskipTests " "dependency:build-classpath -DincludeScope=compile -Dmdep.outputFilterFile=true'") - mvn_output = subprocess.check_output(["mvn", "-pl", "com.yahoo.ycsb:"+database+"-binding", + mvn_output = subprocess.check_output(["mvn", "-pl", "com.yahoo.ycsb:" + module, "-am", "package", "-DskipTests", "dependency:build-classpath", "-DincludeScope=compile", @@ -200,12 +203,14 @@ def main(): warn("Running against a source checkout. In order to get our runtime " "dependencies we'll have to invoke Maven. Depending on the state " "of your system, this may take ~30-45 seconds") - db_dir = os.path.join(ycsb_home, binding) + db_location = "core" if binding == "basic" else binding + project = "core" if binding == "basic" else binding + "-binding" + db_dir = os.path.join(ycsb_home, db_location) # goes first so we can rely on side-effect of package - maven_says = get_classpath_from_maven(binding) + maven_says = get_classpath_from_maven(project) # TODO when we have a version property, skip the glob cp = find_jars(os.path.join(db_dir, "target"), - binding + "-binding*.jar") + project + "*.jar") # alredy in jar:jar:jar form cp.append(maven_says) cp.insert(0, os.path.join(db_dir, "conf")) diff --git a/cassandra/README.md b/cassandra/README.md new file mode 100644 index 0000000000..11853f97d4 --- /dev/null +++ b/cassandra/README.md @@ -0,0 +1,65 @@ + + +# Cassandra (0.7, 0.8, 1.x) drivers for YCSB + +**For Cassandra 2 CQL support, use the `cassandra2-cql` binding. The Thrift drivers below are deprecated, and the CQL driver here does not support Cassandra 2.1+.** + +There are three drivers in the Cassandra binding: + +* `cassandra-7`: Cassandra 0.7 Thrift binding. +* `cassandra-8`: Cassandra 0.8 Thrift binding. +* `cassandra-10`: Cassandra 1.0+ Thrift binding. +* `cassandra-cql`: Cassandra CQL binding, for Cassandra 1.x to 2.0. See `cassandra2/README.md` for details on parameters. + +# `cassandra-10` + +## Creating a table + +Using `cassandra-cli`: + + create keyspace usertable with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options = {replication_factor:1}; + + create column family data with column_type = 'Standard' and comparator = 'UTF8Type'; + +**Note that `replication_factor` and consistency levels (below) will affect performance.** + +## Configuration Parameters + +- `hosts` (**required**) + - Cassandra nodes to connect to. + - No default. + +* `port` + - Thrift port for communicating with Cassandra cluster. + * Default is `9160`. + +- `cassandra.columnfamily` + - Column family name - must match the column family for the table created (see above). + - Default value is `data` + +- `cassandra.username` +- `cassandra.password` + - Optional user name and password for authentication. See http://docs.datastax.com/en/cassandra/2.0/cassandra/security/security_config_native_authenticate_t.html for details. + +* `cassandra.readconsistencylevel` +* `cassandra.scanconsistencylevel` +* `cassandra.writeconsistencylevel` + + - Default value is `ONE` + - Consistency level for reads and writes, respectively. See the [DataStax documentation](http://docs.datastax.com/en/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html) for details. + - *Note that the default setting does not provide durability in the face of node failure. Changing this setting will affect observed performance.* See also `replication_factor`, above. diff --git a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java index 90e27d7685..578a5a8f50 100755 --- a/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java +++ b/cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java @@ -33,22 +33,9 @@ /** * Tested with Cassandra 2.0, CQL client for YCSB framework - * - * In CQLSH, create keyspace and table. Something like: - * cqlsh> create keyspace ycsb - * WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 1 }; - * cqlsh> create table usertable ( - * y_id varchar primary key, - * field0 varchar, - * field1 varchar, - * field2 varchar, - * field3 varchar, - * field4 varchar, - * field5 varchar, - * field6 varchar, - * field7 varchar, - * field8 varchar, - * field9 varchar); + * + * See {@code cassandra2} for a version compatible with Cassandra 2.1+. + * See {@code cassandra2/README.md} for details. * * @author cmatser */ diff --git a/cassandra2/README.md b/cassandra2/README.md new file mode 100644 index 0000000000..e3e56b90c7 --- /dev/null +++ b/cassandra2/README.md @@ -0,0 +1,73 @@ + + +# Apache Cassandra 2.x CQL binding + +Binding for [Apache Cassandra](http://cassandra.apache.org), using the CQL API +via the [DataStax +driver](http://docs.datastax.com/en/developer/java-driver/2.1/java-driver/whatsNew2.html). + +To run against the (deprecated) Cassandra Thrift API, use the `cassandra-10` binding. + +## Creating a table for use with YCSB + +For keyspace `ycsb`, table `usertable`: + + cqlsh> create keyspace ycsb + WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor': 3 }; + cqlsh> USE ycsb; + cqlsh> create table usertable ( + y_id varchar primary key, + field0 varchar, + field1 varchar, + field2 varchar, + field3 varchar, + field4 varchar, + field5 varchar, + field6 varchar, + field7 varchar, + field8 varchar, + field9 varchar); + +**Note that `replication_factor` and consistency levels (below) will affect performance.** + +## Cassandra Configuration Parameters + +- `hosts` (**required**) + - Cassandra nodes to connect to. + - No default. + +* `port` + * CQL port for communicating with Cassandra cluster. + * Default is `9042`. + +- `cassandra.keyspace` + Keyspace name - must match the keyspace for the table created (see above). + See http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html for details. + + - Default value is `ycsb` + +- `cassandra.username` +- `cassandra.password` + - Optional user name and password for authentication. See http://docs.datastax.com/en/cassandra/2.0/cassandra/security/security_config_native_authenticate_t.html for details. + +* `cassandra.readconsistencylevel` +* `cassandra.writeconsistencylevel` + + * Default value is `ONE` + - Consistency level for reads and writes, respectively. See the [DataStax documentation](http://docs.datastax.com/en/cassandra/2.0/cassandra/dml/dml_config_consistency_c.html) for details. + * *Note that the default setting does not provide durability in the face of node failure. Changing this setting will affect observed performance.* See also `replication_factor`, above. diff --git a/cassandra2/pom.xml b/cassandra2/pom.xml new file mode 100644 index 0000000000..7ad8132d61 --- /dev/null +++ b/cassandra2/pom.xml @@ -0,0 +1,61 @@ + + + + + + 4.0.0 + + com.yahoo.ycsb + binding-parent + 0.5.0-SNAPSHOT + ../binding-parent + + + cassandra2-binding + Cassandra 2.1+ DB Binding + jar + + + + + com.datastax.cassandra + cassandra-driver-core + ${cassandra2.cql.version} + + + com.yahoo.ycsb + core + ${project.version} + provided + + + org.cassandraunit + cassandra-unit-shaded + 2.1.9.2 + test + + + junit + junit + 4.12 + test + + + + diff --git a/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java b/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java new file mode 100644 index 0000000000..0fa2b3014f --- /dev/null +++ b/cassandra2/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java @@ -0,0 +1,433 @@ +/** + * Copyright (c) 2013-2015 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. See accompanying LICENSE file. + * + * Submitted by Chrisjan Matser on 10/11/2010. + */ +package com.yahoo.ycsb.db; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Select; +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.Vector; +import java.util.concurrent.atomic.AtomicInteger; + + +/** + * Cassandra 2.x CQL client. + * + * See {@code cassandra2/README.md} for details. + * + * @author cmatser + */ +public class CassandraCQLClient extends DB { + + protected static Cluster cluster = null; + protected static Session session = null; + + private static ConsistencyLevel readConsistencyLevel = ConsistencyLevel.ONE; + private static ConsistencyLevel writeConsistencyLevel = ConsistencyLevel.ONE; + + public static final int OK = 0; + public static final int ERR = -1; + public static final int NOT_FOUND = -3; + + public static final String YCSB_KEY = "y_id"; + public static final String KEYSPACE_PROPERTY = "cassandra.keyspace"; + public static final String KEYSPACE_PROPERTY_DEFAULT = "ycsb"; + public static final String USERNAME_PROPERTY = "cassandra.username"; + public static final String PASSWORD_PROPERTY = "cassandra.password"; + + public static final String HOSTS_PROPERTY = "hosts"; + public static final String PORT_PROPERTY = "port"; + + + public static final String READ_CONSISTENCY_LEVEL_PROPERTY = "cassandra.readconsistencylevel"; + public static final String READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; + public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY = "cassandra.writeconsistencylevel"; + public static final String WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT = "ONE"; + + /** Count the number of times initialized to teardown on the last {@link #cleanup()}. */ + private static final AtomicInteger initCount = new AtomicInteger(0); + + private static boolean _debug = false; + + /** + * Initialize any state for this DB. Called once per DB instance; there is + * one DB instance per client thread. + */ + @Override + public void init() throws DBException { + + //Keep track of number of calls to init (for later cleanup) + initCount.incrementAndGet(); + + //Synchronized so that we only have a single + // cluster/session instance for all the threads. + synchronized (initCount) { + + //Check if the cluster has already been initialized + if (cluster != null) { + return; + } + + try { + + _debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); + + String host = getProperties().getProperty(HOSTS_PROPERTY); + if (host == null) { + throw new DBException(String.format("Required property \"%s\" missing for CassandraCQLClient", HOSTS_PROPERTY)); + } + String hosts[] = host.split(","); + String port = getProperties().getProperty("port", "9042"); + if (port == null) { + throw new DBException(String.format("Required property \"%s\" missing for CassandraCQLClient", PORT_PROPERTY)); + } + + String username = getProperties().getProperty(USERNAME_PROPERTY); + String password = getProperties().getProperty(PASSWORD_PROPERTY); + + String keyspace = getProperties().getProperty(KEYSPACE_PROPERTY, KEYSPACE_PROPERTY_DEFAULT); + + readConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); + writeConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT)); + + // public void connect(String node) {} + if ((username != null) && !username.isEmpty()) { + cluster = Cluster.builder() + .withCredentials(username, password) + .withPort(Integer.valueOf(port)) + .addContactPoints(hosts).build(); + } + else { + cluster = Cluster.builder() + .withPort(Integer.valueOf(port)) + .addContactPoints(hosts).build(); + } + + //Update number of connections based on threads + int threadcount = Integer.parseInt(getProperties().getProperty("threadcount","1")); + cluster.getConfiguration().getPoolingOptions().setMaxConnectionsPerHost(HostDistance.LOCAL, threadcount); + + //Set connection timeout 3min (default is 5s) + cluster.getConfiguration().getSocketOptions().setConnectTimeoutMillis(3*60*1000); + //Set read (execute) timeout 3min (default is 12s) + cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(3*60*1000); + + Metadata metadata = cluster.getMetadata(); + System.err.printf("Connected to cluster: %s\n", metadata.getClusterName()); + + for (Host discoveredHost : metadata.getAllHosts()) { + System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", + discoveredHost.getDatacenter(), + discoveredHost.getAddress(), + discoveredHost.getRack()); + } + + session = cluster.connect(keyspace); + + } catch (Exception e) { + throw new DBException(e); + } + }//synchronized + } + + /** + * Cleanup any state for this DB. Called once per DB instance; there is one + * DB instance per client thread. + */ + @Override + public void cleanup() throws DBException { + synchronized(initCount) { + final int curInitCount = initCount.decrementAndGet(); + if (curInitCount <= 0) { + session.close(); + cluster.close(); + cluster = null; + session = null; + } + if (curInitCount < 0) { + // This should never happen. + throw new DBException( + String.format("initCount is negative: %d", curInitCount)); + } + } + } + + /** + * Read a record from the database. Each field/value pair from the result + * will be stored in a HashMap. + * + * @param table The name of the table + * @param key The record key of the record to read. + * @param fields The list of fields to read, or null for all of them + * @param result A HashMap of field/value pairs for the result + * @return Zero on success, a non-zero error code on error + */ + @Override + public int read(String table, String key, Set fields, HashMap result) { + try { + Statement stmt; + Select.Builder selectBuilder; + + if (fields == null) { + selectBuilder = QueryBuilder.select().all(); + } + else { + selectBuilder = QueryBuilder.select(); + for (String col : fields) { + ((Select.Selection) selectBuilder).column(col); + } + } + + stmt = selectBuilder.from(table).where(QueryBuilder.eq(YCSB_KEY, key)).limit(1); + stmt.setConsistencyLevel(readConsistencyLevel); + + if (_debug) { + System.out.println(stmt.toString()); + } + + ResultSet rs = session.execute(stmt); + + if (rs.isExhausted()) { + return NOT_FOUND; + } + + //Should be only 1 row + Row row = rs.one(); + ColumnDefinitions cd = row.getColumnDefinitions(); + + for (ColumnDefinitions.Definition def : cd) { + ByteBuffer val = row.getBytesUnsafe(def.getName()); + if (val != null) { + result.put(def.getName(), + new ByteArrayByteIterator(val.array())); + } + else { + result.put(def.getName(), null); + } + } + + return OK; + + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Error reading key: " + key); + return ERR; + } + + } + + /** + * Perform a range scan for a set of records in the database. Each + * field/value pair from the result will be stored in a HashMap. + * + * Cassandra CQL uses "token" method for range scan which doesn't always + * yield intuitive results. + * + * @param table The name of the table + * @param startkey The record key of the first record to read. + * @param recordcount The number of records to read + * @param fields The list of fields to read, or null for all of them + * @param result A Vector of HashMaps, where each HashMap is a set + * field/value pairs for one record + * @return Zero on success, a non-zero error code on error + */ + @Override + public int scan(String table, String startkey, int recordcount, Set fields, Vector> result) { + + try { + Statement stmt; + Select.Builder selectBuilder; + + if (fields == null) { + selectBuilder = QueryBuilder.select().all(); + } + else { + selectBuilder = QueryBuilder.select(); + for (String col : fields) { + ((Select.Selection) selectBuilder).column(col); + } + } + + stmt = selectBuilder.from(table); + + //The statement builder is not setup right for tokens. + // So, we need to build it manually. + String initialStmt = stmt.toString(); + StringBuilder scanStmt = new StringBuilder(); + scanStmt.append( + initialStmt.substring(0, initialStmt.length()-1)); + scanStmt.append(" WHERE "); + scanStmt.append(QueryBuilder.token(YCSB_KEY)); + scanStmt.append(" >= "); + scanStmt.append("token('"); + scanStmt.append(startkey); + scanStmt.append("')"); + scanStmt.append(" LIMIT "); + scanStmt.append(recordcount); + + stmt = new SimpleStatement(scanStmt.toString()); + stmt.setConsistencyLevel(readConsistencyLevel); + + if (_debug) { + System.out.println(stmt.toString()); + } + + ResultSet rs = session.execute(stmt); + + HashMap tuple; + while (!rs.isExhausted()) { + Row row = rs.one(); + tuple = new HashMap (); + + ColumnDefinitions cd = row.getColumnDefinitions(); + + for (ColumnDefinitions.Definition def : cd) { + ByteBuffer val = row.getBytesUnsafe(def.getName()); + if (val != null) { + tuple.put(def.getName(), + new ByteArrayByteIterator(val.array())); + } + else { + tuple.put(def.getName(), null); + } + } + + result.add(tuple); + } + + return OK; + + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Error scanning with startkey: " + startkey); + return ERR; + } + + } + + /** + * Update a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key, overwriting any existing values with the same field name. + * + * @param table The name of the table + * @param key The record key of the record to write. + * @param values A HashMap of field/value pairs to update in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public int update(String table, String key, HashMap values) { + //Insert and updates provide the same functionality + return insert(table, key, values); + } + + /** + * Insert a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key. + * + * @param table The name of the table + * @param key The record key of the record to insert. + * @param values A HashMap of field/value pairs to insert in the record + * @return Zero on success, a non-zero error code on error + */ + @Override + public int insert(String table, String key, HashMap values) { + + try { + Insert insertStmt = QueryBuilder.insertInto(table); + + //Add key + insertStmt.value(YCSB_KEY, key); + + //Add fields + for (Map.Entry entry : values.entrySet()) { + Object value; + ByteIterator byteIterator = entry.getValue(); + value = byteIterator.toString(); + + insertStmt.value(entry.getKey(), value); + } + + insertStmt.setConsistencyLevel(writeConsistencyLevel).enableTracing(); + + if (_debug) { + System.out.println(insertStmt.toString()); + } + + ResultSet rs = session.execute(insertStmt); + + return OK; + } catch (Exception e) { + e.printStackTrace(); + } + + return ERR; + } + + /** + * Delete a record from the database. + * + * @param table The name of the table + * @param key The record key of the record to delete. + * @return Zero on success, a non-zero error code on error + */ + @Override + public int delete(String table, String key) { + + try { + Statement stmt; + + stmt = QueryBuilder.delete().from(table).where(QueryBuilder.eq(YCSB_KEY, key)); + stmt.setConsistencyLevel(writeConsistencyLevel); + + if (_debug) { + System.out.println(stmt.toString()); + } + + ResultSet rs = session.execute(stmt); + + return OK; + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Error deleting key: " + key); + } + + return ERR; + } + +} diff --git a/cassandra2/src/test/java/com/yahoo/ycsb/db/CassandraCQLClientTest.java b/cassandra2/src/test/java/com/yahoo/ycsb/db/CassandraCQLClientTest.java new file mode 100644 index 0000000000..f4339d55a7 --- /dev/null +++ b/cassandra2/src/test/java/com/yahoo/ycsb/db/CassandraCQLClientTest.java @@ -0,0 +1,176 @@ +/** + * Copyright (c) 2015 YCSB contributors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +import com.google.common.collect.Sets; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.querybuilder.Insert; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Select; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.StringByteIterator; +import com.yahoo.ycsb.measurements.Measurements; +import com.yahoo.ycsb.workloads.CoreWorkload; + +import org.cassandraunit.CassandraCQLUnit; +import org.cassandraunit.dataset.cql.ClassPathCQLDataSet; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Integration tests for the Cassandra client + */ +public class CassandraCQLClientTest { + private final static String TABLE = "usertable"; + private final static String HOST = "localhost"; + private final static int PORT = 9142; + private final static String DEFAULT_ROW_KEY = "user1"; + + private CassandraCQLClient client; + private Session session; + + @ClassRule + public static CassandraCQLUnit cassandraUnit = + new CassandraCQLUnit(new ClassPathCQLDataSet("ycsb.cql", "ycsb")); + + @Before + public void setUpClient() throws Exception { + Properties p = new Properties(); + p.setProperty("hosts", HOST); + p.setProperty("port", Integer.toString(PORT)); + p.setProperty("table", TABLE); + + Measurements.setProperties(p); + final CoreWorkload workload = new CoreWorkload(); + workload.init(p); + client = new CassandraCQLClient(); + client.setProperties(p); + client.init(); + } + + @Before + public void setSession() { + session = cassandraUnit.getSession(); + } + + @After + public void tearDownClient() throws Exception { + client.cleanup(); + client = null; + } + + @After + public void clearTable() throws Exception { + // Clear the table so that each test starts fresh. + final Statement truncate = QueryBuilder.truncate(TABLE); + cassandraUnit.getSession().execute(truncate); + } + + @Test + public void testReadMissingRow() throws Exception { + final HashMap result = new HashMap(); + final int status = client.read(TABLE, "Missing row", null, result); + assertThat(result.size(), is(0)); + assertThat(status, is(CassandraCQLClient.NOT_FOUND)); + } + + private void insertRow() { + final String rowKey = DEFAULT_ROW_KEY; + Insert insertStmt = QueryBuilder.insertInto(TABLE); + insertStmt.value(CassandraCQLClient.YCSB_KEY, rowKey); + + insertStmt.value("field0", "value1"); + insertStmt.value("field1", "value2"); + session.execute(insertStmt); + } + + @Test + public void testRead() throws Exception { + insertRow(); + + final HashMap result = new HashMap(); + final int status = client.read(CoreWorkload.table, DEFAULT_ROW_KEY, null, result); + assertThat(status, is(CassandraCQLClient.OK)); + assertThat(result.entrySet(), hasSize(11)); + assertThat(result, hasEntry("field2", null)); + + final HashMap strResult = new HashMap(); + for (final Map.Entry e : result.entrySet()) { + if (e.getValue() != null) { + strResult.put(e.getKey(), e.getValue().toString()); + } + } + assertThat(strResult, hasEntry(CassandraCQLClient.YCSB_KEY, DEFAULT_ROW_KEY)); + assertThat(strResult, hasEntry("field0", "value1")); + assertThat(strResult, hasEntry("field1", "value2")); + } + + @Test + public void testReadSingleColumn() throws Exception { + insertRow(); + final HashMap result = new HashMap(); + final Set fields = Sets.newHashSet("field1"); + final int status = client.read(CoreWorkload.table, DEFAULT_ROW_KEY, fields, result); + assertThat(status, is(CassandraCQLClient.OK)); + assertThat(result.entrySet(), hasSize(1)); + final Map strResult = StringByteIterator.getStringMap(result); + assertThat(strResult, hasEntry("field1", "value2")); + } + + @Test + public void testUpdate() throws Exception { + final String key = "key"; + final HashMap input = new HashMap(); + input.put("field0", "value1"); + input.put("field1", "value2"); + + final int status = client.insert(TABLE, key, StringByteIterator.getByteIteratorMap(input)); + assertThat(status, is(CassandraCQLClient.OK)); + + // Verify result + final Select selectStmt = + QueryBuilder.select("field0", "field1") + .from(TABLE) + .where(QueryBuilder.eq(CassandraCQLClient.YCSB_KEY, key)) + .limit(1); + + final ResultSet rs = session.execute(selectStmt); + final Row row = rs.one(); + assertThat(row, notNullValue()); + assertThat(rs.isExhausted(), is(true)); + assertThat(row.getString("field0"), is("value1")); + assertThat(row.getString("field1"), is("value2")); + } +} diff --git a/cassandra2/src/test/resources/ycsb.cql b/cassandra2/src/test/resources/ycsb.cql new file mode 100644 index 0000000000..2888bdce3a --- /dev/null +++ b/cassandra2/src/test/resources/ycsb.cql @@ -0,0 +1,12 @@ +CREATE TABLE usertable ( + y_id varchar primary key, + field0 varchar, + field1 varchar, + field2 varchar, + field3 varchar, + field4 varchar, + field5 varchar, + field6 varchar, + field7 varchar, + field8 varchar, + field9 varchar); diff --git a/core/src/main/java/com/yahoo/ycsb/measurements/OneMeasurementHdrHistogram.java b/core/src/main/java/com/yahoo/ycsb/measurements/OneMeasurementHdrHistogram.java index 5373b4006d..0f20ae675d 100644 --- a/core/src/main/java/com/yahoo/ycsb/measurements/OneMeasurementHdrHistogram.java +++ b/core/src/main/java/com/yahoo/ycsb/measurements/OneMeasurementHdrHistogram.java @@ -23,8 +23,7 @@ import java.io.IOException; import java.io.PrintStream; import java.text.DecimalFormat; -import java.util.Map; -import java.util.Properties; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import org.HdrHistogram.Histogram; @@ -49,8 +48,21 @@ public class OneMeasurementHdrHistogram extends OneMeasurement { final Recorder histogram; Histogram totalHistogram; + /** + * The name of the property for deciding what percentile values to output. + */ + public static final String PERCENTILES_PROPERTY = "hdrhistogram.percentiles"; + + /** + * The default value for the hdrhistogram.percentiles property. + */ + public static final String PERCENTILES_PROPERTY_DEFAULT = "95,99"; + + List percentiles; + public OneMeasurementHdrHistogram(String name, Properties props) { super(name); + percentiles = getPercentileValues(props.getProperty(PERCENTILES_PROPERTY, PERCENTILES_PROPERTY_DEFAULT)); boolean shouldLog = Boolean.parseBoolean(props.getProperty("hdrhistogram.fileoutput", "false")); if (!shouldLog) { log = null; @@ -101,8 +113,10 @@ public void exportMeasurements(MeasurementsExporter exporter) throws IOException exporter.write(getName(), "AverageLatency(us)", totalHistogram.getMean()); exporter.write(getName(), "MinLatency(us)", totalHistogram.getMinValue()); exporter.write(getName(), "MaxLatency(us)", totalHistogram.getMaxValue()); - exporter.write(getName(), "95thPercentileLatency(us)", totalHistogram.getValueAtPercentile(95)); - exporter.write(getName(), "99thPercentileLatency(us)", totalHistogram.getValueAtPercentile(99)); + + for (Integer percentile: percentiles) { + exporter.write(getName(), ordinal(percentile) + "PercentileLatency(us)", totalHistogram.getValueAtPercentile(percentile)); + } exportReturnCodes(exporter); } @@ -142,4 +156,45 @@ private Histogram getIntervalHistogramAndAccumulate() { return intervalHistogram; } + /** + * Helper method to parse the given percentile value string + * + * @param percentileString - comma delimited string of Integer values + * @return An Integer List of percentile values + */ + private List getPercentileValues(String percentileString) { + List percentileValues = new ArrayList(); + + try { + for (String rawPercentile: percentileString.split(",")) { + percentileValues.add(Integer.parseInt(rawPercentile)); + } + } catch(Exception e) { + // If the given hdrhistogram.percentiles value is unreadable for whatever reason, + // then calculate and return the default set. + System.err.println("[WARN] Couldn't read " + PERCENTILES_PROPERTY + " value: '" + percentileString + + "', the default of '" + PERCENTILES_PROPERTY_DEFAULT + "' will be used."); + e.printStackTrace(); + return getPercentileValues(PERCENTILES_PROPERTY_DEFAULT); + } + + return percentileValues; + } + + /** + * Helper method to find the ordinal of any number. eg 1 -> 1st + * @param i + * @return ordinal string + */ + private String ordinal(int i) { + String[] suffixes = new String[] { "th", "st", "nd", "rd", "th", "th", "th", "th", "th", "th" }; + switch (i % 100) { + case 11: + case 12: + case 13: + return i + "th"; + default: + return i + suffixes[i % 10]; + } + } } diff --git a/distribution/pom.xml b/distribution/pom.xml index 6e5c0bb8b2..aaf8c6f577 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -104,6 +104,11 @@ LICENSE file. jdbc-binding ${project.version} + + com.yahoo.ycsb + kudu-binding + ${project.version} + com.yahoo.ycsb mongodb-binding diff --git a/jdbc/README.md b/jdbc/README.md new file mode 100644 index 0000000000..a7a48d7a70 --- /dev/null +++ b/jdbc/README.md @@ -0,0 +1,105 @@ + + +# JDBC Driver for YCSB +This driver enables YCSB to work with databases accessible via the JDBC protocol. + +## Getting Started +### 1. Start your database +This driver will connect to databases that use the JDBC protocol, please refer to your databases documentation on information on how to install, configure and start your system. + +### 2. Set up YCSB +You can clone the YCSB project and compile it to stay up to date with the latest changes. Or you can just download the latest release and unpack it. Either way, instructions for doing so can be found here: https://github.com/brianfrankcooper/YCSB. + +### 3. Configure your database and table. +You can name your database what ever you want, you will need to provide the database name in the JDBC connection string. + +You can name your table whatever you like also, but it needs to be specified using the YCSB core properties, the default is to just use 'usertable' as the table name. + +The expected table schema will look similar to the following, syntactical differences may exist with your specific database: + +```sql +CREATE TABLE usertable ( + YCSB_KEY VARCHAR(255) PRIMARY KEY, + FIELD1 TEXT, FIELD2 TEXT, + FIELD3 TEXT, FIELD4 TEXT, + FIELD5 TEXT, FIELD6 TEXT, + FIELD7 TEXT, FIELD8 TEXT, + FIELD9 TEXT, FIELD10 TEXT +); +``` + +Key take aways: + +* The primary key field needs to be named YCSB_KEY +* The other fields need to be prefixed with FIELD and count up starting from 1 +* Add the same number of FIELDs as you specify in the YCSB core properties, default is 10. +* The type of the fields is not so important as long as they can accept strings of the length that you specify in the YCSB core properties, default is 100. + +#### JdbcDBCreateTable Utility +YCSB has a utility to help create your SQL table. NOTE: It does not support all databases flavors, if it does not work for you, you will have to create your table manually with the schema given above. An example usage of the utility: + +```sh +java -cp YCSB_HOME/jdbc-binding/lib/jdbc-binding-0.4.0.jar:mysql-connector-java-5.1.37-bin.jar com.yahoo.ycsb.db.JdbcDBCreateTable -P testworkload -P db.properties -n usertable +``` + +Hint: you need to include your Driver jar in the classpath as well as specify your loading options via a workload file, JDBC connection information, and a table name with ```-n```. + +Simply executing the JdbcDBCreateTable class without any other parameters will print out usage information. + +### 4. Configure YCSB connection properties +You need to set the following connection configurations: + +```sh +db.driver=com.mysql.jdbc.Driver +db.url=jdbc:mysql://127.0.0.1:3306/ycsb +db.user=admin +db.passwd=admin +``` + +Be sure to use your driver class, a valid JDBC connection string, and credentials to your database. + +You can add these to your workload configuration or a separate properties file and specify it with ```-P``` or you can add the properties individually to your ycsb command with ```-p```. + +### 5. Add your JDBC Driver to the classpath +There are several ways to do this, but a couple easy methods are to put a copy of your Driver jar in ```YCSB_HOME/jdbc-binding/lib/``` or just specify the path to your Driver jar with ```-cp``` in your ycsb command. + +### 6. Running a workload +Before you can actually run the workload, you need to "load" the data first. + +```sh +bin/ycsb load jdbc -P workloads/workloada -P db.properties -cp mysql-connector-java.jar +``` + +Then, you can run the workload: + +```sh +bin/ycsb run jdbc -P workloads/workloada -P db.properties -cp mysql-connector-java.jar +``` + +## Configuration Properties + +```sh +db.driver=com.mysql.jdbc.Driver # The JDBC driver class to use. +db.url=jdbc:mysql://127.0.0.1:3306/ycsb # The Database connection URL. +db.user=admin # User name for the connection. +db.passwd=admin # Password for the connection. +jdbc.fetchsize=10 # The JDBC fetch size hinted to the driver. +jdbc.autocommit=true # The JDBC connection auto-commit property for the driver. +``` + +Please refer to https://github.com/brianfrankcooper/YCSB/wiki/Core-Properties for all other YCSB core properties. diff --git a/kudu/README.md b/kudu/README.md new file mode 100644 index 0000000000..cd5cffd638 --- /dev/null +++ b/kudu/README.md @@ -0,0 +1,44 @@ + + +# Kudu bindings for YCSB + +[Kudu](http://getkudu.io) is a storage engine that enables fast analytics on fast data. + +## Benchmarking Kudu + +Use the following command line to load the initial data into an existing Kudu cluster with default +configurations. + +``` +bin/ycsb load kudu -P workloads/workloada +``` + +Additional configurations: +* `kudu_master_addresses`: The master's address. The default configuration expects a master on localhost. +* `kudu_pre_split_num_tablets`: The number of tablets (or partitions) to create for the table. The default +uses 4 tablets. A good rule of thumb is to use 5 per tablet server. +* `kudu_table_num_replicas`: The number of replicas that each tablet will have. The default is 3. Should +only be configured to use 1 instead, for single node tests. +* `kudu_sync_ops`: If the client should wait after every write operation. The default is true. +* `kudu_block_size`: The data block size used to configure columns. The default is 4096 bytes. + +Then, you can run the workload: + +``` +bin/ycsb run kudu -P workloads/workloada +``` diff --git a/kudu/pom.xml b/kudu/pom.xml new file mode 100644 index 0000000000..0cce21bee3 --- /dev/null +++ b/kudu/pom.xml @@ -0,0 +1,58 @@ + + + + + 4.0.0 + + com.yahoo.ycsb + binding-parent + 0.5.0-SNAPSHOT + ../binding-parent + + + kudu-binding + Kudu DB Binding + jar + + + + org.kududb + kudu-client + ${kudu.version} + + + com.yahoo.ycsb + core + ${project.version} + provided + + + + + + true + + + false + + cloudera-repo + Cloudera Releases + https://repository.cloudera.com/artifactory/cloudera-repos + + + diff --git a/kudu/src/main/conf/log4j.properties b/kudu/src/main/conf/log4j.properties new file mode 100644 index 0000000000..7317ad1cf7 --- /dev/null +++ b/kudu/src/main/conf/log4j.properties @@ -0,0 +1,24 @@ +# +# Copyright (c) 2015 YCSB contributors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you +# may not use this file except in compliance with the License. You +# may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. See accompanying +# LICENSE file. +# +# Enables getting logs from the client. + +log4j.rootLogger = INFO, out +log4j.appender.out = org.apache.log4j.ConsoleAppender +log4j.appender.out.layout = org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n + +log4j.logger.kudu = INFO diff --git a/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java new file mode 100644 index 0000000000..9e65407a82 --- /dev/null +++ b/kudu/src/main/java/com/yahoo/ycsb/db/KuduYCSBClient.java @@ -0,0 +1,320 @@ +/** + * Copyright (c) 2015 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db; + +import com.stumbleupon.async.TimeoutException; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.StringByteIterator; +import com.yahoo.ycsb.workloads.CoreWorkload; +import org.kududb.ColumnSchema; +import org.kududb.Schema; +import org.kududb.client.*; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.Vector; + +import static org.kududb.Type.STRING; + +/** + * Kudu client for YCSB framework + * Example to load: + * $ ./bin/ycsb load kudu -P workloads/workloada -threads 5 + * Example to run: + * ./bin/ycsb run kudu -P workloads/workloada -p kudu_sync_ops=true -threads 5 + * + */ +public class KuduYCSBClient extends com.yahoo.ycsb.DB { + public static final String KEY = "key"; + public static final int OK = 0; + public static final int SERVER_ERROR = -1; + public static final int NO_MATCHING_RECORD = -2; + public static final int TIMEOUT = -3; + public static final int MAX_TABLETS = 9000; + public static final long DEFAULT_SLEEP = 60000; + private static final String SYNC_OPS_OPT = "kudu_sync_ops"; + private static final String DEBUG_OPT = "kudu_debug"; + private static final String PRINT_ROW_ERRORS_OPT = "kudu_print_row_errors"; + private static final String PRE_SPLIT_NUM_TABLETS_OPT = "kudu_pre_split_num_tablets"; + private static final String TABLE_NUM_REPLICAS = "kudu_table_num_replicas"; + private static final String BLOCK_SIZE_OPT = "kudu_block_size"; + private static final String MASTER_ADDRESSES_OPT = "kudu_master_addresses"; + private static final int BLOCK_SIZE_DEFAULT = 4096; + private static final List columnNames = new ArrayList(); + private static KuduClient client; + private static Schema schema; + private static int fieldCount; + private boolean debug = false; + private boolean printErrors = false; + private String tableName; + private KuduSession session; + private KuduTable table; + + @Override + public void init() throws DBException { + if (getProperties().getProperty(DEBUG_OPT) != null) { + this.debug = getProperties().getProperty(DEBUG_OPT).equals("true"); + } + if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) { + this.printErrors = getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true"); + } + if (getProperties().getProperty(PRINT_ROW_ERRORS_OPT) != null) { + this.printErrors = getProperties().getProperty(PRINT_ROW_ERRORS_OPT).equals("true"); + } + this.tableName = com.yahoo.ycsb.workloads.CoreWorkload.table; + initClient(debug, tableName, getProperties()); + this.session = client.newSession(); + if (getProperties().getProperty(SYNC_OPS_OPT) != null && + getProperties().getProperty(SYNC_OPS_OPT).equals("false")) { + this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND); + this.session.setMutationBufferSpace(100); + } else { + this.session.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_SYNC); + } + + try { + this.table = client.openTable(tableName); + } catch (Exception e) { + throw new DBException("Could not open a table because of:", e); + } + } + + private synchronized static void initClient(boolean debug, String tableName, Properties prop) + throws DBException { + if (client != null) return; + + String masterAddresses = prop.getProperty(MASTER_ADDRESSES_OPT); + if (masterAddresses == null) { + masterAddresses = "localhost:7051"; + } + + int numTablets = getIntFromProp(prop, PRE_SPLIT_NUM_TABLETS_OPT, 4); + if (numTablets > MAX_TABLETS) { + throw new DBException("Specified number of tablets (" + numTablets + ") must be equal " + + "or below " + MAX_TABLETS); + } + + int numReplicas = getIntFromProp(prop, TABLE_NUM_REPLICAS, 3); + + int blockSize = getIntFromProp(prop, BLOCK_SIZE_OPT, BLOCK_SIZE_DEFAULT); + + client = new KuduClient.KuduClientBuilder(masterAddresses) + .defaultSocketReadTimeoutMs(DEFAULT_SLEEP) + .defaultOperationTimeoutMs(DEFAULT_SLEEP) + .build(); + if (debug) { + System.out.println("Connecting to the masters at " + masterAddresses); + } + + fieldCount = getIntFromProp(prop, CoreWorkload.FIELD_COUNT_PROPERTY, + Integer.parseInt(CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT)); + + List columns = new ArrayList(fieldCount + 1); + + ColumnSchema keyColumn = new ColumnSchema.ColumnSchemaBuilder(KEY, STRING) + .key(true) + .desiredBlockSize(blockSize) + .build(); + columns.add(keyColumn); + columnNames.add(KEY); + for (int i = 0; i < fieldCount; i++) { + String name = "field" + i; + columnNames.add(name); + columns.add(new ColumnSchema.ColumnSchemaBuilder(name, STRING) + .desiredBlockSize(blockSize) + .build()); + } + schema = new Schema(columns); + + CreateTableBuilder builder = new CreateTableBuilder(); + builder.setNumReplicas(numReplicas); + // create n-1 split keys, which will end up being n tablets master-side + for (int i = 1; i < numTablets + 0; i++) { + // We do +1000 since YCSB starts at user1. + int startKeyInt = (MAX_TABLETS / numTablets * i) + 1000; + String startKey = String.format("%04d", startKeyInt); + PartialRow splitRow = schema.newPartialRow(); + splitRow.addString(0, "user" + startKey); + builder.addSplitRow(splitRow); + } + + try { + client.createTable(tableName, schema, builder); + } catch (Exception e) { + if (!e.getMessage().contains("ALREADY_PRESENT")) { + throw new DBException("Couldn't create the table", e); + } + } + } + + private static int getIntFromProp(Properties prop, String propName, int defaultValue) + throws DBException { + String intStr = prop.getProperty(propName); + if (intStr == null) { + return defaultValue; + } else { + try { + return Integer.valueOf(intStr); + } catch (NumberFormatException ex) { + throw new DBException("Provided number for " + propName + " isn't a valid integer"); + } + } + } + + @Override + public void cleanup() throws DBException { + try { + this.session.close(); + } catch (Exception e) { + throw new DBException("Couldn't cleanup the session", e); + } + } + + @Override + public int read(String table, String key, Set fields, + HashMap result) { + Vector> results = new Vector>(); + int ret = scan(table, key, 1, fields, results); + if (ret != OK) return ret; + if (results.size() != 1) return NO_MATCHING_RECORD; + result.putAll(results.firstElement()); + return OK; + } + + @Override + public int scan(String table, String startkey, int recordcount, Set fields, + Vector> result) { + try { + KuduScanner.KuduScannerBuilder scannerBuilder = client.newScannerBuilder(this.table); + List querySchema; + if (fields == null) { + querySchema = columnNames; + // No need to set the projected columns with the whole schema. + } else { + querySchema = new ArrayList(fields); + scannerBuilder.setProjectedColumnNames(querySchema); + } + + PartialRow lowerBound = schema.newPartialRow(); + lowerBound.addString(0, startkey); + scannerBuilder.lowerBound(lowerBound); + if (recordcount == 1) { + PartialRow upperBound = schema.newPartialRow(); + // Keys are fixed length, just adding something at the end is safe. + upperBound.addString(0, startkey.concat(" ")); + scannerBuilder.exclusiveUpperBound(upperBound); + } + + KuduScanner scanner = scannerBuilder + .limit(recordcount) // currently noop + .build(); + + while (scanner.hasMoreRows()) { + RowResultIterator data = scanner.nextRows(); + addAllRowsToResult(data, recordcount, querySchema, result); + if (recordcount == result.size()) break; + } + RowResultIterator closer = scanner.close(); + addAllRowsToResult(closer, recordcount, querySchema, result); + } catch (TimeoutException te) { + if (printErrors) { + System.err.println("Waited too long for a scan operation with start key=" + startkey); + } + return TIMEOUT; + } catch (Exception e) { + System.err.println("Unexpected exception " + e); + e.printStackTrace(); + return SERVER_ERROR; + } + return OK; + } + + private void addAllRowsToResult(RowResultIterator it, int recordcount, + List querySchema, + Vector> result) + throws Exception { + RowResult row; + HashMap rowResult = new HashMap(querySchema.size()); + if (it == null) return; + while (it.hasNext()) { + if (result.size() == recordcount) return; + row = it.next(); + int colIdx = 0; + for (String col : querySchema) { + rowResult.put(col, new StringByteIterator(row.getString(colIdx))); + colIdx++; + } + result.add(rowResult); + } + } + + @Override + public int update(String table, String key, HashMap values) { + Update update = this.table.newUpdate(); + PartialRow row = update.getRow(); + row.addString(KEY, key); + for (int i = 1; i < schema.getColumnCount(); i++) { + String columnName = schema.getColumnByIndex(i).getName(); + if (values.containsKey(columnName)) { + String value = values.get(columnName).toString(); + row.addString(columnName, value); + } + } + apply(update); + return OK; + } + + @Override + public int insert(String table, String key, HashMap values) { + Insert insert = this.table.newInsert(); + PartialRow row = insert.getRow(); + row.addString(KEY, key); + for (int i = 1; i < schema.getColumnCount(); i++) { + row.addString(i, new String(values.get(schema.getColumnByIndex(i).getName()).toArray())); + } + apply(insert); + return OK; + } + + @Override + public int delete(String table, String key) { + Delete delete = this.table.newDelete(); + PartialRow row = delete.getRow(); + row.addString(KEY, key); + apply(delete); + return OK; + } + + private void apply(Operation op) { + try { + OperationResponse response = session.apply(op); + if (response != null && response.hasRowError() && printErrors) { + System.err.println("Got a row error " + response.getRowError()); + } + } catch (Exception ex) { + if (printErrors) { + System.err.println("Failed to apply an operation " + ex.toString()); + ex.printStackTrace(); + } + } + } +} diff --git a/pom.xml b/pom.xml index cf5f5bc09e..b08e1bb6fb 100644 --- a/pom.xml +++ b/pom.xml @@ -75,8 +75,10 @@ LICENSE file. 1.6.0 1.2.9 1.0.3 + 2.1.8 8.1.0 7.2.2.Final + 0.5.0 2.1.1 3.0.3 @@ -100,6 +102,7 @@ LICENSE file. accumulo aerospike cassandra + cassandra2 couchbase distribution dynamodb @@ -111,6 +114,7 @@ LICENSE file. hypertable infinispan jdbc + kudu mongodb