Skip to content

Commit

Permalink
Move the error handling to routes
Browse files Browse the repository at this point in the history
  • Loading branch information
bhapas committed Sep 24, 2024
1 parent 7b0b264 commit 6486b9c
Show file tree
Hide file tree
Showing 31 changed files with 219 additions and 301 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified x-pack/plugins/integration_assistant/docs/imgs/ecs_graph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified x-pack/plugins/integration_assistant/docs/imgs/ecs_subgraph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified x-pack/plugins/integration_assistant/docs/imgs/related_graph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
* 2.0.
*/
import { JsonOutputParser } from '@langchain/core/output_parsers';
import { GraphRecursionError } from '@langchain/langgraph';
import type { Pipeline } from '../../../common';
import type { CategorizationState, SimplifiedProcessor, SimplifiedProcessors } from '../../types';
import { combineProcessors } from '../../util/processors';
import { CATEGORIZATION_EXAMPLE_PROCESSORS } from './constants';
import { CATEGORIZATION_MAIN_PROMPT } from './prompts';
import type { CategorizationNodeParams } from './types';
import { RecursionLimitError } from '../../lib/errors';

export async function handleCategorization({
state,
Expand All @@ -21,22 +19,13 @@ export async function handleCategorization({
const categorizationMainPrompt = CATEGORIZATION_MAIN_PROMPT;
const outputParser = new JsonOutputParser();
const categorizationMainGraph = categorizationMainPrompt.pipe(model).pipe(outputParser);
let currentProcessors: SimplifiedProcessor[] = [];
try {
currentProcessors = (await categorizationMainGraph.invoke({
pipeline_results: JSON.stringify(state.pipelineResults, null, 2),
example_processors: CATEGORIZATION_EXAMPLE_PROCESSORS,
ex_answer: state?.exAnswer,
ecs_categories: state?.ecsCategories,
ecs_types: state?.ecsTypes,
})) as SimplifiedProcessor[];
} catch (e) {
if (e instanceof GraphRecursionError) {
throw new RecursionLimitError(e.message);
} else {
throw e;
}
}
const currentProcessors = (await categorizationMainGraph.invoke({
pipeline_results: JSON.stringify(state.pipelineResults, null, 2),
example_processors: CATEGORIZATION_EXAMPLE_PROCESSORS,
ex_answer: state?.exAnswer,
ecs_categories: state?.ecsCategories,
ecs_types: state?.ecsTypes,
})) as SimplifiedProcessor[];

const processors = {
type: 'categorization',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
*/

import { JsonOutputParser } from '@langchain/core/output_parsers';
import { GraphRecursionError } from '@langchain/langgraph';
import type { Pipeline } from '../../../common';
import type { CategorizationNodeParams } from './types';
import type { SimplifiedProcessors, SimplifiedProcessor, CategorizationState } from '../../types';
import { combineProcessors } from '../../util/processors';
import { CATEGORIZATION_ERROR_PROMPT } from './prompts';
import { RecursionLimitError } from '../../lib/errors';

export async function handleErrors({
state,
Expand All @@ -22,23 +20,15 @@ export async function handleErrors({

const outputParser = new JsonOutputParser();
const categorizationErrorGraph = categorizationErrorPrompt.pipe(model).pipe(outputParser);
let currentProcessors: SimplifiedProcessor[] = [];

try {
currentProcessors = (await categorizationErrorGraph.invoke({
current_processors: JSON.stringify(state.currentProcessors, null, 2),
ex_answer: state.exAnswer,
errors: JSON.stringify(state.errors, null, 2),
package_name: state.packageName,
data_stream_name: state.dataStreamName,
})) as SimplifiedProcessor[];
} catch (e) {
if (e instanceof GraphRecursionError) {
throw new RecursionLimitError(e.message);
} else {
throw e;
}
}
const currentProcessors = (await categorizationErrorGraph.invoke({
current_processors: JSON.stringify(state.currentProcessors, null, 2),
ex_answer: state.exAnswer,
errors: JSON.stringify(state.errors, null, 2),
package_name: state.packageName,
data_stream_name: state.dataStreamName,
})) as SimplifiedProcessor[];

const processors = {
type: 'categorization',
processors: currentProcessors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@
*/

import { JsonOutputParser } from '@langchain/core/output_parsers';
import { GraphRecursionError } from '@langchain/langgraph';
import type { Pipeline } from '../../../common';
import type { CategorizationNodeParams } from './types';
import type { SimplifiedProcessors, SimplifiedProcessor, CategorizationState } from '../../types';
import { combineProcessors } from '../../util/processors';
import { ECS_EVENT_TYPES_PER_CATEGORY } from './constants';
import { CATEGORIZATION_VALIDATION_PROMPT } from './prompts';
import { RecursionLimitError } from '../../lib/errors';

export async function handleInvalidCategorization({
state,
Expand All @@ -23,22 +21,14 @@ export async function handleInvalidCategorization({

const outputParser = new JsonOutputParser();
const categorizationInvalidGraph = categorizationInvalidPrompt.pipe(model).pipe(outputParser);
let currentProcessors: SimplifiedProcessor[] = [];

try {
currentProcessors = (await categorizationInvalidGraph.invoke({
current_processors: JSON.stringify(state.currentProcessors, null, 2),
invalid_categorization: JSON.stringify(state.invalidCategorization, null, 2),
ex_answer: state.exAnswer,
compatible_types: JSON.stringify(ECS_EVENT_TYPES_PER_CATEGORY, null, 2),
})) as SimplifiedProcessor[];
} catch (e) {
if (e instanceof GraphRecursionError) {
throw new RecursionLimitError(e.message);
} else {
throw e;
}
}
const currentProcessors = (await categorizationInvalidGraph.invoke({
current_processors: JSON.stringify(state.currentProcessors, null, 2),
invalid_categorization: JSON.stringify(state.invalidCategorization, null, 2),
ex_answer: state.exAnswer,
compatible_types: JSON.stringify(ECS_EVENT_TYPES_PER_CATEGORY, null, 2),
})) as SimplifiedProcessor[];

const processors = {
type: 'categorization',
processors: currentProcessors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@
*/

import { JsonOutputParser } from '@langchain/core/output_parsers';
import { GraphRecursionError } from '@langchain/langgraph';
import { CATEGORIZATION_REVIEW_PROMPT } from './prompts';
import type { Pipeline } from '../../../common';
import type { CategorizationNodeParams } from './types';
import type { SimplifiedProcessors, SimplifiedProcessor, CategorizationState } from '../../types';
import { combineProcessors } from '../../util/processors';
import { ECS_EVENT_TYPES_PER_CATEGORY } from './constants';
import { RecursionLimitError } from '../../lib/errors';

export async function handleReview({
state,
Expand All @@ -22,25 +20,17 @@ export async function handleReview({
const categorizationReviewPrompt = CATEGORIZATION_REVIEW_PROMPT;
const outputParser = new JsonOutputParser();
const categorizationReview = categorizationReviewPrompt.pipe(model).pipe(outputParser);
let currentProcessors: SimplifiedProcessor[] = [];

try {
currentProcessors = (await categorizationReview.invoke({
current_processors: JSON.stringify(state.currentProcessors, null, 2),
pipeline_results: JSON.stringify(state.pipelineResults, null, 2),
previous_invalid_categorization: state.previousInvalidCategorization,
previous_error: state.previousError,
ex_answer: state?.exAnswer,
package_name: state?.packageName,
compatibility_matrix: JSON.stringify(ECS_EVENT_TYPES_PER_CATEGORY, null, 2),
})) as SimplifiedProcessor[];
} catch (e) {
if (e instanceof GraphRecursionError) {
throw new RecursionLimitError(e.message);
} else {
throw e;
}
}
const currentProcessors = (await categorizationReview.invoke({
current_processors: JSON.stringify(state.currentProcessors, null, 2),
pipeline_results: JSON.stringify(state.pipelineResults, null, 2),
previous_invalid_categorization: state.previousInvalidCategorization,
previous_error: state.previousError,
ex_answer: state?.exAnswer,
package_name: state?.packageName,
compatibility_matrix: JSON.stringify(ECS_EVENT_TYPES_PER_CATEGORY, null, 2),
})) as SimplifiedProcessor[];

const processors = {
type: 'categorization',
processors: currentProcessors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@
*/

import { JsonOutputParser } from '@langchain/core/output_parsers';
import { GraphRecursionError } from '@langchain/langgraph';
import type { EcsMappingState } from '../../types';
import { ECS_DUPLICATES_PROMPT } from './prompts';
import type { EcsNodeParams } from './types';
import { RecursionLimitError } from '../../lib/errors';

export async function handleDuplicates({
state,
Expand All @@ -21,22 +19,12 @@ export async function handleDuplicates({
const usesFinalMapping = state?.useFinalMapping;
const mapping = usesFinalMapping ? state.finalMapping : state.currentMapping;

let result;

try {
result = await ecsDuplicatesGraph.invoke({
ecs: state.ecs,
current_mapping: JSON.stringify(mapping, null, 2),
ex_answer: state.exAnswer,
duplicate_fields: state.duplicateFields,
});
} catch (e) {
if (e instanceof GraphRecursionError) {
throw new RecursionLimitError(e.message);
} else {
throw e;
}
}
const result = await ecsDuplicatesGraph.invoke({
ecs: state.ecs,
current_mapping: JSON.stringify(mapping, null, 2),
ex_answer: state.exAnswer,
duplicate_fields: state.duplicateFields,
});

return {
[usesFinalMapping ? 'finalMapping' : 'currentMapping']: result,
Expand Down
25 changes: 7 additions & 18 deletions x-pack/plugins/integration_assistant/server/graphs/ecs/invalid.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
* 2.0.
*/
import { JsonOutputParser } from '@langchain/core/output_parsers';
import { GraphRecursionError } from '@langchain/langgraph';
import type { EcsMappingState } from '../../types';
import { ECS_INVALID_PROMPT } from './prompts';
import type { EcsNodeParams } from './types';
import { RecursionLimitError } from '../../lib/errors';

export async function handleInvalidEcs({
state,
Expand All @@ -20,22 +18,13 @@ export async function handleInvalidEcs({
const usesFinalMapping = state?.useFinalMapping;
const mapping = usesFinalMapping ? state.finalMapping : state.currentMapping;

let result;
try {
result = await ecsInvalidEcsGraph.invoke({
ecs: state.ecs,
current_mapping: JSON.stringify(mapping, null, 2),
ex_answer: state.exAnswer,
combined_samples: state.combinedSamples,
invalid_ecs_fields: state.invalidEcsFields,
});
} catch (e) {
if (e instanceof GraphRecursionError) {
throw new RecursionLimitError(e.message);
} else {
throw e;
}
}
const result = await ecsInvalidEcsGraph.invoke({
ecs: state.ecs,
current_mapping: JSON.stringify(mapping, null, 2),
ex_answer: state.exAnswer,
combined_samples: state.combinedSamples,
invalid_ecs_fields: state.invalidEcsFields,
});

return {
[usesFinalMapping ? 'finalMapping' : 'currentMapping']: result,
Expand Down
25 changes: 7 additions & 18 deletions x-pack/plugins/integration_assistant/server/graphs/ecs/mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,23 @@
*/

import { JsonOutputParser } from '@langchain/core/output_parsers';
import { GraphRecursionError } from '@langchain/langgraph';
import type { EcsMappingState } from '../../types';
import { ECS_MAIN_PROMPT } from './prompts';
import type { EcsNodeParams } from './types';
import { RecursionLimitError } from '../../lib/errors';

export async function handleEcsMapping({
state,
model,
}: EcsNodeParams): Promise<Partial<EcsMappingState>> {
const outputParser = new JsonOutputParser();
const ecsMainGraph = ECS_MAIN_PROMPT.pipe(model).pipe(outputParser);
let currentMapping;

try {
currentMapping = await ecsMainGraph.invoke({
ecs: state.ecs,
combined_samples: state.combinedSamples,
package_name: state.packageName,
data_stream_name: state.dataStreamName,
ex_answer: state.exAnswer,
});
} catch (e) {
if (e instanceof GraphRecursionError) {
throw new RecursionLimitError(e.message);
} else {
throw e;
}
}
const currentMapping = await ecsMainGraph.invoke({
ecs: state.ecs,
combined_samples: state.combinedSamples,
package_name: state.packageName,
data_stream_name: state.dataStreamName,
ex_answer: state.exAnswer,
});
return { currentMapping, hasTriedOnce: true, lastExecutedChain: 'ecsMapping' };
}
27 changes: 9 additions & 18 deletions x-pack/plugins/integration_assistant/server/graphs/ecs/missing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@
*/

import { JsonOutputParser } from '@langchain/core/output_parsers';
import { GraphRecursionError } from '@langchain/langgraph';
import { EcsMappingState } from '../../types';
import { ECS_MISSING_KEYS_PROMPT } from './prompts';
import { EcsNodeParams } from './types';
import { RecursionLimitError } from '../../lib/errors';

export async function handleMissingKeys({
state,
Expand All @@ -20,22 +18,15 @@ export async function handleMissingKeys({
const ecsMissingGraph = ECS_MISSING_KEYS_PROMPT.pipe(model).pipe(outputParser);
const usesFinalMapping = state?.useFinalMapping;
const mapping = usesFinalMapping ? state.finalMapping : state.currentMapping;
let result;
try {
result = await ecsMissingGraph.invoke({
ecs: state.ecs,
current_mapping: JSON.stringify(mapping, null, 2),
ex_answer: state.exAnswer,
combined_samples: state.combinedSamples,
missing_keys: state?.missingKeys,
});
} catch (e) {
if (e instanceof GraphRecursionError) {
throw new RecursionLimitError(e.message);
} else {
throw e;
}
}

const result = await ecsMissingGraph.invoke({
ecs: state.ecs,
current_mapping: JSON.stringify(mapping, null, 2),
ex_answer: state.exAnswer,
combined_samples: state.combinedSamples,
missing_keys: state?.missingKeys,
});

return {
[usesFinalMapping ? 'finalMapping' : 'currentMapping']: result,
lastExecutedChain: 'missingKeys',
Expand Down
19 changes: 4 additions & 15 deletions x-pack/plugins/integration_assistant/server/graphs/kv/header.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
*/

import { JsonOutputParser } from '@langchain/core/output_parsers';
import { GraphRecursionError } from '@langchain/langgraph';
import type { KVState } from '../../types';
import type { HandleKVNodeParams } from './types';
import { KV_HEADER_PROMPT } from './prompts';
import { KV_HEADER_EXAMPLE_ANSWER } from './constants';
import { RecursionLimitError } from '../../lib/errors';

export async function handleHeader({
state,
Expand All @@ -20,20 +18,11 @@ export async function handleHeader({
}: HandleKVNodeParams): Promise<Partial<KVState>> {
const outputParser = new JsonOutputParser();
const kvHeaderGraph = KV_HEADER_PROMPT.pipe(model).pipe(outputParser);
let pattern;

try {
pattern = await kvHeaderGraph.invoke({
samples: state.logSamples,
ex_answer: JSON.stringify(KV_HEADER_EXAMPLE_ANSWER, null, 2),
});
} catch (e) {
if (e instanceof GraphRecursionError) {
throw new RecursionLimitError(e.message);
} else {
throw e;
}
}
const pattern = await kvHeaderGraph.invoke({
samples: state.logSamples,
ex_answer: JSON.stringify(KV_HEADER_EXAMPLE_ANSWER, null, 2),
});

return {
grokPattern: pattern.grok_pattern,
Expand Down
Loading

0 comments on commit 6486b9c

Please sign in to comment.