Skip to content

Commit

Permalink
fix: use a singleton http agent for queue events
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Nov 1, 2023
1 parent 23650d2 commit 3325fd4
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 87 deletions.
8 changes: 5 additions & 3 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dotenv from 'dotenv'

type StorageBackendType = 'file' | 's3'
export type StorageBackendType = 'file' | 's3'

type StorageConfigType = {
version: string
Expand All @@ -12,7 +12,7 @@ type StorageConfigType = {
encryptionKey: string
fileSizeLimit: number
fileStoragePath?: string
globalS3Protocol?: 'http' | 'https' | string
globalS3Protocol: 'http' | 'https'
globalS3MaxSockets?: number
globalS3Bucket: string
globalS3Endpoint?: string
Expand Down Expand Up @@ -111,7 +111,9 @@ export function getConfig(): StorageConfigType {
fileSizeLimit: Number(getConfigFromEnv('FILE_SIZE_LIMIT')),
fileStoragePath: getOptionalConfigFromEnv('FILE_STORAGE_BACKEND_PATH'),
globalS3MaxSockets: parseInt(getOptionalConfigFromEnv('GLOBAL_S3_MAX_SOCKETS') || '200', 10),
globalS3Protocol: getOptionalConfigFromEnv('GLOBAL_S3_PROTOCOL') || 'https',
globalS3Protocol: (getOptionalConfigFromEnv('GLOBAL_S3_PROTOCOL') || 'https') as
| 'http'
| 'https',
globalS3Bucket: getConfigFromEnv('GLOBAL_S3_BUCKET'),
globalS3Endpoint: getOptionalConfigFromEnv('GLOBAL_S3_ENDPOINT'),
globalS3ForcePathStyle: getOptionalConfigFromEnv('GLOBAL_S3_FORCE_PATH_STYLE') === 'true',
Expand Down
6 changes: 2 additions & 4 deletions src/database/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,24 +85,22 @@ export class TenantConnection {
acquireConnectionTimeout: databaseConnectionTimeout,
})

DbActivePool.inc({ tenant_id: options.tenantId, is_external: isExternalPool.toString() })
DbActivePool.inc({ is_external: isExternalPool.toString() })

knexPool.client.pool.on('createSuccess', () => {
DbActiveConnection.inc({
tenant_id: options.tenantId,
is_external: isExternalPool.toString(),
})
})

knexPool.client.pool.on('destroySuccess', () => {
DbActiveConnection.dec({
tenant_id: options.tenantId,
is_external: isExternalPool.toString(),
})
})

knexPool.client.pool.on('poolDestroySuccess', () => {
DbActivePool.dec({ tenant_id: options.tenantId, is_external: isExternalPool.toString() })
DbActivePool.dec({ is_external: isExternalPool.toString() })
})

if (!isExternalPool) {
Expand Down
26 changes: 3 additions & 23 deletions src/http/plugins/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import fastifyPlugin from 'fastify-plugin'

Check failure on line 1 in src/http/plugins/metrics.ts

View workflow job for this annotation

GitHub Actions / Test / OS ubuntu-20.04 / Node 18

Property 'metricsRoutes' does not exist on type 'StorageConfigType'.
import { MetricsRegistrar, RequestErrors } from '../../monitoring/metrics'
import { MetricsRegistrar } from '../../monitoring/metrics'
import fastifyMetrics from 'fastify-metrics'
import { getConfig } from '../../config'

const { region, enableDefaultMetrics } = getConfig()
const { region, enableDefaultMetrics, metricsRoutes } = getConfig()

interface MetricsOptions {
enabledEndpoint?: boolean
Expand All @@ -22,7 +22,7 @@ export const metrics = ({ enabledEndpoint }: MetricsOptions) =>
},
},
routeMetrics: {
enabled: enableDefaultMetrics,
enabled: metricsRoutes,
overrides: {
summary: {
name: 'storage_api_http_request_summary_seconds',
Expand All @@ -33,26 +33,6 @@ export const metrics = ({ enabledEndpoint }: MetricsOptions) =>
},
registeredRoutesOnly: true,
groupStatusCodes: true,
customLabels: {
tenant_id: (req) => {
return req.tenantId
},
},
},
})

// Errors
fastify.addHook('onResponse', async (request, reply) => {
const error = (reply.raw as any).executionError || reply.executionError

if (error) {
RequestErrors.inc({
name: error.name || error.constructor.name,
tenant_id: request.tenantId,
path: request.routerPath,
method: request.routerMethod,
status: reply.statusCode,
})
}
})
})
6 changes: 5 additions & 1 deletion src/http/plugins/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import fastifyPlugin from 'fastify-plugin'
import { StorageBackendAdapter, createStorageBackend } from '../../storage/backend'
import { Storage } from '../../storage'
import { StorageKnexDB } from '../../storage/database'
import { getConfig } from '../../config'

