Skip to content

Commit

Permalink
Clean up P&E SQSS proccess
Browse files Browse the repository at this point in the history
  • Loading branch information
aloftus23 committed Mar 14, 2024
1 parent 8da3f57 commit ab676fb
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 1,189 deletions.
16 changes: 8 additions & 8 deletions backend/env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,13 @@ staging:
REPORTS_BUCKET_NAME: cisa-crossfeed-staging-reports
CLOUDWATCH_BUCKET_NAME: cisa-crossfeed-staging-cloudwatch
STAGE: staging
PE_CLUSTER_NAME: pe-staging-worker
PE_FARGATE_CLUSTER_NAME: pe-staging-worker
PE_FARGATE_TASK_DEFINITION_NAME: pe-staging-worker
SHODAN_QUEUE_URL: ${ssm:/crossfeed/staging/SHODAN_QUEUE_URL}
SHODAN_SERVICE_NAME: pe-staging-shodan
DNSTWIST_QUEUE_URL: ${ssm:/crossfeed/staging/DNSTWIST_QUEUE_URL}
DNSTWIST_SERVICE_NAME: pe-staging-dnstwist
HIBP_QUEUE_URL: ${ssm:/crossfeed/staging/HIBP_QUEUE_URL}
HIBP_SERVICE_NAME: pe-staging-hibp
INTELX_QUEUE_URL: ${ssm:/crossfeed/staging/INTELX_QUEUE_URL}
INTELX_SERVICE_NAME: pe-staging-intelx
CYBERSIXGILL_QUEUE_URL: ${ssm:/crossfeed/staging/CYBERSIXGILL_QUEUE_URL}
CYBERSIXGILL_SERVICE_NAME: pe-staging-cybersixgill
EMAIL_BUCKET_NAME: cisa-crossfeed-staging-html-email

prod:
Expand Down Expand Up @@ -97,9 +93,13 @@ prod:
REPORTS_BUCKET_NAME: cisa-crossfeed-prod-reports
CLOUDWATCH_BUCKET_NAME: cisa-crossfeed-prod-cloudwatch
STAGE: prod
PE_CLUSTER_NAME: pe-prod-worker
PE_FARGATE_CLUSTER_NAME: pe-prod-worker
PE_FARGATE_TASK_DEFINITION_NAME: pe-prod-worker
SHODAN_QUEUE_URL: ${ssm:/crossfeed/prod/SHODAN_QUEUE_URL}
SHODAN_SERVICE_NAME: pe-prod-shodan
DNSTWIST_QUEUE_URL: ${ssm:/crossfeed/prod/DNSTWIST_QUEUE_URL}
HIBP_QUEUE_URL: ${ssm:/crossfeed/prod/HIBP_QUEUE_URL}
INTELX_QUEUE_URL: ${ssm:/crossfeed/prod/INTELX_QUEUE_URL}
CYBERSIXGILL_QUEUE_URL: ${ssm:/crossfeed/prod/CYBERSIXGILL_QUEUE_URL}
EMAIL_BUCKET_NAME: cisa-crossfeed-staging-html-email

dev-vpc:
Expand Down
7 changes: 0 additions & 7 deletions backend/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,6 @@ provider:

resources:
Resources:
WorkerControlQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: ${self:provider.stage}-worker-control-queue
VisibilityTimeout: 300 # Should match or exceed function timeout
MaximumMessageSize: 262144 # 256 KB
MessageRetentionPeriod: 604800 # 7 days
ShodanQueue:
Type: AWS::SQS::Queue
Properties:
Expand Down
11 changes: 2 additions & 9 deletions backend/src/tasks/functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,10 @@ checkUserExpiration:
handler: src/tasks/checkUserExpiration.handler
events:
- schedule: cron(0 0 * * ? *) # Runs every day at midnight

scanExecution:
timeout: 900
handler: src/tasks/scanExecution.handler
timeout: 300 # 5 minutes
environment:
SQS_QUEUE_NAME: ${self:provider.stage}-worker-control-queue
events:
- sqs:
arn:
Fn::GetAtt:
- WorkerControlQueue
- Arn
memorySize: 4096

updateScanTaskStatus:
Expand Down
221 changes: 90 additions & 131 deletions backend/src/tasks/scanExecution.ts
Original file line number Diff line number Diff line change
@@ -1,101 +1,85 @@
import { Handler, SQSRecord } from 'aws-lambda';
import { Handler } from 'aws-lambda';
import * as AWS from 'aws-sdk';
import { integer } from 'aws-sdk/clients/cloudfront';
import { connect } from 'amqplib';

