diff --git a/Makefile b/Makefile index 5938e4c886..c24b973903 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,9 @@ CASSANDRA_7_DIR=db/cassandra-0.7/lib CASSANDRA_7_FILE=apache-cassandra-0.7.9-bin.tar.gz CASSANDRA_8_DIR=db/cassandra-0.8/lib CASSANDRA_8_FILE=apache-cassandra-0.8.7-bin.tar.gz +CASSANDRA_1X_VERSION=1.0.6 +CASSANDRA_1X_DIR=db/cassandra-$(CASSANDRA_1X_VERSION)/lib +CASSANDRA_1X_FILE=apache-cassandra-$(CASSANDRA_1X_VERSION)-bin.tar.gz HBASE_DIR=db/hbase/lib HBASE_VERSION=0.90.5 HBASE_FILE=hbase-$(HBASE_VERSION).tar.gz @@ -35,6 +38,7 @@ download-database-deps: $(CASSANDRA_5_DIR)/$(CASSANDRA_5_FILE) \ $(CASSANDRA_6_DIR)/$(CASSANDRA_6_FILE) \ $(CASSANDRA_7_DIR)/$(CASSANDRA_7_FILE) \ $(CASSANDRA_8_DIR)/$(CASSANDRA_8_FILE) \ + $(CASSANDRA_1X_DIR)/$(CASSANDRA_1X_FILE) \ $(HBASE_DIR)/$(HBASE_FILE) \ $(INFINISPAN_DIR)/$(INFINISPAN_FILE) \ $(MONGODB_DIR)/$(MONGODB_FILE) \ @@ -63,6 +67,11 @@ $(CASSANDRA_8_DIR)/$(CASSANDRA_8_FILE) : -O $(CASSANDRA_8_DIR)/$(CASSANDRA_8_FILE) tar -C $(CASSANDRA_8_DIR) -zxf $(CASSANDRA_8_DIR)/$(CASSANDRA_8_FILE) +$(CASSANDRA_1X_DIR)/$(CASSANDRA_1X_FILE) : + wget http://archive.apache.org/dist/cassandra/$(CASSANDRA_1X_VERSION)/$(CASSANDRA_1X_FILE)\ + -O $(CASSANDRA_1X_DIR)/$(CASSANDRA_1X_FILE) + tar -C $(CASSANDRA_1X_DIR) -zxf $(CASSANDRA_1X_DIR)/$(CASSANDRA_1X_FILE) + $(HBASE_DIR)/$(HBASE_FILE) : wget http://archive.apache.org/dist/hbase/hbase-$(HBASE_VERSION)/$(HBASE_FILE)\ -O $(HBASE_DIR)/$(HBASE_FILE) diff --git a/build.xml b/build.xml index 85002ac0a1..e5bb6cc3e3 100644 --- a/build.xml +++ b/build.xml @@ -1,6 +1,6 @@ - + @@ -26,56 +26,61 @@ - - + + - - - - - - - - - - + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + - - + + - + - + @@ -84,22 +89,22 @@ - - - - - - - + + + + + + + - - + + + + + + - - - - @@ -107,7 +112,7 @@ - + diff --git a/changes b/changes index ac874a2955..e9ea80ad9d 100644 --- a/changes +++ b/changes @@ -26,6 +26,7 @@ * gh-57 voldemort - enable nio connector (akkumar) * gh-58 benchmarking with hbase 0.90.5 (akkumar) * gh-55 VMware vFabric GemFire (sbawaska) + * gh-59 Cassandra 1.0.6 (akkumar) 0.1.3 * Voldemort binding (rsumbaly) diff --git a/db/cassandra-1.0.6/src/com/yahoo/ycsb/db/CassandraClient10.java b/db/cassandra-1.0.6/src/com/yahoo/ycsb/db/CassandraClient10.java new file mode 100644 index 0000000000..a374f82cf3 --- /dev/null +++ b/db/cassandra-1.0.6/src/com/yahoo/ycsb/db/CassandraClient10.java @@ -0,0 +1,619 @@ +/** + * Copyright (c) 2010 Yahoo! Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db; + +import com.yahoo.ycsb.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Vector; +import java.util.Random; +import java.util.Properties; +import java.nio.ByteBuffer; + +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.cassandra.thrift.*; + + +//XXXX if we do replication, fix the consistency levels +/** + * Cassandra 1.0.6 client for YCSB framework + */ +public class CassandraClient10 extends DB +{ + static Random random = new Random(); + public static final int Ok = 0; + public static final int Error = -1; + public static final ByteBuffer emptyByteBuffer = ByteBuffer.wrap(new byte[0]); + + public int ConnectionRetries; + public int OperationRetries; + public String column_family; + + public static final String CONNECTION_RETRY_PROPERTY = "cassandra.connectionretries"; + public static final String CONNECTION_RETRY_PROPERTY_DEFAULT = "300"; + + public static final String OPERATION_RETRY_PROPERTY = "cassandra.operationretries"; + public static final String OPERATION_RETRY_PROPERTY_DEFAULT = "300"; + + public static final String USERNAME_PROPERTY = "cassandra.username"; + public static final String PASSWORD_PROPERTY = "cassandra.password"; + + public static final String COLUMN_FAMILY_PROPERTY = "cassandra.columnfamily"; + public static final String COLUMN_FAMILY_PROPERTY_DEFAULT = "data"; + + TTransport tr; + Cassandra.Client client; + + boolean _debug = false; + + String _table = ""; + Exception errorexception = null; + + List mutations = new ArrayList(); + Map> mutationMap = new HashMap>(); + Map>> record = new HashMap>>(); + + ColumnParent parent; + + /** + * Initialize any state for this DB. Called once per DB instance; there is one + * DB instance per client thread. + */ + public void init() throws DBException + { + String hosts = getProperties().getProperty("hosts"); + if (hosts == null) + { + throw new DBException("Required property \"hosts\" missing for CassandraClient"); + } + + column_family = getProperties().getProperty(COLUMN_FAMILY_PROPERTY, COLUMN_FAMILY_PROPERTY_DEFAULT); + parent = new ColumnParent(column_family); + + ConnectionRetries = Integer.parseInt(getProperties().getProperty(CONNECTION_RETRY_PROPERTY, + CONNECTION_RETRY_PROPERTY_DEFAULT)); + OperationRetries = Integer.parseInt(getProperties().getProperty(OPERATION_RETRY_PROPERTY, + OPERATION_RETRY_PROPERTY_DEFAULT)); + + String username = getProperties().getProperty(USERNAME_PROPERTY); + String password = getProperties().getProperty(PASSWORD_PROPERTY); + + _debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false")); + + String[] allhosts = hosts.split(","); + String myhost = allhosts[random.nextInt(allhosts.length)]; + + Exception connectexception = null; + + for (int retry = 0; retry < ConnectionRetries; retry++) + { + tr = new TFramedTransport(new TSocket(myhost, 9160)); + TProtocol proto = new TBinaryProtocol(tr); + client = new Cassandra.Client(proto); + try + { + tr.open(); + connectexception = null; + break; + } catch (Exception e) + { + connectexception = e; + } + try + { + Thread.sleep(1000); + } catch (InterruptedException e) + { + } + } + if (connectexception != null) + { + System.err.println("Unable to connect to " + myhost + " after " + ConnectionRetries + + " tries"); + System.out.println("Unable to connect to " + myhost + " after " + ConnectionRetries + + " tries"); + throw new DBException(connectexception); + } + + if (username != null && password != null) + { + Map cred = new HashMap(); + cred.put("username", username); + cred.put("password", password); + AuthenticationRequest req = new AuthenticationRequest(cred); + try + { + client.login(req); + } + catch (Exception e) + { + throw new DBException(e); + } + } + } + + /** + * Cleanup any state for this DB. Called once per DB instance; there is one DB + * instance per client thread. + */ + public void cleanup() throws DBException + { + tr.close(); + } + + /** + * Read a record from the database. Each field/value pair from the result will + * be stored in a HashMap. + * + * @param table + * The name of the table + * @param key + * The record key of the record to read. + * @param fields + * The list of fields to read, or null for all of them + * @param result + * A HashMap of field/value pairs for the result + * @return Zero on success, a non-zero error code on error + */ + public int read(String table, String key, Set fields, HashMap result) + { + if (!_table.equals(table)) { + try + { + client.set_keyspace(table); + _table = table; + } + catch (Exception e) + { + e.printStackTrace(); + e.printStackTrace(System.out); + return Error; + } + } + + for (int i = 0; i < OperationRetries; i++) + { + + try + { + SlicePredicate predicate; + if (fields == null) + { + predicate = new SlicePredicate().setSlice_range(new SliceRange(emptyByteBuffer, emptyByteBuffer, false, 1000000)); + + } else { + ArrayList fieldlist = new ArrayList(fields.size()); + for (String s : fields) + { + fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); + } + + predicate = new SlicePredicate().setColumn_names(fieldlist); + } + + List results = client.get_slice(ByteBuffer.wrap(key.getBytes("UTF-8")), parent, predicate, ConsistencyLevel.ONE); + + if (_debug) + { + System.out.print("Reading key: " + key); + } + + Column column; + String name; + ByteIterator value; + for (ColumnOrSuperColumn oneresult : results) + { + + column = oneresult.column; + name = new String(column.name.array(), column.name.position()+column.name.arrayOffset(), column.name.remaining()); + value = new ByteArrayByteIterator(column.value.array(), column.value.position()+column.value.arrayOffset(), column.value.remaining()); + + result.put(name,value); + + if (_debug) + { + System.out.print("(" + name + "=" + value + ")"); + } + } + + if (_debug) + { + System.out.println(); + } + + return Ok; + } catch (Exception e) + { + errorexception = e; + } + + try + { + Thread.sleep(500); + } catch (InterruptedException e) + { + } + } + errorexception.printStackTrace(); + errorexception.printStackTrace(System.out); + return Error; + + } + + /** + * Perform a range scan for a set of records in the database. Each field/value + * pair from the result will be stored in a HashMap. + * + * @param table + * The name of the table + * @param startkey + * The record key of the first record to read. + * @param recordcount + * The number of records to read + * @param fields + * The list of fields to read, or null for all of them + * @param result + * A Vector of HashMaps, where each HashMap is a set field/value + * pairs for one record + * @return Zero on success, a non-zero error code on error + */ + public int scan(String table, String startkey, int recordcount, Set fields, + Vector> result) + { + if (!_table.equals(table)) { + try + { + client.set_keyspace(table); + _table = table; + } + catch (Exception e) + { + e.printStackTrace(); + e.printStackTrace(System.out); + return Error; + } + } + + for (int i = 0; i < OperationRetries; i++) + { + + try + { + SlicePredicate predicate; + if (fields == null) + { + predicate = new SlicePredicate().setSlice_range(new SliceRange(emptyByteBuffer, emptyByteBuffer, false, 1000000)); + + } else { + ArrayList fieldlist = new ArrayList(fields.size()); + for (String s : fields) + { + fieldlist.add(ByteBuffer.wrap(s.getBytes("UTF-8"))); + } + + predicate = new SlicePredicate().setColumn_names(fieldlist); + } + + KeyRange kr = new KeyRange().setStart_key(startkey.getBytes("UTF-8")).setEnd_key(new byte[] {}).setCount(recordcount); + + List results = client.get_range_slices(parent, predicate, kr, ConsistencyLevel.ONE); + + if (_debug) + { + System.out.println("Scanning startkey: " + startkey); + } + + HashMap tuple; + for (KeySlice oneresult : results) + { + tuple = new HashMap(); + + Column column; + String name; + ByteIterator value; + for (ColumnOrSuperColumn onecol : oneresult.columns) + { + column = onecol.column; + name = new String(column.name.array(), column.name.position()+column.name.arrayOffset(), column.name.remaining()); + value = new ByteArrayByteIterator(column.value.array(), column.value.position()+column.value.arrayOffset(), column.value.remaining()); + + tuple.put(name, value); + + if (_debug) + { + System.out.print("(" + name + "=" + value + ")"); + } + } + + result.add(tuple); + if (_debug) + { + System.out.println(); + } + } + + return Ok; + } catch (Exception e) + { + errorexception = e; + } + try + { + Thread.sleep(500); + } catch (InterruptedException e) + { + } + } + errorexception.printStackTrace(); + errorexception.printStackTrace(System.out); + return Error; + } + + /** + * Update a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key, overwriting any existing values with the same field name. + * + * @param table + * The name of the table + * @param key + * The record key of the record to write. + * @param values + * A HashMap of field/value pairs to update in the record + * @return Zero on success, a non-zero error code on error + */ + public int update(String table, String key, HashMap values) + { + return insert(table, key, values); + } + + /** + * Insert a record in the database. Any field/value pairs in the specified + * values HashMap will be written into the record with the specified record + * key. + * + * @param table + * The name of the table + * @param key + * The record key of the record to insert. + * @param values + * A HashMap of field/value pairs to insert in the record + * @return Zero on success, a non-zero error code on error + */ + public int insert(String table, String key, HashMap values) + { + if (!_table.equals(table)) { + try + { + client.set_keyspace(table); + _table = table; + } + catch (Exception e) + { + e.printStackTrace(); + e.printStackTrace(System.out); + return Error; + } + } + + for (int i = 0; i < OperationRetries; i++) + { + if (_debug) + { + System.out.println("Inserting key: " + key); + } + + try + { + ByteBuffer wrappedKey = ByteBuffer.wrap(key.getBytes("UTF-8")); + + Column col; + ColumnOrSuperColumn column; + for (Map.Entry entry : values.entrySet()) + { + col = new Column(); + col.setName(ByteBuffer.wrap(entry.getKey().getBytes("UTF-8"))); + col.setValue(ByteBuffer.wrap(entry.getValue().toArray())); + col.setTimestamp(System.currentTimeMillis()); + + column = new ColumnOrSuperColumn(); + column.setColumn(col); + + mutations.add(new Mutation().setColumn_or_supercolumn(column)); + } + + mutationMap.put(column_family, mutations); + record.put(wrappedKey, mutationMap); + + client.batch_mutate(record, ConsistencyLevel.ONE); + + mutations.clear(); + mutationMap.clear(); + record.clear(); + + return Ok; + } catch (Exception e) + { + errorexception = e; + } + try + { + Thread.sleep(500); + } catch (InterruptedException e) + { + } + } + + errorexception.printStackTrace(); + errorexception.printStackTrace(System.out); + return Error; + } + + /** + * Delete a record from the database. + * + * @param table + * The name of the table + * @param key + * The record key of the record to delete. + * @return Zero on success, a non-zero error code on error + */ + public int delete(String table, String key) + { + if (!_table.equals(table)) { + try + { + client.set_keyspace(table); + _table = table; + } + catch (Exception e) + { + e.printStackTrace(); + e.printStackTrace(System.out); + return Error; + } + } + + for (int i = 0; i < OperationRetries; i++) + { + try + { + client.remove(ByteBuffer.wrap(key.getBytes("UTF-8")), + new ColumnPath(column_family), + System.currentTimeMillis(), + ConsistencyLevel.ONE); + + if (_debug) + { + System.out.println("Delete key: " + key); + } + + return Ok; + } catch (Exception e) + { + errorexception = e; + } + try + { + Thread.sleep(500); + } catch (InterruptedException e) + { + } + } + errorexception.printStackTrace(); + errorexception.printStackTrace(System.out); + return Error; + } + + public static void main(String[] args) + { + CassandraClient10 cli = new CassandraClient10(); + + Properties props = new Properties(); + + props.setProperty("hosts", args[0]); + cli.setProperties(props); + + try + { + cli.init(); + } catch (Exception e) + { + e.printStackTrace(); + System.exit(0); + } + + HashMap vals = new HashMap(); + vals.put("age", new StringByteIterator("57")); + vals.put("middlename", new StringByteIterator("bradley")); + vals.put("favoritecolor", new StringByteIterator("blue")); + int res = cli.insert("usertable", "BrianFrankCooper", vals); + System.out.println("Result of insert: " + res); + + HashMap result = new HashMap(); + HashSet fields = new HashSet(); + fields.add("middlename"); + fields.add("age"); + fields.add("favoritecolor"); + res = cli.read("usertable", "BrianFrankCooper", null, result); + System.out.println("Result of read: " + res); + for (String s : result.keySet()) + { + System.out.println("[" + s + "]=[" + result.get(s) + "]"); + } + + res = cli.delete("usertable", "BrianFrankCooper"); + System.out.println("Result of delete: " + res); + } + + /* + * public static void main(String[] args) throws TException, + * InvalidRequestException, UnavailableException, + * UnsupportedEncodingException, NotFoundException { + * + * + * + * String key_user_id = "1"; + * + * + * + * + * client.insert("Keyspace1", key_user_id, new ColumnPath("Standard1", null, + * "age".getBytes("UTF-8")), "24".getBytes("UTF-8"), timestamp, + * ConsistencyLevel.ONE); + * + * + * // read single column ColumnPath path = new ColumnPath("Standard1", null, + * "name".getBytes("UTF-8")); + * + * System.out.println(client.get("Keyspace1", key_user_id, path, + * ConsistencyLevel.ONE)); + * + * + * // read entire row SlicePredicate predicate = new SlicePredicate(null, new + * SliceRange(new byte[0], new byte[0], false, 10)); + * + * ColumnParent parent = new ColumnParent("Standard1", null); + * + * List results = client.get_slice("Keyspace1", + * key_user_id, parent, predicate, ConsistencyLevel.ONE); + * + * for (ColumnOrSuperColumn result : results) { + * + * Column column = result.column; + * + * System.out.println(new String(column.name, "UTF-8") + " -> " + new + * String(column.value, "UTF-8")); + * + * } + * + * + * + * + * } + */ +}