Skip to content

Commit

Permalink
Add ability to search stream categories on Filter and Aggregation eve…
Browse files Browse the repository at this point in the history
…nts (#20160)

Add ability to search stream categories on Filter and Aggregation events
  • Loading branch information
kingzacko1 committed Sep 3, 2024
1 parent a73d630 commit eb4b131
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 21 deletions.
2 changes: 1 addition & 1 deletion changelog/unreleased/pr-20110.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ type = "a"
message = "Added categories to Streams to allow Illuminate content to be scoped to multiple products."

issues = ["graylog-plugin-enterprise#7945"]
pulls = ["20110"]
pulls = ["20110", "20160"]
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public abstract class AggregationEventProcessorConfigEntity implements EventProc
private static final String FIELD_QUERY = "query";
private static final String FIELD_FILTERS = "filters";
private static final String FIELD_STREAMS = "streams";
private static final String FIELD_STREAM_CATEGORIES = "stream_categories";
private static final String FIELD_GROUP_BY = "group_by";
private static final String FIELD_SERIES = "series";
private static final String FIELD_CONDITIONS = "conditions";
Expand All @@ -73,6 +74,9 @@ public abstract class AggregationEventProcessorConfigEntity implements EventProc
@JsonProperty(FIELD_STREAMS)
public abstract ImmutableSet<String> streams();

@JsonProperty(FIELD_STREAM_CATEGORIES)
public abstract ImmutableSet<String> streamCategories();

@JsonProperty(FIELD_GROUP_BY)
public abstract List<String> groupBy();

Expand Down Expand Up @@ -115,7 +119,8 @@ public static Builder create() {
.type(TYPE_NAME)
.filters(Collections.emptyList())
.useCronScheduling(false)
.eventLimit(0);
.eventLimit(0)
.streamCategories(ImmutableSet.of());
}

@JsonProperty(FIELD_QUERY)
Expand All @@ -127,6 +132,9 @@ public static Builder create() {
@JsonProperty(FIELD_STREAMS)
public abstract Builder streams(ImmutableSet<String> streams);

@JsonProperty(FIELD_STREAM_CATEGORIES)
public abstract Builder streamCategories(ImmutableSet<String> streamCategories);

@JsonProperty(FIELD_GROUP_BY)
public abstract Builder groupBy(List<String> groupBy);

Expand Down Expand Up @@ -189,6 +197,7 @@ public EventProcessorConfig toNativeEntity(Map<String, ValueReference> parameter
.cronExpression(cronExpression().orElse(null))
.cronTimezone(cronTimezone().orElse(null))
.eventLimit(eventLimit())
.streamCategories(streamCategories())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,17 @@ private ElasticsearchQueryString groupByQueryString(Event event) {
* @return the actual streams
*/
private Set<String> getStreams(AggregationEventProcessorParameters parameters) {
return parameters.streams().isEmpty() ? config.streams() : parameters.streams();
if (parameters.streams().isEmpty()) {
Set<String> configStreams = new HashSet<>(config.streams());
if (!config.streamCategories().isEmpty()) {
// TODO: We need to account for permissions of the user who created the event here in place of
// a blanket `true` here.
configStreams.addAll(permittedStreams.loadWithCategories(config.streamCategories(), streamId -> true));
}
return configStreams;
} else {
return parameters.streams();
}
}

private void filterSearch(EventFactory eventFactory, AggregationEventProcessorParameters parameters,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public abstract class AggregationEventProcessorConfig implements EventProcessorC
private static final String FIELD_QUERY_PARAMETERS = "query_parameters";
private static final String FIELD_FILTERS = "filters";
private static final String FIELD_STREAMS = "streams";
private static final String FIELD_STREAM_CATEGORIES = "stream_categories";
private static final String FIELD_GROUP_BY = "group_by";
static final String FIELD_SERIES = "series";
private static final String FIELD_CONDITIONS = "conditions";
Expand All @@ -94,6 +95,9 @@ public abstract class AggregationEventProcessorConfig implements EventProcessorC
@JsonProperty(FIELD_STREAMS)
public abstract ImmutableSet<String> streams();

@JsonProperty(FIELD_STREAM_CATEGORIES)
public abstract ImmutableSet<String> streamCategories();

@JsonProperty(FIELD_GROUP_BY)
public abstract List<String> groupBy();

Expand Down Expand Up @@ -183,7 +187,8 @@ public static Builder create() {
.filters(Collections.emptyList())
.type(TYPE_NAME)
.useCronScheduling(false)
.eventLimit(0);
.eventLimit(0)
.streamCategories(ImmutableSet.of());
}

@JsonProperty(FIELD_QUERY)
Expand All @@ -198,6 +203,9 @@ public static Builder create() {
@JsonProperty(FIELD_STREAMS)
public abstract Builder streams(Set<String> streams);

@JsonProperty(FIELD_STREAM_CATEGORIES)
public abstract Builder streamCategories(Set<String> streamCategories);

@JsonProperty(FIELD_GROUP_BY)
public abstract Builder groupBy(List<String> groupBy);

Expand Down Expand Up @@ -326,6 +334,7 @@ public EventProcessorConfigEntity toContentPackEntity(EntityDescriptorIds entity
.query(ValueReference.of(query()))
.filters(filters().stream().map(filter -> filter.toContentPackEntity(entityDescriptorIds)).toList())
.streams(streamRefs)
.streamCategories(streamCategories())
.groupBy(groupBy())
.series(series().stream().map(SeriesSpecEntity::fromNativeEntity).toList())
.conditions(conditions().orElse(null))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -521,6 +522,11 @@ private Filter filteringForStreamIds(Set<String> streamIds) {
private Set<String> getStreams(AggregationEventProcessorParameters parameters) {
// Streams in parameters should override the ones in the config
Set<String> streamIds = parameters.streams().isEmpty() ? config.streams() : parameters.streams();
if (parameters.streams().isEmpty() && !config.streamCategories().isEmpty()) {
streamIds = new HashSet<>(streamIds);
// TODO: How to take into consideration StreamPermissions here???
streamIds.addAll(permittedStreams.loadWithCategories(config.streamCategories(), (streamId) -> true));
}
final Set<String> existingStreams = moreSearch.loadStreams(streamIds).stream()
.map(Stream::getId)
.collect(toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,31 @@
package org.graylog.plugins.views.search.rest;

import com.google.common.collect.ImmutableSet;
import jakarta.inject.Inject;
import org.graylog.plugins.views.search.permissions.StreamPermissions;
import org.graylog2.streams.StreamService;

import jakarta.inject.Inject;

import java.util.Collection;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.graylog2.plugin.streams.Stream.NON_MESSAGE_STREAM_IDS;

public class PermittedStreams {
private final Supplier<Stream<String>> allStreamsProvider;
private final Function<Collection<String>, Stream<String>> streamCategoryMapper;


public PermittedStreams(Supplier<Stream<String>> allStreamsProvider) {
public PermittedStreams(Supplier<Stream<String>> allStreamsProvider, Function<Collection<String>, Stream<String>> streamCategoryMapper) {
this.allStreamsProvider = allStreamsProvider;
this.streamCategoryMapper = streamCategoryMapper;
}

@Inject
public PermittedStreams(StreamService streamService) {
this(() -> streamService.loadAll().stream().map(org.graylog2.plugin.streams.Stream::getId));
this(() -> streamService.loadAll().stream().map(org.graylog2.plugin.streams.Stream::getId),
(categories) -> streamService.mapCategoriesToIds(categories).stream());
}

public ImmutableSet<String> loadAllMessageStreams(final StreamPermissions streamPermissions) {
Expand All @@ -56,4 +60,11 @@ public ImmutableSet<String> loadAll(final StreamPermissions streamPermissions) {
.filter(streamPermissions::canReadStream)
.collect(ImmutableSet.toImmutableSet());
}

public ImmutableSet<String> loadWithCategories(final Collection<String> categories,
final StreamPermissions streamPermissions) {
return streamCategoryMapper.apply(categories)
.filter(streamPermissions::canReadStream)
.collect(ImmutableSet.toImmutableSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class PivotAggregationSearchTest {
@Mock
private NotificationService notificationService;

private final PermittedStreams permittedStreams = new PermittedStreams(Stream::of);
private final PermittedStreams permittedStreams = new PermittedStreams(Stream::of, (categories) -> Stream.of());

@Test
public void testExtractValuesWithGroupBy() throws Exception {
Expand Down Expand Up @@ -489,7 +489,7 @@ public void testQueryParameterSubstitution() {
queryEngine,
EventsConfigurationTestProvider.create(),
moreSearch,
new PermittedStreams(() -> Stream.of("00001")),
new PermittedStreams(() -> Stream.of("00001"), (categories) -> Stream.of()),
notificationService,
new QueryStringDecorators(Optional.of((queryString, parameterProvider, query) -> {
if (queryString.equals("source:$secret$") && parameterProvider.getParameter("secret").isPresent()) {
Expand Down Expand Up @@ -540,7 +540,7 @@ public void testAdditionalSearchTypes() {
queryEngine,
EventsConfigurationTestProvider.create(),
moreSearch,
new PermittedStreams(() -> Stream.of("00001")),
new PermittedStreams(() -> Stream.of("00001"), (categories) -> Stream.of()),
notificationService,
new QueryStringDecorators(Optional.empty())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private User mockUser() {
}

private SearchUser searchUser() {
return new SearchUser(mockUser(), (perm) -> true, (perm, id) -> true, new PermittedStreams(Stream::of),
return new SearchUser(mockUser(), (perm) -> true, (perm, id) -> true, new PermittedStreams(Stream::of, (categories) -> Stream.of()),
new HashMap<>());
}

Expand Down Expand Up @@ -111,7 +111,7 @@ private SearchUser searchUserRequiringPermission(String permission, String id) {
return new SearchUser(mockUser(),
(perm) -> perm.equals(permission),
(perm, pId) -> perm.equals(permission) && id.equals(pId),
new PermittedStreams(Stream::of),
new PermittedStreams(Stream::of, (categories) -> Stream.of()),
viewResolvers);
}

Expand Down Expand Up @@ -140,7 +140,7 @@ private SearchUser searchUserResolvedRequiringPermission(String expectedPermissi
return new SearchUser(mockUser(),
(perm) -> perm.equals("allowed-permission"),
(perm, pId) -> perm.equals("allowed-permission") && "resolved-id".equals(pId),
new PermittedStreams(Stream::of),
new PermittedStreams(Stream::of, (categories) -> Stream.of()),
viewResolvers);
}

Expand Down Expand Up @@ -182,7 +182,7 @@ private SearchUser searchUserResolvedRequiringPermissionEntity(String expectedPe
return new SearchUser(mockUser(),
(perm) -> perm.equals("allowed-permission"),
(perm, pId) -> perm.equals("allowed-permission") && "resolved-id".equals(pId),
new PermittedStreams(Stream::of),
new PermittedStreams(Stream::of, (categories) -> Stream.of()),
viewResolvers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void setUp() {
when(commandFactory.buildFromRequest(any())).thenReturn(ExportMessagesCommand.withDefaults());
when(commandFactory.buildWithSearchOnly(any(), any())).thenReturn(ExportMessagesCommand.withDefaults());
when(commandFactory.buildWithMessageList(any(), any(), any())).thenReturn(ExportMessagesCommand.withDefaults());
final PermittedStreams permittedStreams = new PermittedStreams(() -> Stream.of("a-default-stream"));
final PermittedStreams permittedStreams = new PermittedStreams(() -> Stream.of("a-default-stream"), (categories) -> Stream.of());
executionGuard = mock(SearchExecutionGuard.class);
SearchDomain searchDomain = mock(SearchDomain.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,91 @@
import com.google.common.collect.Streams;
import org.junit.jupiter.api.Test;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.graylog2.plugin.streams.Stream.NON_MESSAGE_STREAM_IDS;

public class PermittedStreamsTest {

@Test
public void findsStreams() {
final PermittedStreams sut = new PermittedStreams(() -> java.util.stream.Stream.of("oans", "zwoa", "gsuffa"));
final PermittedStreams sut = new PermittedStreams(() -> java.util.stream.Stream.of("oans", "zwoa", "gsuffa"), (categories) -> Stream.of());
ImmutableSet<String> result = sut.loadAllMessageStreams(id -> true);
assertThat(result).containsExactlyInAnyOrder("oans", "zwoa", "gsuffa");
}

@Test
public void filtersOutNonPermittedStreams() {
final PermittedStreams sut = new PermittedStreams(() -> java.util.stream.Stream.of("oans", "zwoa", "gsuffa"));
final PermittedStreams sut = new PermittedStreams(() -> java.util.stream.Stream.of("oans", "zwoa", "gsuffa"), (categories) -> Stream.of());
ImmutableSet<String> result = sut.loadAllMessageStreams(id -> id.equals("gsuffa"));
assertThat(result).containsExactly("gsuffa");
}

@Test
public void returnsEmptyListIfNoStreamsFound() {
final PermittedStreams sut = new PermittedStreams(() -> java.util.stream.Stream.of("oans", "zwoa", "gsuffa"));
final PermittedStreams sut = new PermittedStreams(() -> java.util.stream.Stream.of("oans", "zwoa", "gsuffa"), (categories) -> Stream.of());
ImmutableSet<String> result = sut.loadAllMessageStreams(id -> false);
assertThat(result).isEmpty();
}

@Test
public void filtersDefaultStreams() {
final PermittedStreams sut = new PermittedStreams(() -> Streams.concat(NON_MESSAGE_STREAM_IDS.stream(), java.util.stream.Stream.of("i'm ok")));
final PermittedStreams sut = new PermittedStreams(() -> Streams.concat(NON_MESSAGE_STREAM_IDS.stream(), java.util.stream.Stream.of("i'm ok")), (categories) -> Stream.of());
ImmutableSet<String> result = sut.loadAllMessageStreams(id -> true);
assertThat(result).containsExactly("i'm ok");
}

@Test
public void findsStreamsFromCategories() {
final PermittedStreams sut = new PermittedStreams(Stream::of, this::categoryMapping);
ImmutableSet<String> result = sut.loadWithCategories(List.of("colors"), (streamId) -> true);
assertThat(result).containsExactlyInAnyOrder("red", "yellow", "blue");
}

@Test
public void findsStreamsFromMultipleCategories() {
final PermittedStreams sut = new PermittedStreams(Stream::of, this::categoryMapping);
ImmutableSet<String> result = sut.loadWithCategories(List.of("colors", "numbers"), (streamId) -> true);
assertThat(result).containsExactlyInAnyOrder("red", "yellow", "blue", "one", "two", "three");
}

@Test
public void findsStreamsFromCategoriesWithPermissions() {
final PermittedStreams sut = new PermittedStreams(Stream::of, this::categoryMapping);
ImmutableSet<String> result = sut.loadWithCategories(List.of("colors"), (streamId) -> streamId.equals("red") || streamId.equals("blue"));
assertThat(result).containsExactlyInAnyOrder("red", "blue");
}

@Test
public void returnsEmptyIfNoStreamCategoriesMatch() {
final PermittedStreams sut = new PermittedStreams(Stream::of, this::categoryMapping);
ImmutableSet<String> result = sut.loadWithCategories(List.of("invalid_category"), (streamId) -> true);
assertThat(result).isEmpty();
}

@Test
public void returnsEmptyIfPermissionsFilter() {
final PermittedStreams sut = new PermittedStreams(Stream::of, this::categoryMapping);
ImmutableSet<String> result = sut.loadWithCategories(List.of("colors", "numbers", "animals"), (streamId) -> false);
assertThat(result).isEmpty();
}

private Stream<String> categoryMapping(Collection<String> categories) {
Set<String> streams = new HashSet<>();
if (categories.contains("colors")) {
streams.addAll(List.of("red", "yellow", "blue"));
}
if (categories.contains("numbers")) {
streams.addAll(List.of("one", "two", "three"));
}
if (categories.contains("animals")) {
streams.addAll(List.of("cat", "dog", "fox"));
}
return streams.stream();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Stream;

import static org.graylog2.shared.security.RestPermissions.DASHBOARDS_READ;
import static org.graylog2.shared.security.RestPermissions.STREAMS_READ;
Expand Down Expand Up @@ -125,7 +126,7 @@ public SearchUser build() {
Optional.ofNullable(user).orElseGet(() -> Mockito.mock(User.class)),
permission -> verifyPermission(permissions, permission),
(permission, entityid) -> verifyPermission(permissions, permission, entityid),
new PermittedStreams(knownStreamIDs::stream),
new PermittedStreams(knownStreamIDs::stream, (categories) -> Stream.of()),
new HashMap<>());
}

Expand Down

0 comments on commit eb4b131

Please sign in to comment.