diff --git a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java index dae0047393..52c0e50de8 100644 --- a/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java +++ b/core/src/main/java/com/yahoo/ycsb/workloads/CoreWorkload.java @@ -1,11 +1,11 @@ /** - * Copyright (c) 2010 Yahoo! Inc. All rights reserved. - * + * Copyright (c) 2010 Yahoo! Inc. All rights reserved. + * * Licensed under the Apache License, Version 2.0 (the "License"); you * may not use this file except in compliance with the License. You * may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -44,115 +44,127 @@ import java.util.ArrayList; /** - * The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The relative - * proportion of different kinds of operations, and other properties of the workload, are controlled - * by parameters specified at runtime. - * + * The core benchmark scenario. Represents a set of clients doing simple CRUD operations. The + * relative proportion of different kinds of operations, and other properties of the workload, + * are controlled by parameters specified at runtime. + * * Properties to control the client: * + *
  • scanlengthdistribution: for scans, what distribution should be used to choose the + * number of records to scan, for each scan, between 1 and maxscanlength (default: uniform) + *
  • insertorder: should records be inserted in order by key ("ordered"), or in hashed + * order ("hashed") (default: hashed) + * */ -public class CoreWorkload extends Workload -{ - - /** - * The name of the database table to run queries against. - */ - public static final String TABLENAME_PROPERTY="table"; - - /** - * The default name of the database table to run queries against. - */ - public static final String TABLENAME_PROPERTY_DEFAULT="usertable"; - - public static String table; - - - /** - * The name of the property for the number of fields in a record. - */ - public static final String FIELD_COUNT_PROPERTY="fieldcount"; - - /** - * Default number of fields in a record. - */ - public static final String FIELD_COUNT_PROPERTY_DEFAULT="10"; - - int fieldcount; - - private List fieldnames; - - /** - * The name of the property for the field length distribution. Options are "uniform", "zipfian" (favoring short records), "constant", and "histogram". - * - * If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the fieldlength property. If "histogram", then the - * histogram will be read from the filename specified in the "fieldlengthhistogram" property. - */ - public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY="fieldlengthdistribution"; - /** - * The default field length distribution. - */ - public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "constant"; - - /** - * The name of the property for the length of a field in bytes. - */ - public static final String FIELD_LENGTH_PROPERTY="fieldlength"; - /** - * The default maximum length of a field in bytes. - */ - public static final String FIELD_LENGTH_PROPERTY_DEFAULT="100"; - - /** - * The name of a property that specifies the filename containing the field length histogram (only used if fieldlengthdistribution is "histogram"). - */ - public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY = "fieldlengthhistogram"; - /** - * The default filename containing a field length histogram. - */ - public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT = "hist.txt"; - - /** - * Generator object that produces field lengths. The value of this depends on the properties that start with "FIELD_LENGTH_". - */ - IntegerGenerator fieldlengthgenerator; - - /** - * The name of the property for deciding whether to read one field (false) or all fields (true) of a record. - */ - public static final String READ_ALL_FIELDS_PROPERTY="readallfields"; - - /** - * The default value for the readallfields property. - */ - public static final String READ_ALL_FIELDS_PROPERTY_DEFAULT="true"; - - boolean readallfields; - - /** - * The name of the property for deciding whether to write one field (false) or all fields (true) of a record. - */ - public static final String WRITE_ALL_FIELDS_PROPERTY="writeallfields"; - - /** - * The default value for the writeallfields property. - */ - public static final String WRITE_ALL_FIELDS_PROPERTY_DEFAULT="false"; - - boolean writeallfields; +public class CoreWorkload extends Workload { + /** + * The name of the database table to run queries against. + */ + public static final String TABLENAME_PROPERTY = "table"; + + /** + * The default name of the database table to run queries against. + */ + public static final String TABLENAME_PROPERTY_DEFAULT = "usertable"; + + public static String table; + + + /** + * The name of the property for the number of fields in a record. + */ + public static final String FIELD_COUNT_PROPERTY = "fieldcount"; + + /** + * Default number of fields in a record. + */ + public static final String FIELD_COUNT_PROPERTY_DEFAULT = "10"; + + int fieldcount; + + private List fieldnames; + + /** + * The name of the property for the field length distribution. Options are "uniform", "zipfian" + * (favoring short records), "constant", and "histogram". + * + * If "uniform", "zipfian" or "constant", the maximum field length will be that specified by the + * fieldlength property. If "histogram", then the + * histogram will be read from the filename specified in the "fieldlengthhistogram" property. + */ + public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY = "fieldlengthdistribution"; + + /** + * The default field length distribution. + */ + public static final String FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "constant"; + + /** + * The name of the property for the length of a field in bytes. + */ + public static final String FIELD_LENGTH_PROPERTY = "fieldlength"; + + /** + * The default maximum length of a field in bytes. + */ + public static final String FIELD_LENGTH_PROPERTY_DEFAULT = "100"; + + /** + * The name of a property that specifies the filename containing the field length histogram (only + * used if fieldlengthdistribution is "histogram"). + */ + public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY = "fieldlengthhistogram"; + + /** + * The default filename containing a field length histogram. + */ + public static final String FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT = "hist.txt"; + + /** + * Generator object that produces field lengths. The value of this depends on the properties that + * start with "FIELD_LENGTH_". + */ + IntegerGenerator fieldlengthgenerator; + + /** + * The name of the property for deciding whether to read one field (false) or all fields (true) of + * a record. + */ + public static final String READ_ALL_FIELDS_PROPERTY = "readallfields"; + + /** + * The default value for the readallfields property. + */ + public static final String READ_ALL_FIELDS_PROPERTY_DEFAULT = "true"; + + boolean readallfields; + + /** + * The name of the property for deciding whether to write one field (false) or all fields (true) + * of a record. + */ + public static final String WRITE_ALL_FIELDS_PROPERTY = "writeallfields"; + + /** + * The default value for the writeallfields property. + */ + public static final String WRITE_ALL_FIELDS_PROPERTY_DEFAULT = "false"; + + boolean writeallfields; /** @@ -160,7 +172,7 @@ public class CoreWorkload extends Workload * data against the formation template to ensure data integrity. */ public static final String DATA_INTEGRITY_PROPERTY = "dataintegrity"; - + /** * The default value for the dataintegrity property. */ @@ -172,337 +184,361 @@ public class CoreWorkload extends Workload */ private boolean dataintegrity; - /** - * The name of the property for the proportion of transactions that are reads. - */ - public static final String READ_PROPORTION_PROPERTY="readproportion"; - - /** - * The default proportion of transactions that are reads. - */ - public static final String READ_PROPORTION_PROPERTY_DEFAULT="0.95"; - - /** - * The name of the property for the proportion of transactions that are updates. - */ - public static final String UPDATE_PROPORTION_PROPERTY="updateproportion"; - - /** - * The default proportion of transactions that are updates. - */ - public static final String UPDATE_PROPORTION_PROPERTY_DEFAULT="0.05"; - - /** - * The name of the property for the proportion of transactions that are inserts. - */ - public static final String INSERT_PROPORTION_PROPERTY="insertproportion"; - - /** - * The default proportion of transactions that are inserts. - */ - public static final String INSERT_PROPORTION_PROPERTY_DEFAULT="0.0"; - - /** - * The name of the property for the proportion of transactions that are scans. - */ - public static final String SCAN_PROPORTION_PROPERTY="scanproportion"; - - /** - * The default proportion of transactions that are scans. - */ - public static final String SCAN_PROPORTION_PROPERTY_DEFAULT="0.0"; - - /** - * The name of the property for the proportion of transactions that are read-modify-write. - */ - public static final String READMODIFYWRITE_PROPORTION_PROPERTY="readmodifywriteproportion"; - - /** - * The default proportion of transactions that are scans. - */ - public static final String READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT="0.0"; - - /** - * The name of the property for the the distribution of requests across the keyspace. Options are "uniform", "zipfian" and "latest" - */ - public static final String REQUEST_DISTRIBUTION_PROPERTY="requestdistribution"; - - /** - * The default distribution of requests across the keyspace - */ - public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT="uniform"; - - /** - * The name of the property for the max scan length (number of records) - */ - public static final String MAX_SCAN_LENGTH_PROPERTY="maxscanlength"; - - /** - * The default max scan length. - */ - public static final String MAX_SCAN_LENGTH_PROPERTY_DEFAULT="1000"; - - /** - * The name of the property for the scan length distribution. Options are "uniform" and "zipfian" (favoring short scans) - */ - public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY="scanlengthdistribution"; - - /** - * The default max scan length. - */ - public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT="uniform"; - - /** - * The name of the property for the order to insert records. Options are "ordered" or "hashed" - */ - public static final String INSERT_ORDER_PROPERTY="insertorder"; - - /** - * Default insert order. - */ - public static final String INSERT_ORDER_PROPERTY_DEFAULT="hashed"; - - /** + /** + * The name of the property for the proportion of transactions that are reads. + */ + public static final String READ_PROPORTION_PROPERTY = "readproportion"; + + /** + * The default proportion of transactions that are reads. + */ + public static final String READ_PROPORTION_PROPERTY_DEFAULT = "0.95"; + + /** + * The name of the property for the proportion of transactions that are updates. + */ + public static final String UPDATE_PROPORTION_PROPERTY = "updateproportion"; + + /** + * The default proportion of transactions that are updates. + */ + public static final String UPDATE_PROPORTION_PROPERTY_DEFAULT = "0.05"; + + /** + * The name of the property for the proportion of transactions that are inserts. + */ + public static final String INSERT_PROPORTION_PROPERTY = "insertproportion"; + + /** + * The default proportion of transactions that are inserts. + */ + public static final String INSERT_PROPORTION_PROPERTY_DEFAULT = "0.0"; + + /** + * The name of the property for the proportion of transactions that are scans. + */ + public static final String SCAN_PROPORTION_PROPERTY = "scanproportion"; + + /** + * The default proportion of transactions that are scans. + */ + public static final String SCAN_PROPORTION_PROPERTY_DEFAULT = "0.0"; + + /** + * The name of the property for the proportion of transactions that are read-modify-write. + */ + public static final String READMODIFYWRITE_PROPORTION_PROPERTY = "readmodifywriteproportion"; + + /** + * The default proportion of transactions that are scans. + */ + public static final String READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT = "0.0"; + + /** + * The name of the property for the the distribution of requests across the keyspace. Options are + * "uniform", "zipfian" and "latest" + */ + public static final String REQUEST_DISTRIBUTION_PROPERTY = "requestdistribution"; + + /** + * The default distribution of requests across the keyspace + */ + public static final String REQUEST_DISTRIBUTION_PROPERTY_DEFAULT = "uniform"; + + /** + * The name of the property for the max scan length (number of records) + */ + public static final String MAX_SCAN_LENGTH_PROPERTY = "maxscanlength"; + + /** + * The default max scan length. + */ + public static final String MAX_SCAN_LENGTH_PROPERTY_DEFAULT = "1000"; + + /** + * The name of the property for the scan length distribution. Options are "uniform" and "zipfian" + * (favoring short scans) + */ + public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY = "scanlengthdistribution"; + + /** + * The default max scan length. + */ + public static final String SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT = "uniform"; + + /** + * The name of the property for the order to insert records. Options are "ordered" or "hashed" + */ + public static final String INSERT_ORDER_PROPERTY = "insertorder"; + + /** + * Default insert order. + */ + public static final String INSERT_ORDER_PROPERTY_DEFAULT = "hashed"; + + /** * Percentage data items that constitute the hot set. */ public static final String HOTSPOT_DATA_FRACTION = "hotspotdatafraction"; - + /** * Default value of the size of the hot set. */ public static final String HOTSPOT_DATA_FRACTION_DEFAULT = "0.2"; - + /** * Percentage operations that access the hot set. */ public static final String HOTSPOT_OPN_FRACTION = "hotspotopnfraction"; - + /** * Default value of the percentage operations accessing the hot set. */ public static final String HOTSPOT_OPN_FRACTION_DEFAULT = "0.8"; - - IntegerGenerator keysequence; - - DiscreteGenerator operationchooser; - - IntegerGenerator keychooser; - - Generator fieldchooser; - - AcknowledgedCounterGenerator transactioninsertkeysequence; - - IntegerGenerator scanlength; - - boolean orderedinserts; - - int recordcount; - - private Measurements _measurements = Measurements.getMeasurements(); - - protected static IntegerGenerator getFieldLengthGenerator(Properties p) throws WorkloadException{ - IntegerGenerator fieldlengthgenerator; - String fieldlengthdistribution = p.getProperty(FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); - int fieldlength=Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY,FIELD_LENGTH_PROPERTY_DEFAULT)); - String fieldlengthhistogram = p.getProperty(FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY, FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT); - if(fieldlengthdistribution.compareTo("constant") == 0) { - fieldlengthgenerator = new ConstantIntegerGenerator(fieldlength); - } else if(fieldlengthdistribution.compareTo("uniform") == 0) { - fieldlengthgenerator = new UniformIntegerGenerator(1, fieldlength); - } else if(fieldlengthdistribution.compareTo("zipfian") == 0) { - fieldlengthgenerator = new ZipfianGenerator(1, fieldlength); - } else if(fieldlengthdistribution.compareTo("histogram") == 0) { - try { - fieldlengthgenerator = new HistogramGenerator(fieldlengthhistogram); - } catch(IOException e) { - throw new WorkloadException("Couldn't read field length histogram file: "+fieldlengthhistogram, e); - } - } else { - throw new WorkloadException("Unknown field length distribution \""+fieldlengthdistribution+"\""); - } - return fieldlengthgenerator; - } - - /** - * Initialize the scenario. - * Called once, in the main client thread, before any operations are started. - */ - public void init(Properties p) throws WorkloadException - { - table = p.getProperty(TABLENAME_PROPERTY,TABLENAME_PROPERTY_DEFAULT); - - fieldcount=Integer.parseInt(p.getProperty(FIELD_COUNT_PROPERTY,FIELD_COUNT_PROPERTY_DEFAULT)); + + /** + * How many times to retry when insertion of a single item to a DB fails. + */ + public static final String INSERTION_RETRY_LIMIT = "core_workload_insertion_retry_limit"; + public static final String INSERTION_RETRY_LIMIT_DEFAULT = "0"; + + /** + * On average, how long to wait between the retries, in seconds. + */ + public static final String INSERTION_RETRY_INTERVAL = "core_workload_insertion_retry_interval"; + public static final String INSERTION_RETRY_INTERVAL_DEFAULT = "3"; + + IntegerGenerator keysequence; + + DiscreteGenerator operationchooser; + + IntegerGenerator keychooser; + + Generator fieldchooser; + + AcknowledgedCounterGenerator transactioninsertkeysequence; + + IntegerGenerator scanlength; + + boolean orderedinserts; + + int recordcount; + + int insertionRetryLimit; + int insertionRetryInterval; + + private Measurements _measurements = Measurements.getMeasurements(); + + protected static IntegerGenerator getFieldLengthGenerator(Properties p) throws WorkloadException { + IntegerGenerator fieldlengthgenerator; + String fieldlengthdistribution = p.getProperty( + FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); + int fieldlength = + Integer.parseInt(p.getProperty(FIELD_LENGTH_PROPERTY, FIELD_LENGTH_PROPERTY_DEFAULT)); + String fieldlengthhistogram = p.getProperty( + FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY, FIELD_LENGTH_HISTOGRAM_FILE_PROPERTY_DEFAULT); + if (fieldlengthdistribution.compareTo("constant") == 0) { + fieldlengthgenerator = new ConstantIntegerGenerator(fieldlength); + } else if (fieldlengthdistribution.compareTo("uniform") == 0) { + fieldlengthgenerator = new UniformIntegerGenerator(1, fieldlength); + } else if (fieldlengthdistribution.compareTo("zipfian") == 0) { + fieldlengthgenerator = new ZipfianGenerator(1, fieldlength); + } else if (fieldlengthdistribution.compareTo("histogram") == 0) { + try { + fieldlengthgenerator = new HistogramGenerator(fieldlengthhistogram); + } catch (IOException e) { + throw new WorkloadException( + "Couldn't read field length histogram file: " + fieldlengthhistogram, e); + } + } else { + throw new WorkloadException( + "Unknown field length distribution \"" + fieldlengthdistribution + "\""); + } + return fieldlengthgenerator; + } + + /** + * Initialize the scenario. + * Called once, in the main client thread, before any operations are started. + */ + public void init(Properties p) throws WorkloadException { + table = p.getProperty(TABLENAME_PROPERTY, TABLENAME_PROPERTY_DEFAULT); + + fieldcount = + Integer.parseInt(p.getProperty(FIELD_COUNT_PROPERTY, FIELD_COUNT_PROPERTY_DEFAULT)); fieldnames = new ArrayList(); for (int i = 0; i < fieldcount; i++) { - fieldnames.add("field" + i); + fieldnames.add("field" + i); } - fieldlengthgenerator = CoreWorkload.getFieldLengthGenerator(p); - - double readproportion=Double.parseDouble(p.getProperty(READ_PROPORTION_PROPERTY,READ_PROPORTION_PROPERTY_DEFAULT)); - double updateproportion=Double.parseDouble(p.getProperty(UPDATE_PROPORTION_PROPERTY,UPDATE_PROPORTION_PROPERTY_DEFAULT)); - double insertproportion=Double.parseDouble(p.getProperty(INSERT_PROPORTION_PROPERTY,INSERT_PROPORTION_PROPERTY_DEFAULT)); - double scanproportion=Double.parseDouble(p.getProperty(SCAN_PROPORTION_PROPERTY,SCAN_PROPORTION_PROPERTY_DEFAULT)); - double readmodifywriteproportion=Double.parseDouble(p.getProperty(READMODIFYWRITE_PROPORTION_PROPERTY,READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT)); - recordcount=Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT)); - if(recordcount == 0) - recordcount = Integer.MAX_VALUE; - String requestdistrib=p.getProperty(REQUEST_DISTRIBUTION_PROPERTY,REQUEST_DISTRIBUTION_PROPERTY_DEFAULT); - int maxscanlength=Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY,MAX_SCAN_LENGTH_PROPERTY_DEFAULT)); - String scanlengthdistrib=p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY,SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); - - int insertstart=Integer.parseInt(p.getProperty(INSERT_START_PROPERTY,INSERT_START_PROPERTY_DEFAULT)); - - readallfields=Boolean.parseBoolean(p.getProperty(READ_ALL_FIELDS_PROPERTY,READ_ALL_FIELDS_PROPERTY_DEFAULT)); - writeallfields=Boolean.parseBoolean(p.getProperty(WRITE_ALL_FIELDS_PROPERTY,WRITE_ALL_FIELDS_PROPERTY_DEFAULT)); - - dataintegrity = Boolean.parseBoolean(p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT)); - //Confirm that fieldlengthgenerator returns a constant if data - //integrity check requested. - if (dataintegrity && !(p.getProperty(FIELD_LENGTH_DISTRIBUTION_PROPERTY, FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant")) - { + fieldlengthgenerator = CoreWorkload.getFieldLengthGenerator(p); + + double readproportion = Double.parseDouble( + p.getProperty(READ_PROPORTION_PROPERTY, READ_PROPORTION_PROPERTY_DEFAULT)); + double updateproportion = Double.parseDouble( + p.getProperty(UPDATE_PROPORTION_PROPERTY, UPDATE_PROPORTION_PROPERTY_DEFAULT)); + double insertproportion = Double.parseDouble( + p.getProperty(INSERT_PROPORTION_PROPERTY, INSERT_PROPORTION_PROPERTY_DEFAULT)); + double scanproportion = Double.parseDouble( + p.getProperty(SCAN_PROPORTION_PROPERTY, SCAN_PROPORTION_PROPERTY_DEFAULT)); + double readmodifywriteproportion = Double.parseDouble(p.getProperty( + READMODIFYWRITE_PROPORTION_PROPERTY, READMODIFYWRITE_PROPORTION_PROPERTY_DEFAULT)); + recordcount = + Integer.parseInt(p.getProperty(Client.RECORD_COUNT_PROPERTY, Client.DEFAULT_RECORD_COUNT)); + if (recordcount == 0) + recordcount = Integer.MAX_VALUE; + String requestdistrib = + p.getProperty(REQUEST_DISTRIBUTION_PROPERTY, REQUEST_DISTRIBUTION_PROPERTY_DEFAULT); + int maxscanlength = + Integer.parseInt(p.getProperty(MAX_SCAN_LENGTH_PROPERTY, MAX_SCAN_LENGTH_PROPERTY_DEFAULT)); + String scanlengthdistrib = + p.getProperty(SCAN_LENGTH_DISTRIBUTION_PROPERTY, SCAN_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT); + + int insertstart = + Integer.parseInt(p.getProperty(INSERT_START_PROPERTY, INSERT_START_PROPERTY_DEFAULT)); + + readallfields = Boolean.parseBoolean( + p.getProperty(READ_ALL_FIELDS_PROPERTY, READ_ALL_FIELDS_PROPERTY_DEFAULT)); + writeallfields = Boolean.parseBoolean( + p.getProperty(WRITE_ALL_FIELDS_PROPERTY, WRITE_ALL_FIELDS_PROPERTY_DEFAULT)); + + dataintegrity = Boolean.parseBoolean( + p.getProperty(DATA_INTEGRITY_PROPERTY, DATA_INTEGRITY_PROPERTY_DEFAULT)); + // Confirm that fieldlengthgenerator returns a constant if data + // integrity check requested. + if (dataintegrity + && !(p.getProperty( + FIELD_LENGTH_DISTRIBUTION_PROPERTY, + FIELD_LENGTH_DISTRIBUTION_PROPERTY_DEFAULT)).equals("constant")) { System.err.println("Must have constant field size to check data integrity."); System.exit(-1); } - if (p.getProperty(INSERT_ORDER_PROPERTY,INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed")==0) - { - orderedinserts=false; - } - else if (requestdistrib.compareTo("exponential")==0) - { - double percentile = Double.parseDouble(p.getProperty(ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY, - ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT)); - double frac = Double.parseDouble(p.getProperty(ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY, - ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT)); - keychooser = new ExponentialGenerator(percentile, recordcount*frac); - } - else - { - orderedinserts=true; - } - - keysequence=new CounterGenerator(insertstart); - operationchooser=new DiscreteGenerator(); - if (readproportion>0) - { - operationchooser.addValue(readproportion,"READ"); - } - - if (updateproportion>0) - { - operationchooser.addValue(updateproportion,"UPDATE"); - } - - if (insertproportion>0) - { - operationchooser.addValue(insertproportion,"INSERT"); - } - - if (scanproportion>0) - { - operationchooser.addValue(scanproportion,"SCAN"); - } - - if (readmodifywriteproportion>0) - { - operationchooser.addValue(readmodifywriteproportion,"READMODIFYWRITE"); - } - - transactioninsertkeysequence=new AcknowledgedCounterGenerator(recordcount); - if (requestdistrib.compareTo("uniform")==0) - { - keychooser=new UniformIntegerGenerator(0,recordcount-1); - } - else if (requestdistrib.compareTo("zipfian")==0) - { - //it does this by generating a random "next key" in part by taking the modulus over the number of keys - //if the number of keys changes, this would shift the modulus, and we don't want that to change which keys are popular - //so we'll actually construct the scrambled zipfian generator with a keyspace that is larger than exists at the beginning - //of the test. that is, we'll predict the number of inserts, and tell the scrambled zipfian generator the number of existing keys - //plus the number of predicted keys as the total keyspace. then, if the generator picks a key that hasn't been inserted yet, will - //just ignore it and pick another key. this way, the size of the keyspace doesn't change from the perspective of the scrambled zipfian generator - - int opcount=Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY)); - int expectednewkeys=(int)(((double)opcount)*insertproportion*2.0); //2 is fudge factor - - keychooser=new ScrambledZipfianGenerator(recordcount+expectednewkeys); - } - else if (requestdistrib.compareTo("latest")==0) - { - keychooser=new SkewedLatestGenerator(transactioninsertkeysequence); - } - else if (requestdistrib.equals("hotspot")) - { - double hotsetfraction = Double.parseDouble(p.getProperty( - HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT)); - double hotopnfraction = Double.parseDouble(p.getProperty( - HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT)); - keychooser = new HotspotIntegerGenerator(0, recordcount - 1, - hotsetfraction, hotopnfraction); + if (p.getProperty(INSERT_ORDER_PROPERTY, INSERT_ORDER_PROPERTY_DEFAULT).compareTo("hashed") + == 0) { + orderedinserts = false; + } else if (requestdistrib.compareTo("exponential") == 0) { + double percentile = Double.parseDouble(p.getProperty( + ExponentialGenerator.EXPONENTIAL_PERCENTILE_PROPERTY, + ExponentialGenerator.EXPONENTIAL_PERCENTILE_DEFAULT)); + double frac = Double.parseDouble(p.getProperty( + ExponentialGenerator.EXPONENTIAL_FRAC_PROPERTY, + ExponentialGenerator.EXPONENTIAL_FRAC_DEFAULT)); + keychooser = new ExponentialGenerator(percentile, recordcount * frac); + } else { + orderedinserts = true; + } + + keysequence = new CounterGenerator(insertstart); + operationchooser = new DiscreteGenerator(); + if (readproportion > 0) { + operationchooser.addValue(readproportion, "READ"); } - else - { - throw new WorkloadException("Unknown request distribution \""+requestdistrib+"\""); - } - - fieldchooser=new UniformIntegerGenerator(0,fieldcount-1); - - if (scanlengthdistrib.compareTo("uniform")==0) - { - scanlength=new UniformIntegerGenerator(1,maxscanlength); - } - else if (scanlengthdistrib.compareTo("zipfian")==0) - { - scanlength=new ZipfianGenerator(1,maxscanlength); - } - else - { - throw new WorkloadException("Distribution \""+scanlengthdistrib+"\" not allowed for scan length"); - } - } - - public String buildKeyName(long keynum) { - if (!orderedinserts) - { - keynum=Utils.hash(keynum); - } - return "user"+keynum; - } - + + if (updateproportion > 0) { + operationchooser.addValue(updateproportion, "UPDATE"); + } + + if (insertproportion > 0) { + operationchooser.addValue(insertproportion, "INSERT"); + } + + if (scanproportion > 0) { + operationchooser.addValue(scanproportion, "SCAN"); + } + + if (readmodifywriteproportion > 0) { + operationchooser.addValue(readmodifywriteproportion, "READMODIFYWRITE"); + } + + transactioninsertkeysequence = new AcknowledgedCounterGenerator(recordcount); + if (requestdistrib.compareTo("uniform") == 0) { + keychooser = new UniformIntegerGenerator(0, recordcount - 1); + } else if (requestdistrib.compareTo("zipfian") == 0) { + // it does this by generating a random "next key" in part by taking the modulus over the + // number of keys. + // If the number of keys changes, this would shift the modulus, and we don't want that to + // change which keys are popular so we'll actually construct the scrambled zipfian generator + // with a keyspace that is larger than exists at the beginning of the test. that is, we'll predict + // the number of inserts, and tell the scrambled zipfian generator the number of existing keys + // plus the number of predicted keys as the total keyspace. then, if the generator picks a key + // that hasn't been inserted yet, will just ignore it and pick another key. this way, the size of + // the keyspace doesn't change from the perspective of the scrambled zipfian generator + + int opcount = Integer.parseInt(p.getProperty(Client.OPERATION_COUNT_PROPERTY)); + int expectednewkeys = (int) ((opcount) * insertproportion * 2.0); // 2 is fudge factor + + keychooser = new ScrambledZipfianGenerator(recordcount + expectednewkeys); + } else if (requestdistrib.compareTo("latest") == 0) { + keychooser = new SkewedLatestGenerator(transactioninsertkeysequence); + } else if (requestdistrib.equals("hotspot")) { + double hotsetfraction = + Double.parseDouble(p.getProperty(HOTSPOT_DATA_FRACTION, HOTSPOT_DATA_FRACTION_DEFAULT)); + double hotopnfraction = + Double.parseDouble(p.getProperty(HOTSPOT_OPN_FRACTION, HOTSPOT_OPN_FRACTION_DEFAULT)); + keychooser = new HotspotIntegerGenerator(0, recordcount - 1, hotsetfraction, hotopnfraction); + } else { + throw new WorkloadException("Unknown request distribution \"" + requestdistrib + "\""); + } + + fieldchooser = new UniformIntegerGenerator(0, fieldcount - 1); + + if (scanlengthdistrib.compareTo("uniform") == 0) { + scanlength = new UniformIntegerGenerator(1, maxscanlength); + } else if (scanlengthdistrib.compareTo("zipfian") == 0) { + scanlength = new ZipfianGenerator(1, maxscanlength); + } else { + throw new WorkloadException( + "Distribution \"" + scanlengthdistrib + "\" not allowed for scan length"); + } + + insertionRetryLimit = Integer.parseInt(p.getProperty( + INSERTION_RETRY_LIMIT, INSERTION_RETRY_LIMIT_DEFAULT)); + + insertionRetryInterval = Integer.parseInt(p.getProperty( + INSERTION_RETRY_INTERVAL, INSERTION_RETRY_INTERVAL_DEFAULT)); + } + + public String buildKeyName(long keynum) { + if (!orderedinserts) { + keynum = Utils.hash(keynum); + } + return "user" + keynum; + } + /** * Builds a value for a randomly chosen field. */ private HashMap buildSingleValue(String key) { - HashMap value = new HashMap(); + HashMap value = new HashMap(); String fieldkey = fieldnames.get(Integer.parseInt(fieldchooser.nextString())); ByteIterator data; if (dataintegrity) { data = new StringByteIterator(buildDeterministicValue(key, fieldkey)); } else { - //fill with random data + // fill with random data data = new RandomByteIterator(fieldlengthgenerator.nextInt()); } - value.put(fieldkey,data); + value.put(fieldkey, data); - return value; + return value; } /** * Builds values for all fields. */ - private HashMap buildValues(String key) { - HashMap values = new HashMap(); + private HashMap buildValues(String key) { + HashMap values = new HashMap(); for (String fieldkey : fieldnames) { ByteIterator data; if (dataintegrity) { data = new StringByteIterator(buildDeterministicValue(key, fieldkey)); } else { - //fill with random data + // fill with random data data = new RandomByteIterator(fieldlengthgenerator.nextInt()); } - values.put(fieldkey,data); + values.put(fieldkey, data); } return values; } @@ -525,244 +561,238 @@ private String buildDeterministicValue(String key, String fieldkey) { return sb.toString(); } - /** - * Do one insert operation. Because it will be called concurrently from multiple client threads, this - * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each - * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side - * effects other than DB operations. - */ - public boolean doInsert(DB db, Object threadstate) - { - int keynum=keysequence.nextInt(); - String dbkey = buildKeyName(keynum); - HashMap values = buildValues(dbkey); - if (db.insert(table,dbkey,values).equals(Status.OK)) - return true; - else - return false; - } - - /** - * Do one transaction operation. Because it will be called concurrently from multiple client threads, this - * function must be thread safe. However, avoid synchronized, or the threads will block waiting for each - * other, and it will be difficult to reach the target throughput. Ideally, this function would have no side - * effects other than DB operations. - */ - public boolean doTransaction(DB db, Object threadstate) - { - String op=operationchooser.nextString(); - - if (op.compareTo("READ")==0) - { - doTransactionRead(db); - } - else if (op.compareTo("UPDATE")==0) - { - doTransactionUpdate(db); - } - else if (op.compareTo("INSERT")==0) - { - doTransactionInsert(db); - } - else if (op.compareTo("SCAN")==0) - { - doTransactionScan(db); - } - else - { - doTransactionReadModifyWrite(db); - } - - return true; - } + /** + * Do one insert operation. Because it will be called concurrently from multiple client threads, + * this function must be thread safe. However, avoid synchronized, or the threads will block waiting + * for each other, and it will be difficult to reach the target throughput. Ideally, this function would + * have no side effects other than DB operations. + */ + public boolean doInsert(DB db, Object threadstate) { + int keynum = keysequence.nextInt(); + String dbkey = buildKeyName(keynum); + HashMap values = buildValues(dbkey); + + Status status; + int numOfRetries = 0; + do { + status = db.insert(table, dbkey, values); + if (status == Status.OK) { + break; + } + // Retry if configured. Without retrying, the load process will fail + // even if one single insertion fails. User can optionally configure + // an insertion retry limit (default is 0) to enable retry. + if (++numOfRetries <= insertionRetryLimit) { + System.err.println("Retrying insertion, retry count: " + numOfRetries); + try { + // Sleep for a random number between [0.8, 1.2)*insertionRetryInterval. + int sleepTime = (int) (1000 * insertionRetryInterval * (0.8 + 0.4 * Math.random())); + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + break; + } + + } else { + System.err.println("Error inserting, not retrying any more. number of attempts: " + numOfRetries + + "Insertion Retry Limit: " + insertionRetryLimit); + break; + + } + } while (true); + + return (status == Status.OK); + } + + /** + * Do one transaction operation. Because it will be called concurrently from multiple client + * threads, this function must be thread safe. However, avoid synchronized, or the threads will block waiting + * for each other, and it will be difficult to reach the target throughput. Ideally, this function would + * have no side effects other than DB operations. + */ + public boolean doTransaction(DB db, Object threadstate) { + String op = operationchooser.nextString(); + + if (op.compareTo("READ") == 0) { + doTransactionRead(db); + } else if (op.compareTo("UPDATE") == 0) { + doTransactionUpdate(db); + } else if (op.compareTo("INSERT") == 0) { + doTransactionInsert(db); + } else if (op.compareTo("SCAN") == 0) { + doTransactionScan(db); + } else { + doTransactionReadModifyWrite(db); + } + + return true; + } /** * Results are reported in the first three buckets of the histogram under - * the label "VERIFY". + * the label "VERIFY". * Bucket 0 means the expected data was returned. * Bucket 1 means incorrect data was returned. - * Bucket 2 means null data was returned when some data was expected. + * Bucket 2 means null data was returned when some data was expected. */ - protected void verifyRow(String key, HashMap cells) { + protected void verifyRow(String key, HashMap cells) { Status verifyStatus = Status.OK; long startTime = System.nanoTime(); if (!cells.isEmpty()) { for (Map.Entry entry : cells.entrySet()) { - if (!entry.getValue().toString().equals( - buildDeterministicValue(key, entry.getKey()))) { + if (!entry.getValue().toString().equals(buildDeterministicValue(key, entry.getKey()))) { verifyStatus = Status.UNEXPECTED_STATE; break; } } } else { - //This assumes that null data is never valid + // This assumes that null data is never valid verifyStatus = Status.ERROR; } long endTime = System.nanoTime(); _measurements.measure("VERIFY", (int) (endTime - startTime) / 1000); - _measurements.reportStatus("VERIFY",verifyStatus); + _measurements.reportStatus("VERIFY", verifyStatus); } - int nextKeynum() { - int keynum; - if(keychooser instanceof ExponentialGenerator) { - do - { - keynum=transactioninsertkeysequence.lastInt() - keychooser.nextInt(); - } - while(keynum < 0); - } else { - do - { - keynum=keychooser.nextInt(); - } - while (keynum > transactioninsertkeysequence.lastInt()); - } - return keynum; + int nextKeynum() { + int keynum; + if (keychooser instanceof ExponentialGenerator) { + do { + keynum = transactioninsertkeysequence.lastInt() - keychooser.nextInt(); + } while (keynum < 0); + } else { + do { + keynum = keychooser.nextInt(); + } while (keynum > transactioninsertkeysequence.lastInt()); } + return keynum; + } + + public void doTransactionRead(DB db) { + // choose a random key + int keynum = nextKeynum(); + + String keyname = buildKeyName(keynum); - public void doTransactionRead(DB db) - { - //choose a random key - int keynum = nextKeynum(); - - String keyname = buildKeyName(keynum); - - HashSet fields=null; - - if (!readallfields) - { - //read a random field - String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString())); - - fields=new HashSet(); - fields.add(fieldname); - } else if (dataintegrity) { + HashSet fields = null; + + if (!readallfields) { + // read a random field + String fieldname = fieldnames.get(Integer.parseInt(fieldchooser.nextString())); + + fields = new HashSet(); + fields.add(fieldname); + } else if (dataintegrity) { // pass the full field list if dataintegrity is on for verification fields = new HashSet(fieldnames); } - HashMap cells = - new HashMap(); - db.read(table,keyname,fields,cells); + HashMap cells = new HashMap(); + db.read(table, keyname, fields, cells); if (dataintegrity) { verifyRow(keyname, cells); } - } - - public void doTransactionReadModifyWrite(DB db) - { - //choose a random key - int keynum = nextKeynum(); - - String keyname = buildKeyName(keynum); - - HashSet fields=null; - - if (!readallfields) - { - //read a random field - String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString())); - - fields=new HashSet(); - fields.add(fieldname); - } - - HashMap values; - - if (writeallfields) - { - //new data for all the fields - values = buildValues(keyname); - } - else - { - //update a random field - values = buildSingleValue(keyname); - } - - //do the transaction - - HashMap cells = - new HashMap(); - - - long ist=_measurements.getIntendedtartTimeNs(); - long st = System.nanoTime(); - db.read(table,keyname,fields,cells); - - db.update(table,keyname,values); - - long en=System.nanoTime(); + } + + public void doTransactionReadModifyWrite(DB db) { + // choose a random key + int keynum = nextKeynum(); + + String keyname = buildKeyName(keynum); + + HashSet fields = null; + + if (!readallfields) { + // read a random field + String fieldname = fieldnames.get(Integer.parseInt(fieldchooser.nextString())); + + fields = new HashSet(); + fields.add(fieldname); + } + + HashMap values; + + if (writeallfields) { + // new data for all the fields + values = buildValues(keyname); + } else { + // update a random field + values = buildSingleValue(keyname); + } + + // do the transaction + + HashMap cells = new HashMap(); + + + long ist = _measurements.getIntendedtartTimeNs(); + long st = System.nanoTime(); + db.read(table, keyname, fields, cells); + + db.update(table, keyname, values); + + long en = System.nanoTime(); if (dataintegrity) { verifyRow(keyname, cells); } - _measurements .measure("READ-MODIFY-WRITE", (int)((en-st)/1000)); - _measurements .measureIntended("READ-MODIFY-WRITE", (int)((en-ist)/1000)); - } - - public void doTransactionScan(DB db) - { - //choose a random key - int keynum = nextKeynum(); - - String startkeyname = buildKeyName(keynum); - - //choose a random scan length - int len=scanlength.nextInt(); - - HashSet fields=null; - - if (!readallfields) - { - //read a random field - String fieldname=fieldnames.get(Integer.parseInt(fieldchooser.nextString())); - - fields=new HashSet(); - fields.add(fieldname); - } - - db.scan(table,startkeyname,len,fields,new Vector>()); - } - - public void doTransactionUpdate(DB db) - { - //choose a random key - int keynum = nextKeynum(); - - String keyname=buildKeyName(keynum); - - HashMap values; - - if (writeallfields) - { - //new data for all the fields - values = buildValues(keyname); - } - else - { - //update a random field - values = buildSingleValue(keyname); - } - - db.update(table,keyname,values); - } - - public void doTransactionInsert(DB db) - { - //choose the next key - int keynum=transactioninsertkeysequence.nextInt(); - - try { - String dbkey = buildKeyName(keynum); - - HashMap values = buildValues(dbkey); - db.insert(table,dbkey,values); - } finally { - transactioninsertkeysequence.acknowledge(keynum); - } - } + _measurements.measure("READ-MODIFY-WRITE", (int) ((en - st) / 1000)); + _measurements.measureIntended("READ-MODIFY-WRITE", (int) ((en - ist) / 1000)); + } + + public void doTransactionScan(DB db) { + // choose a random key + int keynum = nextKeynum(); + + String startkeyname = buildKeyName(keynum); + + // choose a random scan length + int len = scanlength.nextInt(); + + HashSet fields = null; + + if (!readallfields) { + // read a random field + String fieldname = fieldnames.get(Integer.parseInt(fieldchooser.nextString())); + + fields = new HashSet(); + fields.add(fieldname); + } + + db.scan(table, startkeyname, len, fields, new Vector>()); + } + + public void doTransactionUpdate(DB db) { + // choose a random key + int keynum = nextKeynum(); + + String keyname = buildKeyName(keynum); + + HashMap values; + + if (writeallfields) { + // new data for all the fields + values = buildValues(keyname); + } else { + // update a random field + values = buildSingleValue(keyname); + } + + db.update(table, keyname, values); + } + + public void doTransactionInsert(DB db) { + // choose the next key + int keynum = transactioninsertkeysequence.nextInt(); + + try { + String dbkey = buildKeyName(keynum); + + HashMap values = buildValues(dbkey); + db.insert(table, dbkey, values); + } finally { + transactioninsertkeysequence.acknowledge(keynum); + } + } } diff --git a/workloads/workload_template b/workloads/workload_template index de0bbae24a..f5e80c8899 100644 --- a/workloads/workload_template +++ b/workloads/workload_template @@ -156,3 +156,16 @@ timeseries.granularity=1000 # property. # reportlatencyforeacherror=false # latencytrackederrors="" + +# Insertion error retry for the core workload. +# +# By default, the YCSB core workload does not retry any operations. +# However, during the load process, if any insertion fails, the entire +# load process is terminated. +# If a user desires to have more robust behavior during this phase, they can +# enable retry for insertion by setting the following property to a positive +# number. +# core_workload_insertion_retry_limit = 0 +# +# the following number controls the interval between retries (in seconds): +# core_workload_insertion_retry_interval = 3