const ecs = new AWS.ECS();
const sqs = new AWS.SQS();
let docker;
if (process.env.IS_LOCAL) {
docker = require('dockerode');
}

const toSnakeCase = (input) => input.replace(/ /g, '-');

async function updateServiceAndQueue(
queueUrl: string,
serviceName: string,
async function startDesiredTasks(
scanType: string,
desiredCount: integer,
message_body: any, // Add this parameter
clusterName: string // Add this parameter
) {
// Place message in scan specific queue
if (process.env.IS_LOCAL) {
// If running locally, use RabbitMQ instead of SQS
console.log('Publishing to rabbitMQ');
await publishToRabbitMQ(queueUrl, message_body);
console.log('Done publishing to rabbitMQ');
} else {
// Place in AWS SQS queue
console.log('Publishing to scan specific queue');
await placeMessageInQueue(queueUrl, message_body);
}

// Check if Fargate is running desired count and start if not
await updateServiceDesiredCount(
clusterName,
serviceName,
desiredCount,
queueUrl
);
console.log('Done');
}

export async function updateServiceDesiredCount(
clusterName: string,
serviceName: string,
desiredCountNum: integer,
queueUrl: string
) {
try {
if (process.env.IS_LOCAL) {
console.log('starting local containers');
await startLocalContainers(desiredCountNum, serviceName, queueUrl);
} else {
const describeServiceParams = {
cluster: clusterName,
services: [serviceName]
};
const serviceDescription = await ecs
.describeServices(describeServiceParams)
.promise();
if (
serviceDescription &&
serviceDescription.services &&
serviceDescription.services.length > 0
) {
const service = serviceDescription.services[0];

// Check if the desired task count is less than # provided
if (service.desiredCount !== desiredCountNum) {
console.log('Setting desired count.');
const updateServiceParams = {
cluster: clusterName,
service: serviceName,
desiredCount: desiredCountNum // Set to desired # of Fargate tasks
};

await ecs.updateService(updateServiceParams).promise();
} else {
console.log('Desired count already set.');
}
// ECS can only start 10 tasks at a time. Split up into batches
const batchSize = 10;
let remainingCount = desiredCount;
while (remainingCount > 0) {
const currentBatchCount = Math.min(remainingCount, batchSize);

if (process.env.IS_LOCAL) {
// If running locally, use RabbitMQ and Docker instead of SQS and ECS
console.log('Starting local containers');
await startLocalContainers(currentBatchCount, scanType, queueUrl);
} else {
await ecs
.runTask({
cluster: process.env.PE_FARGATE_CLUSTER_NAME!,
taskDefinition: process.env.PE_FARGATE_TASK_DEFINITION_NAME!,
networkConfiguration: {
awsvpcConfiguration: {
assignPublicIp: 'ENABLED',
securityGroups: [process.env.FARGATE_SG_ID!],
subnets: [process.env.FARGATE_SUBNET_ID!]
}
},
platformVersion: '1.4.0',
launchType: 'FARGATE',
count: currentBatchCount,
overrides: {
containerOverrides: [
{
name: 'main',
environment: [
{
name: 'SERVICE_TYPE',
value: scanType
},
{
name: 'SERVICE_QUEUE_URL',
value: queueUrl
}
]
}
]
}
})
.promise();
}
console.log('Tasks started:', currentBatchCount);
remainingCount -= currentBatchCount;
}
} catch (error) {
console.error('Error: ', error);
console.error('Error starting tasks:', error);
throw error;
}
}

async function startLocalContainers(
count: number,
serviceName: string,
scanType: string,
queueUrl: string
) {
// Start 'count' number of local Docker containers
for (let i = 0; i < count; i++) {
try {
const containerName = toSnakeCase(
`crossfeed_worker_${serviceName}_${i}_` +
`crossfeed_worker_${scanType}_${i}_` +
Math.floor(Math.random() * 10000000)
);
const container = await docker!.createContainer({
Expand Down Expand Up @@ -138,93 +122,68 @@ async function startLocalContainers(
`LG_API_KEY=${process.env.LG_API_KEY}`,
`LG_WORKSPACE_NAME=${process.env.LG_WORKSPACE_NAME}`,
`SERVICE_QUEUE_URL=${queueUrl}`,
`SERVICE_TYPE=${serviceName}`
`SERVICE_TYPE=${scanType}`
]
} as any);
await container.start();
console.log(`done starting container ${i}`);
console.log(`Done starting container ${i}`);
} catch (e) {
console.error(e);
}
}
}

// Place message in AWS SQS Queue
async function placeMessageInQueue(queueUrl: string, message: any) {
const sendMessageParams = {
QueueUrl: queueUrl,
MessageBody: JSON.stringify(message)
};

await sqs.sendMessage(sendMessageParams).promise();
}

// Function to connect to RabbitMQ and publish a message
async function publishToRabbitMQ(queue: string, message: any) {
const connection = await connect('amqp://rabbitmq');
const channel = await connection.createChannel();

await channel.assertQueue(queue, { durable: true });
await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)));

await channel.close();
await connection.close();
}

