Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch to JBoss netty for receiving GELF messages #68

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions misc/graylog2.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ mongodb_threads_allowed_to_block_multiplier = 5
use_gelf = true
gelf_listen_address = 0.0.0.0
gelf_listen_port = 12201
use_gelf_tcp = false
# gelf_http_listen_port = 12202

# Drools Rule File (Use to rewrite incoming log messages)
# rules_file = /etc/graylog2.d/rules/graylog2.drl
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.7.Final</version>
</dependency>
</dependencies>

<build>
Expand Down
31 changes: 23 additions & 8 deletions src/main/java/org/graylog2/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@

package org.graylog2;

import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.log4j.Logger;
import org.graylog2.messagehandlers.amqp.AMQPSubscribedQueue;
import org.graylog2.messagehandlers.amqp.InvalidQueueTypeException;

import com.github.joschi.jadconfig.Parameter;
import com.github.joschi.jadconfig.ValidationException;
import com.github.joschi.jadconfig.ValidatorMethod;
Expand All @@ -28,14 +37,6 @@
import com.github.joschi.jadconfig.validators.PositiveIntegerValidator;
import com.github.joschi.jadconfig.validators.PositiveLongValidator;
import com.mongodb.ServerAddress;
import org.apache.log4j.Logger;
import org.graylog2.messagehandlers.amqp.AMQPSubscribedQueue;
import org.graylog2.messagehandlers.amqp.InvalidQueueTypeException;

