Skip to content

Commit

Permalink
For #330 Initial implementation of module for json event store
Browse files Browse the repository at this point in the history
  • Loading branch information
cjnolet committed Apr 28, 2015
1 parent fbe6cff commit 4b3acdb
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ public CloseableIterable<T> query(BatchScanner scanner, GlobalIndexVisitor globa
checkNotNull(query);
checkNotNull(auths);


QueryPlanner queryPlan = new QueryPlanner(query, globalIndexVisitor, typeRegistry);

if (isEmpty(queryPlan.getOptimizedQuery()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,32 @@ public void testQuery_AndQuery() throws Exception {
}
}


@Test
public void testQuery_AndQuery1() throws Exception {

Event event = EventBuilder.create("", UUID.randomUUID().toString(), currentTimeMillis())
.attr(new Attribute("key1", 500, meta))
.attr(new Attribute("key1", 501, meta))
.build();

Event event2 = EventBuilder.create("", UUID.randomUUID().toString(), currentTimeMillis())
.attr(new Attribute("`key`.`1`", "val1", meta))
.attr(new Attribute("key2", "val2", meta))
.build();

store.save(asList(event, event2));

Node query = QueryBuilder.create().and().greaterThan("key1", 400).end().build();

Iterator<Event> itr = store.query(new Date(currentTimeMillis() - 5000),
new Date(), query, null, DEFAULT_AUTHS).iterator();

while(itr.hasNext()) {
System.out.println(itr.next());
}
}

@Test
public void testQuery_OrQuery() throws Exception {

Expand Down
99 changes: 99 additions & 0 deletions store/json-event-store/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (C) 2013 The Calrissian Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.calrissian.accumulorecipes</groupId>
<artifactId>store</artifactId>
<version>2.1.0-SNAPSHOT</version>
</parent>
<artifactId>json-event-store</artifactId>
<packaging>jar</packaging>
<name>Calrissian Accumulo Recipes Event Store</name>
<description>The event store is a document store for time-based events.</description>
<dependencies>

<!-- Accumulo Recipes -->
<dependency>
<groupId>org.calrissian.accumulorecipes</groupId>
<artifactId>commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.calrissian.mango</groupId>
<artifactId>mango-core</artifactId>
</dependency>
<dependency>
<groupId>org.calrissian.mango</groupId>
<artifactId>mango-json</artifactId>
<version>${mango.version}</version>
</dependency>
<dependency>
<groupId>org.calrissian.accumulorecipes</groupId>
<artifactId>event-store</artifactId>
<version>${project.version}</version>
</dependency>



<!-- Hadoop / Accumulo -->
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>

<!-- JUNIT -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>


<!-- Joda-Time -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>

<!-- Test -->
<dependency>
<groupId>org.calrissian.mango</groupId>
<artifactId>mango-json</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.calrissian.accumulorecipes</groupId>
<artifactId>test</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.calrissian.store.json.event;

import java.util.Map;

public class JsonEvent {

private String type;
private String id;
private long timestamp;

private Map<String, Object> document;

public JsonEvent(String type, String id, long timestamp, Map<String,Object> document) {
this.type = type;
this.id = id;
this.timestamp = timestamp;
this.document = document;
}

public String getType() {
return type;
}

public String getId() {
return id;
}

public Long getTimestamp() {
return timestamp;
}

public Map<String,Object> getDocument() {
return document;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.calrissian.store.json.event;

import java.util.Collection;
import java.util.Date;
import java.util.Set;

import org.calrissian.accumulorecipes.commons.domain.Auths;
import org.calrissian.mango.collect.CloseableIterable;
import org.calrissian.mango.criteria.domain.Node;
import org.calrissian.mango.domain.Pair;
import org.calrissian.mango.domain.event.EventIdentifier;

public interface JsonEventStore {

void save(Iterable<JsonEvent> jsonEvents);

/**
* Force persistence of all events currently in-memory to the backing persistence
* implementation.
* @throws Exception
*/
void flush() throws Exception;

/**
* Query the store using criteria specified with the specified filter fields
*
* @param start
* @param end
* @param node
* @param auths
* @return
*/
@Deprecated
CloseableIterable<JsonEvent> query(Date start, Date end, Node node, Set<String> selectFields, Auths auths);


CloseableIterable<JsonEvent> query(Date start, Date end, Set<String> types, Node node, Set<String> selectFields, Auths auths);

/**
* Query the store using criteria specified
*
* @param start
* @param end
* @param node
* @param auths
* @return
*/
@Deprecated
CloseableIterable<JsonEvent> query(Date start, Date end, Node node, Auths auths);


CloseableIterable<JsonEvent> query(Date start, Date end, Set<String> types, Node node, Auths auths);

/**
* If an event is already being indexed in another store, it's often useful to query a bunch
* back in batches. This method allows the selection of specific fields.
*
* @param indexes
* @param auths
* @return
*/
CloseableIterable<JsonEvent> get(Collection<EventIdentifier> indexes, Set<String> selectFields, Auths auths);

CloseableIterable<JsonEvent> getAllByType(Date start, Date stop, Set<String> types, Set<String> selectFields, Auths auths);

CloseableIterable<JsonEvent> getAllByType(Date start, Date stop, Set<String> types, Auths auths);

/**
* Queries events back by id and timestamp.
* @param indexes
* @param auths
* @return
*/
CloseableIterable<JsonEvent> get(Collection<EventIdentifier> indexes, Auths auths);


public CloseableIterable<Pair<String,String>> uniqueKeys(String prefix, String type, Auths auths);
public CloseableIterable<Object> uniqueValuesForKey(String prefix, String type, String alias, String key, Auths auths);
public CloseableIterable<String> getTypes(String prefix, Auths auths);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package org.calrissian.store.json.event.impl;

import static com.google.common.collect.Iterables.transform;
import static org.apache.commons.lang.StringUtils.replace;
import static org.calrissian.mango.json.util.store.JsonAttributeStore.toObject;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.Set;

import com.google.common.base.Function;
import org.calrissian.accumulorecipes.commons.domain.Auths;
import org.calrissian.accumulorecipes.eventstore.EventStore;
import org.calrissian.mango.collect.CloseableIterable;
import org.calrissian.mango.collect.CloseableIterables;
import org.calrissian.mango.criteria.domain.Node;
import org.calrissian.mango.domain.Pair;
import org.calrissian.mango.domain.event.Event;
import org.calrissian.mango.domain.event.EventBuilder;
import org.calrissian.mango.domain.event.EventIdentifier;
import org.calrissian.mango.json.util.store.JsonAttributeStore;
import org.calrissian.store.json.event.JsonEvent;
import org.calrissian.store.json.event.JsonEventStore;

public class AccumuloJsonEventStore implements JsonEventStore {

private EventStore eventStore;

public AccumuloJsonEventStore(EventStore eventStore) {
this.eventStore = eventStore;
}

@Override
public void save(Iterable<JsonEvent> jsonEvents) {
eventStore.save(transform(jsonEvents, jsonEventEventFunction));
}

@Override
public void flush() throws Exception {
eventStore.flush();
}

@Override
public CloseableIterable<JsonEvent> query(Date start, Date end, Node node, Set<String> selectFields, Auths auths) {
return CloseableIterables.transform(eventStore.query(start, end, node, selectFields, auths), eventJsonEventFunction);
}

@Override
public CloseableIterable<JsonEvent> query(Date start, Date end, Set<String> types, Node node, Set<String> selectFields, Auths auths) {
return CloseableIterables.transform(eventStore.query(start, end, types, node, selectFields, auths), eventJsonEventFunction);
}

@Override
public CloseableIterable<JsonEvent> query(Date start, Date end, Node node, Auths auths) {
return CloseableIterables.transform(eventStore.query(start, end, node, auths), eventJsonEventFunction);
}

@Override
public CloseableIterable<JsonEvent> query(Date start, Date end, Set<String> types, Node node, Auths auths) {
return CloseableIterables.transform(eventStore.query(start, end, types, node, auths), eventJsonEventFunction);
}

@Override
public CloseableIterable<JsonEvent> get(Collection<EventIdentifier> indexes, Set<String> selectFields, Auths auths) {
return CloseableIterables.transform(eventStore.get(indexes, selectFields, auths), eventJsonEventFunction);
}

@Override
public CloseableIterable<JsonEvent> getAllByType(Date start, Date stop, Set<String> types, Set<String> selectFields, Auths auths) {
return CloseableIterables.transform(eventStore.getAllByType(start, stop, types, selectFields, auths), eventJsonEventFunction);
}

@Override
public CloseableIterable<JsonEvent> getAllByType(Date start, Date stop, Set<String> types, Auths auths) {
return CloseableIterables.transform(eventStore.getAllByType(start, stop, types, auths), eventJsonEventFunction);
}

@Override
public CloseableIterable<JsonEvent> get(Collection<EventIdentifier> indexes, Auths auths) {
return CloseableIterables.transform(eventStore.get(indexes, auths), eventJsonEventFunction);
}

@Override
public CloseableIterable<Pair<String,String>> uniqueKeys(String prefix, String type, Auths auths) {
return CloseableIterables.transform(eventStore.uniqueKeys(prefix, type, auths), flattenedKeyToDottedKey);
}

@Override
public CloseableIterable<Object> uniqueValuesForKey(String prefix, String type, String alias, String key, Auths auths) {
return eventStore.uniqueValuesForKey(prefix, type, alias, key, auths);
}

@Override
public CloseableIterable<String> getTypes(String prefix, Auths auths) {
return eventStore.getTypes(prefix, auths);
}

private static Function<JsonEvent,Event> jsonEventEventFunction = new Function<JsonEvent,Event>() {
@Override
public Event apply(JsonEvent jsonEvent) {
EventBuilder eventBuilder = EventBuilder.create(jsonEvent.getType(), jsonEvent.getId(), jsonEvent.getTimestamp());
eventBuilder.attrs(JsonAttributeStore.fromMap(jsonEvent.getDocument()));
return eventBuilder.build();
}
};

private static Function<Event,JsonEvent> eventJsonEventFunction = new Function<Event,JsonEvent>() {
@Override
public JsonEvent apply(Event event) {
Map<String, Object> attrs = toObject(event.getAttributes());
return new JsonEvent(event.getType(), event.getId(), event.getTimestamp(), attrs);
}
};

private static Function<Pair<String, String>, Pair<String, String>> flattenedKeyToDottedKey = new Function<Pair<String, String>, Pair<String, String>>() {
@Override
public Pair<String, String> apply(Pair<String, String> s) {
return new Pair<String,String>(s.getOne(), replace(s.getTwo(), "_$", "."));
}
};

}
1 change: 1 addition & 0 deletions store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ limitations under the License.
<module>blob-store</module>
<module>changelog-store</module>
<module>entity-store</module>
<module>json-event-store</module>
<module>event-store</module>
<module>geospatial-store</module>
<module>graph-store</module>
Expand Down

0 comments on commit 4b3acdb

Please sign in to comment.