Skip to content

Commit

Permalink
Add http/1 response streaming to Canvas batches (#32027)
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisdavies authored Mar 1, 2019
1 parent bda704a commit b8a784e
Show file tree
Hide file tree
Showing 9 changed files with 570 additions and 63 deletions.
38 changes: 19 additions & 19 deletions packages/kbn-interpreter/src/public/batched_fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { FUNCTIONS_URL } from './consts';
* Create a function which executes an Expression function on the
* server as part of a larger batch of executions.
*/
export function batchedFetch({ kfetch, serialize, ms = 10 }) {
export function batchedFetch({ ajaxStream, serialize, ms = 10 }) {
// Uniquely identifies each function call in a batch operation
// so that the appropriate promise can be resolved / rejected later.
let id = 0;
Expand All @@ -42,7 +42,7 @@ export function batchedFetch({ kfetch, serialize, ms = 10 }) {
};

const runBatch = () => {
processBatch(kfetch, batch);
processBatch(ajaxStream, batch);
reset();
};

Expand Down Expand Up @@ -70,37 +70,37 @@ export function batchedFetch({ kfetch, serialize, ms = 10 }) {
function createFuture() {
let resolve;
let reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});

return {
resolve(val) { return resolve(val); },
reject(val) { return reject(val); },
promise: new Promise((res, rej) => {
resolve = res;
reject = rej;
}),
resolve,
reject,
promise,
};
}

/**
* Runs the specified batch of functions on the server, then resolves
* the related promises.
*/
async function processBatch(kfetch, batch) {
async function processBatch(ajaxStream, batch) {
try {
const { results } = await kfetch({
pathname: FUNCTIONS_URL,
method: 'POST',
await ajaxStream({
url: FUNCTIONS_URL,
body: JSON.stringify({
functions: Object.values(batch).map(({ request }) => request),
}),
});
onResponse({ id, statusCode, result }) {
const { future } = batch[id];

results.forEach(({ id, result }) => {
const { future } = batch[id];
if (result.statusCode && result.err) {
future.reject(result);
} else {
future.resolve(result);
if (statusCode >= 400) {
future.reject(result);
} else {
future.resolve(result);
}
}
});
} catch (err) {
Expand Down
72 changes: 72 additions & 0 deletions packages/kbn-interpreter/src/public/batched_fetch.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { batchedFetch } from './batched_fetch';

const serialize = (o) => JSON.stringify(o);

describe('batchedFetch', () => {
it('resolves the correct promise', async () => {
const ajaxStream = jest.fn(async ({ body, onResponse }) => {
const { functions } = JSON.parse(body);
functions.map(({ id, functionName, context, args }) => onResponse({
id,
statusCode: 200,
result: `${functionName}${context}${args}`,
}));
});

const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 });

const result = await Promise.all([
ajax({ functionName: 'a', context: 1, args: 'aaa' }),
ajax({ functionName: 'b', context: 2, args: 'bbb' }),
]);

expect(result).toEqual([
'a1aaa',
'b2bbb',
]);
});

it('rejects responses whose statusCode is >= 300', async () => {
const ajaxStream = jest.fn(async ({ body, onResponse }) => {
const { functions } = JSON.parse(body);
functions.map(({ id, functionName, context, args }) => onResponse({
id,
statusCode: context,
result: context >= 400 ? { err: {} } : `${functionName}${context}${args}`,
}));
});

const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 });

const result = await Promise.all([
ajax({ functionName: 'a', context: 500, args: 'aaa' }).catch(() => 'fail'),
ajax({ functionName: 'b', context: 400, args: 'bbb' }).catch(() => 'fail'),
ajax({ functionName: 'c', context: 200, args: 'ccc' }),
]);

expect(result).toEqual([
'fail',
'fail',
'c200ccc'
]);
});
});
4 changes: 2 additions & 2 deletions packages/kbn-interpreter/src/public/interpreter.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import { createHandlers } from './create_handlers';
import { batchedFetch } from './batched_fetch';
import { FUNCTIONS_URL } from './consts';

export async function initializeInterpreter(kfetch, typesRegistry, functionsRegistry) {
export async function initializeInterpreter({ kfetch, ajaxStream, typesRegistry, functionsRegistry }) {
const serverFunctionList = await kfetch({ pathname: FUNCTIONS_URL });
const types = typesRegistry.toJS();
const { serialize } = serializeProvider(types);
const batch = batchedFetch({ kfetch, serialize });
const batch = batchedFetch({ ajaxStream, serialize });

// For every sever-side function, register a client-side
// function that matches its definition, but which simply
Expand Down
37 changes: 20 additions & 17 deletions packages/kbn-interpreter/src/public/interpreter.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,35 +35,38 @@ jest.mock('./create_handlers', () => ({
describe('kbn-interpreter/interpreter', () => {
it('loads server-side functions', async () => {
const kfetch = jest.fn(async () => ({}));
const ajaxStream = jest.fn(async () => ({}));

await initializeInterpreter(kfetch, { toJS: () => ({}) }, ({ register: () => {} }));
await initializeInterpreter({
kfetch,
ajaxStream,
typesRegistry: { toJS: () => ({}) },
functionsRegistry: ({ register: () => {} }),
});

expect(kfetch).toHaveBeenCalledTimes(1);
expect(kfetch).toHaveBeenCalledWith({ pathname: FUNCTIONS_URL });
});

it('registers client-side functions that pass through to the server', async () => {
const kfetch = jest.fn(async ({ method }) => {
if (method === 'POST') {
return {
results: [{
id: 1,
result: {
hello: 'world',
},
}],
};
}

const kfetch = jest.fn(async () => {
return {
hello: { name: 'hello' },
world: { name: 'world' },
};
});

const register = jest.fn();
const ajaxStream = jest.fn(async ({ onResponse }) => {
onResponse({ id: 1, result: { hello: 'world' } });
});

await initializeInterpreter(kfetch, { toJS: () => ({}) }, ({ register }));
await initializeInterpreter({
kfetch,
ajaxStream,
typesRegistry: { toJS: () => ({}) },
functionsRegistry: ({ register }),
});

expect(register).toHaveBeenCalledTimes(2);

Expand All @@ -81,9 +84,9 @@ describe('kbn-interpreter/interpreter', () => {

expect(result).toEqual({ hello: 'world' });

expect(kfetch).toHaveBeenCalledWith({
pathname: FUNCTIONS_URL,
method: 'POST',
expect(ajaxStream).toHaveBeenCalledWith({
url: FUNCTIONS_URL,
onResponse: expect.any(Function),
body: JSON.stringify({
functions: [{
id: 1,
Expand Down
8 changes: 7 additions & 1 deletion src/legacy/core_plugins/interpreter/public/interpreter.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import { register } from '@kbn/interpreter/common';
import { initializeInterpreter, registries } from '@kbn/interpreter/public';
import { kfetch } from 'ui/kfetch';
import { ajaxStream } from 'ui/ajax_stream';
import { functions } from './functions';
import { visualization } from './renderers/visualization';

Expand All @@ -32,7 +33,12 @@ let _resolve;
let _interpreterPromise;

const initialize = async () => {
initializeInterpreter(kfetch, registries.types, registries.browserFunctions).then(interpreter => {
initializeInterpreter({
kfetch,
ajaxStream,
typesRegistry: registries.types,
functionsRegistry: registries.browserFunctions,
}).then(interpreter => {
_resolve({ interpreter });
});
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,37 +65,64 @@ function runServerFunctions(server) {
const handlers = await createHandlers(req, server);
const { functions } = req.payload;

// Process each function individually, and bundle up respones / errors into
// the format expected by the front-end batcher.
const results = await Promise.all(functions.map(async ({ id, ...fnCall }) => {
const result = await runFunction(server, handlers, fnCall)
.catch(err => {
if (Boom.isBoom(err)) {
return { err, statusCode: err.statusCode, message: err.output.payload };
}
return { err: 'Internal Server Error', statusCode: 500, message: 'See server logs for details.' };
});

if (result == null) {
const { functionName } = fnCall;
return {
id,
result: {
err: `No result from '${functionName}'`,
statusCode: 500,
message: `Function '${functionName}' did not return anything`
}
};
// Grab the raw Node response object.
const res = req.raw.res;

// Tell Hapi not to manage the response https://github.com/hapijs/hapi/issues/3884
req._isReplied = true;

// Send the initial headers.
res.writeHead(200, {
'Content-Type': 'text/plain',
'Connection': 'keep-alive',
'Transfer-Encoding': 'chunked',
'Cache-Control': 'no-cache',
});

// Write a length-delimited response
const streamResult = (result) => {
const payload = JSON.stringify(result) + '\n';
res.write(`${payload.length}:${payload}`);
};

// Tries to run an interpreter function, and ensures a consistent error payload on failure.
const tryFunction = async (id, fnCall) => {
try {
const result = await runFunction(server, handlers, fnCall);

if (result != null) {
return { id, statusCode: 200, result };
}

return batchError(id, `Function ${fnCall.functionName} did not return anything.`);
} catch (err) {
if (Boom.isBoom(err)) {
return batchError(id, err.output.payload, err.statusCode);
}
return batchError(id, 'See server logs for details.');
}
};

return { id, result };
}));
// Process each function individually, and stream the responses back to the client
await Promise.all(functions.map(({ id, ...fnCall }) => tryFunction(id, fnCall).then(streamResult)));

return { results };
// All of the responses have been written, so we can close the response.
res.end();
},
});
}

/**
* A helper function for bundling up errors.
*/
function batchError(id, message, statusCode = 500) {
return {
id,
statusCode,
result: { statusCode, message },
};
}

/**
* Register the endpoint that returns the list of server-only functions.
* @param {*} server - The Kibana server
Expand Down
Loading

0 comments on commit b8a784e

Please sign in to comment.