File: SolutionAssetProvider.cs
Web Access
Project: ..\..\..\src\Workspaces\Remote\Core\Microsoft.CodeAnalysis.Remote.Workspaces.csproj (Microsoft.CodeAnalysis.Remote.Workspaces)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.ErrorReporting;
using Microsoft.CodeAnalysis.Host;
using Microsoft.CodeAnalysis.Serialization;
using Microsoft.VisualStudio.Threading;
using Roslyn.Utilities;
 
namespace Microsoft.CodeAnalysis.Remote
{
    /// <summary>
    /// Provides solution assets present locally (in the current process) to a remote process where the solution is being replicated to.
    /// </summary>
    internal sealed class SolutionAssetProvider : ISolutionAssetProvider
    {
        public const string ServiceName = "SolutionAssetProvider";
 
        internal static ServiceDescriptor ServiceDescriptor { get; } = ServiceDescriptor.CreateInProcServiceDescriptor(ServiceDescriptors.ComponentName, ServiceName, suffix: "", ServiceDescriptors.GetFeatureDisplayName);
 
        private readonly SolutionServices _services;
 
        public SolutionAssetProvider(SolutionServices services)
        {
            _services = services;
        }
 
        public async ValueTask GetAssetsAsync(PipeWriter pipeWriter, Checksum solutionChecksum, Checksum[] checksums, CancellationToken cancellationToken)
        {
            // The responsibility is on us (as per the requirements of RemoteCallback.InvokeAsync) to Complete the
            // pipewriter.  This will signal to streamjsonrpc that the writer passed into it is complete, which will
            // allow the calling side know to stop reading results.
            Exception? exception = null;
            try
            {
                await GetAssetsWorkerAsync(pipeWriter, solutionChecksum, checksums, cancellationToken).ConfigureAwait(false);
            }
            catch (Exception ex) when ((exception = ex) == null)
            {
                throw ExceptionUtilities.Unreachable();
            }
            finally
            {
                await pipeWriter.CompleteAsync(exception).ConfigureAwait(false);
            }
        }
 
        private async ValueTask GetAssetsWorkerAsync(PipeWriter pipeWriter, Checksum solutionChecksum, Checksum[] checksums, CancellationToken cancellationToken)
        {
            var assetStorage = _services.GetRequiredService<ISolutionAssetStorageProvider>().AssetStorage;
            var serializer = _services.GetRequiredService<ISerializerService>();
            var scope = assetStorage.GetScope(solutionChecksum);
 
            SolutionAsset? singleAsset = null;
            IReadOnlyDictionary<Checksum, SolutionAsset>? assetMap = null;
 
            if (checksums.Length == 1)
            {
                singleAsset = await scope.GetAssetAsync(checksums[0], cancellationToken).ConfigureAwait(false);
            }
            else
            {
                assetMap = await scope.GetAssetsAsync(checksums, cancellationToken).ConfigureAwait(false);
            }
 
            cancellationToken.ThrowIfCancellationRequested();
 
            using var stream = new PipeWriterStream(pipeWriter);
            await RemoteHostAssetSerialization.WriteDataAsync(
                stream, singleAsset, assetMap, serializer, scope.ReplicationContext,
                solutionChecksum, checksums, cancellationToken).ConfigureAwait(false);
 
            // Ensure any last data written into the stream makes it into the pipe.
            await stream.FlushAsync(cancellationToken).ConfigureAwait(false);
        }
 
        /// <summary>
        /// Simple port of
        /// https://github.com/AArnott/Nerdbank.Streams/blob/dafeb5846702bc29e261c9ddf60f42feae01654c/src/Nerdbank.Streams/BufferWriterStream.cs#L16.
        /// Wraps a <see cref="PipeWriter"/> in a <see cref="Stream"/> interface.  Preferred over <see
        /// cref="PipeWriter.AsStream(bool)"/> as that API produces a stream that will synchronously flush after
        /// <em>every</em> write.  That's undesirable as that will then block a thread pool thread on the actual
        /// asynchronous flush call to the underlying PipeWriter
        /// </summary>
        /// <remarks>
        /// Note: this stream does not have to <see cref="PipeWriter.Complete"/> the underlying <see cref="_writer"/> it
        /// is holding onto (including within <see cref="Flush"/>, <see cref="FlushAsync"/>, or <see cref="Dispose"/>).
        /// Responsibility for that is solely in the hands of <see cref="GetAssetsAsync"/>.
        /// </remarks>
        private class PipeWriterStream : Stream, IDisposableObservable
        {
            private readonly PipeWriter _writer;
 
