From 0e81002b3a5e560c874d814d28a35a102311d9ef Mon Sep 17 00:00:00 2001 From: Brenna N Epp Date: Wed, 3 Jul 2024 12:22:37 -0700 Subject: [PATCH] fix(storage/transfermanager): WaitAndClose waits for Callbacks to finish (#10504) Fixes #10502 --- storage/transfermanager/downloader.go | 35 +++++++++++++++------------ 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/storage/transfermanager/downloader.go b/storage/transfermanager/downloader.go index 6c4598c6c138..bed6a3dfa5f1 100644 --- a/storage/transfermanager/downloader.go +++ b/storage/transfermanager/downloader.go @@ -139,7 +139,8 @@ func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirec } if d.config.asynchronous { - go input.gatherObjectOutputs(outs, len(inputs)) + d.downloadsInProgress.Add(1) + go d.gatherObjectOutputs(input, outs, len(inputs)) } d.addNewInputs(inputs) return nil @@ -359,6 +360,22 @@ func (d *Downloader) gatherShards(in *DownloadObjectInput, outs <-chan *Download d.addResult(in, shardOut) } +// gatherObjectOutputs receives from the given channel exactly numObjects times. +// It will execute the callback once all object outputs are received. +// It does not do any verification on the outputs nor does it cancel other +// objects on error. +func (d *Downloader) gatherObjectOutputs(in *DownloadDirectoryInput, gatherOuts <-chan DownloadOutput, numObjects int) { + outs := make([]DownloadOutput, 0, numObjects) + for i := 0; i < numObjects; i++ { + obj := <-gatherOuts + outs = append(outs, obj) + } + + // All objects have been gathered; execute the callback. + in.Callback(outs) + d.downloadsInProgress.Done() +} + func (d *Downloader) validateObjectInput(in *DownloadObjectInput) error { if d.config.asynchronous && in.Callback == nil { return errors.New("transfermanager: input.Callback must not be nil when the WithCallbacks option is set") @@ -567,27 +584,13 @@ type DownloadDirectoryInput struct { // Callback will run after all the objects in the directory as selected by // the provided filters are finished downloading. // It must be set if and only if the [WithCallbacks] option is set. + // WaitAndClose will wait for all callbacks to finish. Callback func([]DownloadOutput) // OnObjectDownload will run after every finished object download. Optional. OnObjectDownload func(*DownloadOutput) } -// gatherObjectOutputs receives from the given channel exactly numObjects times. -// It will call the callback once all object outputs are received. -// It does not do any verification on the outputs nor does it cancel other -// objects on error. -func (dirin *DownloadDirectoryInput) gatherObjectOutputs(gatherOuts <-chan DownloadOutput, numObjects int) { - outs := make([]DownloadOutput, 0, numObjects) - for i := 0; i < numObjects; i++ { - obj := <-gatherOuts - outs = append(outs, obj) - } - - // All objects have been gathered; execute the callback. - dirin.Callback(outs) -} - // DownloadOutput provides output for a single object download, including all // errors received while downloading object parts. If the download was successful, // Attrs will be populated.