From ab676fb11e0c657bdc4b9e089192733795a376ce Mon Sep 17 00:00:00 2001 From: aloftus23 Date: Thu, 14 Mar 2024 11:19:55 -0400 Subject: [PATCH] Clean up P&E SQSS proccess --- backend/env.yml | 16 +- backend/serverless.yml | 7 - backend/src/tasks/functions.yml | 11 +- backend/src/tasks/scanExecution.ts | 221 ++++------ dev.env.example | 8 +- infrastructure/database.tf | 7 + infrastructure/pe.tf | 333 --------------- infrastructure/pe_worker.tf | 658 +---------------------------- infrastructure/prod.tfvars | 5 - infrastructure/stage.tfvars | 4 - infrastructure/vars.tf | 30 -- 11 files changed, 111 insertions(+), 1189 deletions(-) delete mode 100644 infrastructure/pe.tf diff --git a/backend/env.yml b/backend/env.yml index c51ef649..b6d6c7ac 100644 --- a/backend/env.yml +++ b/backend/env.yml @@ -47,17 +47,13 @@ staging: REPORTS_BUCKET_NAME: cisa-crossfeed-staging-reports CLOUDWATCH_BUCKET_NAME: cisa-crossfeed-staging-cloudwatch STAGE: staging - PE_CLUSTER_NAME: pe-staging-worker + PE_FARGATE_CLUSTER_NAME: pe-staging-worker + PE_FARGATE_TASK_DEFINITION_NAME: pe-staging-worker SHODAN_QUEUE_URL: ${ssm:/crossfeed/staging/SHODAN_QUEUE_URL} - SHODAN_SERVICE_NAME: pe-staging-shodan DNSTWIST_QUEUE_URL: ${ssm:/crossfeed/staging/DNSTWIST_QUEUE_URL} - DNSTWIST_SERVICE_NAME: pe-staging-dnstwist HIBP_QUEUE_URL: ${ssm:/crossfeed/staging/HIBP_QUEUE_URL} - HIBP_SERVICE_NAME: pe-staging-hibp INTELX_QUEUE_URL: ${ssm:/crossfeed/staging/INTELX_QUEUE_URL} - INTELX_SERVICE_NAME: pe-staging-intelx CYBERSIXGILL_QUEUE_URL: ${ssm:/crossfeed/staging/CYBERSIXGILL_QUEUE_URL} - CYBERSIXGILL_SERVICE_NAME: pe-staging-cybersixgill EMAIL_BUCKET_NAME: cisa-crossfeed-staging-html-email prod: @@ -97,9 +93,13 @@ prod: REPORTS_BUCKET_NAME: cisa-crossfeed-prod-reports CLOUDWATCH_BUCKET_NAME: cisa-crossfeed-prod-cloudwatch STAGE: prod - PE_CLUSTER_NAME: pe-prod-worker + PE_FARGATE_CLUSTER_NAME: pe-prod-worker + PE_FARGATE_TASK_DEFINITION_NAME: pe-prod-worker SHODAN_QUEUE_URL: ${ssm:/crossfeed/prod/SHODAN_QUEUE_URL} - SHODAN_SERVICE_NAME: pe-prod-shodan + DNSTWIST_QUEUE_URL: ${ssm:/crossfeed/prod/DNSTWIST_QUEUE_URL} + HIBP_QUEUE_URL: ${ssm:/crossfeed/prod/HIBP_QUEUE_URL} + INTELX_QUEUE_URL: ${ssm:/crossfeed/prod/INTELX_QUEUE_URL} + CYBERSIXGILL_QUEUE_URL: ${ssm:/crossfeed/prod/CYBERSIXGILL_QUEUE_URL} EMAIL_BUCKET_NAME: cisa-crossfeed-staging-html-email dev-vpc: diff --git a/backend/serverless.yml b/backend/serverless.yml index b94a238f..caf161d3 100644 --- a/backend/serverless.yml +++ b/backend/serverless.yml @@ -108,13 +108,6 @@ provider: resources: Resources: - WorkerControlQueue: - Type: AWS::SQS::Queue - Properties: - QueueName: ${self:provider.stage}-worker-control-queue - VisibilityTimeout: 300 # Should match or exceed function timeout - MaximumMessageSize: 262144 # 256 KB - MessageRetentionPeriod: 604800 # 7 days ShodanQueue: Type: AWS::SQS::Queue Properties: diff --git a/backend/src/tasks/functions.yml b/backend/src/tasks/functions.yml index c821a00c..6c569e5f 100644 --- a/backend/src/tasks/functions.yml +++ b/backend/src/tasks/functions.yml @@ -35,17 +35,10 @@ checkUserExpiration: handler: src/tasks/checkUserExpiration.handler events: - schedule: cron(0 0 * * ? *) # Runs every day at midnight + scanExecution: + timeout: 900 handler: src/tasks/scanExecution.handler - timeout: 300 # 5 minutes - environment: - SQS_QUEUE_NAME: ${self:provider.stage}-worker-control-queue - events: - - sqs: - arn: - Fn::GetAtt: - - WorkerControlQueue - - Arn memorySize: 4096 updateScanTaskStatus: diff --git a/backend/src/tasks/scanExecution.ts b/backend/src/tasks/scanExecution.ts index 93bbd63b..8da85ad5 100644 --- a/backend/src/tasks/scanExecution.ts +++ b/backend/src/tasks/scanExecution.ts @@ -1,10 +1,8 @@ -import { Handler, SQSRecord } from 'aws-lambda'; +import { Handler } from 'aws-lambda'; import * as AWS from 'aws-sdk'; import { integer } from 'aws-sdk/clients/cloudfront'; -import { connect } from 'amqplib'; const ecs = new AWS.ECS(); -const sqs = new AWS.SQS(); let docker; if (process.env.IS_LOCAL) { docker = require('dockerode'); @@ -12,90 +10,76 @@ if (process.env.IS_LOCAL) { const toSnakeCase = (input) => input.replace(/ /g, '-'); -async function updateServiceAndQueue( - queueUrl: string, - serviceName: string, +async function startDesiredTasks( + scanType: string, desiredCount: integer, - message_body: any, // Add this parameter - clusterName: string // Add this parameter -) { - // Place message in scan specific queue - if (process.env.IS_LOCAL) { - // If running locally, use RabbitMQ instead of SQS - console.log('Publishing to rabbitMQ'); - await publishToRabbitMQ(queueUrl, message_body); - console.log('Done publishing to rabbitMQ'); - } else { - // Place in AWS SQS queue - console.log('Publishing to scan specific queue'); - await placeMessageInQueue(queueUrl, message_body); - } - - // Check if Fargate is running desired count and start if not - await updateServiceDesiredCount( - clusterName, - serviceName, - desiredCount, - queueUrl - ); - console.log('Done'); -} - -export async function updateServiceDesiredCount( - clusterName: string, - serviceName: string, - desiredCountNum: integer, queueUrl: string ) { try { - if (process.env.IS_LOCAL) { - console.log('starting local containers'); - await startLocalContainers(desiredCountNum, serviceName, queueUrl); - } else { - const describeServiceParams = { - cluster: clusterName, - services: [serviceName] - }; - const serviceDescription = await ecs - .describeServices(describeServiceParams) - .promise(); - if ( - serviceDescription && - serviceDescription.services && - serviceDescription.services.length > 0 - ) { - const service = serviceDescription.services[0]; - - // Check if the desired task count is less than # provided - if (service.desiredCount !== desiredCountNum) { - console.log('Setting desired count.'); - const updateServiceParams = { - cluster: clusterName, - service: serviceName, - desiredCount: desiredCountNum // Set to desired # of Fargate tasks - }; - - await ecs.updateService(updateServiceParams).promise(); - } else { - console.log('Desired count already set.'); - } + // ECS can only start 10 tasks at a time. Split up into batches + const batchSize = 10; + let remainingCount = desiredCount; + while (remainingCount > 0) { + const currentBatchCount = Math.min(remainingCount, batchSize); + + if (process.env.IS_LOCAL) { + // If running locally, use RabbitMQ and Docker instead of SQS and ECS + console.log('Starting local containers'); + await startLocalContainers(currentBatchCount, scanType, queueUrl); + } else { + await ecs + .runTask({ + cluster: process.env.PE_FARGATE_CLUSTER_NAME!, + taskDefinition: process.env.PE_FARGATE_TASK_DEFINITION_NAME!, + networkConfiguration: { + awsvpcConfiguration: { + assignPublicIp: 'ENABLED', + securityGroups: [process.env.FARGATE_SG_ID!], + subnets: [process.env.FARGATE_SUBNET_ID!] + } + }, + platformVersion: '1.4.0', + launchType: 'FARGATE', + count: currentBatchCount, + overrides: { + containerOverrides: [ + { + name: 'main', + environment: [ + { + name: 'SERVICE_TYPE', + value: scanType + }, + { + name: 'SERVICE_QUEUE_URL', + value: queueUrl + } + ] + } + ] + } + }) + .promise(); } + console.log('Tasks started:', currentBatchCount); + remainingCount -= currentBatchCount; } } catch (error) { - console.error('Error: ', error); + console.error('Error starting tasks:', error); + throw error; } } async function startLocalContainers( count: number, - serviceName: string, + scanType: string, queueUrl: string ) { // Start 'count' number of local Docker containers for (let i = 0; i < count; i++) { try { const containerName = toSnakeCase( - `crossfeed_worker_${serviceName}_${i}_` + + `crossfeed_worker_${scanType}_${i}_` + Math.floor(Math.random() * 10000000) ); const container = await docker!.createContainer({ @@ -138,93 +122,68 @@ async function startLocalContainers( `LG_API_KEY=${process.env.LG_API_KEY}`, `LG_WORKSPACE_NAME=${process.env.LG_WORKSPACE_NAME}`, `SERVICE_QUEUE_URL=${queueUrl}`, - `SERVICE_TYPE=${serviceName}` + `SERVICE_TYPE=${scanType}` ] } as any); await container.start(); - console.log(`done starting container ${i}`); + console.log(`Done starting container ${i}`); } catch (e) { console.error(e); } } } -// Place message in AWS SQS Queue -async function placeMessageInQueue(queueUrl: string, message: any) { - const sendMessageParams = { - QueueUrl: queueUrl, - MessageBody: JSON.stringify(message) - }; - - await sqs.sendMessage(sendMessageParams).promise(); -} - -// Function to connect to RabbitMQ and publish a message -async function publishToRabbitMQ(queue: string, message: any) { - const connection = await connect('amqp://rabbitmq'); - const channel = await connection.createChannel(); - - await channel.assertQueue(queue, { durable: true }); - await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message))); - - await channel.close(); - await connection.close(); -} - export const handler: Handler = async (event) => { - try { - let desiredCount; - const clusterName = process.env.PE_CLUSTER_NAME!; + let desiredCount: integer; + let scanType: string; + if (event.desiredCount) { + desiredCount = event.desiredCount; + } else { + console.log('Desired count not found. Setting to 1.'); + desiredCount = 1; + } - // Get the Control SQS record and message body - const sqsRecord: SQSRecord = event.Records[0]; - const message_body = JSON.parse(sqsRecord.body); - console.log(message_body); + if (event.scanType) { + scanType = event.scanType; + } else { + console.error('scanType must be provided.'); + return 'Failed no scanType'; + } - if (message_body.scriptType === 'shodan') { - desiredCount = 5; - await updateServiceAndQueue( - process.env.SHODAN_QUEUE_URL!, - process.env.SHODAN_SERVICE_NAME!, + try { + if (scanType === 'shodan') { + await startDesiredTasks( + scanType, desiredCount, - message_body, - clusterName + process.env.SHODAN_QUEUE_URL! ); - } else if (message_body.scriptType === 'dnstwist') { + } else if (scanType === 'dnstwist') { desiredCount = 30; - await updateServiceAndQueue( - process.env.DNSTWIST_QUEUE_URL!, - process.env.DNSTWIST_SERVICE_NAME!, + await startDesiredTasks( + scanType, desiredCount, - message_body, - clusterName + process.env.DNSTWIST_QUEUE_URL! ); - } else if (message_body.scriptType === 'hibp') { + } else if (scanType === 'hibp') { desiredCount = 20; - await updateServiceAndQueue( - process.env.HIBP_QUEUE_URL!, - process.env.HIBP_SERVICE_NAME!, + await startDesiredTasks( + scanType, desiredCount, - message_body, - clusterName + process.env.HIBP_QUEUE_URL! ); - } else if (message_body.scriptType === 'intelx') { + } else if (scanType === 'intelx') { desiredCount = 10; - await updateServiceAndQueue( - process.env.INTELX_QUEUE_URL!, - process.env.INTELX_SERVICE_NAME!, + await startDesiredTasks( + scanType, desiredCount, - message_body, - clusterName + process.env.INTELX_QUEUE_URL! ); - } else if (message_body.scriptType === 'cybersixgill') { + } else if (scanType === 'cybersixgill') { desiredCount = 10; - await updateServiceAndQueue( - process.env.CYBERSIXGILL_QUEUE_URL!, - process.env.CYBERSIXGILL_SERVICE_NAME!, + await startDesiredTasks( + scanType, desiredCount, - message_body, - clusterName + process.env.CYBERSIXGILL_QUEUE_URL! ); } else { console.log( diff --git a/dev.env.example b/dev.env.example index be23dba0..9116f102 100644 --- a/dev.env.example +++ b/dev.env.example @@ -92,15 +92,11 @@ PE_DB_PASSWORD=password SHODAN_QUEUE_URL =shodanQueue -SHODAN_SERVICE_NAME=pe-shodan PE_SHODAN_API_KEYS= DNSTWIST_QUEUE_URL=dnstwistQueue -DNSTWIST_SERVICE_NAME=pe-dnstwist HIBP_QUEUE_URL=hibpQueue -HIBP_SERVICE_NAME=pe-hibp INTELX_QUEUE_URL=intelxQueue -INTELX_SERVICE_NAME=pe-intelx CYBERSIXGILL_QUEUE_URL=cybersixgillQueue -CYBERSIXGILL_SERVICE_NAME=pe-cybersixgill -PE_CLUSTER_NAME=pe-staging-worker +PE_FARGATE_CLUSTER_NAME=pe-staging-worker +PE_FARGATE_TASK_DEFINITION_NAME=pe-staging-worker diff --git a/infrastructure/database.tf b/infrastructure/database.tf index ebda69f9..f1b0dc01 100644 --- a/infrastructure/database.tf +++ b/infrastructure/database.tf @@ -160,6 +160,13 @@ resource "aws_iam_role_policy" "db_accessor_s3_policy" { "s3:*" ], "Resource": ["${aws_s3_bucket.reports_bucket.arn}", "${aws_s3_bucket.reports_bucket.arn}/*", "${aws_s3_bucket.pe_db_backups_bucket.arn}", "${aws_s3_bucket.pe_db_backups_bucket.arn}/*"] + }, + { + "Effect": "Allow", + "Action": [ + "lambda:InvokeFunction" + ], + "Resource": * } ] } diff --git a/infrastructure/pe.tf b/infrastructure/pe.tf deleted file mode 100644 index 14150e91..00000000 --- a/infrastructure/pe.tf +++ /dev/null @@ -1,333 +0,0 @@ - -resource "aws_cloudwatch_event_rule" "scheduled_pe_task" { - - name = "scheduled-pe-event-rule" - schedule_expression = "cron(0 0 1,16 * ? *)" -} - -resource "aws_cloudwatch_event_rule" "scheduled_pe_cybersixgill_task" { - - name = "scheduled-pe-event-cybersixgill-rule" - schedule_expression = "cron(0 0 1,16 * ? *)" -} - -resource "aws_iam_role" "cloudwatch_scheduled_task_execution" { - name = "crossfeed-pe-cloudwatch-role-${var.stage}" - assume_role_policy = <