Skip to content

Commit

Permalink
Closes #331 Fixing EventFields so it doesn't have such bad runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
cjnolet committed Apr 28, 2015
1 parent 78207fb commit fbe6cff
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -199,13 +200,18 @@ private void findTop() throws IOException {
ByteBuffer buf = ByteBuffer.wrap(serializedMap);
// Serialize the EventFields object

EventFields newEventFields = new EventFields();
for(Map.Entry<String,EventFields.FieldValue> entry : event.entries()) {
if((selectFields != null && selectFields.contains(entry.getKey()) || selectFields == null))
newEventFields.put(entry.getKey(), entry.getValue());
Set<String> keysToRemove = new HashSet<String>();
if(selectFields != null) {
for(String field : event.keys()) {
if(!selectFields.contains(field))
keysToRemove.add(field);
}
}

newEventFields.write(kryo, new ByteBufferOutput(buf), newEventFields);
for(String field : keysToRemove)
event.removeAll(field);

event.write(kryo, new ByteBufferOutput(buf), event);
// Truncate array to the used size.
returnValue = new Value(copyOfRange(serializedMap, 0, buf.position()));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.calrissian.accumulorecipes.commons.iterators.support;

import java.util.Collection;
import static java.util.Collections.unmodifiableMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
Expand All @@ -25,77 +27,66 @@
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.DefaultArraySerializers;
import com.esotericsoftware.kryo.serializers.DefaultArraySerializers.ByteArraySerializer;
import com.esotericsoftware.kryo.serializers.DefaultSerializers;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import com.google.common.collect.SetMultimap;
import org.apache.accumulo.core.security.ColumnVisibility;
import org.calrissian.accumulorecipes.commons.iterators.support.EventFields.FieldValue;

/**
* Object used to hold the fields in an event. This is a multimap because fields can be repeated.
*/
public class EventFields extends Serializer<EventFields> implements SetMultimap<String, FieldValue> {
public class EventFields extends Serializer<EventFields> {

private static boolean kryoInitialized = false;
private static DefaultArraySerializers.ByteArraySerializer valueSerializer = new DefaultArraySerializers.ByteArraySerializer();
private static ByteArraySerializer valueSerializer = new ByteArraySerializer();
private static DefaultSerializers.IntSerializer intSerializer = new DefaultSerializers.IntSerializer();
private static DefaultSerializers.StringSerializer stringSerializer = new DefaultSerializers.StringSerializer();

private Multimap<String, FieldValue> map = null;
private Map<String, Set<FieldValue>> map = null;

private int size = 0;
public EventFields() {
map = HashMultimap.create();
}

public EventFields(Kryo kryo, Input input) {

this();
if(!kryoInitialized)
EventFields.initializeKryo(kryo);
}
map = new HashMap<String,Set<FieldValue>>();
}

@Override public void write(Kryo kryo, Output output, EventFields eventFields) {
// Write out the number of entries;
intSerializer.write(kryo, output, size);
for (Entry<String, Set<FieldValue>> entry : map.entrySet()) {
for(FieldValue fieldValue : entry.getValue()) {
// Write the fields in the value
stringSerializer.write(kryo, output, entry.getKey());
valueSerializer.write(kryo, output, fieldValue.getVisibility().getExpression().length > 0 ? fieldValue.getVisibility().flatten() : fieldValue.getVisibility().getExpression());
valueSerializer.write(kryo, output, fieldValue.getValue());
valueSerializer.write(kryo, output, fieldValue.getMetadata());
}
}

@Override public void write(Kryo kryo, Output output, EventFields eventFields) {
// Write out the number of entries;
intSerializer.write(kryo, output, map.size());
for (Entry<String, FieldValue> entry : map.entries()) {
// Write the key
stringSerializer.write(kryo, output, entry.getKey());
// Write the fields in the value
output.flush();

valueSerializer.write(kryo, output, entry.getValue().getVisibility().getExpression().length > 0 ? entry.getValue().getVisibility().flatten() : entry.getValue().getVisibility().getExpression());
valueSerializer.write(kryo, output, entry.getValue().getValue());
valueSerializer.write(kryo, output, entry.getValue().getMetadata());
}

output.flush();

}

@Override
public EventFields read(Kryo kryo, Input input, Class<EventFields> eventFieldsClass) {
@Override
public EventFields read(Kryo kryo, Input input, Class<EventFields> eventFieldsClass) {

// Read in the number of map entries
int entries = intSerializer.read(kryo, input, Integer.class);
for (int i = 0; i < entries; i++) {
// Read in the key
String key = stringSerializer.read(kryo, input, String.class);
// Read in the fields in the value
ColumnVisibility vis = new ColumnVisibility(valueSerializer.read(kryo, input, byte[].class));
byte[] value = valueSerializer.read(kryo, input, byte[].class);
byte[] metadata = valueSerializer.read(kryo, input, byte[].class);
put(key, new FieldValue(vis, value, metadata));
}

// Read in the number of map entries
int entries = intSerializer.read(kryo, input, Integer.class);
for (int i = 0; i < entries; i++) {
// Read in the key
String key = stringSerializer.read(kryo, input, String.class);
// Read in the fields in the value
ColumnVisibility vis = new ColumnVisibility(valueSerializer.read(kryo, input, byte[].class));
byte[] value = valueSerializer.read(kryo, input, byte[].class);
byte[] metadata = valueSerializer.read(kryo, input, byte[].class);
map.put(key, new FieldValue(vis, value, metadata));
return this;
}

return this;
}

public static synchronized void initializeKryo(Kryo kryo) {
public static synchronized void initializeKryo(Kryo kryo) {
if (kryoInitialized)
return;
valueSerializer = new DefaultArraySerializers.ByteArraySerializer();
valueSerializer = new ByteArraySerializer();
kryo.register(byte[].class, valueSerializer);

kryoInitialized = true;
Expand All @@ -105,91 +96,64 @@ public int size() {
return map.size();
}

public boolean isEmpty() {
return map.isEmpty();
public Set<Map.Entry<String, Set<FieldValue>>> entrySet() {
return map.entrySet();
}

public boolean containsKey(Object key) {
return map.containsKey(key);
public Map<String, Set<FieldValue>> asMap() {
return unmodifiableMap(map);
}

public boolean containsValue(Object value) {
return map.containsValue(value);
public boolean isEmpty() {
return map.isEmpty();
}

public boolean containsEntry(Object key, Object value) {
return map.containsEntry(key, value);
public boolean containsKey(Object key) {
return map.containsKey(key);
}

public boolean put(String key, FieldValue value) {
return map.put(key, value);
}

public boolean remove(Object key, Object value) {
return map.remove(key, value);
}
Set<FieldValue> fieldValues = map.get(key);
if(fieldValues == null) {
fieldValues = new HashSet<FieldValue>();
map.put(key, fieldValues);
}

public boolean putAll(String key, Iterable<? extends FieldValue> values) {
return map.putAll(key, values);
}
if(!fieldValues.contains(value))
size++;

public boolean putAll(Multimap<? extends String, ? extends FieldValue> multimap) {
return map.putAll(multimap);
return fieldValues.add(value);
}

public void clear() {
map.clear();
}

public Set<String> keySet() {
public Set<String> keys() {
return map.keySet();
}

public Multiset<String> keys() {
return map.keys();
}

public Collection<FieldValue> values() {
return map.values();
}

public Set<FieldValue> get(String key) {
return (Set<FieldValue>) map.get(key);
}

public Set<FieldValue> removeAll(Object key) {
return (Set<FieldValue>) map.removeAll(key);
}

public Set<FieldValue> replaceValues(String key, Iterable<? extends FieldValue> values) {
return (Set<FieldValue>) map.replaceValues(key, values);
}

public Set<Entry<String, FieldValue>> entries() {
return (Set<Entry<String, FieldValue>>) map.entries();
return map.get(key);
}

public Map<String, Collection<FieldValue>> asMap() {
return map.asMap();
public Set<FieldValue> removeAll(String key) {
Set<FieldValue> values = map.remove(key);
if(values != null)
size -= values.size();
return values;
}

public int getByteSize() {
int count = 0;
for (Entry<String, FieldValue> e : map.entries()) {
count += e.getKey().getBytes().length + e.getValue().size();
for (Entry<String, Set<FieldValue>> e : map.entrySet()) {
for(FieldValue fieldValue : e.getValue())
count += e.getKey().getBytes().length + fieldValue.size();
}
return count;
}

@Override
public String toString() {
StringBuilder buf = new StringBuilder();
for (Entry<String, FieldValue> entry : map.entries()) {
buf.append("\tkey: ").append(entry.getKey()).append(" -> ").append(entry.getValue().toString()).append("\n");
}
return buf.toString();
}



public static class FieldValue {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public boolean evaluate(Key topKey, EventFields eventFields) {
HashSet<String> literalsCopy = new HashSet<String>(literals);

// Loop through the event fields and add them to the JexlContext.
for (Entry<String, Collection<FieldValue>> field : eventFields.asMap().entrySet()) {
for (Entry<String, Set<FieldValue>> field : eventFields.asMap().entrySet()) {
String fName = normalizeKey(topKey, field.getKey());
fName = removeInvalidChars(fName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.calrissian.accumulorecipes.commons.support.attribute.Metadata.Visiblity.setVisibility;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
Expand Down Expand Up @@ -62,20 +63,23 @@ public V apply(Map.Entry<Key, Value> keyValueEntry) {
EventFields eventFields = new EventFields();
eventFields.read(kryo, new Input(keyValueEntry.getValue().get()), EventFields.class);
B entry = buildAttributeCollectionFromKey(keyValueEntry.getKey());
for (Map.Entry<String, EventFields.FieldValue> fieldValue : eventFields.entries()) {
String[] aliasVal = splitPreserveAllTokens(new String(fieldValue.getValue().getValue()), ONE_BYTE);
Object javaVal = typeRegistry.decode(aliasVal[0], aliasVal[1]);
for (Map.Entry<String,Set<EventFields.FieldValue>> fieldValue : eventFields.entrySet()) {

String vis = fieldValue.getValue().getVisibility().getExpression().length > 0 ? new String(fieldValue.getValue().getVisibility().getExpression()) : "";
for(EventFields.FieldValue fieldValue1 : fieldValue.getValue()) {
String[] aliasVal = splitPreserveAllTokens(new String(fieldValue1.getValue()), ONE_BYTE);
Object javaVal = typeRegistry.decode(aliasVal[0], aliasVal[1]);

try {
Map<String,String> meta = metadataSerDe.deserialize(fieldValue.getValue().getMetadata());
Map<String,String> metadata = (meta == null ? new HashMap<String,String>() : new HashMap<String,String>(meta));
setVisibility(metadata, vis);
Attribute attribute = new Attribute(fieldValue.getKey(), javaVal, metadata);
entry.attr(attribute);
} catch(Exception e) {
log.error("There was an error deserializing the metadata for a attribute", e);
String vis = fieldValue1.getVisibility().getExpression().length > 0 ? new String(fieldValue1.getVisibility().getExpression()) : "";

try {
Map<String,String> meta = metadataSerDe.deserialize(fieldValue1.getMetadata());
Map<String,String> metadata = (meta == null ? new HashMap<String,String>() : new HashMap<String,String>(meta));
setVisibility(metadata, vis);
Attribute attribute = new Attribute(fieldValue.getKey(), javaVal, metadata);
entry.attr(attribute);
} catch(Exception e) {
log.error("There was an error deserializing the metadata for a attribute", e);
}
}
}
return entry.build();
Expand Down

0 comments on commit fbe6cff

Please sign in to comment.