Skip to content

Commit

Permalink
Post filter phrase suggestions
Browse files Browse the repository at this point in the history
This implementation has a bunch of problems that'll need to be worked
before it is a valid candidate for merging.  I don't have time to rebase
it right now but would still love the feedback on problem.  The ones I
remember:

1.  It performs the filtering by blocking the suggesting thread.
2.  Because there is no "exists" query type it uses a limit.  I now know
that isn't ass efficient as just using a count but it might be worth
implementing an exists query type for it any way.
3.  It feels like there are a lot of plumbing changes required for this
feature.  My guess is that is because I'm going about it wrong.  This
correlates with #1 pretty well.
4.  I have to wrap the filter through the map nodes and parse it during
the reduce step.  That feels silly.

Closes elastic#3482
  • Loading branch information
nik9000 committed Nov 4, 2013
1 parent fddb742 commit 53cbce7
Show file tree
Hide file tree
Showing 20 changed files with 419 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest requ
@Override
protected void moveToSecondPhase() throws Exception {
// no need to sort, since we know we have no hits back
final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty());
final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty(), request);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void finishHim() {

void innerFinishHim() throws Exception {
sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults, request);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ void finishHim() {
}

void innerFinishHim() throws Exception {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults, request);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected void moveToSecondPhase() throws Exception {

private void innerFinishHim() throws IOException {
sortedShardList = searchPhaseController.sortDocs(firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults, request);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void finishHim() {
}

void innerFinishHim() throws Exception {
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults, request);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest requ

@Override
protected void moveToSecondPhase() throws Exception {
final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty());
final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty(), request);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, ImmutableMap.of("total_hits", Long.toString(internalResponse.hits().totalHits())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private void finishHim() {

private void innerFinishHim() {
ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults, null);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ private void finishHim() {
}

private void innerFinishHim() {
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults, null);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ private void innerFinishHim() throws IOException {
docs[counter++] = scoreDoc;
}
}
final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults, null);
((InternalSearchHits) internalResponse.hits()).totalHits = Long.parseLong(this.scrollId.getAttributes().get("total_hits"));


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -62,13 +63,16 @@ public class TransportSuggestAction extends TransportBroadcastOperationAction<Su
private final IndicesService indicesService;

private final SuggestPhase suggestPhase;

private final Client client;

@Inject
public TransportSuggestAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, SuggestPhase suggestPhase) {
IndicesService indicesService, SuggestPhase suggestPhase, Client client) {
super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService;
this.suggestPhase = suggestPhase;
this.client = client;
}

@Override
Expand Down Expand Up @@ -142,7 +146,10 @@ protected SuggestResponse newResponse(SuggestRequest request, AtomicReferenceArr
}
}