declare module 'fastify' {
interface FastifyRequest {
Expand All @@ -10,15 +11,18 @@ declare module 'fastify' {
}
}

const { storageBackendType, metricsDbPerformance } = getConfig()

export const storage = fastifyPlugin(async (fastify) => {
const storageBackend = createStorageBackend()
const storageBackend = createStorageBackend(storageBackendType)

fastify.decorateRequest('storage', undefined)
fastify.addHook('preHandler', async (request) => {
const database = new StorageKnexDB(request.db, {
tenantId: request.tenantId,
host: request.headers['x-forwarded-host'] as string,
reqId: request.id,
enableMetrics: metricsDbPerformance,
})
request.backend = storageBackend
request.storage = new Storage(storageBackend, database)
Expand Down
5 changes: 1 addition & 4 deletions src/http/routes/tus/s3-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,8 @@ export class S3Store extends BaseS3Store {
const timer = S3UploadPart.startTimer()

const result = await super.uploadPart(metadata, readStream, partNumber)
const resource = UploadId.fromString(metadata.file.id)

timer({
tenant_id: resource.tenant,
})
timer()

return result
}
Expand Down
28 changes: 11 additions & 17 deletions src/monitoring/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,71 +6,65 @@ export const MetricsRegistrar = new Registry()
export const FileUploadStarted = new client.Gauge({
name: 'storage_api_upload_started',
help: 'Upload started',
labelNames: ['tenant_id', 'region', 'is_multipart'],
labelNames: ['region', 'is_multipart'],
})

export const FileUploadedSuccess = new client.Gauge({
name: 'storage_api_upload_success',
help: 'Successful uploads',
labelNames: ['tenant_id', 'region', 'is_multipart'],
labelNames: ['region', 'is_multipart'],
})

export const DbQueryPerformance = new client.Histogram({
name: 'storage_api_database_query_performance',
help: 'Database query performance',
labelNames: ['tenant_id', 'region', 'name'],
})

export const RequestErrors = new client.Gauge({
name: 'storage_api_request_errors',
labelNames: ['tenant_id', 'region', 'method', 'path', 'status', 'name'],
help: 'Response Errors',
labelNames: ['region', 'name'],
})

export const QueueJobSchedulingTime = new client.Histogram({
name: 'storage_api_queue_job_scheduled_time',
help: 'Time taken to schedule a job in the queue',
labelNames: ['region', 'name', 'tenant_id'],
labelNames: ['region', 'name'],
})

export const QueueJobScheduled = new client.Gauge({
name: 'storage_api_queue_job_scheduled',
help: 'Current number of pending messages in the queue',
labelNames: ['region', 'name', 'tenant_id'],
labelNames: ['region', 'name'],
})

export const QueueJobCompleted = new client.Gauge({
name: 'storage_api_queue_job_completed',
help: 'Current number of processed messages in the queue',
labelNames: ['tenant_id', 'region', 'name'],
labelNames: ['region', 'name'],
})

export const QueueJobRetryFailed = new client.Gauge({
name: 'storage_api_queue_job_retry_failed',
help: 'Current number of failed attempts messages in the queue',
labelNames: ['tenant_id', 'region', 'name'],
labelNames: ['region', 'name'],
})

export const QueueJobError = new client.Gauge({
name: 'storage_api_queue_job_error',
help: 'Current number of errored messages in the queue',
labelNames: ['tenant_id', 'region', 'name'],
labelNames: ['region', 'name'],
})

export const S3UploadPart = new client.Histogram({
name: 'storage_api_s3_upload_part',
help: 'S3 upload part performance',
labelNames: ['tenant_id', 'region'],
labelNames: ['region'],
})

export const DbActivePool = new client.Gauge({
name: 'storage_api_db_pool',
help: 'Number of database pools created',
labelNames: ['tenant_id', 'region', 'is_external'],
labelNames: ['region', 'is_external'],
})

export const DbActiveConnection = new client.Gauge({
name: 'storage_api_db_connections',
help: 'Number of database connections',
labelNames: ['tenant_id', 'region', 'is_external'],
labelNames: ['region', 'is_external'],
})
11 changes: 6 additions & 5 deletions src/queue/events/base-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { getServiceKeyUser } from '../../database/tenant'
import { getPostgresConnection } from '../../database'
import { Storage } from '../../storage'
import { StorageKnexDB } from '../../storage/database'
import { createStorageBackend } from '../../storage/backend'
import { createAgent, createStorageBackend } from '../../storage/backend'
import { getConfig } from '../../config'
import { QueueJobScheduled, QueueJobSchedulingTime } from '../../monitoring/metrics'
import { logger } from '../../monitoring'
Expand All @@ -20,7 +20,8 @@ export interface BasePayload {

export type StaticThis<T> = { new (...args: any): T }

const { enableQueueEvents } = getConfig()
const { enableQueueEvents, storageBackendType, globalS3Protocol } = getConfig()
const httpAgent = createAgent(globalS3Protocol)

export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
public static readonly version: string = 'v1'
Expand Down Expand Up @@ -113,7 +114,9 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {
host: payload.tenant.host,
})

const storageBackend = createStorageBackend()
const storageBackend = createStorageBackend(storageBackendType, {
httpAgent,
})

return new Storage(storageBackend, db)
}
Expand Down Expand Up @@ -145,12 +148,10 @@ export abstract class BaseEvent<T extends Omit<BasePayload, '$version'>> {

timer({
name: constructor.getQueueName(),
tenant_id: this.payload.tenant.ref,
})

QueueJobScheduled.inc({
name: constructor.getQueueName(),
tenant_id: this.payload.tenant.ref,
})

return res
Expand Down
3 changes: 0 additions & 3 deletions src/queue/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,12 @@ export abstract class Queue {
const res = await event.handle(job)

QueueJobCompleted.inc({
tenant_id: job.data.tenant.ref,
name: event.getQueueName(),
})

return res
} catch (e) {
QueueJobRetryFailed.inc({
tenant_id: job.data.tenant.ref,
name: event.getQueueName(),
})

Expand All @@ -106,7 +104,6 @@ export abstract class Queue {
}
if (dbJob.retrycount === dbJob.retrylimit) {
QueueJobError.inc({
tenant_id: job.data.tenant.ref,
name: event.getQueueName(),
})
}
Expand Down
25 changes: 19 additions & 6 deletions src/storage/backend/index.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
import { StorageBackendAdapter } from './generic'
import { FileBackend } from './file'
import { S3Backend } from './s3'
import { getConfig } from '../../config'
import { S3Backend, S3ClientOptions } from './s3'
import { getConfig, StorageBackendType } from '../../config'

export * from './s3'
export * from './file'
export * from './generic'

const { region, globalS3Endpoint, globalS3ForcePathStyle, storageBackendType } = getConfig()
const { region, globalS3Endpoint, globalS3ForcePathStyle } = getConfig()

export function createStorageBackend() {
type ConfigForStorage<Type extends StorageBackendType> = Type extends 's3'
? S3ClientOptions
: undefined

export function createStorageBackend<Type extends StorageBackendType>(
type: Type,
config?: ConfigForStorage<Type>
) {
let storageBackend: StorageBackendAdapter

if (storageBackendType === 'file') {
if (type === 'file') {
storageBackend = new FileBackend()
} else {
storageBackend = new S3Backend(region, globalS3Endpoint, globalS3ForcePathStyle)
const defaultOptions: S3ClientOptions = {
region: region,
endpoint: globalS3Endpoint,
forcePathStyle: globalS3ForcePathStyle,
...(config ? config : {}),
}
storageBackend = new S3Backend(defaultOptions)
}

return storageBackend
Expand Down
Loading

0 comments on commit 3325fd4

Please sign in to comment.