Skip to content

Commit

Permalink
Revert "Prevent max_signals from being exceeded on threat match rule …
Browse files Browse the repository at this point in the history
…executions"

This reverts commit ba3b2f7.
  • Loading branch information
madirey committed Jun 14, 2021
1 parent b301793 commit 4752688
Show file tree
Hide file tree
Showing 6 changed files with 4 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@ import {
mergeReturns,
mergeSearchResults,
getSafeSortIds,
getLock,
releaseLock,
} from './utils';
import { SearchAfterAndBulkCreateParams, SearchAfterAndBulkCreateReturnType } from './types';

// search_after through documents and re-index using bulk endpoint.
// eslint-disable-next-line complexity
export const searchAfterAndBulkCreate = async ({
tuple,
ruleSO,
Expand All @@ -40,7 +37,6 @@ export const searchAfterAndBulkCreate = async ({
enrichment = identity,
bulkCreate,
wrapHits,
state,
}: SearchAfterAndBulkCreateParams): Promise<SearchAfterAndBulkCreateReturnType> => {
const ruleParams = ruleSO.attributes.params;
let toReturn = createSearchAfterReturnType();
Expand All @@ -52,22 +48,16 @@ export const searchAfterAndBulkCreate = async ({
// signalsCreatedCount keeps track of how many signals we have created,
// to ensure we don't exceed maxSignals
let signalsCreatedCount = 0;
const signalsAlreadyCreated = () => state?.signalsCreated || 0;
const totalSignalsCreated = (_signalsCreatedCount: number): number => {
return _signalsCreatedCount + signalsAlreadyCreated();
};

if (tuple == null || tuple.to == null || tuple.from == null) {
logger.error(buildRuleMessage(`[-] malformed date tuple`));
if (state != null) {
releaseLock(state);
}
return createSearchAfterReturnType({
success: false,
errors: ['malformed date tuple'],
});
}
while (totalSignalsCreated(signalsCreatedCount) < tuple.maxSignals) {
signalsCreatedCount = 0;
while (signalsCreatedCount < tuple.maxSignals) {
try {
let mergedSearchResults = createSearchResultReturnType();
logger.debug(buildRuleMessage(`sortIds: ${sortIds}`));
Expand Down Expand Up @@ -144,25 +134,11 @@ export const searchAfterAndBulkCreate = async ({
// skip the call to bulk create and proceed to the next search_after,
// if there is a sort id to continue the search_after with.
if (filteredEvents.hits.hits.length !== 0) {
if (state != null) {
const error = await getLock(state);
if (error != null) {
logger.error(buildRuleMessage(error));
return createSearchAfterReturnType({
success: false,
errors: [error],
});
}
}

// make sure we are not going to create more signals than maxSignals allows
if (
totalSignalsCreated(signalsCreatedCount) + filteredEvents.hits.hits.length >
tuple.maxSignals
) {
if (signalsCreatedCount + filteredEvents.hits.hits.length > tuple.maxSignals) {
filteredEvents.hits.hits = filteredEvents.hits.hits.slice(
0,
tuple.maxSignals - totalSignalsCreated(signalsCreatedCount)
tuple.maxSignals - signalsCreatedCount
);
}
const enrichedEvents = await enrichment(filteredEvents);
Expand All @@ -186,12 +162,6 @@ export const searchAfterAndBulkCreate = async ({
}),
]);
signalsCreatedCount += createdCount;
if (state != null) {
// Protected by lock
// eslint-disable-next-line require-atomic-updates
state.signalsCreated += createdCount;
releaseLock(state);
}
logger.debug(buildRuleMessage(`created ${createdCount} signals`));
logger.debug(buildRuleMessage(`signalsCreatedCount: ${signalsCreatedCount}`));
logger.debug(
Expand All @@ -207,9 +177,6 @@ export const searchAfterAndBulkCreate = async ({
}
} catch (exc: unknown) {
logger.error(buildRuleMessage(`[-] search_after and bulk threw an error ${exc}`));
if (state != null) {
releaseLock(state);
}
return mergeReturns([
toReturn,
createSearchAfterReturnType({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ export const createThreatSignal = async ({
currentResult,
bulkCreate,
wrapHits,
signalState,
}: CreateThreatSignalOptions): Promise<SearchAfterAndBulkCreateReturnType> => {
const threatFilter = buildThreatMappingFilter({
threatMapping,
Expand Down Expand Up @@ -87,7 +86,6 @@ export const createThreatSignal = async ({
enrichment: threatEnrichment,
bulkCreate,
wrapHits,
state: signalState,
});
logger.debug(
buildRuleMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,6 @@ export const createThreatSignals = async ({
while (threatList.hits.hits.length !== 0) {
const chunks = chunk(itemsPerSearch, threatList.hits.hits);
logger.debug(buildRuleMessage(`${chunks.length} concurrent indicator searches are starting.`));
const signalState = {
isLocked: false,
signalsCreated: 0,
};
const concurrentSearchesPerformed = chunks.map<Promise<SearchAfterAndBulkCreateReturnType>>(
(slicedChunk) =>
createThreatSignal({
Expand All @@ -131,7 +127,6 @@ export const createThreatSignals = async ({
currentResult: results,
bulkCreate,
wrapHits,
signalState,
})
);
const searchesPerformed = await Promise.all(concurrentSearchesPerformed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import {
BulkCreate,
RuleRangeTuple,
SearchAfterAndBulkCreateReturnType,
SearchAfterAndBulkCreateState,
SignalsEnrichment,
WrapHits,
} from '../types';
Expand Down Expand Up @@ -94,7 +93,6 @@ export interface CreateThreatSignalOptions {
currentResult: SearchAfterAndBulkCreateReturnType;
bulkCreate: BulkCreate;
wrapHits: WrapHits;
signalState: SearchAfterAndBulkCreateState;
}

export interface BuildThreatMappingFilterOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,6 @@ export type WrapHits = (
hits: Array<estypes.SearchHit<unknown>>
) => Array<BaseHit<{ '@timestamp': string }>>;

export interface SearchAfterAndBulkCreateState {
isLocked: boolean;
signalsCreated: number;
}
export interface SearchAfterAndBulkCreateParams {
tuple: {
to: moment.Moment;
Expand All @@ -286,7 +282,6 @@ export interface SearchAfterAndBulkCreateParams {
enrichment?: SignalsEnrichment;
bulkCreate: BulkCreate;
wrapHits: WrapHits;
state?: SearchAfterAndBulkCreateState | undefined;
}

export interface SearchAfterAndBulkCreateReturnType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,23 +914,3 @@ export const buildChunkedOrFilter = (field: string, values: string[], chunkSize:
})
.join(' OR ');
};

const LOCK_INTERVAL_MS = 250;
const LOCK_MAX_TRIES = 100;
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

export const getLock = async (signalState: { isLocked: boolean }): Promise<string | undefined> => {
let tries = 0;
while (signalState.isLocked && tries < LOCK_MAX_TRIES) {
await sleep(LOCK_INTERVAL_MS);
tries++;
}
if (!signalState.isLocked) {
signalState.isLocked = true;
}
return `Error retrieving lock after {tries} tries.`;
};

export const releaseLock = (signalState: { isLocked: boolean }) => {
signalState.isLocked = false;
};

0 comments on commit 4752688

Please sign in to comment.