return new SuggestResponse(new Suggest(Suggest.reduce(groupedSuggestions)), shardsResponses.length(), successfulShards, failedShards, shardFailures);
Suggest.ReduceContext reduceContext = new Suggest.ReduceContext(client, request.indices());
reduceContext.setPreference(request.preference());
reduceContext.setRouting(request.routing());
return new SuggestResponse(new Suggest(Suggest.reduce(groupedSuggestions, reduceContext)), shardsResponses.length(), successfulShards, failedShards, shardFailures);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.query;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.geo.ShapeRelation;
import org.elasticsearch.common.geo.builders.ShapeBuilder;
Expand Down Expand Up @@ -553,6 +554,10 @@ public static WrapperFilterBuilder wrapperFilter(String filter) {
public static WrapperFilterBuilder wrapperFilter(byte[] data, int offset, int length) {
return new WrapperFilterBuilder(data, offset, length);
}

public static WrapperFilterBuilder wrapperFilter(BytesReference bytes) {
return new WrapperFilterBuilder(bytes);
}

private FilterBuilders() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
* Time: 11:30
*/

import com.google.common.base.Charsets;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
Expand All @@ -36,27 +37,24 @@
* query builders.
*/
public class WrapperFilterBuilder extends BaseFilterBuilder {

private final byte[] source;
private final int offset;
private final int length;
private final BytesReference bytes;

public WrapperFilterBuilder(String source) {
this.source = source.getBytes(Charsets.UTF_8);
this.offset = 0;
this.length = this.source.length;
this(new BytesArray(source));
}

public WrapperFilterBuilder(byte[] source, int offset, int length) {
this.source = source;
this.offset = offset;
this.length = length;
this(new BytesArray(source, offset, length));
}

public WrapperFilterBuilder(BytesReference bytes) {
this.bytes = bytes;
}

@Override
protected void doXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(WrapperFilterParser.NAME);
builder.field("filter", source, offset, length);
builder.field("filter", bytes);
builder.endObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.lucene.index.Term;
import org.apache.lucene.search.*;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.XMaps;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -66,12 +68,14 @@ public int compare(AtomicArray.Entry<? extends QuerySearchResultProvider> o1, At

private final CacheRecycler cacheRecycler;
private final boolean optimizeSingleShard;
private final Client client;

@Inject
public SearchPhaseController(Settings settings, CacheRecycler cacheRecycler) {
public SearchPhaseController(Settings settings, CacheRecycler cacheRecycler, Client client) {
super(settings);
this.cacheRecycler = cacheRecycler;
this.optimizeSingleShard = componentSettings.getAsBoolean("optimize_single_shard", true);
this.client = client;
}

public boolean optimizeSingleShard() {
Expand Down Expand Up @@ -296,7 +300,7 @@ public void fillDocIdsToLoad(AtomicArray<ExtTIntArrayList> docsIdsToLoad, ScoreD
}
}

public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray<? extends QuerySearchResultProvider> queryResultsArr, AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr) {
public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray<? extends QuerySearchResultProvider> queryResultsArr, AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr, SearchRequest request) {

List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults = queryResultsArr.asList();
List<? extends AtomicArray.Entry<? extends FetchSearchResultProvider>> fetchResults = fetchResultsArr.asList();
Expand Down Expand Up @@ -411,7 +415,14 @@ public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray<? extends
Suggest.group(groupedSuggestions, shardResult);
}

suggest = hasSuggestions ? new Suggest(Suggest.Fields.SUGGEST, Suggest.reduce(groupedSuggestions)) : null;
Suggest.ReduceContext reduceContext = null;
if (request != null) {
reduceContext = new Suggest.ReduceContext(client, request.indices());
reduceContext.setPreference(request.preference());
reduceContext.setRouting(request.routing());
reduceContext.setTypes(request.types());
}
suggest = hasSuggestions ? new Suggest(Suggest.Fields.SUGGEST, Suggest.reduce(groupedSuggestions, reduceContext)) : null;
}

InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore);
Expand Down
62 changes: 60 additions & 2 deletions src/main/java/org/elasticsearch/search/suggest/Suggest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
Expand Down Expand Up @@ -176,11 +179,12 @@ public static Map<String, List<Suggest.Suggestion>> group(Map<String, List<Sugge
return groupedSuggestions;
}

public static List<Suggestion<? extends Entry<? extends Option>>> reduce(Map<String, List<Suggest.Suggestion>> groupedSuggestions) {
public static List<Suggestion<? extends Entry<? extends Option>>> reduce(Map<String, List<Suggest.Suggestion>> groupedSuggestions, ReduceContext context) {
List<Suggestion<? extends Entry<? extends Option>>> reduced = new ArrayList<Suggestion<? extends Entry<? extends Option>>>(groupedSuggestions.size());
for (java.util.Map.Entry<String, List<Suggestion>> unmergedResults : groupedSuggestions.entrySet()) {
List<Suggestion> value = unmergedResults.getValue();
Suggestion reduce = value.get(0).reduce(value);
reduce.filter(context);
reduce.trim();
reduced.add(reduce);
}
Expand Down Expand Up @@ -265,7 +269,14 @@ public Suggestion<T> reduce(List<Suggestion<T>> toReduce) {
protected Comparator<Option> sortComparator() {
return COMPARATOR;
}


/**
* Filter options after they've been reduced.
*/
protected void filter(ReduceContext context) {
// Default implementation is noop
}

/**
* Trims the number of options per suggest text term to the requested size.
* For internal usage.
Expand Down Expand Up @@ -647,4 +658,51 @@ public static Sort fromId(byte id) {
}
}
}

public static class ReduceContext {
private final Client client;
private final String[] indecies;
@Nullable
private String routing;
@Nullable
private String preference;
private String[] types = Strings.EMPTY_ARRAY;

public ReduceContext(Client client, String[] indecies) {
this.client = client;
this.indecies = indecies;
}

public Client getClient() {
return client;
}

public String[] getIndecies() {
return indecies;
}

public void setRouting(String routing) {
this.routing = routing;
}

public String getRouting() {
return routing;
}

public void setPreference(String preference) {
this.preference = preference;
}

public String getPreference() {
return preference;
}

public void setTypes(String[] types) {
this.types = types;
}

public String[] getTypes() {
return types;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.lucene.index.Terms;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.index.analysis.ShingleTokenFilterFactory;
Expand All @@ -46,7 +48,8 @@ public SuggestionSearchContext.SuggestionContext parse(XContentParser parser, Ma
PhraseSuggestionContext suggestion = new PhraseSuggestionContext(suggester);
XContentParser.Token token;
String fieldName = null;
boolean gramSizeSet = false;
boolean gramSizeSet = false;
boolean filterTypeSet = false;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
Expand Down Expand Up @@ -124,6 +127,25 @@ public SuggestionSearchContext.SuggestionContext parse(XContentParser parser, Ma
}
}
}
} else if ("filter".equals(fieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
} else if (token.isValue()) {
if ("type".equals(fieldName)) {
suggestion.setFilterType(PhraseSuggestion.FilterType.fromString(parser.text()));
filterTypeSet = true;
} else if ("extra".equals(fieldName)) {
// Copy the filter data
XContentBuilder copier = XContentFactory.contentBuilder(parser.contentType());
copier.copyCurrentStructure(parser);
suggestion.setFilterExtra(copier.bytes());
} else {
throw new ElasticSearchIllegalArgumentException(
"suggester[phrase][highlight] doesn't support field [" + fieldName + "]");
}
}
}
} else {
throw new ElasticSearchIllegalArgumentException("suggester[phrase] doesn't support array field [" + fieldName + "]");
}
Expand Down Expand Up @@ -166,7 +188,10 @@ public SuggestionSearchContext.SuggestionContext parse(XContentParser parser, Ma
}
}


// filterType defaults to PHRASE if filterExtra is set but it isn't
if (!filterTypeSet && suggestion.getFilterExtra().length() > 0) {
suggestion.setFilterType(PhraseSuggestion.FilterType.MATCH_PHRASE);
}

return suggestion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public Suggestion<? extends Entry<? extends Option>> innerExecute(String name, P
IndexReader indexReader, CharsRef spare) throws IOException {
double realWordErrorLikelihood = suggestion.realworldErrorLikelyhood();
final PhraseSuggestion response = new PhraseSuggestion(name, suggestion.getSize());
response.setFilterType(suggestion.getFilterType());
response.setFilterExtra(suggestion.getFilterExtra());
response.setField(new StringText(suggestion.getField()));

List<PhraseSuggestionContext.DirectCandidateGenerator> generators = suggestion.generators();
final int numGenerators = generators.size();
Expand Down
Loading

0 comments on commit 53cbce7

Please sign in to comment.