Skip to content

Commit

Permalink
Added Close to DataLakeOpenWriteOptions, removed RetainedUncommittedD…
Browse files Browse the repository at this point in the history
…ata from DataLakeUploadOptions (#15140)
  • Loading branch information
seanmcc-msft authored Sep 15, 2020
1 parent a1a0dd1 commit 0d53cf5
Show file tree
Hide file tree
Showing 12 changed files with 1,073 additions and 436 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ public partial class DataLakeFileOpenWriteOptions
{
public DataLakeFileOpenWriteOptions() { }
public long? BufferSize { get { throw null; } set { } }
public bool? Close { get { throw null; } set { } }
public Azure.Storage.Files.DataLake.Models.DataLakeRequestConditions OpenConditions { get { throw null; } set { } }
public System.IProgress<long> ProgressHandler { get { throw null; } set { } }
}
Expand All @@ -324,7 +325,6 @@ public DataLakeFileUploadOptions() { }
public System.Collections.Generic.IDictionary<string, string> Metadata { get { throw null; } set { } }
public string Permissions { get { throw null; } set { } }
public System.IProgress<long> ProgressHandler { get { throw null; } set { } }
public bool? RetainUncommittedData { get { throw null; } set { } }
public Azure.Storage.StorageTransferOptions TransferOptions { get { throw null; } set { } }
public string Umask { get { throw null; } set { } }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4136,7 +4136,8 @@ private async Task<Stream> OpenWriteInternal(
bufferSize: options?.BufferSize ?? Constants.DefaultBufferSize,
position: position,
conditions: conditions,
progressHandler: options?.ProgressHandler);
progressHandler: options?.ProgressHandler,
closeEvent: options?.Close);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -4196,7 +4197,7 @@ await client.AppendInternal(
// Flush data
return await client.FlushInternal(
position: newPosition,
retainUncommittedData: args.RetainUncommittedData,
retainUncommittedData: default,
close: args.Close,
args.HttpHeaders,
args.Conditions,
Expand All @@ -4223,7 +4224,7 @@ await client.AppendInternal(
return await client.FlushInternal(
offset + size,
retainUncommittedData: args.RetainUncommittedData,
retainUncommittedData: default,
close: args.Close,
httpHeaders: args.HttpHeaders,
conditions: args.Conditions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ internal class DataLakeFileWriteStream : StorageWriteStream
{
private readonly DataLakeFileClient _fileClient;
private readonly DataLakeRequestConditions _conditions;
private readonly bool? _closeEvent;
private long _writeIndex;

public DataLakeFileWriteStream(
DataLakeFileClient fileClient,
long bufferSize,
long position,
DataLakeRequestConditions conditions,
IProgress<long> progressHandler) : base(
IProgress<long> progressHandler,
bool? closeEvent) : base(
position,
bufferSize,
progressHandler)
Expand All @@ -29,6 +31,7 @@ public DataLakeFileWriteStream(
_fileClient = fileClient;
_conditions = conditions ?? new DataLakeRequestConditions();
_writeIndex = position;
_closeEvent = closeEvent;
}

protected override async Task AppendInternal(bool async, CancellationToken cancellationToken)
Expand Down Expand Up @@ -59,7 +62,7 @@ protected override async Task FlushInternal(bool async, CancellationToken cancel
Response<PathInfo> response = await _fileClient.FlushInternal(
position: _writeIndex,
retainUncommittedData: default,
close: default,
close: _closeEvent,
httpHeaders: default,
conditions: _conditions,
async: async,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,17 @@ public class DataLakeFileOpenWriteOptions
/// progress updates about data transfers.
/// </summary>
public IProgress<long> ProgressHandler { get; set; }

/// <summary>
/// Azure Storage Events allow applications to receive notifications when files change. When Azure Storage Events are enabled,
/// a file changed event is raised. This event has a property indicating whether this is the final change to distinguish the
/// difference between an intermediate flush to a file stream and the final close of a file stream. The close query parameter
/// is valid only when the action is "flush" and change notifications are enabled. If the value of close is "true" and the
/// flush operation completes successfully, the service raises a file change notification with a property indicating that
/// this is the final update (the file stream has been closed). If "false" a change notification is raised indicating the
/// file has changed. The default is false. This query parameter is set to true by the Hadoop ABFS driver to indicate that
/// the file stream has been closed.
/// </summary>
public bool? Close { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,6 @@ public class DataLakeFileUploadOptions
/// </summary>
public IProgress<long> ProgressHandler { get; set; }

/// <summary>
/// If "true", uncommitted data is retained after the flush operation completes; otherwise, the uncommitted data is deleted
/// after the flush operation. The default is false. Data at offsets less than the specified position are written to the
/// file when flush succeeds, but this optional parameter allows data after the flush position to be retained for a future
/// flush operation.
/// </summary>
public bool? RetainUncommittedData { get; set; }

/// <summary>
/// Azure Storage Events allow applications to receive notifications when files change. When Azure Storage Events are enabled,
/// a file changed event is raised. This event has a property indicating whether this is the final change to distinguish the
Expand Down
29 changes: 27 additions & 2 deletions sdk/storage/Azure.Storage.Files.DataLake/tests/FileClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2878,7 +2878,7 @@ await file.UploadAsync(
}

[Test]
public async Task UploadAsync_CloseAndRetainData()
public async Task UploadAsync_Close()
{
// Arrange
await using DisposingFileSystem test = await GetNewFileSystem();
Expand All @@ -2889,7 +2889,6 @@ public async Task UploadAsync_CloseAndRetainData()
DataLakeFileUploadOptions options = new DataLakeFileUploadOptions
{
Close = true,
RetainUncommittedData = true
};

// Act
Expand Down Expand Up @@ -4201,6 +4200,7 @@ await TestHelper.AssertExpectedExceptionAsync<RequestFailedException>(
openWriteStream.FlushAsync(),
e => Assert.AreEqual("ConditionNotMet", e.ErrorCode));
}

[Test]
public async Task OpenWriteAsync_ProgressReporting()
{
Expand Down Expand Up @@ -4337,5 +4337,30 @@ await file.OpenWriteAsync(
});
}
}

[Test]
public async Task OpenWriteAsync_Close()
{
// Arrange
await using DisposingFileSystem test = await GetNewFileSystem();
DataLakeFileClient file = InstrumentClient(test.FileSystem.GetFileClient(GetNewFileName()));
await file.CreateAsync();

byte[] data = GetRandomBuffer(Constants.KB);
using Stream stream = new MemoryStream(data);

DataLakeFileOpenWriteOptions options = new DataLakeFileOpenWriteOptions
{
Close = true,
BufferSize = 256
};

// Act
Stream openWriteStream = await file.OpenWriteAsync(
overwrite: false,
options);
await stream.CopyToAsync(openWriteStream);
await openWriteStream.FlushAsync();
}
}
}
Loading

0 comments on commit 0d53cf5

Please sign in to comment.