From fbe6cff67677c23aba8faec46dfdfb115197fa0a Mon Sep 17 00:00:00 2001 From: "Corey J. Nolet" Date: Tue, 28 Apr 2015 00:39:30 -0400 Subject: [PATCH] Closes #331 Fixing EventFields so it doesn't have such bad runtime --- .../iterators/AbstractEvaluatingIterator.java | 16 +- .../iterators/support/EventFields.java | 170 +++++++----------- .../iterators/support/QueryEvaluator.java | 2 +- .../qfd/KeyToAttributeStoreQueryXform.java | 28 +-- 4 files changed, 95 insertions(+), 121 deletions(-) diff --git a/commons/src/main/java/org/calrissian/accumulorecipes/commons/iterators/AbstractEvaluatingIterator.java b/commons/src/main/java/org/calrissian/accumulorecipes/commons/iterators/AbstractEvaluatingIterator.java index aaaeedad..7b0acaa3 100644 --- a/commons/src/main/java/org/calrissian/accumulorecipes/commons/iterators/AbstractEvaluatingIterator.java +++ b/commons/src/main/java/org/calrissian/accumulorecipes/commons/iterators/AbstractEvaluatingIterator.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -199,13 +200,18 @@ private void findTop() throws IOException { ByteBuffer buf = ByteBuffer.wrap(serializedMap); // Serialize the EventFields object - EventFields newEventFields = new EventFields(); - for(Map.Entry entry : event.entries()) { - if((selectFields != null && selectFields.contains(entry.getKey()) || selectFields == null)) - newEventFields.put(entry.getKey(), entry.getValue()); + Set keysToRemove = new HashSet(); + if(selectFields != null) { + for(String field : event.keys()) { + if(!selectFields.contains(field)) + keysToRemove.add(field); + } } - newEventFields.write(kryo, new ByteBufferOutput(buf), newEventFields); + for(String field : keysToRemove) + event.removeAll(field); + + event.write(kryo, new ByteBufferOutput(buf), event); // Truncate array to the used size. returnValue = new Value(copyOfRange(serializedMap, 0, buf.position())); } else { diff --git a/commons/src/main/java/org/calrissian/accumulorecipes/commons/iterators/support/EventFields.java b/commons/src/main/java/org/calrissian/accumulorecipes/commons/iterators/support/EventFields.java index 2be2ca48..7a6fd224 100644 --- a/commons/src/main/java/org/calrissian/accumulorecipes/commons/iterators/support/EventFields.java +++ b/commons/src/main/java/org/calrissian/accumulorecipes/commons/iterators/support/EventFields.java @@ -16,7 +16,9 @@ */ package org.calrissian.accumulorecipes.commons.iterators.support; -import java.util.Collection; +import static java.util.Collections.unmodifiableMap; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -25,77 +27,66 @@ import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.serializers.DefaultArraySerializers; +import com.esotericsoftware.kryo.serializers.DefaultArraySerializers.ByteArraySerializer; import com.esotericsoftware.kryo.serializers.DefaultSerializers; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multiset; -import com.google.common.collect.SetMultimap; import org.apache.accumulo.core.security.ColumnVisibility; -import org.calrissian.accumulorecipes.commons.iterators.support.EventFields.FieldValue; /** * Object used to hold the fields in an event. This is a multimap because fields can be repeated. */ -public class EventFields extends Serializer implements SetMultimap { +public class EventFields extends Serializer { private static boolean kryoInitialized = false; - private static DefaultArraySerializers.ByteArraySerializer valueSerializer = new DefaultArraySerializers.ByteArraySerializer(); + private static ByteArraySerializer valueSerializer = new ByteArraySerializer(); private static DefaultSerializers.IntSerializer intSerializer = new DefaultSerializers.IntSerializer(); private static DefaultSerializers.StringSerializer stringSerializer = new DefaultSerializers.StringSerializer(); - private Multimap map = null; + private Map> map = null; + private int size = 0; public EventFields() { - map = HashMultimap.create(); - } - - public EventFields(Kryo kryo, Input input) { - - this(); - if(!kryoInitialized) - EventFields.initializeKryo(kryo); - } + map = new HashMap>(); + } + + @Override public void write(Kryo kryo, Output output, EventFields eventFields) { + // Write out the number of entries; + intSerializer.write(kryo, output, size); + for (Entry> entry : map.entrySet()) { + for(FieldValue fieldValue : entry.getValue()) { + // Write the fields in the value + stringSerializer.write(kryo, output, entry.getKey()); + valueSerializer.write(kryo, output, fieldValue.getVisibility().getExpression().length > 0 ? fieldValue.getVisibility().flatten() : fieldValue.getVisibility().getExpression()); + valueSerializer.write(kryo, output, fieldValue.getValue()); + valueSerializer.write(kryo, output, fieldValue.getMetadata()); + } + } - @Override public void write(Kryo kryo, Output output, EventFields eventFields) { - // Write out the number of entries; - intSerializer.write(kryo, output, map.size()); - for (Entry entry : map.entries()) { - // Write the key - stringSerializer.write(kryo, output, entry.getKey()); - // Write the fields in the value + output.flush(); - valueSerializer.write(kryo, output, entry.getValue().getVisibility().getExpression().length > 0 ? entry.getValue().getVisibility().flatten() : entry.getValue().getVisibility().getExpression()); - valueSerializer.write(kryo, output, entry.getValue().getValue()); - valueSerializer.write(kryo, output, entry.getValue().getMetadata()); } - output.flush(); - - } - - @Override - public EventFields read(Kryo kryo, Input input, Class eventFieldsClass) { + @Override + public EventFields read(Kryo kryo, Input input, Class eventFieldsClass) { + + // Read in the number of map entries + int entries = intSerializer.read(kryo, input, Integer.class); + for (int i = 0; i < entries; i++) { + // Read in the key + String key = stringSerializer.read(kryo, input, String.class); + // Read in the fields in the value + ColumnVisibility vis = new ColumnVisibility(valueSerializer.read(kryo, input, byte[].class)); + byte[] value = valueSerializer.read(kryo, input, byte[].class); + byte[] metadata = valueSerializer.read(kryo, input, byte[].class); + put(key, new FieldValue(vis, value, metadata)); + } - // Read in the number of map entries - int entries = intSerializer.read(kryo, input, Integer.class); - for (int i = 0; i < entries; i++) { - // Read in the key - String key = stringSerializer.read(kryo, input, String.class); - // Read in the fields in the value - ColumnVisibility vis = new ColumnVisibility(valueSerializer.read(kryo, input, byte[].class)); - byte[] value = valueSerializer.read(kryo, input, byte[].class); - byte[] metadata = valueSerializer.read(kryo, input, byte[].class); - map.put(key, new FieldValue(vis, value, metadata)); + return this; } - return this; - } - - public static synchronized void initializeKryo(Kryo kryo) { + public static synchronized void initializeKryo(Kryo kryo) { if (kryoInitialized) return; - valueSerializer = new DefaultArraySerializers.ByteArraySerializer(); + valueSerializer = new ByteArraySerializer(); kryo.register(byte[].class, valueSerializer); kryoInitialized = true; @@ -105,91 +96,64 @@ public int size() { return map.size(); } - public boolean isEmpty() { - return map.isEmpty(); + public Set>> entrySet() { + return map.entrySet(); } - public boolean containsKey(Object key) { - return map.containsKey(key); + public Map> asMap() { + return unmodifiableMap(map); } - public boolean containsValue(Object value) { - return map.containsValue(value); + public boolean isEmpty() { + return map.isEmpty(); } - public boolean containsEntry(Object key, Object value) { - return map.containsEntry(key, value); + public boolean containsKey(Object key) { + return map.containsKey(key); } public boolean put(String key, FieldValue value) { - return map.put(key, value); - } - public boolean remove(Object key, Object value) { - return map.remove(key, value); - } + Set fieldValues = map.get(key); + if(fieldValues == null) { + fieldValues = new HashSet(); + map.put(key, fieldValues); + } - public boolean putAll(String key, Iterable values) { - return map.putAll(key, values); - } + if(!fieldValues.contains(value)) + size++; - public boolean putAll(Multimap multimap) { - return map.putAll(multimap); + return fieldValues.add(value); } public void clear() { map.clear(); } - public Set keySet() { + public Set keys() { return map.keySet(); } - public Multiset keys() { - return map.keys(); - } - - public Collection values() { - return map.values(); - } - public Set get(String key) { - return (Set) map.get(key); - } - - public Set removeAll(Object key) { - return (Set) map.removeAll(key); - } - - public Set replaceValues(String key, Iterable values) { - return (Set) map.replaceValues(key, values); - } - - public Set> entries() { - return (Set>) map.entries(); + return map.get(key); } - public Map> asMap() { - return map.asMap(); + public Set removeAll(String key) { + Set values = map.remove(key); + if(values != null) + size -= values.size(); + return values; } public int getByteSize() { int count = 0; - for (Entry e : map.entries()) { - count += e.getKey().getBytes().length + e.getValue().size(); + for (Entry> e : map.entrySet()) { + for(FieldValue fieldValue : e.getValue()) + count += e.getKey().getBytes().length + fieldValue.size(); } return count; } - @Override - public String toString() { - StringBuilder buf = new StringBuilder(); - for (Entry entry : map.entries()) { - buf.append("\tkey: ").append(entry.getKey()).append(" -> ").append(entry.getValue().toString()).append("\n"); - } - return buf.toString(); - } - public static class FieldValue { diff --git a/commons/src/main/java/org/calrissian/accumulorecipes/commons/iterators/support/QueryEvaluator.java b/commons/src/main/java/org/calrissian/accumulorecipes/commons/iterators/support/QueryEvaluator.java index ead785a7..61b47f3a 100644 --- a/commons/src/main/java/org/calrissian/accumulorecipes/commons/iterators/support/QueryEvaluator.java +++ b/commons/src/main/java/org/calrissian/accumulorecipes/commons/iterators/support/QueryEvaluator.java @@ -155,7 +155,7 @@ public boolean evaluate(Key topKey, EventFields eventFields) { HashSet literalsCopy = new HashSet(literals); // Loop through the event fields and add them to the JexlContext. - for (Entry> field : eventFields.asMap().entrySet()) { + for (Entry> field : eventFields.asMap().entrySet()) { String fName = normalizeKey(topKey, field.getKey()); fName = removeInvalidChars(fName); diff --git a/commons/src/main/java/org/calrissian/accumulorecipes/commons/support/qfd/KeyToAttributeStoreQueryXform.java b/commons/src/main/java/org/calrissian/accumulorecipes/commons/support/qfd/KeyToAttributeStoreQueryXform.java index 5e164f40..92cd7b4b 100644 --- a/commons/src/main/java/org/calrissian/accumulorecipes/commons/support/qfd/KeyToAttributeStoreQueryXform.java +++ b/commons/src/main/java/org/calrissian/accumulorecipes/commons/support/qfd/KeyToAttributeStoreQueryXform.java @@ -20,6 +20,7 @@ import static org.calrissian.accumulorecipes.commons.support.attribute.Metadata.Visiblity.setVisibility; import java.util.HashMap; import java.util.Map; +import java.util.Set; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; @@ -62,20 +63,23 @@ public V apply(Map.Entry keyValueEntry) { EventFields eventFields = new EventFields(); eventFields.read(kryo, new Input(keyValueEntry.getValue().get()), EventFields.class); B entry = buildAttributeCollectionFromKey(keyValueEntry.getKey()); - for (Map.Entry fieldValue : eventFields.entries()) { - String[] aliasVal = splitPreserveAllTokens(new String(fieldValue.getValue().getValue()), ONE_BYTE); - Object javaVal = typeRegistry.decode(aliasVal[0], aliasVal[1]); + for (Map.Entry> fieldValue : eventFields.entrySet()) { - String vis = fieldValue.getValue().getVisibility().getExpression().length > 0 ? new String(fieldValue.getValue().getVisibility().getExpression()) : ""; + for(EventFields.FieldValue fieldValue1 : fieldValue.getValue()) { + String[] aliasVal = splitPreserveAllTokens(new String(fieldValue1.getValue()), ONE_BYTE); + Object javaVal = typeRegistry.decode(aliasVal[0], aliasVal[1]); - try { - Map meta = metadataSerDe.deserialize(fieldValue.getValue().getMetadata()); - Map metadata = (meta == null ? new HashMap() : new HashMap(meta)); - setVisibility(metadata, vis); - Attribute attribute = new Attribute(fieldValue.getKey(), javaVal, metadata); - entry.attr(attribute); - } catch(Exception e) { - log.error("There was an error deserializing the metadata for a attribute", e); + String vis = fieldValue1.getVisibility().getExpression().length > 0 ? new String(fieldValue1.getVisibility().getExpression()) : ""; + + try { + Map meta = metadataSerDe.deserialize(fieldValue1.getMetadata()); + Map metadata = (meta == null ? new HashMap() : new HashMap(meta)); + setVisibility(metadata, vis); + Attribute attribute = new Attribute(fieldValue.getKey(), javaVal, metadata); + entry.attr(attribute); + } catch(Exception e) { + log.error("There was an error deserializing the metadata for a attribute", e); + } } } return entry.build();