Skip to content

Commit

Permalink
Merge pull request #7065 from BigFunger/add-data-disable-processors
Browse files Browse the repository at this point in the history
[add data] ability to disable processors
  • Loading branch information
BigFunger committed Apr 27, 2016
2 parents 054d4f0 + f0e99c3 commit ff26c18
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import './processor_ui';
import pipelineSetupTemplate from '../views/pipeline_setup.html';

const app = uiModules.get('kibana');
function buildProcessorTypeList() {

function buildProcessorTypeList(enabledProcessorTypeIds) {
return _(ProcessorTypes)
.map(Type => {
const instance = new Type();
Expand All @@ -22,6 +23,8 @@ function buildProcessorTypeList() {
};
})
.compact()
.filter((processorType) => enabledProcessorTypeIds.includes(processorType.typeId))
.sortBy('title')
.value();
}

Expand All @@ -36,9 +39,15 @@ app.directive('pipelineSetup', function () {
controller: function ($scope, debounce, Private, Notifier) {
const ingest = Private(IngestProvider);
const notify = new Notifier({ location: `Ingest Pipeline Setup` });
$scope.processorTypes = _.sortBy(buildProcessorTypeList(), 'title');
$scope.sample = {};

//determines which processors are available on the cluster
ingest.getProcessors()
.then((enabledProcessorTypeIds) => {
$scope.processorTypes = buildProcessorTypeList(enabledProcessorTypeIds);
})
.catch(notify.error);

const pipeline = new Pipeline();
// Loads pre-existing pipeline which will exist if the user returns from
// a later step in the wizard
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import processESIngestProcessorsResponse from '../process_es_ingest_processors_response';
import expect from 'expect.js';
import _ from 'lodash';

describe('processESIngestSimulateResponse', function () {

it('should return a list of strings indicating the enabled processors', function () {
const response = {
nodes: {
node_foo: {
ingest: {
processors: [
{ type: 'proc_foo' },
{ type: 'proc_bar' }
]
}
}
}
};

const expected = [ 'proc_foo', 'proc_bar' ];
const actual = processESIngestProcessorsResponse(response);

expect(_.isEqual(actual, expected)).to.be.ok();
});

it('should return a unique list of processors', function () {
const response = {
nodes: {
node_foo: {
ingest: {
processors: [
{ type: 'proc_foo' },
{ type: 'proc_bar' }
]
}
},
node_bar: {
ingest: {
processors: [
{ type: 'proc_foo' },
{ type: 'proc_bar' }
]
}
}
}
};

const expected = [ 'proc_foo', 'proc_bar' ];
const actual = processESIngestProcessorsResponse(response);

expect(_.isEqual(actual, expected)).to.be.ok();
});

it('should combine the available processors from all nodes', function () {
const response = {
nodes: {
node_foo: {
ingest: {
processors: [
{ type: 'proc_foo' }
]
}
},
node_bar: {
ingest: {
processors: [
{ type: 'proc_bar' }
]
}
}
}
};

const expected = [ 'proc_foo', 'proc_bar' ];
const actual = processESIngestProcessorsResponse(response);

expect(_.isEqual(actual, expected)).to.be.ok();
});

it('should return an empty array for unexpected response', function () {
expect(_.isEqual(processESIngestProcessorsResponse({ nodes: {}}), [])).to.be.ok();
expect(_.isEqual(processESIngestProcessorsResponse({}), [])).to.be.ok();
expect(_.isEqual(processESIngestProcessorsResponse(undefined), [])).to.be.ok();
expect(_.isEqual(processESIngestProcessorsResponse(null), [])).to.be.ok();
expect(_.isEqual(processESIngestProcessorsResponse(''), [])).to.be.ok();
expect(_.isEqual(processESIngestProcessorsResponse(1), [])).to.be.ok();
});

});
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
const _ = require('lodash');

export default function processESIngestProcessorsResponse(response) {
const nodes = _.get(response, 'nodes');

const results = _.chain(nodes)
.map('ingest.processors')
.reduce((result, processors) => {
return result.concat(processors);
})
.map('type')
.unique()
.value();

return results;
};
2 changes: 2 additions & 0 deletions src/plugins/kibana/server/routes/api/ingest/index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { registerPost } from './register_post';
import { registerDelete } from './register_delete';
import { registerProcessors } from './register_processors';
import { registerSimulate } from './register_simulate';

export default function (server) {
registerPost(server);
registerDelete(server);
registerProcessors(server);
registerSimulate(server);
}
24 changes: 24 additions & 0 deletions src/plugins/kibana/server/routes/api/ingest/register_processors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import _ from 'lodash';
import handleESError from '../../../lib/handle_es_error';
import handleResponse from '../../../lib/process_es_ingest_processors_response';
import { keysToCamelCaseShallow, keysToSnakeCaseShallow } from '../../../../common/lib/case_conversion';

export function registerProcessors(server) {
server.route({
path: '/api/kibana/ingest/processors',
method: 'GET',
handler: function (request, reply) {
const boundCallWithRequest = _.partial(server.plugins.elasticsearch.callWithRequest, request);

return boundCallWithRequest('transport.request', {
path: '/_nodes/ingest',
method: 'GET'
})
.then(handleResponse)
.then(reply)
.catch((error) => {
reply(handleESError(error));
});
}
});
};
32 changes: 32 additions & 0 deletions src/ui/public/ingest/__tests__/ingest.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,36 @@ describe('Ingest Service', function () {
expect($rootScope.$broadcast.calledWith('ingest:updated')).to.be.ok();
});
});

describe('getProcessors', () => {

it('Calls the processors GET endpoint of the ingest API', function () {
$httpBackend
.expectGET('../api/kibana/ingest/processors')
.respond('ok');

ingest.getProcessors();
$httpBackend.flush();
});

it('Throws user-friendly error when there is an error in the request', function (done) {
$httpBackend
.when('GET', '../api/kibana/ingest/processors')
.respond(404);

ingest.getProcessors()
.then(
() => {
throw new Error('expected an error response');
},
(error) => {
expect(error.message).to.be('Error fetching enabled processors');
done();
});

$httpBackend.flush();
});

});

});
16 changes: 14 additions & 2 deletions src/ui/public/ingest/ingest.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { keysToCamelCaseShallow, keysToSnakeCaseShallow } from '../../../plugins
import _ from 'lodash';
import angular from 'angular';

