Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataMovement Test Infra: In-Memory StorageResources #38941

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;

namespace Azure.Storage.DataMovement.Tests
{
internal class MemoryStorageResourceTests
{
[Test]
public async Task MemoryItemCopyFromStream([Values(true, false)] bool overwrite)
{
Random r = new Random();
byte[] data = new byte[r.Next(1, 9999)];
r.NextBytes(data);

MemoryStorageResourceItem item = new();

await item.CopyFromStreamAsync(
new MemoryStream(data),
data.Length,
overwrite,
data.Length);

Assert.That(item.Buffer.ToArray(), Is.EquivalentTo(data));
}

[Test]
public async Task MemoryItemCopyFromStreamOverwrite()
{
Random r = new Random();
byte[] oldData = new byte[r.Next(1, 9999)];
byte[] newData = new byte[r.Next(1, 9999)];
r.NextBytes(oldData);
r.NextBytes(newData);

MemoryStorageResourceItem item = new()
{
Buffer = new Memory<byte>(oldData)
};

await item.CopyFromStreamAsync(
new MemoryStream(newData),
newData.Length,
overwrite: true,
newData.Length);

Assert.That(item.Buffer.ToArray(), Is.EquivalentTo(newData));
}

[Test]
public async Task MemoryItemDeleteIfExists([Values(true, false)] bool alreadyExists)
{
Random r = new Random();
byte[] data = new byte[r.Next(1, 9999)];
r.NextBytes(data);

MemoryStorageResourceItem item = new()
{
Buffer = alreadyExists ? new Memory<byte>(data) : Memory<byte>.Empty
};

bool deleted = await item.DeleteIfExistsAsync();

Assert.That(deleted, Is.EqualTo(alreadyExists));
}

[Test]
public async Task MemoryItemReadStream(
[Values(true, false)] bool nonZeroOffset,
[Values(true, false)] bool sliceLength)
{
Random r = new Random();
byte[] data = new byte[r.Next(1024, 4096)];
r.NextBytes(data);

MemoryStorageResourceItem item = new()
{
Buffer = new Memory<byte>(data)
};

int position = nonZeroOffset ? r.Next(r.Next(data.Length / 3, data.Length * 2 / 3)) : 0;
int? length = sliceLength ? r.Next(1, data.Length - position) : null;

MemoryStream dest = new MemoryStream();
await (await item.ReadStreamAsync(position, length)).Content.CopyToAsync(dest);

Memory<byte> expectedData = length.HasValue
? new Memory<byte>(data).Slice(position, length.Value)
: new Memory<byte>(data).Slice(position);
Assert.That(dest.ToArray(), Is.EquivalentTo(expectedData.ToArray()));
}

[Test]
public async Task MemoryContainerEnumerate([Values(true, false)] bool returnsContainers)
{
const string baseUri = "memory://localhost/my/path";
List<StorageResource> allChildren = new();

MemoryStorageResourceContainer baseContainer = new(new Uri(baseUri))
{
ReturnsContainersOnEnumeration = returnsContainers,
};

MemoryStorageResourceItem child1 = new(new Uri(baseUri + "/item"));
baseContainer.Children.Add(child1);
allChildren.Add(child1);

MemoryStorageResourceContainer child2 = new(new Uri(baseUri + "/container"));
baseContainer.Children.Add(child2);
allChildren.Add(child2);

MemoryStorageResourceItem child2_1 = new(new Uri(baseUri + "/container/item1"));
child2.Children.Add(child1);
allChildren.Add(child1);

MemoryStorageResourceItem child2_2 = new(new Uri(baseUri + "/container/item2"));
child2.Children.Add(child2_2);
allChildren.Add(child2_2);

MemoryStorageResourceContainer child2_3 = new(new Uri(baseUri + "/container/container3"));
child2.Children.Add(child2_3);
allChildren.Add(child2_3);

List<StorageResource> result = new();
await foreach (StorageResource resource in baseContainer.GetStorageResourcesAsync())
{
result.Add(resource);
}

List<StorageResource> expected = allChildren
.Where(sr => returnsContainers ? true : sr is MemoryStorageResourceItem)
.ToList();
Assert.That(result, Is.EquivalentTo(expected));
}

[Test]
public void MemoryContainerGetResource()
{
const string baseUri = "memory://localhost/my/path";
List<StorageResource> allChildren = new();

MemoryStorageResourceContainer baseContainer = new(new Uri(baseUri));

MemoryStorageResourceItem foo = new(new Uri(baseUri + "/foo"));
baseContainer.Children.Add(foo);

MemoryStorageResourceContainer fizz = new(new Uri(baseUri + "/fizz"));
baseContainer.Children.Add(fizz);

MemoryStorageResourceItem buzz = new(new Uri(baseUri + "/fizz/buzz"));
fizz.Children.Add(buzz);

Assert.That(baseContainer.GetStorageResourceReference("foo"), Is.EqualTo(foo));
Assert.That(baseContainer.GetStorageResourceReference("fizz/buzz"), Is.EqualTo(buzz));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Storage.DataMovement.Tests
{
internal class MemoryStorageResourceContainer : StorageResourceContainer
{
public bool ReturnsContainersOnEnumeration { get; set; }

public List<StorageResource> Children { get; } = new();

public override Uri Uri { get; }

public MemoryStorageResourceContainer(Uri uri)
{
Uri = uri ?? new Uri($"memory://localhost/mycontainer/mypath-{Guid.NewGuid()}/resource-item-{Guid.NewGuid()}");
}

protected internal override StorageResourceItem GetStorageResourceReference(string path)
{
UriBuilder builder = new(Uri);
builder.Path = string.Join("/", new List<string>()
{
builder.Path.Trim('/'),
path.Trim('/'),
}.Where(s => !string.IsNullOrWhiteSpace(s)));
Uri expected = builder.Uri;

foreach (StorageResourceItem item in GetStorageResources(false))
{
if (item.Uri == expected)
{
return item;
}
}
return null;
}

protected internal override async IAsyncEnumerable<StorageResource> GetStorageResourcesAsync(
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
foreach (StorageResource storageResource in GetStorageResources(ReturnsContainersOnEnumeration))
{
yield return await Task.FromResult(storageResource);
}
}

private IEnumerable<StorageResource> GetStorageResources(bool includeContainers)
{
Queue<MemoryStorageResourceContainer> queue = new();
queue.Enqueue(this);

while (queue.Count > 0)
{
MemoryStorageResourceContainer container = queue.Dequeue();
foreach (var child in container.Children)
{
if (child is MemoryStorageResourceItem)
{
yield return child;
}
else if (child is MemoryStorageResourceContainer)
{
queue.Enqueue(child as MemoryStorageResourceContainer);
if (includeContainers)
{
yield return child;
}
}
else
{
throw new Exception($"Do not combine other StorageResource implementations with {nameof(MemoryStorageResourceContainer)}");
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Storage.DataMovement.Tests
{
internal class MemoryStorageResourceItem : StorageResourceItem
{
public Memory<byte> Buffer { get; set; } = Memory<byte>.Empty;

public override Uri Uri { get; }

protected internal override string ResourceId => "MemoryBuffer";

protected internal override DataTransferOrder TransferType => DataTransferOrder.Unordered;

protected internal override long MaxChunkSize => long.MaxValue;

protected internal override long? Length => Buffer.Length;

public MemoryStorageResourceItem(Uri uri = default)
{
Uri = uri ?? new Uri($"memory://localhost/mycontainer/mypath-{Guid.NewGuid()}/resource-item-{Guid.NewGuid()}");
}

protected internal override Task CompleteTransferAsync(bool overwrite, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}

protected internal override Task CopyBlockFromUriAsync(StorageResourceItem sourceResource, HttpRange range, bool overwrite, long completeLength, StorageResourceCopyFromUriOptions options = null, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

protected internal override async Task CopyFromStreamAsync(Stream stream, long streamLength, bool overwrite, long completeLength, StorageResourceWriteToOffsetOptions options = null, CancellationToken cancellationToken = default)
{
if (!overwrite && !Buffer.IsEmpty)
{
return;
}
byte[] buf = new byte[streamLength];
MemoryStream dest = new(buf);
await stream.CopyToAsync(dest);
Buffer = new Memory<byte>(buf);
}

protected internal override Task CopyFromUriAsync(StorageResourceItem sourceResource, bool overwrite, long completeLength, StorageResourceCopyFromUriOptions options = null, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

protected internal override Task<bool> DeleteIfExistsAsync(CancellationToken cancellationToken = default)
{
bool result = !Buffer.IsEmpty;
Buffer = Memory<byte>.Empty;
return Task.FromResult(result);
}

protected internal override Task<HttpAuthorization> GetCopyAuthorizationHeaderAsync(CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

protected internal override StorageResourceCheckpointData GetDestinationCheckpointData()
{
throw new NotImplementedException();
}

protected internal override Task<StorageResourceProperties> GetPropertiesAsync(CancellationToken token = default)
{
return Task.FromResult(new StorageResourceProperties(default, default, Buffer.Length, default));
}

protected internal override StorageResourceCheckpointData GetSourceCheckpointData()
{
throw new NotImplementedException();
}

protected internal override Task<StorageResourceReadStreamResult> ReadStreamAsync(long position = 0, long? length = null, CancellationToken cancellationToken = default)
{
var slice = length.HasValue ? Buffer.Slice((int)position, (int)length.Value) : Buffer.Slice((int)position);
return Task.FromResult(new StorageResourceReadStreamResult(new MemoryStream(slice.ToArray())));
}
}
}