export const handler: Handler = async (event) => {
try {
let desiredCount;
const clusterName = process.env.PE_CLUSTER_NAME!;
let desiredCount: integer;
let scanType: string;
if (event.desiredCount) {
desiredCount = event.desiredCount;
} else {
console.log('Desired count not found. Setting to 1.');
desiredCount = 1;
}

// Get the Control SQS record and message body
const sqsRecord: SQSRecord = event.Records[0];
const message_body = JSON.parse(sqsRecord.body);
console.log(message_body);
if (event.scanType) {
scanType = event.scanType;
} else {
console.error('scanType must be provided.');
return 'Failed no scanType';
}

if (message_body.scriptType === 'shodan') {
desiredCount = 5;
await updateServiceAndQueue(
process.env.SHODAN_QUEUE_URL!,
process.env.SHODAN_SERVICE_NAME!,
try {
if (scanType === 'shodan') {
await startDesiredTasks(
scanType,
desiredCount,
message_body,
clusterName
process.env.SHODAN_QUEUE_URL!
);
} else if (message_body.scriptType === 'dnstwist') {
} else if (scanType === 'dnstwist') {
desiredCount = 30;
await updateServiceAndQueue(
process.env.DNSTWIST_QUEUE_URL!,
process.env.DNSTWIST_SERVICE_NAME!,
await startDesiredTasks(
scanType,
desiredCount,
message_body,
clusterName
process.env.DNSTWIST_QUEUE_URL!
);
} else if (message_body.scriptType === 'hibp') {
} else if (scanType === 'hibp') {
desiredCount = 20;
await updateServiceAndQueue(
process.env.HIBP_QUEUE_URL!,
process.env.HIBP_SERVICE_NAME!,
await startDesiredTasks(
scanType,
desiredCount,
message_body,
clusterName
process.env.HIBP_QUEUE_URL!
);
} else if (message_body.scriptType === 'intelx') {
} else if (scanType === 'intelx') {
desiredCount = 10;
await updateServiceAndQueue(
process.env.INTELX_QUEUE_URL!,
process.env.INTELX_SERVICE_NAME!,
await startDesiredTasks(
scanType,
desiredCount,
message_body,
clusterName
process.env.INTELX_QUEUE_URL!
);
} else if (message_body.scriptType === 'cybersixgill') {
} else if (scanType === 'cybersixgill') {
desiredCount = 10;
await updateServiceAndQueue(
process.env.CYBERSIXGILL_QUEUE_URL!,
process.env.CYBERSIXGILL_SERVICE_NAME!,
await startDesiredTasks(
scanType,
desiredCount,
message_body,
clusterName
process.env.CYBERSIXGILL_QUEUE_URL!
);
} else {
console.log(
Expand Down
8 changes: 2 additions & 6 deletions dev.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,11 @@ PE_DB_PASSWORD=password


SHODAN_QUEUE_URL =shodanQueue
SHODAN_SERVICE_NAME=pe-shodan
PE_SHODAN_API_KEYS=
DNSTWIST_QUEUE_URL=dnstwistQueue
DNSTWIST_SERVICE_NAME=pe-dnstwist
HIBP_QUEUE_URL=hibpQueue
HIBP_SERVICE_NAME=pe-hibp
INTELX_QUEUE_URL=intelxQueue
INTELX_SERVICE_NAME=pe-intelx
CYBERSIXGILL_QUEUE_URL=cybersixgillQueue
CYBERSIXGILL_SERVICE_NAME=pe-cybersixgill

PE_CLUSTER_NAME=pe-staging-worker
PE_FARGATE_CLUSTER_NAME=pe-staging-worker
PE_FARGATE_TASK_DEFINITION_NAME=pe-staging-worker
7 changes: 7 additions & 0 deletions infrastructure/database.tf
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ resource "aws_iam_role_policy" "db_accessor_s3_policy" {
"s3:*"
],
"Resource": ["${aws_s3_bucket.reports_bucket.arn}", "${aws_s3_bucket.reports_bucket.arn}/*", "${aws_s3_bucket.pe_db_backups_bucket.arn}", "${aws_s3_bucket.pe_db_backups_bucket.arn}/*"]
},
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": *
}
]
}
Expand Down
Loading

0 comments on commit ab676fb

Please sign in to comment.