File: ServerDispatcher.cs
Web Access
Project: ..\..\..\src\Compilers\Server\VBCSCompiler\AnyCpu\VBCSCompiler.csproj (VBCSCompiler)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Globalization;
using Microsoft.CodeAnalysis.CommandLine;
using Microsoft.CodeAnalysis.Symbols;
using Microsoft.CodeAnalysis.CSharp;
namespace Microsoft.CodeAnalysis.CompilerServer
{
    /// <summary>
    /// This class manages the connections, timeout and general scheduling of the client
    /// requests.
    /// </summary>
    internal sealed class ServerDispatcher
    {
        private enum State
        {
            /// <summary>
            /// Server running and accepting all requests
            /// </summary>
            Running,
 
            /// <summary>
            /// Server is in the process of shutting down. New connections will not be accepted.
            /// </summary>
            ShuttingDown,
 
            /// <summary>
            /// Server is done.
            /// </summary>
            Completed,
        }
 
        /// <summary>
        /// Default time the server will stay alive after the last request disconnects.
        /// </summary>
        internal static readonly TimeSpan DefaultServerKeepAlive = TimeSpan.FromMinutes(10);
 
        /// <summary>
        /// Time to delay after the last connection before initiating a garbage collection
        /// in the server.
        /// </summary>
        internal static readonly TimeSpan GCTimeout = TimeSpan.FromSeconds(30);
 
        private readonly ICompilerServerHost _compilerServerHost;
        private readonly ICompilerServerLogger _logger;
        private readonly IClientConnectionHost _clientConnectionHost;
        private readonly IDiagnosticListener _diagnosticListener;
        private State _state;
        private Task? _timeoutTask;
        private Task? _gcTask;
        private Task<IClientConnection>? _listenTask;
        private readonly List<Task<CompletionData>> _connectionList = new List<Task<CompletionData>>();
        private TimeSpan? _keepAlive;
        private bool _keepAliveIsDefault;
 
        internal ServerDispatcher(ICompilerServerHost compilerServerHost, IClientConnectionHost clientConnectionHost, IDiagnosticListener? diagnosticListener = null)
        {
            _compilerServerHost = compilerServerHost;
            _logger = compilerServerHost.Logger;
            _clientConnectionHost = clientConnectionHost;
            _diagnosticListener = diagnosticListener ?? new EmptyDiagnosticListener();
        }
 
        /// <summary>
        /// This function will accept and process new connections until an event causes
        /// the server to enter a passive shut down mode.  For example if analyzers change
        /// or the keep alive timeout is hit.  At which point this function will cease
        /// accepting new connections and wait for existing connections to complete before
        /// returning.
        /// </summary>
        public void ListenAndDispatchConnections(TimeSpan? keepAlive, CancellationToken cancellationToken = default)
        {
            _state = State.Running;
            _keepAlive = keepAlive;
            _keepAliveIsDefault = true;
 
            try
            {
                _clientConnectionHost.BeginListening();
                ListenAndDispatchConnectionsCore(cancellationToken);
            }
            finally
            {
                _state = State.Completed;
                _gcTask = null;
                _timeoutTask = null;
 
                if (_clientConnectionHost.IsListening)
                {
                    _clientConnectionHost.EndListening();
                }
 
                if (_listenTask is not null)
                {
                    // This type is responsible for cleaning up resources associated with _listenTask. Once EndListening
                    // is complete this task is guaranteed to be either completed or have a task scheduled to complete
                    // it. If it ran to completion we need to dispose of the value.
                    if (!_listenTask.IsCompleted)
                    {
                        // Wait for the task to complete
                        _listenTask.ContinueWith(_ => { }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default)
                            .Wait(CancellationToken.None);
                    }
 
                    if (_listenTask.Status == TaskStatus.RanToCompletion)
                    {
                        try
                        {
                            _listenTask.Result.Dispose();
                        }
                        catch (Exception ex)
                        {
                            _logger.LogException(ex, $"Error disposing of {nameof(_listenTask)}");
                        }
                    }
                }
            }
            _logger.Log($"End ListenAndDispatchConnections");
        }
 
        public void ListenAndDispatchConnectionsCore(CancellationToken cancellationToken)
        {
            do
            {
                MaybeCreateListenTask();
                MaybeCreateTimeoutTask();
                MaybeCreateGCTask();
                WaitForAnyCompletion(cancellationToken);
                CheckCompletedTasks(cancellationToken);
            } while (_connectionList.Count > 0 || _state == State.Running);
        }
 
