diff --git a/src/http/plugins/log-request.ts b/src/http/plugins/log-request.ts index ab06a168..15117e89 100644 --- a/src/http/plugins/log-request.ts +++ b/src/http/plugins/log-request.ts @@ -1,6 +1,8 @@ import fastifyPlugin from 'fastify-plugin' import { logSchema, redactQueryParamFromRequest } from '@internal/monitoring' import { trace } from '@opentelemetry/api' +import { FastifyRequest } from 'fastify/types/request' +import { FastifyReply } from 'fastify/types/reply' interface RequestLoggerOptions { excludeUrls?: string[] @@ -20,13 +22,32 @@ declare module 'fastify' { } } +/** + * Request logger plugin + * @param options + */ export const logRequest = (options: RequestLoggerOptions) => fastifyPlugin( async (fastify) => { - fastify.addHook('onRequest', async (req) => { + fastify.addHook('onRequest', async (req, res) => { req.startTime = Date.now() + + // Request was aborted before the server finishes to return a response + res.raw.once('close', () => { + const aborted = !res.raw.writableFinished + if (aborted) { + doRequestLog(req, { + excludeUrls: options.excludeUrls, + statusCode: 'ABORTED RES', + responseTime: (Date.now() - req.startTime) / 1000, + }) + } + }) }) + /** + * Adds req.resources and req.operation to the request object + */ fastify.addHook('preHandler', async (req) => { const resourceFromParams = Object.values(req.params || {}).join('/') const resources = getFirstDefined( @@ -53,75 +74,67 @@ export const logRequest = (options: RequestLoggerOptions) => }) fastify.addHook('onRequestAbort', async (req) => { - if (options.excludeUrls?.includes(req.url)) { - return - } - - const rMeth = req.method - const rUrl = redactQueryParamFromRequest(req, [ - 'token', - 'X-Amz-Credential', - 'X-Amz-Signature', - 'X-Amz-Security-Token', - ]) - const uAgent = req.headers['user-agent'] - const rId = req.id - const cIP = req.ip - const error = (req.raw as any).executionError || req.executionError - const tenantId = req.tenantId - - const buildLogMessage = `${tenantId} | ${rMeth} | ABORTED | ${cIP} | ${rId} | ${rUrl} | ${uAgent}` - - logSchema.request(req.log, buildLogMessage, { - type: 'request', - req, + doRequestLog(req, { + excludeUrls: options.excludeUrls, + statusCode: 'ABORTED REQ', responseTime: (Date.now() - req.startTime) / 1000, - error: error, - owner: req.owner, - operation: req.operation?.type ?? req.routeConfig.operation?.type, - resources: req.resources, - serverTimes: req.serverTimings, }) }) fastify.addHook('onResponse', async (req, reply) => { - if (options.excludeUrls?.includes(req.url)) { - return - } - - const rMeth = req.method - const rUrl = redactQueryParamFromRequest(req, [ - 'token', - 'X-Amz-Credential', - 'X-Amz-Signature', - 'X-Amz-Security-Token', - ]) - const uAgent = req.headers['user-agent'] - const rId = req.id - const cIP = req.ip - const statusCode = reply.statusCode - const error = (req.raw as any).executionError || req.executionError - const tenantId = req.tenantId - - const buildLogMessage = `${tenantId} | ${rMeth} | ${statusCode} | ${cIP} | ${rId} | ${rUrl} | ${uAgent}` - - logSchema.request(req.log, buildLogMessage, { - type: 'request', - req, - res: reply, - responseTime: reply.getResponseTime(), - error: error, - owner: req.owner, - role: req.jwtPayload?.role, - resources: req.resources, - operation: req.operation?.type ?? req.routeConfig.operation?.type, - serverTimes: req.serverTimings, + doRequestLog(req, { + reply, + excludeUrls: options.excludeUrls, + statusCode: reply.statusCode, + responseTime: reply.elapsedTime, }) }) }, { name: 'log-request' } ) +interface LogRequestOptions { + reply?: FastifyReply + excludeUrls?: string[] + statusCode: number | 'ABORTED REQ' | 'ABORTED RES' + responseTime: number +} + +function doRequestLog(req: FastifyRequest, options: LogRequestOptions) { + if (options.excludeUrls?.includes(req.url)) { + return + } + + const rMeth = req.method + const rUrl = redactQueryParamFromRequest(req, [ + 'token', + 'X-Amz-Credential', + 'X-Amz-Signature', + 'X-Amz-Security-Token', + ]) + const uAgent = req.headers['user-agent'] + const rId = req.id + const cIP = req.ip + const statusCode = options.statusCode + const error = (req.raw as any).executionError || req.executionError + const tenantId = req.tenantId + + const buildLogMessage = `${tenantId} | ${rMeth} | ${statusCode} | ${cIP} | ${rId} | ${rUrl} | ${uAgent}` + + logSchema.request(req.log, buildLogMessage, { + type: 'request', + req, + res: options.reply, + responseTime: options.responseTime, + error: error, + owner: req.owner, + role: req.jwtPayload?.role, + resources: req.resources, + operation: req.operation?.type ?? req.routeConfig.operation?.type, + serverTimes: req.serverTimings, + }) +} + function getFirstDefined(...values: any[]): T | undefined { for (const value of values) { if (value !== undefined) { diff --git a/src/http/plugins/signals.ts b/src/http/plugins/signals.ts index 06bf14e4..930474e6 100644 --- a/src/http/plugins/signals.ts +++ b/src/http/plugins/signals.ts @@ -20,7 +20,7 @@ export const signals = fastifyPlugin( disconnect: new AbortController(), } - // Client terminated the request before the body was fully received + // Client terminated the request before server finished sending the response res.raw.once('close', () => { const aborted = !res.raw.writableFinished if (aborted) { diff --git a/src/http/plugins/tracing.ts b/src/http/plugins/tracing.ts index 8a85d9f2..c29f3ab4 100644 --- a/src/http/plugins/tracing.ts +++ b/src/http/plugins/tracing.ts @@ -65,6 +65,34 @@ export const traceServerTime = fastifyPlugin( if (!tracingEnabled) { return } + fastify.addHook('onRequest', async (req, res) => { + // Request was aborted before the server finishes to return a response + res.raw.once('close', () => { + const aborted = !res.raw.writableFinished + if (aborted) { + try { + const span = trace.getSpan(context.active()) + const traceId = span?.spanContext().traceId + + span?.setAttribute('res_aborted', true) + + if (traceId) { + const spans = traceCollector.getSpansForTrace(traceId) + if (spans) { + req.serverTimings = spansToServerTimings(spans, true) + } + traceCollector.clearTrace(traceId) + } + } catch (e) { + logSchema.error(logger, 'failed parsing server times on abort', { + error: e, + type: 'otel', + }) + } + } + }) + }) + fastify.addHook('onResponse', async (request, reply) => { try { const traceId = trace.getSpan(context.active())?.spanContext().traceId diff --git a/src/http/routes/object/createObject.ts b/src/http/routes/object/createObject.ts index 30548dfb..502e9a1d 100644 --- a/src/http/routes/object/createObject.ts +++ b/src/http/routes/object/createObject.ts @@ -80,6 +80,7 @@ export default async function routes(fastify: FastifyInstance) { objectName, owner, isUpsert, + signal: request.signals.body.signal, }) return response.status(objectMetadata?.httpStatusCode ?? 200).send({ diff --git a/src/http/routes/s3/index.ts b/src/http/routes/s3/index.ts index 2eba32c2..5dbd2f13 100644 --- a/src/http/routes/s3/index.ts +++ b/src/http/routes/s3/index.ts @@ -80,7 +80,12 @@ export default async function routes(fastify: FastifyInstance) { if (route.disableContentTypeParser) { reply.header('connection', 'close') reply.raw.on('finish', () => { - req.raw.destroy() + // wait sometime so that the client can receive the response + setTimeout(() => { + if (!req.raw.destroyed) { + req.raw.destroy() + } + }, 3000) }) } throw e diff --git a/src/internal/auth/jwt.ts b/src/internal/auth/jwt.ts index beee533a..80fb3e5b 100644 --- a/src/internal/auth/jwt.ts +++ b/src/internal/auth/jwt.ts @@ -1,4 +1,4 @@ -import * as crypto from 'crypto' +import * as crypto from 'node:crypto' import jwt from 'jsonwebtoken' import { ERRORS } from '@internal/errors' diff --git a/src/internal/database/tenant.ts b/src/internal/database/tenant.ts index 3da5e21c..978fee5d 100644 --- a/src/internal/database/tenant.ts +++ b/src/internal/database/tenant.ts @@ -1,4 +1,4 @@ -import crypto from 'crypto' +import crypto from 'node:crypto' import { getConfig } from '../../config' import { decrypt, encrypt, verifyJWT } from '../auth' import { multitenantKnex } from './multitenant-db' diff --git a/src/internal/monitoring/logger.ts b/src/internal/monitoring/logger.ts index 16b0d154..540245bd 100644 --- a/src/internal/monitoring/logger.ts +++ b/src/internal/monitoring/logger.ts @@ -1,7 +1,7 @@ import pino, { BaseLogger } from 'pino' import { getConfig } from '../../config' import { FastifyReply, FastifyRequest } from 'fastify' -import { URL } from 'url' +import { URL } from 'node:url' import { normalizeRawError } from '@internal/errors' const { logLevel, logflareApiKey, logflareSourceToken, logflareEnabled, region } = getConfig() diff --git a/src/internal/monitoring/otel.ts b/src/internal/monitoring/otel.ts index 68e188cb..fc3bb4d1 100644 --- a/src/internal/monitoring/otel.ts +++ b/src/internal/monitoring/otel.ts @@ -19,7 +19,7 @@ import { CompressionAlgorithm } from '@opentelemetry/otlp-exporter-base' import { SpanExporter, BatchSpanProcessor } from '@opentelemetry/sdk-trace-base' import * as grpc from '@grpc/grpc-js' import { HttpInstrumentation } from '@opentelemetry/instrumentation-http' -import { IncomingMessage } from 'http' +import { IncomingMessage } from 'node:http' import { logger, logSchema } from '@internal/monitoring/logger' import { traceCollector } from '@internal/monitoring/otel-processor' import { ClassInstrumentation } from './otel-instrumentation' @@ -32,6 +32,7 @@ import { StorageKnexDB } from '@storage/database' import { TenantConnection } from '@internal/database' import { S3Store } from '@tus/s3-store' import { Upload } from '@aws-sdk/lib-storage' +import { StreamSplitter } from '@tus/server' const tracingEnabled = process.env.TRACING_ENABLED === 'true' const headersEnv = process.env.OTEL_EXPORTER_OTLP_TRACES_HEADERS || '' @@ -222,11 +223,31 @@ const sdk = new NodeSDK({ 'getUpload', 'declareUploadLength', 'uploadIncompletePart', + 'uploadPart', 'downloadIncompletePart', 'uploadParts', ], setName: (name) => 'Tus.' + name, }), + new ClassInstrumentation({ + targetClass: StreamSplitter, + enabled: true, + methodsToInstrument: ['emitEvent'], + setName: (name: string, attrs: any) => { + if (attrs.event) { + return name + '.' + attrs.event + } + return name + }, + setAttributes: { + emitEvent: function (event) { + return { + part: this.part as any, + event, + } + }, + }, + }), new ClassInstrumentation({ targetClass: S3Client, enabled: true, @@ -243,7 +264,14 @@ const sdk = new NodeSDK({ new ClassInstrumentation({ targetClass: Upload, enabled: true, - methodsToInstrument: ['done', '__notifyProgress'], + methodsToInstrument: [ + 'done', + '__doConcurrentUpload', + '__uploadUsingPut', + '__createMultipartUpload', + '__notifyProgress', + 'markUploadAsAborted', + ], }), getNodeAutoInstrumentations({ '@opentelemetry/instrumentation-http': { diff --git a/src/internal/pubsub/postgres.ts b/src/internal/pubsub/postgres.ts index b0b6020a..dc0263e9 100644 --- a/src/internal/pubsub/postgres.ts +++ b/src/internal/pubsub/postgres.ts @@ -1,4 +1,4 @@ -import EventEmitter from 'events' +import EventEmitter from 'node:events' import createSubscriber, { Subscriber } from 'pg-listen' import { ERRORS } from '@internal/errors' import { logger, logSchema } from '@internal/monitoring' diff --git a/src/start/server.ts b/src/start/server.ts index 03293b33..34e69353 100644 --- a/src/start/server.ts +++ b/src/start/server.ts @@ -1,6 +1,6 @@ import '@internal/monitoring/otel' import { FastifyInstance } from 'fastify' -import { IncomingMessage, Server, ServerResponse } from 'http' +import { IncomingMessage, Server, ServerResponse } from 'node:http' import build from '../app' import buildAdmin from '../admin-app' diff --git a/src/storage/backend/s3.ts b/src/storage/backend/s3.ts index e0654f20..88914689 100644 --- a/src/storage/backend/s3.ts +++ b/src/storage/backend/s3.ts @@ -1,7 +1,6 @@ import { AbortMultipartUploadCommand, CompleteMultipartUploadCommand, - CompleteMultipartUploadCommandOutput, CopyObjectCommand, CreateMultipartUploadCommand, DeleteObjectCommand, @@ -228,7 +227,7 @@ export class S3Backend implements StorageBackendAdapter { { once: true } ) - const data = (await paralellUploadS3.done()) as CompleteMultipartUploadCommandOutput + const data = await paralellUploadS3.done() const metadata = await this.headObject(bucketName, key, version) diff --git a/src/storage/object.ts b/src/storage/object.ts index 290ec040..8dfb674d 100644 --- a/src/storage/object.ts +++ b/src/storage/object.ts @@ -1,5 +1,5 @@ import { FastifyRequest } from 'fastify' -import { randomUUID } from 'crypto' +import { randomUUID } from 'node:crypto' import { SignedUploadToken, signJWT, verifyJWT } from '@internal/auth' import { ERRORS } from '@internal/errors' import { getJwtSecret } from '@internal/database' diff --git a/src/storage/protocols/s3/s3-handler.ts b/src/storage/protocols/s3/s3-handler.ts index c3ad826c..902c8099 100644 --- a/src/storage/protocols/s3/s3-handler.ts +++ b/src/storage/protocols/s3/s3-handler.ts @@ -1275,7 +1275,7 @@ function toAwsMeatadataHeaders(records: Record) { if (records) { Object.keys(records).forEach((key) => { const value = records[key] - if (isUSASCII(value)) { + if (value && isUSASCII(value)) { metadataHeaders['x-amz-meta-' + key.toLowerCase()] = value } else { missingCount++