            public bool IsDisposed { get; private set; }
 
            public override bool CanRead => false;
            public override bool CanSeek => false;
            public override bool CanWrite => !this.IsDisposed;
 
            internal PipeWriterStream(PipeWriter writer)
            {
                _writer = writer;
            }
 
            protected override void Dispose(bool disposing)
            {
                this.IsDisposed = true;
                base.Dispose(disposing);
 
                // DO NOT CALL .Complete on the PipeWriter here (see remarks on type).
            }
 
            private Exception ThrowDisposedOr(Exception ex)
            {
                Verify.NotDisposed(this);
                throw ex;
            }
 
            /// <summary>
            /// Intentionally a no op. We know that we and <see cref="RemoteHostAssetSerialization.WriteDataAsync"/>
            /// will call <see cref="FlushAsync"/> at appropriate times to ensure data is being sent through the writer
            /// at a reasonable cadence (once per asset).
            /// </summary>
            public override void Flush()
            {
                Verify.NotDisposed(this);
 
                // DO NOT CALL .Complete on the PipeWriter here (see remarks on type).
            }
 
            public override async Task FlushAsync(CancellationToken cancellationToken)
            {
                await _writer.FlushAsync(cancellationToken).ConfigureAwait(false);
 
                // DO NOT CALL .Complete on the PipeWriter here (see remarks on type).
            }
 
            public override void Write(byte[] buffer, int offset, int count)
            {
                Requires.NotNull(buffer, nameof(buffer));
                Verify.NotDisposed(this);
 
                var span = _writer.GetSpan(count);
                buffer.AsSpan(offset, count).CopyTo(span);
                _writer.Advance(count);
            }
 
            public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
            {
                cancellationToken.ThrowIfCancellationRequested();
                this.Write(buffer, offset, count);
                return Task.CompletedTask;
            }
 
            public override void WriteByte(byte value)
            {
                Verify.NotDisposed(this);
                var span = _writer.GetSpan(1);
                span[0] = value;
                _writer.Advance(1);
            }
 
#if !NETSTANDARD
 
            public override void Write(ReadOnlySpan<byte> buffer)
            {
                Verify.NotDisposed(this);
                var span = _writer.GetSpan(buffer.Length);
                buffer.CopyTo(span);
                _writer.Advance(buffer.Length);
            }
 
            public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken)
            {
                cancellationToken.ThrowIfCancellationRequested();
                this.Write(buffer.Span);
                return default;
            }
 
#endif
 
            #region read/seek api (not supported)
 
            public override long Length => throw this.ThrowDisposedOr(new NotSupportedException());
            public override long Position
            {
                get => throw this.ThrowDisposedOr(new NotSupportedException());
                set => this.ThrowDisposedOr(new NotSupportedException());
            }
 
            public override int Read(byte[] buffer, int offset, int count)
                => throw this.ThrowDisposedOr(new NotSupportedException());
 
            public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
                => throw this.ThrowDisposedOr(new NotSupportedException());
 
#if !NETSTANDARD
 
            public override int Read(Span<byte> buffer)
                => throw this.ThrowDisposedOr(new NotSupportedException());
 
            public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken)
                => throw this.ThrowDisposedOr(new NotSupportedException());
 
#endif
 
            public override int ReadByte()
                => throw this.ThrowDisposedOr(new NotSupportedException());
 
            public override long Seek(long offset, SeekOrigin origin)
                => throw this.ThrowDisposedOr(new NotSupportedException());
 
            public override void SetLength(long value)
                => this.ThrowDisposedOr(new NotSupportedException());
 
            #endregion
        }
    }
}