import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* Helper class to hold configuration of Graylog2
Expand Down Expand Up @@ -113,6 +114,12 @@ public class Configuration {
@Parameter(value = "gelf_listen_port", required = true, validator = InetPortValidator.class)
private int gelfListenPort = 12201;

@Parameter(value = "use_gelf_tcp")
private boolean useGelfTcp = false;

@Parameter(value = "gelf_http_listen_port")
private int gelfHttpListenPort = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does 0 mean disabled?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, 0 is disabled.
I didn't want to add yet another flag: thus if it's not set explicitly it's not turned on.


@Parameter("amqp_enabled")
private boolean amqpEnabled = false;

Expand Down Expand Up @@ -231,6 +238,14 @@ public int getGelfListenPort() {
return gelfListenPort;
}

public boolean isUseGelfTcp() {
return useGelfTcp;
}

public int getGelfHttpListenPort() {
return gelfHttpListenPort;
}

public boolean isAmqpEnabled() {
return amqpEnabled;
}
Expand Down
49 changes: 29 additions & 20 deletions src/main/java/org/graylog2/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@

package org.graylog2;

import com.beust.jcommander.JCommander;
import com.github.joschi.jadconfig.JadConfig;
import com.github.joschi.jadconfig.RepositoryException;
import com.github.joschi.jadconfig.ValidationException;
import com.github.joschi.jadconfig.repositories.PropertiesRepository;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.IOUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
Expand All @@ -35,7 +39,7 @@
import org.graylog2.messagehandlers.amqp.AMQPSubscribedQueue;
import org.graylog2.messagehandlers.amqp.AMQPSubscriberThread;
import org.graylog2.messagehandlers.gelf.ChunkedGELFClientManager;
import org.graylog2.messagehandlers.gelf.GELFMainThread;
import org.graylog2.messagehandlers.gelf.GELFServer;
import org.graylog2.messagehandlers.syslog.SyslogServerThread;
import org.graylog2.messagequeue.MessageQueue;
import org.graylog2.messagequeue.MessageQueueFlusher;
Expand All @@ -46,14 +50,11 @@
import org.graylog2.periodical.MessageRetentionThread;
import org.graylog2.periodical.ServerValueWriterThread;

import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.beust.jcommander.JCommander;
import com.github.joschi.jadconfig.JadConfig;
import com.github.joschi.jadconfig.RepositoryException;
import com.github.joschi.jadconfig.ValidationException;
import com.github.joschi.jadconfig.repositories.PropertiesRepository;

/**
* Main class of Graylog2.
Expand Down Expand Up @@ -168,7 +169,10 @@ public static void main(String[] args) {

// Start GELF threads
if (configuration.isUseGELF()) {
initializeGELFThreads(configuration.getGelfListenAddress(), configuration.getGelfListenPort(), scheduler);
initializeGELFThreads(configuration.getGelfListenAddress(),
configuration.getGelfListenPort(),
configuration.isUseGelfTcp(),
configuration.getGelfHttpListenPort(), scheduler);
}

// Initialize AMQP Broker if enabled
Expand Down Expand Up @@ -231,13 +235,18 @@ private static void initializeMessageRetentionThread(ScheduledExecutorService sc
LOG.info("Retention time management active.");
}

private static void initializeGELFThreads(String gelfAddress, int gelfPort, ScheduledExecutorService scheduler) {
GELFMainThread gelfThread = new GELFMainThread(new InetSocketAddress(gelfAddress, gelfPort));
gelfThread.start();
private static void initializeGELFThreads(String gelfAddress, int gelfPort, boolean useTcp, int httpListenPort, ScheduledExecutorService scheduler) {

scheduler.scheduleAtFixedRate(new ChunkedGELFClientManagerThread(ChunkedGELFClientManager.getInstance()), ChunkedGELFClientManagerThread.INITIAL_DELAY, ChunkedGELFClientManagerThread.PERIOD, TimeUnit.SECONDS);
if (GELFServer.create(new InetSocketAddress(gelfAddress, gelfPort), useTcp, httpListenPort)) {
scheduler.scheduleAtFixedRate(new ChunkedGELFClientManagerThread(
ChunkedGELFClientManager.getInstance()),
ChunkedGELFClientManagerThread.INITIAL_DELAY,
ChunkedGELFClientManagerThread.PERIOD, TimeUnit.SECONDS);
LOG.info("GELF threads started");
} else {
LOG.error("GELF threads could not be started.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should exit hard here IMO

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

LOG.info("GELF threads started");
}

private static void initializeSyslogServer(String syslogProtocol, int syslogPort) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

package org.graylog2.messagehandlers.gelf;

import java.io.IOException;
import java.net.DatagramPacket;
import java.util.zip.DataFormatException;

import org.apache.log4j.Logger;
import org.graylog2.Tools;
import org.graylog2.blacklists.Blacklist;
Expand All @@ -28,10 +32,6 @@
import org.graylog2.messagehandlers.common.MessageCountUpdateHook;
import org.graylog2.messagehandlers.common.MessageParserHook;
import org.graylog2.messagehandlers.common.ReceiveHookManager;

import java.io.IOException;
import java.net.DatagramPacket;
import java.util.zip.DataFormatException;
import org.graylog2.messagequeue.MessageQueue;

/**
Expand Down Expand Up @@ -109,6 +109,10 @@ public ChunkedGELFClientHandler(DatagramPacket clientMessage) throws GELFExcepti
}
}

public ChunkedGELFClientHandler(byte[] data) throws GELFException, IOException, DataFormatException {
this(new DatagramPacket(data, data.length));
}

private void decompress(byte[] data, String hash) throws InvalidGELFCompressionMethodException, IOException {
// Determine compression type.
int type = GELF.getGELFType(data);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright 2012 Kay Roepke <kroepke@classdump.org>
*
* This file is part of Graylog2.
*
* Graylog2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog2. If not, see <http://www.gnu.org/licenses/>.
*
*/

package org.graylog2.messagehandlers.gelf;

import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

public class ChunkedGELFHandler extends SimpleChannelUpstreamHandler {
private static final Logger LOG = Logger.getLogger(ChunkedGELFHandler.class);

@Override
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {
final ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
final byte[] data = new byte[buffer.readableBytes()];
buffer.getBytes(0, data);
LOG.info("received chunked gelf messages, passing on to handler");
new ChunkedGELFClientHandler(data).handle();
}

@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e)
throws Exception {
LOG.error("Could not handle chunked GELF message", e.getCause());
e.getChannel().close();
}


}

This file was deleted.

Loading