Skip to content

Commit

Permalink
Merge pull request #48230 from dotnet/merges/release/dev16.8-to-master
Browse files Browse the repository at this point in the history
Merge release/dev16.8 to master
  • Loading branch information
msftbot[bot] authored Oct 2, 2020
2 parents 2cd81a1 + 9de2d24 commit 2e37d69
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 82 deletions.
15 changes: 7 additions & 8 deletions azure-pipelines-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@ jobs:
name: NetCorePublic-Pool
queue: buildpool.windows.10.amd64.vs2019.pre.open
strategy:
maxParallel: 2
maxParallel: 4
matrix:
debug_32:
_configuration: Debug
_oop64bit: false
# 64-bit disabled for https://github.com/dotnet/roslyn/issues/40476
# debug_64:
# _configuration: Debug
# _oop64bit: true
debug_64:
_configuration: Debug
_oop64bit: true
release_32:
_configuration: Release
_oop64bit: false
# release_64:
# _configuration: Release
# _oop64bit: true
release_64:
_configuration: Release
_oop64bit: true
timeoutInMinutes: 135

steps:
Expand Down
9 changes: 7 additions & 2 deletions src/Workspaces/Remote/Core/BrokeredServiceConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ internal static async ValueTask<TResult> InvokeStreamingServiceAsync<TResult>(
Func<PipeReader, CancellationToken, ValueTask<TResult>> reader,
CancellationToken cancellationToken)
{
// We can cancel at entry, but once the pipe operations are scheduled we rely on both operations running to
// avoid deadlocks (the exception handler in 'writerTask' ensures progress is made in 'readerTask').
cancellationToken.ThrowIfCancellationRequested();
var mustNotCancelToken = CancellationToken.None;

var pipe = new Pipe();

// Create new tasks that both start executing, rather than invoking the delegates directly
Expand All @@ -168,7 +173,7 @@ internal static async ValueTask<TResult> InvokeStreamingServiceAsync<TResult>(
throw;
}
}, cancellationToken);
}, mustNotCancelToken);

var readerTask = Task.Run(
async () =>
Expand All @@ -187,7 +192,7 @@ internal static async ValueTask<TResult> InvokeStreamingServiceAsync<TResult>(
{
await pipe.Reader.CompleteAsync(exception).ConfigureAwait(false);
}
}, cancellationToken);
}, mustNotCancelToken);

await Task.WhenAll(writerTask, readerTask).ConfigureAwait(false);

Expand Down
34 changes: 14 additions & 20 deletions src/Workspaces/Remote/Core/RemoteCallback.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using MessagePack;
using Microsoft.CodeAnalysis.ErrorReporting;
using Nerdbank.Streams;
using Newtonsoft.Json;
using Roslyn.Utilities;
using StreamJsonRpc;

Expand All @@ -28,12 +25,9 @@ internal readonly struct RemoteCallback<T>
{
private readonly T _callback;

public readonly CancellationTokenSource ClientDisconnectedSource;

public RemoteCallback(T callback, CancellationTokenSource clientDisconnectedSource)
public RemoteCallback(T callback)
{
_callback = callback;
ClientDisconnectedSource = clientDisconnectedSource;
}

public async ValueTask InvokeAsync(Func<T, CancellationToken, ValueTask> invocation, CancellationToken cancellationToken)
Expand All @@ -44,7 +38,7 @@ public async ValueTask InvokeAsync(Func<T, CancellationToken, ValueTask> invocat
}
catch (Exception exception) when (ReportUnexpectedException(exception, cancellationToken))
{
throw OnUnexpectedException(cancellationToken);
throw OnUnexpectedException(exception, cancellationToken);
}
}

Expand All @@ -56,7 +50,7 @@ public async ValueTask<TResult> InvokeAsync<TResult>(Func<T, CancellationToken,
}
catch (Exception exception) when (ReportUnexpectedException(exception, cancellationToken))
{
throw OnUnexpectedException(cancellationToken);
throw OnUnexpectedException(exception, cancellationToken);
}
}

Expand All @@ -74,7 +68,7 @@ public async ValueTask<TResult> InvokeAsync<TResult>(
}
catch (Exception exception) when (ReportUnexpectedException(exception, cancellationToken))
{
throw OnUnexpectedException(cancellationToken);
throw OnUnexpectedException(exception, cancellationToken);
}
}

