Skip to content
This repository has been archived by the owner on Mar 31, 2024. It is now read-only.

Commit

Permalink
Make basic settings dynamically updatable.
Browse files Browse the repository at this point in the history
The following currently support dynamic changes:
marvel.agent.interval  (also supports setting to -1 to disable exporting)
marvel.agent.indices
marvel.agent.exporter.es.hosts
marvel.agent.exporter.es.timeout

Also removed dashboard uploading code in es exporter as it is not needed.

Closes #20
  • Loading branch information
bleskes committed Mar 6, 2014
1 parent 48d6281 commit f5ed021
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 137 deletions.
111 changes: 82 additions & 29 deletions agent/src/main/java/org/elasticsearch/marvel/agent/AgentService.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
import org.elasticsearch.cluster.settings.DynamicSettings;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableSet;
Expand All @@ -54,6 +56,7 @@
import org.elasticsearch.marvel.agent.event.*;
import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.settings.NodeSettingsService;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -63,12 +66,12 @@

import static org.elasticsearch.common.collect.Lists.newArrayList;

public class AgentService extends AbstractLifecycleComponent<AgentService> {
public class AgentService extends AbstractLifecycleComponent<AgentService> implements NodeSettingsService.Listener {

public static final String SETTINGS_INTERVAL = "interval";
public static final String SETTINGS_INDICES = "indices";
public static final String SETTINGS_ENABLED = "enabled";
public static final String SETTINGS_SHARD_STATS_ENABLED = "shard_stats.enabled";
public static final String SETTINGS_INTERVAL = "marvel.agent.interval";
public static final String SETTINGS_INDICES = "marvel.agent.indices";
public static final String SETTINGS_ENABLED = "marvel.agent.enabled";
public static final String SETTINGS_SHARD_STATS_ENABLED = "marvel.agent.shard_stats.enabled";

private final InternalIndicesService indicesService;
private final NodeService nodeService;
Expand All @@ -79,42 +82,69 @@ public class AgentService extends AbstractLifecycleComponent<AgentService> {
private final IndicesLifecycle.Listener indicesLifeCycleListener;
private final ClusterStateListener clusterStateEventListener;

private volatile ExportingWorker exp;
private volatile Thread thread;
private final TimeValue interval;
private volatile ExportingWorker exportingWorker;
private volatile Thread workerThread;
private volatile long samplingInterval;
volatile private String[] indicesToExport = Strings.EMPTY_ARRAY;
volatile private boolean exportShardStats;

private Collection<Exporter> exporters;

volatile private String[] indicesToExport = Strings.EMPTY_ARRAY;
volatile private boolean exportShardStats;

private final BlockingQueue<Event> pendingEventsQueue;

@Inject
public AgentService(Settings settings, IndicesService indicesService,
NodeService nodeService, ClusterService clusterService,
Client client, ClusterName clusterName,
NodeSettingsService nodeSettingsService,
@ClusterDynamicSettings DynamicSettings dynamicSettings,
Set<Exporter> exporters) {
super(settings);
this.indicesService = (InternalIndicesService) indicesService;
this.clusterService = clusterService;
this.nodeService = nodeService;
this.interval = componentSettings.getAsTime(SETTINGS_INTERVAL, TimeValue.timeValueSeconds(10));
this.indicesToExport = componentSettings.getAsArray(SETTINGS_INDICES, this.indicesToExport, true);
this.exportShardStats = componentSettings.getAsBoolean(SETTINGS_SHARD_STATS_ENABLED, false);
this.samplingInterval = settings.getAsTime(SETTINGS_INTERVAL, TimeValue.timeValueSeconds(10)).millis();
this.indicesToExport = settings.getAsArray(SETTINGS_INDICES, this.indicesToExport, true);
this.exportShardStats = settings.getAsBoolean(SETTINGS_SHARD_STATS_ENABLED, false);
this.client = client;
this.clusterName = clusterName.value();

indicesLifeCycleListener = new IndicesLifeCycleListener();
clusterStateEventListener = new ClusterStateListener();
pendingEventsQueue = ConcurrentCollections.newBlockingQueue();

if (componentSettings.getAsBoolean(SETTINGS_ENABLED, true)) {
if (settings.getAsBoolean(SETTINGS_ENABLED, true)) {
this.exporters = ImmutableSet.copyOf(exporters);
} else {
this.exporters = ImmutableSet.of();
logger.info("collecting disabled by settings");
}

nodeSettingsService.addListener(this);
dynamicSettings.addDynamicSetting(SETTINGS_INTERVAL);
dynamicSettings.addDynamicSetting(SETTINGS_INDICES + ".*"); // array settings
}

protected void applyIntervalSettings() {
if (samplingInterval <= 0) {
logger.info("data sampling is disabled due to interval settings [{}]", samplingInterval);
if (workerThread != null) {

// notify worker to stop on its leisure, not to disturb an exporting operation
exportingWorker.closed = true;

exportingWorker = null;
workerThread = null;
}
} else if (workerThread == null || !workerThread.isAlive()) {

exportingWorker = new ExportingWorker();
workerThread = new Thread(exportingWorker, EsExecutors.threadName(settings, "marvel.exporters"));
workerThread.setDaemon(true);
workerThread.start();

}
}

@Override
Expand All @@ -125,33 +155,31 @@ protected void doStart() {
for (Exporter e : exporters)
e.start();

this.exp = new ExportingWorker();
this.thread = new Thread(exp, EsExecutors.threadName(settings, "marvel.exporters"));
this.thread.setDaemon(true);
this.thread.start();

indicesService.indicesLifecycle().addListener(indicesLifeCycleListener);
clusterService.addLast(clusterStateEventListener);

applyIntervalSettings();
}

@Override
protected void doStop() {
if (exporters.size() == 0) {
return;
}
this.exp.closed = true;
this.thread.interrupt();
try {
this.thread.join(60000);
} catch (InterruptedException e) {
// we don't care...
if (workerThread != null && workerThread.isAlive()) {
exportingWorker.closed = true;
workerThread.interrupt();
try {
workerThread.join(60000);
} catch (InterruptedException e) {
// we don't care...
}
}
for (Exporter e : exporters)
e.stop();

indicesService.indicesLifecycle().removeListener(indicesLifeCycleListener);
clusterService.remove(clusterStateEventListener);

}

@Override
Expand All @@ -160,16 +188,32 @@ protected void doClose() {
e.close();
}

@Override
public void onRefreshSettings(Settings settings) {
TimeValue newSamplingInterval = settings.getAsTime(SETTINGS_INTERVAL, null);
if (newSamplingInterval != null) {
logger.info("sampling interval updated to [{}]", newSamplingInterval);
samplingInterval = newSamplingInterval.millis();
applyIntervalSettings();
}

String[] indices = settings.getAsArray(SETTINGS_INDICES, null, true);
if (indices != null) {
logger.info("sampling indices updated to [{}]", Strings.arrayToCommaDelimitedString(indices));
indicesToExport = indices;
}
}

class ExportingWorker implements Runnable {

volatile boolean closed;
volatile boolean closed = false;

@Override
public void run() {
while (!closed) {
// sleep first to allow node to complete initialization before collecting the first start
try {
Thread.sleep(interval.millis());
Thread.sleep(samplingInterval);
if (closed) {
continue;
}
Expand Down Expand Up @@ -289,9 +333,15 @@ class ClusterStateListener implements org.elasticsearch.cluster.ClusterStateList

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (samplingInterval <= 0) {
// ignore as we're not sampling
return;
}

if (!event.localNodeMaster()) {
return;
}

// only collect if i'm master.
long timestamp = System.currentTimeMillis();

Expand Down Expand Up @@ -463,7 +513,10 @@ class IndicesLifeCycleListener extends IndicesLifecycle.Listener {

@Override
public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) {

if (samplingInterval <= 0) {
// ignore as we're not sampling
return;
}
DiscoveryNode relocatingNode = null;
if (indexShard.routingEntry() != null) {
if (indexShard.routingEntry().relocatingNodeId() != null) {
Expand Down
Loading

0 comments on commit f5ed021

Please sign in to comment.