Skip to content

Commit

Permalink
Part 12: fixes brianfrankcooper#2 - tidy up formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
lyubent committed Jul 15, 2014
1 parent d2e6af0 commit d06eb09
Showing 1 changed file with 66 additions and 52 deletions.
118 changes: 66 additions & 52 deletions cassandra/src/main/java/com/yahoo/ycsb/db/CassandraCQLClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
*
* @author cmatser
*/
public class CassandraCQLClient extends DB {

public class CassandraCQLClient extends DB
{
private static Cluster cluster = null;
private static Session session = null;

Expand All @@ -79,7 +79,6 @@ public class CassandraCQLClient extends DB {

private static boolean _debug = false;
private static boolean readallfields;
private static boolean writeallfields;

private static PreparedStatement insertStatement = null;
private static PreparedStatement selectStatement = null;
Expand All @@ -93,24 +92,24 @@ public class CassandraCQLClient extends DB {
* one DB instance per client thread.
*/
@Override
public synchronized void init() throws DBException {
public synchronized void init() throws DBException
{
//Check if the cluster has already been initialized
if (cluster != null) {
if (cluster != null)
return;
}

try {

try
{
_debug = Boolean.parseBoolean(getProperties().getProperty("debug", "false"));

if (getProperties().getProperty("hosts") == null) {
if (getProperties().getProperty("hosts") == null)
throw new DBException("Required property \"hosts\" missing for CassandraClient");
}

String hosts[] = getProperties().getProperty("hosts").split(",");
String port = getProperties().getProperty("port", "9042");
if (port == null) {
if (port == null)
throw new DBException("Required property \"port\" missing for CassandraClient");
}


String username = getProperties().getProperty(USERNAME_PROPERTY);
String password = getProperties().getProperty(PASSWORD_PROPERTY);
Expand All @@ -120,16 +119,17 @@ public synchronized void init() throws DBException {
readConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(READ_CONSISTENCY_LEVEL_PROPERTY, READ_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
writeConsistencyLevel = ConsistencyLevel.valueOf(getProperties().getProperty(WRITE_CONSISTENCY_LEVEL_PROPERTY, WRITE_CONSISTENCY_LEVEL_PROPERTY_DEFAULT));
readallfields = Boolean.parseBoolean(getProperties().getProperty(CoreWorkload.READ_ALL_FIELDS_PROPERTY, CoreWorkload.READ_ALL_FIELDS_PROPERTY_DEFAULT));
writeallfields = Boolean.parseBoolean(getProperties().getProperty(CoreWorkload.WRITE_ALL_FIELDS_PROPERTY, CoreWorkload.WRITE_ALL_FIELDS_PROPERTY_DEFAULT));

// public void connect(String node) {}
if ((username != null) && !username.isEmpty()) {
if ((username != null) && !username.isEmpty())
{
cluster = Cluster.builder()
.withCredentials(username, password)
.withPort(Integer.valueOf(port))
.addContactPoints(hosts).build();
}
else {
else
{
cluster = Cluster.builder()
.withPort(Integer.valueOf(port))
.addContactPoints(hosts).build();
Expand All @@ -147,7 +147,8 @@ public synchronized void init() throws DBException {
Metadata metadata = cluster.getMetadata();
System.out.printf("Connected to cluster: %s\n", metadata.getClusterName());

for (Host discoveredHost : metadata.getAllHosts()) {
for (Host discoveredHost : metadata.getAllHosts())
{
System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n",
discoveredHost.getDatacenter(),
discoveredHost.getAddress(),
Expand All @@ -158,13 +159,15 @@ public synchronized void init() throws DBException {

// build prepared statements.
buildStatements();
} catch (Exception e) {
}
catch (Exception e)
{
throw new DBException(e);
}
}

private void buildStatements() {

private void buildStatements()
{
Properties p = getProperties();
int fieldCount = Integer.parseInt(p.getProperty(CoreWorkload.FIELD_COUNT_PROPERTY, CoreWorkload.FIELD_COUNT_PROPERTY_DEFAULT));
String fieldPrefix = p.getProperty(CoreWorkload.FIELD_NAME_PREFIX, CoreWorkload.FIELD_NAME_PREFIX_DEFAULT);
Expand Down Expand Up @@ -233,8 +236,7 @@ private String getScanQueryString()
* DB instance per client thread.
*/
@Override
public void cleanup() throws DBException {
}
public void cleanup() throws DBException {}

/**
* Read a record from the database. Each field/value pair from the result will
Expand Down Expand Up @@ -263,41 +265,42 @@ public int readAll(String table, String key, Map<String, ByteIterator> result)
* @return Zero on success, a non-zero error code on error
*/
@Override
public int readOne(String table, String key, String field, Map<String, ByteIterator> result) {
public int readOne(String table, String key, String field, Map<String, ByteIterator> result)
{
BoundStatement bs = selectStatements.get(field).bind(key);
return read(key, result, bs);
}

public int read(String key, Map<String, ByteIterator> result, BoundStatement bs) {

try {

public int read(String key, Map<String, ByteIterator> result, BoundStatement bs)
{
try
{
if (_debug)
System.out.println(bs.preparedStatement().getQueryString());

ResultSet rs = session.execute(bs);

//Should be only 1 row
if (!rs.isExhausted()) {
if (!rs.isExhausted())
{
Row row = rs.one();
ColumnDefinitions cd = row.getColumnDefinitions();

for (ColumnDefinitions.Definition def : cd) {
for (ColumnDefinitions.Definition def : cd)
{
ByteBuffer val = row.getBytesUnsafe(def.getName());
if (val != null) {
result.put(def.getName(),
new ByteArrayByteIterator(val.array()));
}
else {
if (val != null)
result.put(def.getName(), new ByteArrayByteIterator(val.array()));
else
result.put(def.getName(), null);
}
}

}

return OK;

} catch (Exception e) {
}
catch (Exception e)
{
e.printStackTrace();
System.out.println("Error reading key: " + key);
return ERR;
Expand Down Expand Up @@ -349,29 +352,34 @@ public int scanAll(String table, String startkey, int recordcount, List<Map<Stri
return scan(startkey, result, bs);
}

public int scan(String startkey, List<Map<String, ByteIterator>> result, BoundStatement bs) {

try {
public int scan(String startkey, List<Map<String, ByteIterator>> result, BoundStatement bs)
{

try
{
if (_debug)
System.out.println(bs.preparedStatement().getQueryString());

ResultSet rs = session.execute(bs);

HashMap<String, ByteIterator> tuple;
while (!rs.isExhausted()) {
while (!rs.isExhausted())
{
Row row = rs.one();
tuple = new HashMap<String, ByteIterator> ();

ColumnDefinitions cd = row.getColumnDefinitions();

for (ColumnDefinitions.Definition def : cd) {
for (ColumnDefinitions.Definition def : cd)
{
ByteBuffer val = row.getBytesUnsafe(def.getName());
if (val != null) {
if (val != null)
{
tuple.put(def.getName(),
new ByteArrayByteIterator(val.array()));
}
else {
else
{
tuple.put(def.getName(), null);
}
}
Expand All @@ -380,8 +388,9 @@ public int scan(String startkey, List<Map<String, ByteIterator>> result, BoundSt
}

return OK;

} catch (Exception e) {
}
catch (Exception e)
{
e.printStackTrace();
System.out.println("Error scanning with startkey: " + startkey);
return ERR;
Expand Down Expand Up @@ -435,8 +444,8 @@ public int updateAll(String table, String key, Map<String,ByteIterator> values)
* @return Zero on success, a non-zero error code on error
*/
@Override
public int insert(String table, String key, Map<String, ByteIterator> values) {

public int insert(String table, String key, Map<String, ByteIterator> values)
{
try {

List<String> vals = new ArrayList<String>(values.size() + 1);
Expand All @@ -450,7 +459,9 @@ public int insert(String table, String key, Map<String, ByteIterator> values) {
session.execute(insertStatement.bind(vals.toArray()));

return OK;
} catch (Exception e) {
}
catch (Exception e)
{
e.printStackTrace();
}

Expand All @@ -465,16 +476,19 @@ public int insert(String table, String key, Map<String, ByteIterator> values) {
* @return Zero on success, a non-zero error code on error
*/
@Override
public int delete(String table, String key) {

try {
public int delete(String table, String key)
{
try
{
if (_debug)
System.out.println(deleteStatement.getQueryString());

session.execute(deleteStatement.bind(key));

return OK;
} catch (Exception e) {
}
catch (Exception e)
{
e.printStackTrace();
System.out.println("Error deleting key: " + key);
}
Expand Down

0 comments on commit d06eb09

Please sign in to comment.