diff --git a/agent/src/main/java/org/elasticsearch/marvel/agent/AgentService.java b/agent/src/main/java/org/elasticsearch/marvel/agent/AgentService.java index 95d740c81209f7..accb996182f547 100644 --- a/agent/src/main/java/org/elasticsearch/marvel/agent/AgentService.java +++ b/agent/src/main/java/org/elasticsearch/marvel/agent/AgentService.java @@ -36,6 +36,8 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.settings.ClusterDynamicSettings; +import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableSet; @@ -54,6 +56,7 @@ import org.elasticsearch.marvel.agent.event.*; import org.elasticsearch.marvel.agent.exporter.Exporter; import org.elasticsearch.node.service.NodeService; +import org.elasticsearch.node.settings.NodeSettingsService; import java.util.ArrayList; import java.util.Collection; @@ -63,12 +66,12 @@ import static org.elasticsearch.common.collect.Lists.newArrayList; -public class AgentService extends AbstractLifecycleComponent { +public class AgentService extends AbstractLifecycleComponent implements NodeSettingsService.Listener { - public static final String SETTINGS_INTERVAL = "interval"; - public static final String SETTINGS_INDICES = "indices"; - public static final String SETTINGS_ENABLED = "enabled"; - public static final String SETTINGS_SHARD_STATS_ENABLED = "shard_stats.enabled"; + public static final String SETTINGS_INTERVAL = "marvel.agent.interval"; + public static final String SETTINGS_INDICES = "marvel.agent.indices"; + public static final String SETTINGS_ENABLED = "marvel.agent.enabled"; + public static final String SETTINGS_SHARD_STATS_ENABLED = "marvel.agent.shard_stats.enabled"; private final InternalIndicesService indicesService; private final NodeService nodeService; @@ -79,14 +82,14 @@ public class AgentService extends AbstractLifecycleComponent { private final IndicesLifecycle.Listener indicesLifeCycleListener; private final ClusterStateListener clusterStateEventListener; - private volatile ExportingWorker exp; - private volatile Thread thread; - private final TimeValue interval; + private volatile ExportingWorker exportingWorker; + private volatile Thread workerThread; + private volatile long samplingInterval; + volatile private String[] indicesToExport = Strings.EMPTY_ARRAY; + volatile private boolean exportShardStats; private Collection exporters; - volatile private String[] indicesToExport = Strings.EMPTY_ARRAY; - volatile private boolean exportShardStats; private final BlockingQueue pendingEventsQueue; @@ -94,14 +97,16 @@ public class AgentService extends AbstractLifecycleComponent { public AgentService(Settings settings, IndicesService indicesService, NodeService nodeService, ClusterService clusterService, Client client, ClusterName clusterName, + NodeSettingsService nodeSettingsService, + @ClusterDynamicSettings DynamicSettings dynamicSettings, Set exporters) { super(settings); this.indicesService = (InternalIndicesService) indicesService; this.clusterService = clusterService; this.nodeService = nodeService; - this.interval = componentSettings.getAsTime(SETTINGS_INTERVAL, TimeValue.timeValueSeconds(10)); - this.indicesToExport = componentSettings.getAsArray(SETTINGS_INDICES, this.indicesToExport, true); - this.exportShardStats = componentSettings.getAsBoolean(SETTINGS_SHARD_STATS_ENABLED, false); + this.samplingInterval = settings.getAsTime(SETTINGS_INTERVAL, TimeValue.timeValueSeconds(10)).millis(); + this.indicesToExport = settings.getAsArray(SETTINGS_INDICES, this.indicesToExport, true); + this.exportShardStats = settings.getAsBoolean(SETTINGS_SHARD_STATS_ENABLED, false); this.client = client; this.clusterName = clusterName.value(); @@ -109,12 +114,37 @@ public AgentService(Settings settings, IndicesService indicesService, clusterStateEventListener = new ClusterStateListener(); pendingEventsQueue = ConcurrentCollections.newBlockingQueue(); - if (componentSettings.getAsBoolean(SETTINGS_ENABLED, true)) { + if (settings.getAsBoolean(SETTINGS_ENABLED, true)) { this.exporters = ImmutableSet.copyOf(exporters); } else { this.exporters = ImmutableSet.of(); logger.info("collecting disabled by settings"); } + + nodeSettingsService.addListener(this); + dynamicSettings.addDynamicSetting(SETTINGS_INTERVAL); + dynamicSettings.addDynamicSetting(SETTINGS_INDICES + ".*"); // array settings + } + + protected void applyIntervalSettings() { + if (samplingInterval <= 0) { + logger.info("data sampling is disabled due to interval settings [{}]", samplingInterval); + if (workerThread != null) { + + // notify worker to stop on its leisure, not to disturb an exporting operation + exportingWorker.closed = true; + + exportingWorker = null; + workerThread = null; + } + } else if (workerThread == null || !workerThread.isAlive()) { + + exportingWorker = new ExportingWorker(); + workerThread = new Thread(exportingWorker, EsExecutors.threadName(settings, "marvel.exporters")); + workerThread.setDaemon(true); + workerThread.start(); + + } } @Override @@ -125,13 +155,10 @@ protected void doStart() { for (Exporter e : exporters) e.start(); - this.exp = new ExportingWorker(); - this.thread = new Thread(exp, EsExecutors.threadName(settings, "marvel.exporters")); - this.thread.setDaemon(true); - this.thread.start(); - indicesService.indicesLifecycle().addListener(indicesLifeCycleListener); clusterService.addLast(clusterStateEventListener); + + applyIntervalSettings(); } @Override @@ -139,19 +166,20 @@ protected void doStop() { if (exporters.size() == 0) { return; } - this.exp.closed = true; - this.thread.interrupt(); - try { - this.thread.join(60000); - } catch (InterruptedException e) { - // we don't care... + if (workerThread != null && workerThread.isAlive()) { + exportingWorker.closed = true; + workerThread.interrupt(); + try { + workerThread.join(60000); + } catch (InterruptedException e) { + // we don't care... + } } for (Exporter e : exporters) e.stop(); indicesService.indicesLifecycle().removeListener(indicesLifeCycleListener); clusterService.remove(clusterStateEventListener); - } @Override @@ -160,16 +188,32 @@ protected void doClose() { e.close(); } + @Override + public void onRefreshSettings(Settings settings) { + TimeValue newSamplingInterval = settings.getAsTime(SETTINGS_INTERVAL, null); + if (newSamplingInterval != null) { + logger.info("sampling interval updated to [{}]", newSamplingInterval); + samplingInterval = newSamplingInterval.millis(); + applyIntervalSettings(); + } + + String[] indices = settings.getAsArray(SETTINGS_INDICES, null, true); + if (indices != null) { + logger.info("sampling indices updated to [{}]", Strings.arrayToCommaDelimitedString(indices)); + indicesToExport = indices; + } + } + class ExportingWorker implements Runnable { - volatile boolean closed; + volatile boolean closed = false; @Override public void run() { while (!closed) { // sleep first to allow node to complete initialization before collecting the first start try { - Thread.sleep(interval.millis()); + Thread.sleep(samplingInterval); if (closed) { continue; } @@ -289,9 +333,15 @@ class ClusterStateListener implements org.elasticsearch.cluster.ClusterStateList @Override public void clusterChanged(ClusterChangedEvent event) { + if (samplingInterval <= 0) { + // ignore as we're not sampling + return; + } + if (!event.localNodeMaster()) { return; } + // only collect if i'm master. long timestamp = System.currentTimeMillis(); @@ -463,7 +513,10 @@ class IndicesLifeCycleListener extends IndicesLifecycle.Listener { @Override public void indexShardStateChanged(IndexShard indexShard, @Nullable IndexShardState previousState, IndexShardState currentState, @Nullable String reason) { - + if (samplingInterval <= 0) { + // ignore as we're not sampling + return; + } DiscoveryNode relocatingNode = null; if (indexShard.routingEntry() != null) { if (indexShard.routingEntry().relocatingNodeId() != null) { diff --git a/agent/src/main/java/org/elasticsearch/marvel/agent/exporter/ESExporter.java b/agent/src/main/java/org/elasticsearch/marvel/agent/exporter/ESExporter.java index 50032ffd169c18..aacd84d4105b90 100644 --- a/agent/src/main/java/org/elasticsearch/marvel/agent/exporter/ESExporter.java +++ b/agent/src/main/java/org/elasticsearch/marvel/agent/exporter/ESExporter.java @@ -28,7 +28,10 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.settings.ClusterDynamicSettings; +import org.elasticsearch.cluster.settings.DynamicSettings; import org.elasticsearch.common.Base64; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -40,37 +43,40 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.*; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.smile.SmileXContent; -import org.elasticsearch.env.Environment; -import org.elasticsearch.marvel.agent.Plugin; import org.elasticsearch.marvel.agent.Utils; import org.elasticsearch.marvel.agent.event.Event; +import org.elasticsearch.node.settings.NodeSettingsService; -import java.io.*; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.URL; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Map; -public class ESExporter extends AbstractLifecycleComponent implements Exporter { +public class ESExporter extends AbstractLifecycleComponent implements Exporter, NodeSettingsService.Listener { + + private static final String SETTINGS_PREFIX = "marvel.agent.exporter.es."; + public static final String SETTINGS_HOSTS = SETTINGS_PREFIX + "hosts"; + public static final String SETTINGS_INDEX_PREFIX = SETTINGS_PREFIX + "index.prefix"; + public static final String SETTINGS_INDEX_TIME_FORMAT = SETTINGS_PREFIX + "index.timeformat"; + public static final String SETTINGS_TIMEOUT = SETTINGS_PREFIX + "timeout"; volatile String[] hosts; final String indexPrefix; final DateTimeFormatter indexTimeFormatter; - final int timeout; - - // index to upload dashboards into. - final String kibanaIndex; - final String[] dashboardPathsToUpload; + volatile int timeout; final ClusterService clusterService; final ClusterName clusterName; public final static DateTimeFormatter defaultDatePrinter = Joda.forPattern("date_time").printer(); - volatile boolean checkedAndUploadedAllResources = false; + volatile boolean checkedAndUploadedIndexTemplate = false; final NodeStatsRenderer nodeStatsRenderer; final ShardStatsRenderer shardStatsRenderer; @@ -83,38 +89,20 @@ public class ESExporter extends AbstractLifecycleComponent implement Thread keepAliveThread; @Inject - public ESExporter(Settings settings, ClusterService clusterService, ClusterName clusterName, Environment environment, Plugin marvelPlugin) { + public ESExporter(Settings settings, ClusterService clusterService, ClusterName clusterName, + @ClusterDynamicSettings DynamicSettings dynamicSettings, NodeSettingsService nodeSettingsService) { super(settings); this.clusterService = clusterService; this.clusterName = clusterName; - hosts = componentSettings.getAsArray("es.hosts", new String[]{"localhost:9200"}); - indexPrefix = componentSettings.get("es.index.prefix", ".marvel"); - String indexTimeFormat = componentSettings.get("es.index.timeformat", "YYYY.MM.dd"); + hosts = settings.getAsArray(SETTINGS_HOSTS, new String[]{"localhost:9200"}); + indexPrefix = settings.get(SETTINGS_INDEX_PREFIX, ".marvel"); + String indexTimeFormat = settings.get(SETTINGS_INDEX_TIME_FORMAT, "YYYY.MM.dd"); indexTimeFormatter = DateTimeFormat.forPattern(indexTimeFormat).withZoneUTC(); - timeout = (int) componentSettings.getAsTime("es.timeout", new TimeValue(6000)).seconds(); - - kibanaIndex = componentSettings.get("es.kibana_index", ".marvel-kibana"); - - - String dashboardsBasePath = componentSettings.get("es.upload.dashboards.path"); - File dashboardsBase; - if (dashboardsBasePath != null) { - dashboardsBase = new File(dashboardsBasePath); - } else { - dashboardsBase = new File(new File(environment.pluginsFile(), marvelPlugin.name()), - "_site/app/dashboards/marvel".replace('/', File.separatorChar) - ); - } - ArrayList dashboardPaths = new ArrayList(); - for (String d : componentSettings.getAsArray("es.upload.dashboards", new String[]{})) { - dashboardPaths.add(new File(dashboardsBase, d).getAbsolutePath()); - } - - dashboardPathsToUpload = dashboardPaths.toArray(new String[dashboardPaths.size()]); + timeout = (int) settings.getAsTime(SETTINGS_TIMEOUT, new TimeValue(6000)).seconds(); nodeStatsRenderer = new NodeStatsRenderer(); shardStatsRenderer = new ShardStatsRenderer(); @@ -125,6 +113,10 @@ public ESExporter(Settings settings, ClusterService clusterService, ClusterName keepAliveWorker = new ConnectionKeepAliveWorker(); + dynamicSettings.addDynamicSetting(SETTINGS_HOSTS + ".*"); + dynamicSettings.addDynamicSetting(SETTINGS_TIMEOUT); + nodeSettingsService.addListener(this); + logger.debug("initialized with targets: {}, index prefix [{}], index time format [{}]", hosts, indexPrefix, indexTimeFormat); } @@ -179,11 +171,11 @@ public void exportClusterStats(ClusterStatsResponse clusterStats) { private HttpURLConnection openExportingConnection() { - if (!checkedAndUploadedAllResources) { + if (!checkedAndUploadedIndexTemplate) { try { - checkedAndUploadedAllResources = checkAndUploadAllResources(); + checkedAndUploadedIndexTemplate = checkAndUploadIndexTemplate(); } catch (RuntimeException e) { - logger.error("failed to upload critical resources, stopping export", e); + logger.error("failed to upload index template, stopping export", e); return null; } } @@ -333,10 +325,10 @@ private HttpURLConnection openConnection(String method, String path, String cont } } finally { if (hostIndex > 0 && hostIndex < hosts.length) { - logger.debug("moving [{}] failed hosts to the end of the list", hostIndex + 1); + logger.debug("moving [{}] failed hosts to the end of the list", hostIndex); String[] newHosts = new String[hosts.length]; System.arraycopy(hosts, hostIndex, newHosts, 0, hosts.length - hostIndex); - System.arraycopy(hosts, 0, newHosts, hosts.length - hostIndex - 1, hostIndex + 1); + System.arraycopy(hosts, 0, newHosts, hosts.length - hostIndex, hostIndex); hosts = newHosts; logger.debug("preferred target host is now [{}]", hosts[0]); } @@ -345,74 +337,6 @@ private HttpURLConnection openConnection(String method, String path, String cont return null; } - /** - * Checks if resources such as index templates and dashboards already exist and if not uploads them/ - * Any critical error that should prevent data exporting is communicated via an exception. - * - * @return true if all resources exist or are uploaded. - */ - private boolean checkAndUploadAllResources() { - boolean ret = checkAndUploadIndexTemplate(); - for (String dashPath : dashboardPathsToUpload) { - ret = checkAndUploadDashboard(dashPath) && ret; - } - return ret; - } - - private boolean checkAndUploadDashboard(String path) { - logger.debug("checking/uploading [{}]", path); - File dashboardFile = new File(path); - if (!dashboardFile.exists()) { - logger.warn("can't upload dashboard [{}] - file doesn't exist", path); - return true; - } - try { - byte[] dashboardBytes = Streams.copyToByteArray(dashboardFile); - XContentParser parser = XContentHelper.createParser(dashboardBytes, 0, dashboardBytes.length); - XContentParser.Token token = parser.nextToken(); - if (token == null) { - throw new IOException("no data"); - } - if (token != XContentParser.Token.START_OBJECT) { - throw new IOException("should start with an object"); - } - String dashboardTitle = null; - String currentFieldName = null; - while (dashboardTitle == null && (token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - switch (token) { - case START_ARRAY: - parser.skipChildren(); - break; - case START_OBJECT: - parser.skipChildren(); - break; - case FIELD_NAME: - currentFieldName = parser.currentName(); - break; - case VALUE_STRING: - if ("title".equals(currentFieldName)) { - dashboardTitle = parser.text(); - } - break; - } - } - if (dashboardTitle == null) { - throw new IOException("failed to find dashboard title"); - } - - XContentBuilder builder = JsonXContent.contentBuilder(); - builder.startObject(); - builder.field("title", dashboardTitle); - builder.field("dashboard", new String(dashboardBytes, "UTF-8")); - builder.endObject(); - - return checkAndUpload(kibanaIndex, "dashboard", dashboardTitle, builder.bytes().toBytes()); - } catch (IOException e) { - logger.error("error while checking/uploading dashboard [{}]", path, e); - return false; - } - } - private String urlEncode(String s) { try { return URLEncoder.encode(s, "UTF-8"); @@ -465,6 +389,12 @@ private boolean checkAndUpload(String path, byte[] bytes) throws IOException { return hasDoc; } + /** + * Checks if the index templates already exist and if not uploads it + * Any critical error that should prevent data exporting is communicated via an exception. + * + * @return true if template exists or was uploaded. + */ private boolean checkAndUploadIndexTemplate() { byte[] template; try { @@ -497,6 +427,21 @@ private void logConnectionError(String msg, HttpURLConnection conn) { } } + @Override + public void onRefreshSettings(Settings settings) { + TimeValue newTimeout = settings.getAsTime(SETTINGS_TIMEOUT, null); + if (newTimeout != null) { + logger.info("connection timeout set to [{}]", newTimeout); + timeout = (int) newTimeout.seconds(); + } + + String[] newHosts = settings.getAsArray(SETTINGS_HOSTS, null); + if (newHosts != null) { + logger.info("hosts set to [{}]", Strings.arrayToCommaDelimitedString(newHosts)); + this.hosts = newHosts; + } + } + interface MultiXContentRenderer { int length();