Skip to content

Commit

Permalink
Core TransferManager tests (#46014)
Browse files Browse the repository at this point in the history
* Transfermanager public constructor calls DI

* testing and minor changes

move delegate out of class for easier typing.
privatize some fields
testing transfer manager item transfer with mocked dependencies.

* container test

* usings

* rename test

* address nitpicks

* fix
  • Loading branch information
jaschrep-msft committed Sep 18, 2024
1 parent f328c76 commit 70567f2
Show file tree
Hide file tree
Showing 7 changed files with 557 additions and 24 deletions.
12 changes: 12 additions & 0 deletions sdk/storage/Azure.Storage.Common/tests/Shared/MockExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,17 @@ public static void BasicSetup(this Mock<Stream> stream, bool canRead, bool canWr
.Throws<NotSupportedException>();
}
}

public static void VerifyDisposal<T>(this Mock<T> mock)
where T : class, IDisposable
{
mock.Verify(m => m.Dispose(), Times.Once);
}

public static void VerifyAsyncDisposal<T>(this Mock<T> mock)
where T : class, IAsyncDisposable
{
mock.Verify(m => m.DisposeAsync(), Times.Once);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
using System.Threading.Channels;
using System.Threading.Tasks;
using Azure.Storage.Common;
using static Azure.Storage.DataMovement.ChannelProcessing;

namespace Azure.Storage.DataMovement;

internal delegate Task ProcessAsync<T>(T item, CancellationToken cancellationToken);

internal interface IProcessor<TItem> : IDisposable
{
ValueTask QueueAsync(TItem item, CancellationToken cancellationToken = default);
Expand All @@ -19,8 +20,6 @@ internal interface IProcessor<TItem> : IDisposable

internal static class ChannelProcessing
{
public delegate Task ProcessAsync<T>(T item, CancellationToken cancellationToken);

public static IProcessor<T> NewProcessor<T>(int parallelism)
{
Argument.AssertInRange(parallelism, 1, int.MaxValue, nameof(parallelism));
Expand Down
12 changes: 9 additions & 3 deletions sdk/storage/Azure.Storage.DataMovement/src/JobBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,23 @@ namespace Azure.Storage.DataMovement;

internal class JobBuilder
{
internal readonly ArrayPool<byte> _arrayPool;
private readonly ArrayPool<byte> _arrayPool;

/// <summary>
/// Defines the error handling method to follow when an error is seen. Defaults to
/// <see cref="DataTransferErrorMode.StopOnAnyFailure"/>.
///
/// See <see cref="DataTransferErrorMode"/>.
/// </summary>
internal readonly DataTransferErrorMode _errorHandling;
private readonly DataTransferErrorMode _errorHandling;

internal ClientDiagnostics ClientDiagnostics { get; }
private ClientDiagnostics ClientDiagnostics { get; }

/// <summary>
/// Mocking constructor.
/// </summary>
protected JobBuilder()
{ }

internal JobBuilder(
ArrayPool<byte> arrayPool,
Expand Down
34 changes: 18 additions & 16 deletions sdk/storage/Azure.Storage.DataMovement/src/TransferManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class TransferManager : IAsyncDisposable
private readonly CancellationTokenSource _cancellationTokenSource = new();
private CancellationToken _cancellationToken => _cancellationTokenSource.Token;

private readonly Func<string> _generateTransferId;

/// <summary>
/// Protected constructor for mocking.
/// </summary>
Expand All @@ -55,20 +57,18 @@ protected TransferManager()
/// </summary>
/// <param name="options">Options that will apply to all transfers started by this TransferManager.</param>
public TransferManager(TransferManagerOptions options = default)
{
_jobsProcessor = ChannelProcessing.NewProcessor<TransferJobInternal>(parallelism: 1);
_partsProcessor = ChannelProcessing.NewProcessor<JobPartInternal>(DataMovementConstants.MaxJobPartReaders);
_chunksProcessor = ChannelProcessing.NewProcessor<Func<Task>>(options?.MaximumConcurrency ?? DataMovementConstants.MaxJobChunkTasks);
_jobBuilder = new(
ArrayPool<byte>.Shared,
: this(
ChannelProcessing.NewProcessor<TransferJobInternal>(parallelism: 1),
ChannelProcessing.NewProcessor<JobPartInternal>(DataMovementConstants.MaxJobPartReaders),
ChannelProcessing.NewProcessor<Func<Task>>(options?.MaximumConcurrency ?? DataMovementConstants.MaxJobChunkTasks),
new(ArrayPool<byte>.Shared,
options?.ErrorHandling ?? DataTransferErrorMode.StopOnAnyFailure,
new ClientDiagnostics(options?.ClientOptions ?? ClientOptions.Default));
TransferCheckpointStoreOptions checkpointerOptions = options?.CheckpointerOptions != default ? new TransferCheckpointStoreOptions(options.CheckpointerOptions) : default;
_checkpointer = checkpointerOptions != default ? checkpointerOptions.GetCheckpointer() : CreateDefaultCheckpointer();
_resumeProviders = options?.ResumeProviders != null ? new(options.ResumeProviders) : new();

ConfigureProcessorCallbacks();
}
new ClientDiagnostics(options?.ClientOptions ?? ClientOptions.Default)),
(options?.CheckpointerOptions != default ? new TransferCheckpointStoreOptions(options.CheckpointerOptions) : default)
?.GetCheckpointer() ?? CreateDefaultCheckpointer(),
options?.ResumeProviders != null ? new List<StorageResourceProvider>(options.ResumeProviders) : new(),
default)
{}

/// <summary>
/// Dependency injection constructor.
Expand All @@ -79,14 +79,16 @@ internal TransferManager(
IProcessor<Func<Task>> chunksProcessor,
JobBuilder jobBuilder,
TransferCheckpointer checkpointer,
ICollection<StorageResourceProvider> resumeProviders)
ICollection<StorageResourceProvider> resumeProviders,
Func<string> generateTransferId = default)
{
_jobsProcessor = jobsProcessor;
_partsProcessor = partsProcessor;
_chunksProcessor = chunksProcessor;
_jobBuilder = jobBuilder;
_resumeProviders = new(resumeProviders);
_resumeProviders = new(resumeProviders ?? new List<StorageResourceProvider>());
_checkpointer = checkpointer;
_generateTransferId = generateTransferId ?? (() => Guid.NewGuid().ToString());

ConfigureProcessorCallbacks();
}
Expand Down Expand Up @@ -345,7 +347,7 @@ public virtual async Task<DataTransfer> StartTransferAsync(

transferOptions ??= new DataTransferOptions();

string transferId = Guid.NewGuid().ToString();
string transferId = _generateTransferId();
await _checkpointer.AddNewJobAsync(
transferId,
sourceResource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
<Compile Include="$(AzureStorageSharedTestSources)\**\*.cs" LinkBase="Shared\Storage" />
<Compile Include="$(MSBuildThisFileDirectory)..\..\Azure.Storage.Blobs\tests\BlobsClientTestFixtureAttribute.cs" LinkBase="Shared\Storage" />
<Compile Include="$(MSBuildThisFileDirectory)..\..\Azure.Storage.Blobs\tests\DisposingContainer.cs" LinkBase="Shared\Storage" />
<Compile Remove="$(AzureStorageSharedTestSources)\AzuriteFixture.cs"/>
<Compile Remove="$(AzureStorageSharedTestSources)\AzuriteNUnitFixture.cs"/>
<Compile Remove="$(AzureStorageSharedTestSources)\AzuriteFixture.cs" />
<Compile Remove="$(AzureStorageSharedTestSources)\AzuriteNUnitFixture.cs" />
<Compile Remove="$(AzureStorageSharedTestSources)\ClientSideEncryptionTestExtensions.cs" />
<Compile Remove="$(AzureStorageSharedTestSources)\StorageTestBase.SasVersion.cs" />
<Compile Remove="$(AzureStorageSharedTestSources)\TransferValidationTestBase.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Storage.DataMovement.Tests
{
/// <summary>
/// Processor that only processes items one by one from
/// its queue only when invoked for each item.
/// </summary>
/// <typeparam name="T"></typeparam>
internal class StepProcessor<T> : IProcessor<T>
{
private readonly Queue<T> _queue = new();

public int ItemsInQueue => _queue.Count;

/// <inheritdoc/>
public ProcessAsync<T> Process { get; set; }

/// <inheritdoc/>
public ValueTask QueueAsync(T item, CancellationToken cancellationToken = default)
{
_queue.Enqueue(item);
return new(Task.CompletedTask);
}

/// <summary>
/// Attmpts to read an item from internal queue, then completes
/// a call to <see cref="Process"/> on it.
/// </summary>
/// <returns>
/// Whether or not an item was successfully read from the queue.
/// </returns>
public async ValueTask<bool> TryStepAsync(CancellationToken cancellationToken = default)
{
if (_queue.Count > 0)
{
await Process?.Invoke(_queue.Dequeue(), cancellationToken);
return true;
}
else
{
return false;
}
}

public async ValueTask<int> StepMany(int maxSteps, CancellationToken cancellationToken = default)
{
int steps = 0;
while (steps < maxSteps && await TryStepAsync(cancellationToken))
{
steps++;
}
return steps;
}

public async ValueTask<int> StepAll(CancellationToken cancellationToken = default)
{
int steps = 0;
while (await TryStepAsync(cancellationToken))
{
steps++;
}
return steps;
}

public void Dispose()
{ }
}
}
Loading

0 comments on commit 70567f2

Please sign in to comment.