Skip to content

Commit

Permalink
DataMovement Test Infra: In-Memory StorageResources (#38941)
Browse files Browse the repository at this point in the history
* memoryresources initial implementation

* revised implementaoin

* testing memoryitem

* test container
  • Loading branch information
jaschrep-msft committed Sep 26, 2023
1 parent 9c05757 commit 8ec3689
Show file tree
Hide file tree
Showing 3 changed files with 340 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;

namespace Azure.Storage.DataMovement.Tests
{
internal class MemoryStorageResourceTests
{
[Test]
public async Task MemoryItemCopyFromStream([Values(true, false)] bool overwrite)
{
Random r = new Random();
byte[] data = new byte[r.Next(1, 9999)];
r.NextBytes(data);

MemoryStorageResourceItem item = new();

await item.CopyFromStreamAsync(
new MemoryStream(data),
data.Length,
overwrite,
data.Length);

Assert.That(item.Buffer.ToArray(), Is.EquivalentTo(data));
}

[Test]
public async Task MemoryItemCopyFromStreamOverwrite()
{
Random r = new Random();
byte[] oldData = new byte[r.Next(1, 9999)];
byte[] newData = new byte[r.Next(1, 9999)];
r.NextBytes(oldData);
r.NextBytes(newData);

MemoryStorageResourceItem item = new()
{
Buffer = new Memory<byte>(oldData)
};

await item.CopyFromStreamAsync(
new MemoryStream(newData),
newData.Length,
overwrite: true,
newData.Length);

Assert.That(item.Buffer.ToArray(), Is.EquivalentTo(newData));
}

[Test]
public async Task MemoryItemDeleteIfExists([Values(true, false)] bool alreadyExists)
{
Random r = new Random();
byte[] data = new byte[r.Next(1, 9999)];
r.NextBytes(data);

MemoryStorageResourceItem item = new()
{
Buffer = alreadyExists ? new Memory<byte>(data) : Memory<byte>.Empty
};

bool deleted = await item.DeleteIfExistsAsync();

Assert.That(deleted, Is.EqualTo(alreadyExists));
}

[Test]
public async Task MemoryItemReadStream(
[Values(true, false)] bool nonZeroOffset,
[Values(true, false)] bool sliceLength)
{
Random r = new Random();
byte[] data = new byte[r.Next(1024, 4096)];
r.NextBytes(data);

MemoryStorageResourceItem item = new()
{
Buffer = new Memory<byte>(data)
};

int position = nonZeroOffset ? r.Next(r.Next(data.Length / 3, data.Length * 2 / 3)) : 0;
int? length = sliceLength ? r.Next(1, data.Length - position) : null;

MemoryStream dest = new MemoryStream();
await (await item.ReadStreamAsync(position, length)).Content.CopyToAsync(dest);

Memory<byte> expectedData = length.HasValue
? new Memory<byte>(data).Slice(position, length.Value)
: new Memory<byte>(data).Slice(position);
Assert.That(dest.ToArray(), Is.EquivalentTo(expectedData.ToArray()));
}

[Test]
public async Task MemoryContainerEnumerate([Values(true, false)] bool returnsContainers)
{
const string baseUri = "memory://localhost/my/path";
List<StorageResource> allChildren = new();

MemoryStorageResourceContainer baseContainer = new(new Uri(baseUri))
{
ReturnsContainersOnEnumeration = returnsContainers,
};

MemoryStorageResourceItem child1 = new(new Uri(baseUri + "/item"));
baseContainer.Children.Add(child1);
allChildren.Add(child1);

MemoryStorageResourceContainer child2 = new(new Uri(baseUri + "/container"));
baseContainer.Children.Add(child2);
allChildren.Add(child2);

MemoryStorageResourceItem child2_1 = new(new Uri(baseUri + "/container/item1"));
child2.Children.Add(child1);
allChildren.Add(child1);

MemoryStorageResourceItem child2_2 = new(new Uri(baseUri + "/container/item2"));
child2.Children.Add(child2_2);
allChildren.Add(child2_2);

MemoryStorageResourceContainer child2_3 = new(new Uri(baseUri + "/container/container3"));
child2.Children.Add(child2_3);
allChildren.Add(child2_3);

List<StorageResource> result = new();
await foreach (StorageResource resource in baseContainer.GetStorageResourcesAsync())
{
result.Add(resource);
}

List<StorageResource> expected = allChildren
.Where(sr => returnsContainers ? true : sr is MemoryStorageResourceItem)
.ToList();
Assert.That(result, Is.EquivalentTo(expected));
}

[Test]
public void MemoryContainerGetResource()
{
const string baseUri = "memory://localhost/my/path";
List<StorageResource> allChildren = new();

MemoryStorageResourceContainer baseContainer = new(new Uri(baseUri));

MemoryStorageResourceItem foo = new(new Uri(baseUri + "/foo"));
baseContainer.Children.Add(foo);

MemoryStorageResourceContainer fizz = new(new Uri(baseUri + "/fizz"));
baseContainer.Children.Add(fizz);

MemoryStorageResourceItem buzz = new(new Uri(baseUri + "/fizz/buzz"));
fizz.Children.Add(buzz);

Assert.That(baseContainer.GetStorageResourceReference("foo"), Is.EqualTo(foo));
Assert.That(baseContainer.GetStorageResourceReference("fizz/buzz"), Is.EqualTo(buzz));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Storage.DataMovement.Tests
{
internal class MemoryStorageResourceContainer : StorageResourceContainer
{
public bool ReturnsContainersOnEnumeration { get; set; }

public List<StorageResource> Children { get; } = new();

public override Uri Uri { get; }

public MemoryStorageResourceContainer(Uri uri)
{
Uri = uri ?? new Uri($"memory://localhost/mycontainer/mypath-{Guid.NewGuid()}/resource-item-{Guid.NewGuid()}");
}

protected internal override StorageResourceItem GetStorageResourceReference(string path)
{
UriBuilder builder = new(Uri);
builder.Path = string.Join("/", new List<string>()
{
builder.Path.Trim('/'),
path.Trim('/'),
}.Where(s => !string.IsNullOrWhiteSpace(s)));
Uri expected = builder.Uri;

foreach (StorageResourceItem item in GetStorageResources(false))
{
if (item.Uri == expected)
{
return item;
}
}
return null;
}

protected internal override async IAsyncEnumerable<StorageResource> GetStorageResourcesAsync(
[EnumeratorCancellation]CancellationToken cancellationToken = default)
{
foreach (StorageResource storageResource in GetStorageResources(ReturnsContainersOnEnumeration))
{
yield return await Task.FromResult(storageResource);
}
}

private IEnumerable<StorageResource> GetStorageResources(bool includeContainers)
{
Queue<MemoryStorageResourceContainer> queue = new();
queue.Enqueue(this);

while (queue.Count > 0)
{
MemoryStorageResourceContainer container = queue.Dequeue();
foreach (var child in container.Children)
{
if (child is MemoryStorageResourceItem)
{
yield return child;
}
else if (child is MemoryStorageResourceContainer)
{
queue.Enqueue(child as MemoryStorageResourceContainer);
if (includeContainers)
{
yield return child;
}
}
else
{
throw new Exception($"Do not combine other StorageResource implementations with {nameof(MemoryStorageResourceContainer)}");
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Azure.Storage.DataMovement.Tests
{
internal class MemoryStorageResourceItem : StorageResourceItem
{
public Memory<byte> Buffer { get; set; } = Memory<byte>.Empty;

public override Uri Uri { get; }

protected internal override string ResourceId => "MemoryBuffer";

protected internal override DataTransferOrder TransferType => DataTransferOrder.Unordered;

protected internal override long MaxChunkSize => long.MaxValue;

protected internal override long? Length => Buffer.Length;

public MemoryStorageResourceItem(Uri uri = default)
{
Uri = uri ?? new Uri($"memory://localhost/mycontainer/mypath-{Guid.NewGuid()}/resource-item-{Guid.NewGuid()}");
}

protected internal override Task CompleteTransferAsync(bool overwrite, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}

protected internal override Task CopyBlockFromUriAsync(StorageResourceItem sourceResource, HttpRange range, bool overwrite, long completeLength, StorageResourceCopyFromUriOptions options = null, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

protected internal override async Task CopyFromStreamAsync(Stream stream, long streamLength, bool overwrite, long completeLength, StorageResourceWriteToOffsetOptions options = null, CancellationToken cancellationToken = default)
{
if (!overwrite && !Buffer.IsEmpty)
{
return;
}
byte[] buf = new byte[streamLength];
MemoryStream dest = new(buf);
await stream.CopyToAsync(dest);
Buffer = new Memory<byte>(buf);
}

protected internal override Task CopyFromUriAsync(StorageResourceItem sourceResource, bool overwrite, long completeLength, StorageResourceCopyFromUriOptions options = null, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

protected internal override Task<bool> DeleteIfExistsAsync(CancellationToken cancellationToken = default)
{
bool result = !Buffer.IsEmpty;
Buffer = Memory<byte>.Empty;
return Task.FromResult(result);
}

protected internal override Task<HttpAuthorization> GetCopyAuthorizationHeaderAsync(CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

protected internal override StorageResourceCheckpointData GetDestinationCheckpointData()
{
throw new NotImplementedException();
}

protected internal override Task<StorageResourceProperties> GetPropertiesAsync(CancellationToken token = default)
{
return Task.FromResult(new StorageResourceProperties(default, default, Buffer.Length, default));
}

protected internal override StorageResourceCheckpointData GetSourceCheckpointData()
{
throw new NotImplementedException();
}

protected internal override Task<StorageResourceReadStreamResult> ReadStreamAsync(long position = 0, long? length = null, CancellationToken cancellationToken = default)
{
var slice = length.HasValue ? Buffer.Slice((int)position, (int)length.Value) : Buffer.Slice((int)position);
return Task.FromResult(new StorageResourceReadStreamResult(new MemoryStream(slice.ToArray())));
}
}
}

0 comments on commit 8ec3689

Please sign in to comment.