        private void CheckCompletedTasks(CancellationToken cancellationToken)
        {
            if (cancellationToken.IsCancellationRequested)
            {
                ChangeToShuttingDown("Server cancellation");
                Debug.Assert(_gcTask is null);
                Debug.Assert(_timeoutTask is null);
            }
 
            if (_listenTask?.IsCompleted == true)
            {
                _diagnosticListener.ConnectionReceived();
                var connectionTask = ProcessClientConnectionAsync(
                    _compilerServerHost,
                    _listenTask,
                    allowCompilationRequests: _state == State.Running,
                    cancellationToken);
                _connectionList.Add(connectionTask);
 
                // Timeout and GC are only done when there are no active connections.  Now that we have a new
                // connection cancel out these tasks.
                _timeoutTask = null;
                _gcTask = null;
                _listenTask = null;
            }
 
            if (_timeoutTask?.IsCompleted == true)
            {
                _diagnosticListener.KeepAliveReached();
                ChangeToShuttingDown("Keep alive hit");
            }
 
            if (_gcTask?.IsCompleted == true)
            {
                RunGC();
            }
 
            HandleCompletedConnections();
        }
 
        /// <summary>
        /// The server farms out work to Task values and this method needs to wait until at least one of them
        /// has completed.
        /// </summary>
        private void WaitForAnyCompletion(CancellationToken cancellationToken)
        {
            var all = new List<Task>();
            all.AddRange(_connectionList);
            AddNonNull(_timeoutTask);
            AddNonNull(_listenTask);
            AddNonNull(_gcTask);
 
            try
            {
                Task.WaitAny(all.ToArray(), cancellationToken);
            }
            catch (OperationCanceledException)
            {
                // Thrown when the provided cancellationToken is cancelled.  This is handled in the caller,
                // here it just serves to break out of the WaitAny call.
            }
 
            void AddNonNull(Task? task)
            {
                if (task is object)
                {
                    all.Add(task);
                }
            }
        }
 
        private void ChangeToShuttingDown(string reason)
        {
            if (_state == State.ShuttingDown)
            {
                return;
            }
 
            _logger.Log($"Shutting down server: {reason}");
            Debug.Assert(_state == State.Running);
            Debug.Assert(_clientConnectionHost.IsListening);
 
            _state = State.ShuttingDown;
            _timeoutTask = null;
            _gcTask = null;
        }
 
        private void RunGC()
        {
            _gcTask = null;
            GC.GetTotalMemory(forceFullCollection: true);
        }
 
        private void MaybeCreateListenTask()
        {
            if (_listenTask is null)
            {
                _listenTask = _clientConnectionHost.GetNextClientConnectionAsync();
            }
        }
 
        private void MaybeCreateTimeoutTask()
        {
            // If there are no active clients running then the server needs to be in a timeout mode.
            if (_state == State.Running && _connectionList.Count == 0 && _timeoutTask is null && _keepAlive.HasValue)
            {
                Debug.Assert(_listenTask != null);
                _timeoutTask = Task.Delay(_keepAlive.Value);
            }
        }
 
        private void MaybeCreateGCTask()
        {
            if (_state == State.Running && _connectionList.Count == 0 && _gcTask is null)
            {
                _gcTask = Task.Delay(GCTimeout);
            }
        }
 
        /// <summary>
        /// Checks the completed connection objects and updates the server state based on their 
        /// results.
        /// </summary>
        private void HandleCompletedConnections()
        {
            var shutdown = false;
            var i = 0;
            while (i < _connectionList.Count)
            {
                var current = _connectionList[i];
                if (!current.IsCompleted)
                {
                    i++;
                    continue;
                }
 
                _connectionList.RemoveAt(i);
 
                // These task should never fail. Unexpected errors will be caught and translated into
                // a RequestError message
                Debug.Assert(current.Status == TaskStatus.RanToCompletion);
                var completionData = current.Result;
                switch (completionData.Reason)
                {
                    case CompletionReason.RequestCompleted:
                        _logger.Log("Client request completed");
 
                        if (completionData.NewKeepAlive is { } keepAlive)
                        {
                            _logger.Log($"Client changed keep alive to {keepAlive}");
                            ChangeKeepAlive(keepAlive);
                        }
 
                        if (completionData.ShutdownRequest)
                        {
                            _logger.Log("Client requested shutdown");
                            shutdown = true;
                        }
 
                        break;
                    case CompletionReason.RequestError:
                        _logger.LogError("Client request failed");
                        shutdown = true;
                        break;
                    default:
                        _logger.LogError("Unexpected enum value");
                        shutdown = true;
                        break;
                }
 
                _diagnosticListener.ConnectionCompleted(completionData);
            }
 
            if (shutdown)
            {
                ChangeToShuttingDown("Error handling client connection");
            }
        }
 
        private void ChangeKeepAlive(TimeSpan keepAlive)
        {
            if (_keepAliveIsDefault || !_keepAlive.HasValue || keepAlive > _keepAlive.Value)
            {
                _keepAlive = keepAlive;
                _keepAliveIsDefault = false;
                _diagnosticListener.UpdateKeepAlive(keepAlive);
            }
        }
 
        internal static async Task<CompletionData> ProcessClientConnectionAsync(
            ICompilerServerHost compilerServerHost,
            Task<IClientConnection> clientStreamTask,
            bool allowCompilationRequests,
            CancellationToken cancellationToken)
        {
            var clientHandler = new ClientConnectionHandler(compilerServerHost);
            return await clientHandler.ProcessAsync(clientStreamTask, allowCompilationRequests, cancellationToken).ConfigureAwait(false);
        }
    }
}