Skip to content

Commit

Permalink
Add clone_message() function (#153)
Browse files Browse the repository at this point in the history
* Add clone_message() function

Closes #138

* Add $message parameter to clone_message()
  • Loading branch information
joschi authored and kroepke committed Jan 4, 2017
1 parent ff9ec56 commit d58accf
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.graylog.plugins.pipelineprocessor.functions.ips.IpAddressConversion;
import org.graylog.plugins.pipelineprocessor.functions.json.JsonParse;
import org.graylog.plugins.pipelineprocessor.functions.json.SelectJsonPath;
import org.graylog.plugins.pipelineprocessor.functions.messages.CloneMessage;
import org.graylog.plugins.pipelineprocessor.functions.messages.CreateMessage;
import org.graylog.plugins.pipelineprocessor.functions.messages.DropMessage;
import org.graylog.plugins.pipelineprocessor.functions.messages.HasField;
Expand Down Expand Up @@ -97,6 +98,7 @@ protected void configure() {

addMessageProcessorFunction(DropMessage.NAME, DropMessage.class);
addMessageProcessorFunction(CreateMessage.NAME, CreateMessage.class);
addMessageProcessorFunction(CloneMessage.NAME, CloneMessage.class);
addMessageProcessorFunction(RouteToStream.NAME, RouteToStream.class);

// input related functions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* This file is part of Graylog Pipeline Processor.
*
* Graylog Pipeline Processor is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog Pipeline Processor is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog Pipeline Processor. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.pipelineprocessor.functions.messages;

import com.google.common.collect.ImmutableList;
import org.graylog.plugins.pipelineprocessor.EvaluationContext;
import org.graylog.plugins.pipelineprocessor.ast.functions.AbstractFunction;
import org.graylog.plugins.pipelineprocessor.ast.functions.FunctionArgs;
import org.graylog.plugins.pipelineprocessor.ast.functions.FunctionDescriptor;
import org.graylog.plugins.pipelineprocessor.ast.functions.ParameterDescriptor;
import org.graylog2.plugin.Message;

import static org.graylog.plugins.pipelineprocessor.ast.functions.ParameterDescriptor.type;

public class CloneMessage extends AbstractFunction<Message> {
public static final String NAME = "clone_message";

private final ParameterDescriptor<Message, Message> messageParam;

public CloneMessage() {
messageParam = type("message", Message.class).optional().description("The message to use, defaults to '$message'").build();
}

@Override
public Message evaluate(FunctionArgs args, EvaluationContext context) {
final Message currentMessage = messageParam.optional(args, context).orElse(context.currentMessage());
final Message clonedMessage = new Message(currentMessage.getMessage(), currentMessage.getSource(), currentMessage.getTimestamp());
clonedMessage.addFields(currentMessage.getFields());
clonedMessage.addStreams(currentMessage.getStreams());

// register in context so the processor can inject it later on
context.addCreatedMessage(clonedMessage);
return clonedMessage;
}

@Override
public FunctionDescriptor<Message> descriptor() {
return FunctionDescriptor.<Message>builder()
.name(NAME)
.params(ImmutableList.of(messageParam))
.returnType(Message.class)
.description("Clones a message")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.graylog.plugins.pipelineprocessor.functions.ips.IpAddressConversion;
import org.graylog.plugins.pipelineprocessor.functions.json.JsonParse;
import org.graylog.plugins.pipelineprocessor.functions.json.SelectJsonPath;
import org.graylog.plugins.pipelineprocessor.functions.messages.CloneMessage;
import org.graylog.plugins.pipelineprocessor.functions.messages.CreateMessage;
import org.graylog.plugins.pipelineprocessor.functions.messages.DropMessage;
import org.graylog.plugins.pipelineprocessor.functions.messages.HasField;
Expand Down Expand Up @@ -135,6 +136,7 @@ public static void registerFunctions() {

functions.put(DropMessage.NAME, new DropMessage());
functions.put(CreateMessage.NAME, new CreateMessage());
functions.put(CloneMessage.NAME, new CloneMessage());

// route to stream mocks
final StreamService streamService = mock(StreamService.class);
Expand Down Expand Up @@ -392,18 +394,52 @@ public void evalErrorSuppressed() {

@Test
public void newlyCreatedMessage() {
final Message message = new Message("test", "test", Tools.nowUTC());
message.addField("foo", "bar");
message.addStream(mock(Stream.class));
final Rule rule = parser.parseRule(ruleForTest(), false);
final EvaluationContext context = contextForRuleEval(rule, new Message("test", "test", Tools.nowUTC()));
final EvaluationContext context = contextForRuleEval(rule, message);

final Message origMessage = context.currentMessage();
final Message newMessage = Iterables.getOnlyElement(context.createdMessages());

assertThat(origMessage).isNotSameAs(newMessage);
assertThat(newMessage.getMessage()).isEqualTo("new");
assertThat(newMessage.getSource()).isEqualTo("synthetic");
assertThat(newMessage.getStreams()).isEmpty();
assertThat(newMessage.hasField("removed_again")).isFalse();
assertThat(newMessage.getFieldAs(Boolean.class, "has_source")).isTrue();
assertThat(newMessage.getFieldAs(String.class, "only_in")).isEqualTo("new message");
assertThat(newMessage.getFieldAs(String.class, "multi")).isEqualTo("new message");
assertThat(newMessage.getFieldAs(String.class, "foo")).isNull();
}

@Test
public void clonedMessage() {
final Message message = new Message("test", "test", Tools.nowUTC());
message.addField("foo", "bar");
message.addStream(mock(Stream.class));
final Rule rule = parser.parseRule(ruleForTest(), false);
final EvaluationContext context = contextForRuleEval(rule, message);

final Message origMessage = context.currentMessage();
final Message clonedMessage = Iterables.get(context.createdMessages(), 0);
final Message otherMessage = Iterables.get(context.createdMessages(), 1);

assertThat(origMessage).isNotSameAs(clonedMessage);
assertThat(clonedMessage).isNotNull();
assertThat(clonedMessage.getMessage()).isEqualTo(origMessage.getMessage());
assertThat(clonedMessage.getSource()).isEqualTo(origMessage.getSource());
assertThat(clonedMessage.getTimestamp()).isEqualTo(origMessage.getTimestamp());
assertThat(clonedMessage.getStreams()).isEqualTo(origMessage.getStreams());
assertThat(clonedMessage.hasField("removed_again")).isFalse();
assertThat(clonedMessage.getFieldAs(Boolean.class, "has_source")).isTrue();
assertThat(clonedMessage.getFieldAs(String.class, "only_in")).isEqualTo("new message");
assertThat(clonedMessage.getFieldAs(String.class, "multi")).isEqualTo("new message");
assertThat(clonedMessage.getFieldAs(String.class, "foo")).isEqualTo("bar");
assertThat(otherMessage).isNotNull();
assertThat(otherMessage.getMessage()).isEqualTo("foo");
assertThat(otherMessage.getSource()).isEqualTo("source");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
rule "operate on cloned message"
when true
then
let x = clone_message();
let new = create_message("foo", "source");
let cloned = clone_message(new);

set_field(field: "removed_again", value: "foo", message: x);
set_field(field: "only_in", value: "new message", message: x);
set_fields(fields: { multi: "new message" }, message: x);
set_field(field: "has_source", value: has_field("source", x), message: x);
route_to_stream(name: "some stream", message: x);
remove_field("removed_again", x);
end

0 comments on commit d58accf

Please sign in to comment.