Expand All @@ -85,7 +79,7 @@ public async ValueTask<TResult> InvokeAsync<TResult>(
// 3) Remote exception - an exception was thrown by the callee
// 4) Cancelation
//
private bool ReportUnexpectedException(Exception exception, CancellationToken cancellationToken)
private static bool ReportUnexpectedException(Exception exception, CancellationToken cancellationToken)
{
if (exception is IOException)
{
Expand All @@ -97,14 +91,10 @@ private bool ReportUnexpectedException(Exception exception, CancellationToken ca
{
if (cancellationToken.IsCancellationRequested)
{
// Cancellation was requested and expected
return false;
}

// It is not guaranteed that RPC only throws OCE when our token is signaled.
// Signal the cancelation source that our token is linked to and throw new cancellation
// exception in OnUnexpectedException.
ClientDisconnectedSource.Cancel();

return true;
}

Expand All @@ -116,20 +106,24 @@ private bool ReportUnexpectedException(Exception exception, CancellationToken ca
// as any observation of ConnectionLostException indicates a bug (e.g. https://github.com/microsoft/vs-streamjsonrpc/issues/549).
if (exception is ConnectionLostException)
{
ClientDisconnectedSource.Cancel();

return true;
}

// Indicates bug on client side or in serialization, report NFW and propagate the exception.
return FatalError.ReportWithoutCrashAndPropagate(exception);
}

private static Exception OnUnexpectedException(CancellationToken cancellationToken)
private static Exception OnUnexpectedException(Exception exception, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

// If this is hit the cancellation token passed to the service implementation did not use the correct token.
if (exception is ConnectionLostException)
{
throw new OperationCanceledException(exception.Message, exception);
}

// If this is hit the cancellation token passed to the service implementation did not use the correct token,
// and the resulting exception was not a ConnectionLostException.
return ExceptionUtilities.Unreachable;
}
}
Expand Down
19 changes: 15 additions & 4 deletions src/Workspaces/Remote/Core/RemoteHostAssetSerialization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,13 @@ static void WriteAsset(ObjectWriter writer, ISerializerService serializer, Check
}
}

public static ValueTask<ImmutableArray<(Checksum, object)>> ReadDataAsync(PipeReader pipeReader, int scopeId, ISet<Checksum> checksums, ISerializerService serializerService, CancellationToken cancellationToken)
public static async ValueTask<ImmutableArray<(Checksum, object)>> ReadDataAsync(PipeReader pipeReader, int scopeId, ISet<Checksum> checksums, ISerializerService serializerService, CancellationToken cancellationToken)
{
// We can cancel at entry, but once the pipe operations are scheduled we rely on both operations running to
// avoid deadlocks (the exception handler in 'copyTask' ensures progress is made in the blocking read).
cancellationToken.ThrowIfCancellationRequested();
var mustNotCancelToken = CancellationToken.None;

// Workaround for ObjectReader not supporting async reading.
// Unless we read from the RPC stream asynchronously and with cancallation support we might hang when the server cancels.
// https://github.com/dotnet/roslyn/issues/47861
Expand All @@ -96,7 +101,7 @@ static void WriteAsset(ObjectWriter writer, ISerializerService serializer, Check
Exception? exception = null;

// start a task on a thread pool thread copying from the RPC pipe to a local pipe:
Task.Run(async () =>
var copyTask = Task.Run(async () =>
{
try
{
Expand All @@ -111,20 +116,26 @@ static void WriteAsset(ObjectWriter writer, ISerializerService serializer, Check
await localPipe.Writer.CompleteAsync(exception).ConfigureAwait(false);
await pipeReader.CompleteAsync(exception).ConfigureAwait(false);
}
}, cancellationToken).Forget();
}, mustNotCancelToken);

// blocking read from the local pipe on the current thread:
try
{
using var stream = localPipe.Reader.AsStream(leaveOpen: false);
return new(ReadData(stream, scopeId, checksums, serializerService, cancellationToken));
return ReadData(stream, scopeId, checksums, serializerService, cancellationToken);
}
catch (EndOfStreamException)
{
cancellationToken.ThrowIfCancellationRequested();

throw exception ?? ExceptionUtilities.Unreachable;
}
finally
{
// Make sure to complete the copy and pipes before returning, otherwise the caller could complete the
// reader and/or writer while they are still in use.
await copyTask.ConfigureAwait(false);
}
}

public static ImmutableArray<(Checksum, object)> ReadData(Stream stream, int scopeId, ISet<Checksum> checksums, ISerializerService serializerService, CancellationToken cancellationToken)
Expand Down
21 changes: 21 additions & 0 deletions src/Workspaces/Remote/Core/ServiceDescriptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System;
using System.IO.Pipelines;
using System.Reflection;
using MessagePack;
using MessagePack.Resolvers;
using Microsoft.ServiceHub.Framework;
Expand Down Expand Up @@ -76,6 +77,26 @@ protected override JsonRpcConnection CreateConnection(JsonRpc jsonRpc)
return connection;
}

public override ServiceRpcDescriptor WithMultiplexingStream(MultiplexingStream? multiplexingStream)
{
var baseResult = base.WithMultiplexingStream(multiplexingStream);
if (baseResult is ServiceDescriptor)
return baseResult;

// work around incorrect implementation in 16.8 Preview 2
if (MultiplexingStream == multiplexingStream)
return this;

var result = (ServiceDescriptor)Clone();
typeof(ServiceRpcDescriptor).GetProperty(nameof(MultiplexingStream))!.SetValue(result, multiplexingStream);
if (result.MultiplexingStreamOptions is null)
return result;

result = (ServiceDescriptor)result.Clone();
typeof(ServiceJsonRpcDescriptor).GetProperty(nameof(MultiplexingStreamOptions))!.SetValue(result, value: null);
return result;
}

internal static class TestAccessor
{
public static MessagePackSerializerOptions Options => s_options;
Expand Down
13 changes: 10 additions & 3 deletions src/Workspaces/Remote/Core/SolutionAssetProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,19 @@ public async ValueTask GetAssetsAsync(PipeWriter pipeWriter, int scopeId, Checks
assetMap = await assetStorage.GetAssetsAsync(scopeId, checksums, cancellationToken).ConfigureAwait(false);
}

// We can cancel early, but once the pipe operations are scheduled we rely on both operations running to
// avoid deadlocks (the exception handler in 'task1' ensures progress is made in 'task2').
cancellationToken.ThrowIfCancellationRequested();
var mustNotCancelToken = CancellationToken.None;

// Work around the lack of async stream writing in ObjectWriter, which is required when writing to the RPC pipe.
// Run two tasks - the first synchronously writes to a local pipe and the second asynchronosly transfers the data to the RPC pipe.
//
// Configure the pipe to never block on write (waiting for the reader to read). This prevents deadlocks but might result in more
// (non-contiguous) memory allocated for the underlying buffers. The amount of memory is bounded by the total size of the serialized assets.
var localPipe = new Pipe(RemoteHostAssetSerialization.PipeOptionsWithUnlimitedWriterBuffer);

Task.Run(() =>
var task1 = Task.Run(() =>
{
try
{
Expand All @@ -69,12 +74,14 @@ public async ValueTask GetAssetsAsync(PipeWriter pipeWriter, int scopeId, Checks
{
// no-op
}
}, cancellationToken).Forget();
}, mustNotCancelToken);

// Complete RPC once we send the initial piece of data and start waiting for the writer to send more,
// so the client can start reading from the stream. Once CopyPipeDataAsync completes the pipeWriter
// the corresponding client-side pipeReader will complete and the data transfer will be finished.
CopyPipeDataAsync().Forget();
var task2 = CopyPipeDataAsync();

await Task.WhenAll(task1, task2).ConfigureAwait(false);

async Task CopyPipeDataAsync()
{
Expand Down
8 changes: 3 additions & 5 deletions src/Workspaces/Remote/ServiceHub/Host/SolutionAssetSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ namespace Microsoft.CodeAnalysis.Remote
internal sealed class SolutionAssetSource : IAssetSource
{
private readonly ServiceBrokerClient _client;
private readonly CancellationTokenSource _clientDisconnectedSource;

public SolutionAssetSource(ServiceBrokerClient client, CancellationTokenSource clientDisconnectedSource)
public SolutionAssetSource(ServiceBrokerClient client)
{
_client = client;
_clientDisconnectedSource = clientDisconnectedSource;
}

public async ValueTask<ImmutableArray<(Checksum, object)>> GetAssetsAsync(int scopeId, ISet<Checksum> checksums, ISerializerService serializerService, CancellationToken cancellationToken)
Expand All @@ -33,7 +31,7 @@ public SolutionAssetSource(ServiceBrokerClient client, CancellationTokenSource c
using var provider = await _client.GetProxyAsync<ISolutionAssetProvider>(SolutionAssetProvider.ServiceDescriptor, cancellationToken).ConfigureAwait(false);
Contract.ThrowIfNull(provider.Proxy);

return await new RemoteCallback<ISolutionAssetProvider>(provider.Proxy, _clientDisconnectedSource).InvokeAsync(
return await new RemoteCallback<ISolutionAssetProvider>(provider.Proxy).InvokeAsync(
(proxy, pipeWriter, cancellationToken) => proxy.GetAssetsAsync(pipeWriter, scopeId, checksums.ToArray(), cancellationToken),
(pipeReader, cancellationToken) => RemoteHostAssetSerialization.ReadDataAsync(pipeReader, scopeId, checksums, serializerService, cancellationToken),
cancellationToken).ConfigureAwait(false);
Expand All @@ -47,7 +45,7 @@ public async ValueTask<bool> IsExperimentEnabledAsync(string experimentName, Can
using var provider = await _client.GetProxyAsync<ISolutionAssetProvider>(SolutionAssetProvider.ServiceDescriptor, cancellationToken).ConfigureAwait(false);
Contract.ThrowIfNull(provider.Proxy);

return await new RemoteCallback<ISolutionAssetProvider>(provider.Proxy, _clientDisconnectedSource).InvokeAsync(
return await new RemoteCallback<ISolutionAssetProvider>(provider.Proxy).InvokeAsync(
(self, cancellationToken) => provider.Proxy.IsExperimentEnabledAsync(experimentName, cancellationToken),
cancellationToken).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Diagnostics;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceHub.Framework;
using Microsoft.ServiceHub.Framework.Services;
Expand Down Expand Up @@ -72,7 +71,7 @@ internal TService Create(
var serviceHubTraceSource = (TraceSource)hostProvidedServices.GetService(typeof(TraceSource));
var serverConnection = descriptor.WithTraceSource(serviceHubTraceSource).ConstructRpcConnection(pipe);

var args = new ServiceConstructionArguments(hostProvidedServices, serviceBroker, new CancellationTokenSource());
var args = new ServiceConstructionArguments(hostProvidedServices, serviceBroker);
var service = CreateService(args, descriptor, serverConnection, serviceActivationOptions.ClientRpcTarget);

serverConnection.AddLocalRpcTarget(service);
Expand Down Expand Up @@ -104,7 +103,7 @@ protected sealed override TService CreateService(
{
Contract.ThrowIfNull(descriptor.ClientInterface);
var callback = (TCallback)(clientRpcTarget ?? serverConnection.ConstructRpcClient(descriptor.ClientInterface));
return CreateService(arguments, new RemoteCallback<TCallback>(callback, arguments.ClientDisconnectedSource));
return CreateService(arguments, new RemoteCallback<TCallback>(callback));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Threading;
using Microsoft.ServiceHub.Framework;

namespace Microsoft.CodeAnalysis.Remote
Expand All @@ -14,13 +13,11 @@ internal readonly struct ServiceConstructionArguments
{
public readonly IServiceProvider ServiceProvider;
public readonly IServiceBroker ServiceBroker;
public readonly CancellationTokenSource ClientDisconnectedSource;

public ServiceConstructionArguments(IServiceProvider serviceProvider, IServiceBroker serviceBroker, CancellationTokenSource clientDisconnectedSource)
public ServiceConstructionArguments(IServiceProvider serviceProvider, IServiceBroker serviceBroker)
{
ServiceProvider = serviceProvider;
ServiceBroker = serviceBroker;
ClientDisconnectedSource = clientDisconnectedSource;
}
}
}
Expand Down
Loading

0 comments on commit 2e37d69

Please sign in to comment.