From b45260bd7432e84debcdb232ac26a6d1c255d998 Mon Sep 17 00:00:00 2001 From: Ignazio Bovo Date: Thu, 16 May 2024 12:49:23 +0200 Subject: [PATCH] feat: :art: handle multipart uploads --- .../storageProviders/IConnectionHandler.ts | 4 +-- .../storageProviders/awsConnectionHandler.ts | 25 +++++++++++++------ .../src/services/sync/acceptPendingObjects.ts | 2 +- .../sync/tasks/StorageProviderSyncTask.ts | 3 +-- storage-node/src/services/webApi/app.ts | 1 - 5 files changed, 22 insertions(+), 13 deletions(-) diff --git a/storage-node/src/services/storageProviders/IConnectionHandler.ts b/storage-node/src/services/storageProviders/IConnectionHandler.ts index 82b8bd4d04..eda429baa0 100644 --- a/storage-node/src/services/storageProviders/IConnectionHandler.ts +++ b/storage-node/src/services/storageProviders/IConnectionHandler.ts @@ -7,10 +7,10 @@ export interface IConnectionHandler { /** * Asynchronously uploads a file to the remote bucket. * @param filename - The key of the file in the remote bucket. - * @param filestream - The file stream to upload. + * @param filePath - The file path of the file to upload. * @returns A promise that resolves when the upload is complete or rejects with an error. */ - uploadFileToRemoteBucket(filename: string, filestream: ColossusFileStream): Promise + uploadFileToRemoteBucket(filename: string, filePath: string): Promise /** * Asynchronously retrieves a file from the remote bucket. diff --git a/storage-node/src/services/storageProviders/awsConnectionHandler.ts b/storage-node/src/services/storageProviders/awsConnectionHandler.ts index 929f17220e..e49d837d66 100644 --- a/storage-node/src/services/storageProviders/awsConnectionHandler.ts +++ b/storage-node/src/services/storageProviders/awsConnectionHandler.ts @@ -1,5 +1,6 @@ import { IConnectionHandler, ColossusFileStream } from './IConnectionHandler' import { + CreateMultipartUploadCommand, GetObjectCommand, ListObjectsCommand, ListObjectsCommandInput, @@ -7,6 +8,7 @@ import { S3Client, } from '@aws-sdk/client-s3' import { getSignedUrl } from '@aws-sdk/s3-request-presigner' +import fs from 'fs' export type AwsConnectionHandlerParams = { accessKeyId: string @@ -19,6 +21,9 @@ export class AwsConnectionHandler implements IConnectionHandler { private client: S3Client private bucket: string + // Official doc at https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html: Upload an object in a single operation by using the AWS SDKs, REST API, or AWS CLI – With a single PUT operation, you can upload a single object up to 5 GB in size. + private multiPartThresholdGB = 5 + constructor(opts: AwsConnectionHandlerParams) { this.client = new S3Client({ credentials: { @@ -34,18 +39,24 @@ export class AwsConnectionHandler implements IConnectionHandler { // Response status code info: https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html return response.$metadata.httpStatusCode === 200 } + private isMultiPartNeeded(filePath: string): boolean { + const stats = fs.statSync(filePath) + const fileSizeInBytes = stats.size + const fileSizeInGigabytes = fileSizeInBytes / (1024 * 1024 * 1024) + return fileSizeInGigabytes > this.multiPartThresholdGB + } - async uploadFileToRemoteBucket(filename: string, filestream: ColossusFileStream): Promise { - // Setting up S3 upload parameters - + async uploadFileToRemoteBucket(filename: string, filePath: string): Promise { const input = { Bucket: this.bucket, - Key: filename, // File name you want to save as in S3 - Body: filestream, + Key: filename, + Body: filePath, } - // Uploading files to the bucket - const command = new PutObjectCommand(input) + // Uploading files to the bucket: multipart + const command = this.isMultiPartNeeded(filePath) + ? new CreateMultipartUploadCommand(input) + : new PutObjectCommand(input) const response = await this.client.send(command) if (!this.isSuccessfulResponse(response)) { throw new Error('Failed to upload file to S3') diff --git a/storage-node/src/services/sync/acceptPendingObjects.ts b/storage-node/src/services/sync/acceptPendingObjects.ts index 8251528871..bd60bdf635 100644 --- a/storage-node/src/services/sync/acceptPendingObjects.ts +++ b/storage-node/src/services/sync/acceptPendingObjects.ts @@ -185,7 +185,7 @@ export class AcceptPendingObjectsService { await moveFile(currentPath, newPath) } else { const connection = getStorageProviderConnection()! - await connection.uploadFileToRemoteBucket(dataObjectId, fs.createReadStream(currentPath)) // NOTE: consider converting to non blocking promise + await connection.uploadFileToRemoteBucket(dataObjectId, currentPath) // NOTE: consider converting to non blocking promise await fsPromises.unlink(currentPath) // delete the file from the local storage after successful upload } registerNewDataObjectId(dataObjectId) diff --git a/storage-node/src/services/sync/tasks/StorageProviderSyncTask.ts b/storage-node/src/services/sync/tasks/StorageProviderSyncTask.ts index b4d46a1756..deae0b0caa 100644 --- a/storage-node/src/services/sync/tasks/StorageProviderSyncTask.ts +++ b/storage-node/src/services/sync/tasks/StorageProviderSyncTask.ts @@ -37,8 +37,7 @@ export class ProviderSyncTask extends DownloadFileTask { // TODO: I have added a HashFileVerificationError to the utils file, but it is not used here, we should establish what to do in case the file is corrupted await withRandomUrls(operatorUrls, async (chosenBaseUrl) => { await this.tryDownloadTemp(chosenBaseUrl, this.dataObjectId) - const fileStream = fs.createReadStream(tempFilePath) - await this.connection.uploadFileToRemoteBucket(this.dataObjectId, fileStream) // NOTE: consider converting to non blocking promise + await this.connection.uploadFileToRemoteBucket(this.dataObjectId, tempFilePath) // NOTE: consider converting to non blocking promise }) } catch (err) { logger.error(`Sync - error when synching asset ${this.dataObjectId} with remote storage provider: ${err}`, { diff --git a/storage-node/src/services/webApi/app.ts b/storage-node/src/services/webApi/app.ts index 809fd39b4a..30d3b87463 100644 --- a/storage-node/src/services/webApi/app.ts +++ b/storage-node/src/services/webApi/app.ts @@ -91,7 +91,6 @@ export async function createApp(config: AppConfig): Promise { // For multipart forms, the max number of file fields (Default: Infinity) files: 1, // For multipart forms, the max file size (in bytes) (Default: Infinity) - // TODO CS3 : This should be set to the maximum file size allowed by the storage provider. fileSize: config.maxFileSize, }, },