export default function IngestProvider($rootScope, $http, config) {
export default function IngestProvider($rootScope, $http, config, $q) {

const ingestAPIPrefix = '../api/kibana/ingest';

Expand Down Expand Up @@ -55,7 +55,19 @@ export default function IngestProvider($rootScope, $http, config) {
return $http.post(`${ingestAPIPrefix}/simulate`, pack(pipeline))
.then(unpack)
.catch(err => {
throw ('Error communicating with Kibana server');
return $q.reject(new Error('Error simulating pipeline'));
});
};

this.getProcessors = function () {
function unpack(response) {
return response.data;
}

return $http.get(`${ingestAPIPrefix}/processors`)
.then(unpack)
.catch(err => {
return $q.reject(new Error('Error fetching enabled processors'));
});
};

Expand Down
19 changes: 19 additions & 0 deletions test/unit/api/ingest/_processors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
define(function (require) {
var Promise = require('bluebird');
var _ = require('intern/dojo/node!lodash');
var expect = require('intern/dojo/node!expect.js');

return function (bdd, scenarioManager, request) {
bdd.describe('processors', () => {

bdd.it('should return 200 for a successful run', function () {
return request.get('/kibana/ingest/processors')
.expect(200)
.then((response) => {
expect(_.isArray(response.body)).to.be(true);
});
});

});
};
});
4 changes: 3 additions & 1 deletion test/unit/api/ingest/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ define(function (require) {
var post = require('./_post');
var del = require('./_del');
var simulate = require('./_simulate');
var processors = require('./processors/index');
var processors = require('./_processors');
var processorTypes = require('./processors/index');

bdd.describe('ingest API', function () {
var scenarioManager = new ScenarioManager(url.format(serverConfig.servers.elasticsearch));
Expand All @@ -27,5 +28,6 @@ define(function (require) {
del(bdd, scenarioManager, request);
simulate(bdd, scenarioManager, request);
processors(bdd, scenarioManager, request);
processorTypes(bdd, scenarioManager, request);
});
});

0 comments on commit ff26c18

Please sign in to comment.