Skip to content

Commit

Permalink
Transmit "hasMoreBatches" info in the enumeration
Browse files Browse the repository at this point in the history
  • Loading branch information
roji committed Mar 2, 2022
1 parent b7f6ec6 commit 711c567
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2189,7 +2189,7 @@ private IEnumerable<MigrationOperation> GetDataOperations(
var commandBatches = new CommandBatchPreparer(CommandBatchPreparerDependencies)
.BatchCommands(entries, updateAdapter);

foreach (var commandBatch in commandBatches)
foreach (var (commandBatch, _) in commandBatches)
{
InsertDataOperation? batchInsertOperation = null;
foreach (var command in commandBatch.ModificationCommands)
Expand Down
12 changes: 8 additions & 4 deletions src/EFCore.Relational/Update/IBatchExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,21 @@ public interface IBatchExecutor
/// <summary>
/// Executes the commands in the batches against the given database connection.
/// </summary>
/// <param name="commandBatches">The batches to execute.</param>
/// <param name="commandBatches">
/// A list of value tuples, each of which contains a batch to execute, and whether more batches are available.
/// </param>
/// <param name="connection">The database connection to use.</param>
/// <returns>The total number of rows affected.</returns>
int Execute(
IEnumerable<ModificationCommandBatch> commandBatches,
IEnumerable<(ModificationCommandBatch Batch, bool HasMore)> commandBatches,
IRelationalConnection connection);

/// <summary>
/// Executes the commands in the batches against the given database connection.
/// </summary>
/// <param name="commandBatches">The batches to execute.</param>
/// <param name="commandBatches">
/// A list of value tuples, each of which contains a batch to execute, and whether more batches are available.
/// </param>
/// <param name="connection">The database connection to use.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> to observe while waiting for the task to complete.</param>
/// <returns>
Expand All @@ -47,7 +51,7 @@ int Execute(
/// </returns>
/// <exception cref="OperationCanceledException">If the <see cref="CancellationToken" /> is canceled.</exception>
Task<int> ExecuteAsync(
IEnumerable<ModificationCommandBatch> commandBatches,
IEnumerable<(ModificationCommandBatch Batch, bool HasMore)> commandBatches,
IRelationalConnection connection,
CancellationToken cancellationToken = default);
}
6 changes: 2 additions & 4 deletions src/EFCore.Relational/Update/ICommandBatchPreparer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public interface ICommandBatchPreparer
/// </summary>
/// <param name="entries">The entries that represent the entities to be modified.</param>
/// <param name="updateAdapter">The model data.</param>
/// <returns>The list of batches to execute.</returns>
IEnumerable<ModificationCommandBatch> BatchCommands(
IList<IUpdateEntry> entries,
IUpdateAdapter updateAdapter);
/// <returns>A list of value tuples, each of which contains a batch to execute, and whether more batches are available.</returns>
IEnumerable<(ModificationCommandBatch Batch, bool HasMore)> BatchCommands(IList<IUpdateEntry> entries, IUpdateAdapter updateAdapter);
}
36 changes: 16 additions & 20 deletions src/EFCore.Relational/Update/Internal/BatchExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public BatchExecutor(
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public virtual int Execute(
IEnumerable<ModificationCommandBatch> commandBatches,
IEnumerable<(ModificationCommandBatch Batch, bool HasMore)> commandBatches,
IRelationalConnection connection)
{
using var batchEnumerator = commandBatches.GetEnumerator();
Expand All @@ -59,8 +59,7 @@ public virtual int Execute(
return 0;
}

var currentBatch = batchEnumerator.Current;
var nextBatch = batchEnumerator.MoveNext() ? batchEnumerator.Current : null;
var (batch, hasMoreBatches) = batchEnumerator.Current;

var rowsAffected = 0;
var transaction = connection.CurrentTransaction;
Expand All @@ -74,7 +73,7 @@ public virtual int Execute(
&& transactionEnlistManager?.CurrentAmbientTransaction is null
&& CurrentContext.Context.Database.AutoTransactionsEnabled
// Don't start a transaction if we have a single batch which doesn't require a transaction (single command), for perf.
&& (nextBatch is not null || currentBatch.RequiresTransaction))
&& (hasMoreBatches || batch.RequiresTransaction))
{
transaction = connection.BeginTransaction();
beganTransaction = true;
Expand All @@ -91,14 +90,13 @@ public virtual int Execute(
}
}

while (currentBatch is not null)
do
{
currentBatch.Execute(connection);
rowsAffected += currentBatch.ModificationCommands.Count;

currentBatch = nextBatch;
nextBatch = batchEnumerator.MoveNext() ? batchEnumerator.Current : null;
batch = batchEnumerator.Current.Batch;
batch.Execute(connection);
rowsAffected += batch.ModificationCommands.Count;
}
while (batchEnumerator.MoveNext());

if (beganTransaction)
{
Expand Down Expand Up @@ -158,7 +156,7 @@ public virtual int Execute(
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public virtual async Task<int> ExecuteAsync(
IEnumerable<ModificationCommandBatch> commandBatches,
IEnumerable<(ModificationCommandBatch Batch, bool HasMore)> commandBatches,
IRelationalConnection connection,
CancellationToken cancellationToken = default)
{
Expand All @@ -169,8 +167,7 @@ public virtual async Task<int> ExecuteAsync(
return 0;
}

var currentBatch = batchEnumerator.Current;
var nextBatch = batchEnumerator.MoveNext() ? batchEnumerator.Current : null;
var (batch, hasMoreBatches) = batchEnumerator.Current;

var rowsAffected = 0;
var transaction = connection.CurrentTransaction;
Expand All @@ -184,7 +181,7 @@ public virtual async Task<int> ExecuteAsync(
&& transactionEnlistManager?.CurrentAmbientTransaction is null
&& CurrentContext.Context.Database.AutoTransactionsEnabled
// Don't start a transaction if we have a single batch which doesn't require a transaction (single command), for perf.
&& (nextBatch is not null || currentBatch.RequiresTransaction))
&& (hasMoreBatches || batch.RequiresTransaction))
{
transaction = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
beganTransaction = true;
Expand All @@ -201,14 +198,13 @@ public virtual async Task<int> ExecuteAsync(
}
}

while (currentBatch is not null)
do
{
await currentBatch.ExecuteAsync(connection, cancellationToken).ConfigureAwait(false);
rowsAffected += currentBatch.ModificationCommands.Count;

currentBatch = nextBatch;
nextBatch = batchEnumerator.MoveNext() ? batchEnumerator.Current : null;
batch = batchEnumerator.Current.Batch;
await batch.ExecuteAsync(connection, cancellationToken).ConfigureAwait(false);
rowsAffected += batch.ModificationCommands.Count;
}
while (batchEnumerator.MoveNext());

if (beganTransaction)
{
Expand Down
22 changes: 13 additions & 9 deletions src/EFCore.Relational/Update/Internal/CommandBatchPreparer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,18 @@ public CommandBatchPreparer(CommandBatchPreparerDependencies dependencies)
/// any release. You should only use it directly in your code with extreme caution and knowing that
/// doing so can result in application failures when updating to a new Entity Framework Core release.
/// </summary>
public virtual IEnumerable<ModificationCommandBatch> BatchCommands(
public virtual IEnumerable<(ModificationCommandBatch Batch, bool HasMore)> BatchCommands(
IList<IUpdateEntry> entries,
IUpdateAdapter updateAdapter)
{
var parameterNameGenerator = Dependencies.ParameterNameGeneratorFactory.Create();
var commands = CreateModificationCommands(entries, updateAdapter, parameterNameGenerator.GenerateNext);
var sortedCommandSets = TopologicalSort(commands);

foreach (var independentCommandSet in sortedCommandSets)
for (var commandSetIndex = 0; commandSetIndex < sortedCommandSets.Count; commandSetIndex++)
{
var independentCommandSet = sortedCommandSets[commandSetIndex];

independentCommandSet.Sort(Dependencies.ModificationCommandComparer);

var batch = Dependencies.ModificationCommandBatchFactory.Create();
Expand All @@ -85,7 +87,7 @@ public virtual IEnumerable<ModificationCommandBatch> BatchCommands(

batch.Complete();

yield return batch;
yield return (batch, true);
}
else
{
Expand All @@ -97,14 +99,16 @@ public virtual IEnumerable<ModificationCommandBatch> BatchCommands(
batch = StartNewBatch(parameterNameGenerator, command);
batch.Complete();

yield return batch;
yield return (batch, true);
}
}

batch = StartNewBatch(parameterNameGenerator, modificationCommand);
}
}

var hasMoreCommandSets = commandSetIndex < sortedCommandSets.Count - 1;

if (batch.ModificationCommands.Count == 1
|| batch.ModificationCommands.Count >= _minBatchSize)
{
Expand All @@ -116,19 +120,19 @@ public virtual IEnumerable<ModificationCommandBatch> BatchCommands(

batch.Complete();

yield return batch;
yield return (batch, hasMoreCommandSets);
}
else
{
Dependencies.UpdateLogger.BatchSmallerThanMinBatchSize(
batch.ModificationCommands.SelectMany(c => c.Entries), batch.ModificationCommands.Count, _minBatchSize);

foreach (var command in batch.ModificationCommands)
for (var commandIndex = 0; commandIndex < batch.ModificationCommands.Count; commandIndex++)
{
batch = StartNewBatch(parameterNameGenerator, command);
batch.Complete();
var singleCommandBatch = StartNewBatch(parameterNameGenerator, batch.ModificationCommands[commandIndex]);
singleCommandBatch.Complete();

yield return batch;
yield return (singleCommandBatch, hasMoreCommandSets || commandIndex < batch.ModificationCommands.Count - 1);
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions test/EFCore.Relational.Tests/Update/BatchExecutorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ public async Task ExecuteAsync_calls_Commit_if_no_transaction(bool async)
using var context = new TestContext();
var connection = SetupConnection(context);

context.Add(
new Foo { Id = "1" });
context.Add(new Foo { Id = "1" });
context.Add(new Bar { Id = "1" });

if (async)
{
Expand Down Expand Up @@ -83,10 +83,16 @@ public TestContext()
}

public DbSet<Foo> Foos { get; set; }
public DbSet<Bar> Bars { get; set; }
}

private class Foo
{
public string Id { get; set; }
}

private class Bar
{
public string Id { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,7 @@ public List<ModificationCommandBatch> CreateBatches(
bool sensitiveLogging = false)
=> CreateCommandBatchPreparer(updateAdapter: updateAdapter, sensitiveLogging: sensitiveLogging)
.BatchCommands(entries, updateAdapter)
.Select(t => t.Batch)
.ToList();

public ICommandBatchPreparer CreateCommandBatchPreparer(
Expand Down

0 comments on commit 711c567

Please sign in to comment.