File: ExternalAccess\UnitTesting\SolutionCrawler\UnitTestingWorkCoordinator.UnitTestingNormalPriorityProcessor.cs
Web Access
Project: ..\..\..\src\Features\Core\Portable\Microsoft.CodeAnalysis.Features.csproj (Microsoft.CodeAnalysis.Features)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.ErrorReporting;
using Microsoft.CodeAnalysis.Internal.Log;
using Microsoft.CodeAnalysis.Notification;
using Microsoft.CodeAnalysis.Shared.Extensions;
using Microsoft.CodeAnalysis.Shared.TestHooks;
using Roslyn.Utilities;
using Microsoft.CodeAnalysis.ExternalAccess.UnitTesting.Api;
 
#if DEBUG
using System.Diagnostics;
#endif
 
namespace Microsoft.CodeAnalysis.ExternalAccess.UnitTesting.SolutionCrawler
{
    internal sealed partial class UnitTestingSolutionCrawlerRegistrationService
    {
        internal sealed partial class UnitTestingWorkCoordinator
        {
            private sealed partial class UnitTestingIncrementalAnalyzerProcessor
            {
                private sealed class UnitTestingNormalPriorityProcessor : AbstractUnitTestingPriorityProcessor
                {
                    private readonly UnitTestingAsyncDocumentWorkItemQueue _workItemQueue;
                    private readonly ConcurrentDictionary<DocumentId, /*unused*/ object?> _higherPriorityDocumentsNotProcessed;
 
                    private ProjectId? _currentProjectProcessing;
 
                    // this is only used in ResetState to find out solution has changed
                    // and reset some states such as logging some telemetry or
                    // priorities active,visible, opened files and etc
                    private Solution? _lastSolution = null;
 
                    // whether this processor is running or not
                    private Task _running;
 
                    public UnitTestingNormalPriorityProcessor(
                        IAsynchronousOperationListener listener,
                        UnitTestingIncrementalAnalyzerProcessor processor,
                        Lazy<ImmutableArray<IUnitTestingIncrementalAnalyzer>> lazyAnalyzers,
                        IGlobalOperationNotificationService? globalOperationNotificationService,
                        TimeSpan backOffTimeSpan,
                        CancellationToken shutdownToken)
                        : base(listener, processor, lazyAnalyzers, globalOperationNotificationService, backOffTimeSpan, shutdownToken)
                    {
                        _running = Task.CompletedTask;
                        _workItemQueue = new UnitTestingAsyncDocumentWorkItemQueue(processor._registration.ProgressReporter);
                        _higherPriorityDocumentsNotProcessed = new ConcurrentDictionary<DocumentId, object?>(concurrencyLevel: 2, capacity: 20);
 
                        _currentProjectProcessing = null;
 
                        Start();
                    }
 
                    public void Enqueue(UnitTestingWorkItem item)
                    {
                        Contract.ThrowIfFalse(item.DocumentId != null, "can only enqueue a document work item");
 
                        UpdateLastAccessTime();
 
                        var added = _workItemQueue.AddOrReplace(item);
 
                        Logger.Log(FunctionId.WorkCoordinator_DocumentWorker_Enqueue, s_enqueueLogger, Environment.TickCount, item.DocumentId, !added);
 
                        CheckHigherPriorityDocument(item);
 
                        UnitTestingSolutionCrawlerLogger.LogWorkItemEnqueue(
                            Processor._logAggregator, item.Language, item.DocumentId, item.InvocationReasons, item.IsLowPriority, item.ActiveMember, added);
                    }
 
                    private void CheckHigherPriorityDocument(UnitTestingWorkItem item)
                    {
                        Contract.ThrowIfFalse(item.DocumentId != null);
 
                        if (!item.InvocationReasons.Contains(UnitTestingPredefinedInvocationReasons.HighPriority))
                        {
                            return;
                        }
 
                        AddHigherPriorityDocument(item.DocumentId);
                    }
 
                    private void AddHigherPriorityDocument(DocumentId id)
                    {
                        _higherPriorityDocumentsNotProcessed.TryAdd(id, /*unused*/null);
                        UnitTestingSolutionCrawlerLogger.LogHigherPriority(Processor._logAggregator, id.Id);
                    }
 
                    protected override Task WaitAsync(CancellationToken cancellationToken)
                        => _workItemQueue.WaitAsync(cancellationToken);
 
                    public Task Running => _running;
                    public int WorkItemCount => _workItemQueue.WorkItemCount;
                    public bool HasAnyWork => _workItemQueue.HasAnyWork;
 
                    protected override async Task ExecuteAsync()
                    {
                        if (CancellationToken.IsCancellationRequested)
                        {
                            return;
                        }
 
                        var source = new TaskCompletionSource<object?>();
                        try
                        {
                            // mark it as running
                            _running = source.Task;
 
                            await WaitForHigherPriorityOperationsAsync().ConfigureAwait(false);
 
                            // okay, there must be at least one item in the map
                            ResetStates();
 
                            if (await TryProcessOneHigherPriorityDocumentAsync().ConfigureAwait(false))
                            {
                                // successfully processed a high priority document.
                                return;
                            }
 
                            // process one of documents remaining
                            if (!_workItemQueue.TryTakeAnyWork(
                                    _currentProjectProcessing,
#if false // Not used in unit testing crawling
                                    Processor.DependencyGraph,
                                    Processor.DiagnosticAnalyzerService,
#endif
                                    out var workItem,
                                    out var documentCancellation))
                            {
                                return;
                            }
 
                            // check whether we have been shutdown
                            if (CancellationToken.IsCancellationRequested)
                            {
                                return;
                            }
 
                            // check whether we have moved to new project
                            SetProjectProcessing(workItem.ProjectId);
 
                            // process the new document
                            await ProcessDocumentAsync(Analyzers, workItem, documentCancellation).ConfigureAwait(false);
                        }
                        catch (Exception e) when (FatalError.ReportAndPropagateUnlessCanceled(e))
                        {
                            throw ExceptionUtilities.Unreachable();
                        }
                        finally
                        {
                            // mark it as done running
                            source.SetResult(null);
                        }
                    }
 
                    protected override Task HigherQueueOperationTask
#if false // Not used in unit testing crawling
                    {
                        get
                        {
                            return Processor._highPriorityProcessor.Running;
                        }
                    }
#else
                        => Task.CompletedTask;
#endif
 
                    protected override bool HigherQueueHasWorkItem
#if false // Not used in unit testing crawling
                    {
                        get
                        {
                            return Processor._highPriorityProcessor.HasAnyWork;
                        }
                    }
#else
                        => false;
#endif
 
                    protected override void OnPaused()
                    {
                        base.OnPaused();
                        _workItemQueue.RequestCancellationOnRunningTasks();
                    }
 
                    private void SetProjectProcessing(ProjectId currentProject)
                    {
                        _currentProjectProcessing = currentProject;
                    }
 
                    private IEnumerable<DocumentId> GetPrioritizedPendingDocuments()
                    {
                        // First the active document
                        var activeDocumentId = Processor._documentTracker.TryGetActiveDocument();
                        if (activeDocumentId != null)
                        {
                            yield return activeDocumentId;
                        }
 
                        // Now any visible documents
                        foreach (var visibleDocumentId in Processor._documentTracker.GetVisibleDocuments())
                        {
                            yield return visibleDocumentId;
                        }
 
                        // Any other high priority documents
                        foreach (var (documentId, _) in _higherPriorityDocumentsNotProcessed)
                        {
                            yield return documentId;
                        }
                    }
 
                    private async Task<bool> TryProcessOneHigherPriorityDocumentAsync()
                    {
                        try
                        {
                            if (!Processor._documentTracker.SupportsDocumentTracking)
                            {
                                return false;
                            }
 
                            foreach (var documentId in GetPrioritizedPendingDocuments())
                            {
                                if (CancellationToken.IsCancellationRequested)
                                {
                                    return true;
                                }
 
                                // this is a best effort algorithm with some shortcomings.
                                //
                                // the most obvious issue is if there is a new work item (without a solution change - but very unlikely) 
                                // for a opened document we already processed, the work item will be treated as a regular one rather than higher priority one
                                // (opened document)
                                // see whether we have work item for the document
                                if (!_workItemQueue.TryTake(documentId, out var workItem, out var documentCancellation))
                                {
                                    RemoveHigherPriorityDocument(documentId);
                                    continue;
                                }
 
                                // okay now we have work to do
                                await ProcessDocumentAsync(Analyzers, workItem, documentCancellation).ConfigureAwait(false);
 
                                RemoveHigherPriorityDocument(documentId);
                                return true;
                            }
 
                            return false;
                        }
                        catch (Exception e) when (FatalError.ReportAndPropagateUnlessCanceled(e))
                        {
                            throw ExceptionUtilities.Unreachable();
                        }
                    }
 
                    private void RemoveHigherPriorityDocument(DocumentId documentId)
                    {
                        // remove opened document processed
                        _higherPriorityDocumentsNotProcessed.TryRemove(documentId, out _);
                    }
 
                    private async Task ProcessDocumentAsync(ImmutableArray<IUnitTestingIncrementalAnalyzer> analyzers, UnitTestingWorkItem workItem, CancellationToken cancellationToken)
                    {
                        Contract.ThrowIfNull(workItem.DocumentId);
 
                        if (CancellationToken.IsCancellationRequested)
                        {
                            return;
                        }
 
                        var processedEverything = false;
                        var documentId = workItem.DocumentId;
 
                        // we should always use solution snapshot after workitem is removed from the queue.
                        // otherwise, we can have a race such as below.
                        //
                        // 1.solution crawler picked up a solution
                        // 2.before processing the solution, an workitem got changed
                        // 3.and then the work item got picked up from the queue
                        // 4.and use the work item with the solution that got picked up in step 1
                        // 
                        // step 2 is happening because solution has changed, but step 4 used old solution from step 1
                        // that doesn't have effects of the solution changes.
                        // 
                        // solution crawler must remove the work item from the queue first and then pick up the soluton,
                        // so that the queue gets new work item if there is any solution changes after the work item is removed
                        // from the queue
                        // 
                        // using later version of solution is always fine since, as long as there is new work item in the queue,
                        // solution crawler will eventually call the last workitem with the lastest solution
                        // making everything to catch up
                        var solution = Processor._registration.GetSolutionToAnalyze();
                        try
                        {
                            using (Logger.LogBlock(FunctionId.WorkCoordinator_ProcessDocumentAsync, w => w.ToString(), workItem, cancellationToken))
                            {
                                var textDocument = solution.GetTextDocument(documentId) ?? await solution.GetSourceGeneratedDocumentAsync(documentId, cancellationToken).ConfigureAwait(false);
 
                                if (textDocument != null)
                                {
#if false // Not used in unit testing crawling
                                    // if we are called because a document is opened, we invalidate the document so that
                                    // it can be re-analyzed. otherwise, since newly opened document has same version as before
                                    // analyzer will simply return same data back
                                    if (workItem.MustRefresh && !workItem.IsRetry)
                                    {
                                        var isOpen = textDocument.IsOpen();
 
                                        await ProcessOpenDocumentIfNeededAsync(analyzers, workItem, textDocument, isOpen, cancellationToken).ConfigureAwait(false);
                                        await ProcessCloseDocumentIfNeededAsync(analyzers, workItem, textDocument, isOpen, cancellationToken).ConfigureAwait(false);
                                    }
#endif
 
                                    // check whether we are having special reanalyze request
                                    await ProcessReanalyzeDocumentAsync(workItem, textDocument, cancellationToken).ConfigureAwait(false);
 
                                    await Processor.ProcessDocumentAnalyzersAsync(textDocument, analyzers, workItem, cancellationToken).ConfigureAwait(false);
                                }
                                else
                                {
                                    UnitTestingSolutionCrawlerLogger.LogProcessDocumentNotExist(Processor._logAggregator);
 
                                    await RemoveDocumentAsync(documentId, cancellationToken).ConfigureAwait(false);
                                }
 
                                if (!cancellationToken.IsCancellationRequested)
                                {
                                    processedEverything = true;
                                }
                            }
                        }
                        catch (Exception e) when (FatalError.ReportAndPropagateUnlessCanceled(e, cancellationToken))
                        {
                            throw ExceptionUtilities.Unreachable();
                        }
                        finally
                        {
                            // we got cancelled in the middle of processing the document.
                            // let's make sure newly enqueued work item has all the flag needed.
                            // Avoid retry attempts after cancellation is requested, since work will not be processed
                            // after that point.
                            if (!processedEverything && !CancellationToken.IsCancellationRequested)
                            {
                                _workItemQueue.AddOrReplace(workItem.Retry(Listener.BeginAsyncOperation("ReenqueueWorkItem")));
                            }
 
                            UnitTestingSolutionCrawlerLogger.LogProcessDocument(Processor._logAggregator, documentId.Id, processedEverything);
 
                            // remove one that is finished running
                            _workItemQueue.MarkWorkItemDoneFor(workItem.DocumentId);
                        }
                    }
 
#if false // Not used in unit testing crawling
                    private async Task ProcessOpenDocumentIfNeededAsync(ImmutableArray<IUnitTestingIncrementalAnalyzer> analyzers, UnitTestingWorkItem workItem, TextDocument textDocument, bool isOpen, CancellationToken cancellationToken)
                    {
                        if (!isOpen || !workItem.InvocationReasons.Contains(UnitTestingPredefinedInvocationReasons.DocumentOpened))
                        {
                            return;
                        }
 
                        UnitTestingSolutionCrawlerLogger.LogProcessOpenDocument(Processor._logAggregator, textDocument.Id.Id);
 
                        await Processor.RunAnalyzersAsync(analyzers, textDocument, workItem, DocumentOpenAsync, cancellationToken).ConfigureAwait(false);
                        return;
 
                        static async Task DocumentOpenAsync(IUnitTestingIncrementalAnalyzer analyzer, TextDocument textDocument, CancellationToken cancellationToken)
                        {
                            if (textDocument is Document document)
                            {
                                await analyzer.DocumentOpenAsync(document, cancellationToken).ConfigureAwait(false);
                            }
                            else
                            {
                                await analyzer.NonSourceDocumentOpenAsync(textDocument, cancellationToken).ConfigureAwait(false);
                            }
                        }
                    }
 
                    private async Task ProcessCloseDocumentIfNeededAsync(ImmutableArray<IUnitTestingIncrementalAnalyzer> analyzers, UnitTestingWorkItem workItem, TextDocument textDocument, bool isOpen, CancellationToken cancellationToken)
                    {
                        if (isOpen || !workItem.InvocationReasons.Contains(UnitTestingPredefinedInvocationReasons.DocumentClosed))
                        {
                            return;
                        }
 
                        UnitTestingSolutionCrawlerLogger.LogProcessCloseDocument(Processor._logAggregator, textDocument.Id.Id);
 
                        await Processor.RunAnalyzersAsync(analyzers, textDocument, workItem, DocumentCloseAsync, cancellationToken).ConfigureAwait(false);
                        return;
 
                        static async Task DocumentCloseAsync(IUnitTestingIncrementalAnalyzer analyzer, TextDocument textDocument, CancellationToken cancellationToken)
                        {
                            if (textDocument is Document document)
                            {
                                await analyzer.DocumentCloseAsync(document, cancellationToken).ConfigureAwait(false);
                            }
                            else
                            {
                                await analyzer.NonSourceDocumentCloseAsync(textDocument, cancellationToken).ConfigureAwait(false);
                            }
                        }
                    }
#endif
 
                    private async Task ProcessReanalyzeDocumentAsync(UnitTestingWorkItem workItem, TextDocument document, CancellationToken cancellationToken)
                    {
                        try
                        {
#if DEBUG
                            Debug.Assert(!workItem.InvocationReasons.Contains(UnitTestingPredefinedInvocationReasons.Reanalyze) || workItem.SpecificAnalyzers.Count > 0);
#endif
 
                            // No-reanalyze request or we already have a request to re-analyze every thing
                            if (
#if false // Not used in unit testing crawling
                                workItem.MustRefresh ||
#endif
                                !workItem.InvocationReasons.Contains(UnitTestingPredefinedInvocationReasons.Reanalyze))
                            {
                                return;
                            }
 
                            // First reset the document state in analyzers.
                            var reanalyzers = workItem.SpecificAnalyzers.ToImmutableArray();
#if false // Not used in unit testing crawling
                            await Processor.RunAnalyzersAsync(reanalyzers, document, workItem, DocumentResetAsync, cancellationToken).ConfigureAwait(false);
#endif
 
                            // No request to re-run syntax change analysis. run it here
                            var reasons = workItem.InvocationReasons;
#if false // Not used in unit testing crawling
                            if (!reasons.Contains(UnitTestingPredefinedInvocationReasons.SyntaxChanged))
                            {
                                await Processor.RunAnalyzersAsync(reanalyzers, document, workItem, (a, d, c) => AnalyzeSyntaxAsync(a, d, reasons, c), cancellationToken).ConfigureAwait(false);
                            }
#endif
 
                            // No request to re-run semantic change analysis. run it here
                            // Note: Semantic analysis is not supported for non-source documents.
                            if (document is Document sourceDocument &&
                                !workItem.InvocationReasons.Contains(UnitTestingPredefinedInvocationReasons.SemanticChanged))
                            {
                                await Processor.RunAnalyzersAsync(reanalyzers, sourceDocument, workItem,
                                    (a, d, c) => a.AnalyzeDocumentAsync(
                                        d,
#if false // Not used in unit testing crawling
                                        bodyOpt: null,
#endif
                                        reasons,
                                        c), cancellationToken).ConfigureAwait(false);
                            }
                        }
                        catch (Exception e) when (FatalError.ReportAndPropagateUnlessCanceled(e, cancellationToken))
                        {
                            throw ExceptionUtilities.Unreachable();
                        }
 
                        return;
 
#if false // Not used in unit testing crawling
                        static async Task DocumentResetAsync(IUnitTestingIncrementalAnalyzer analyzer, TextDocument textDocument, CancellationToken cancellationToken)
                        {
                            if (textDocument is Document document)
                            {
                                await analyzer.DocumentResetAsync(document, cancellationToken).ConfigureAwait(false);
                            }
                            else
                            {
                                await analyzer.NonSourceDocumentResetAsync(textDocument, cancellationToken).ConfigureAwait(false);
                            }
                        }
#endif
 
#if false // Not used in unit testing crawling
                        static async Task AnalyzeSyntaxAsync(IUnitTestingIncrementalAnalyzer analyzer, TextDocument textDocument, UnitTestingInvocationReasons reasons, CancellationToken cancellationToken)
                        {
                            if (textDocument is Document document)
                            {
                                await analyzer.AnalyzeSyntaxAsync(document, reasons, cancellationToken).ConfigureAwait(false);
                            }
                            else
                            {
                                await analyzer.AnalyzeNonSourceDocumentAsync(textDocument, reasons, cancellationToken).ConfigureAwait(false);
                            }
                        }
#endif
                    }
 
                    private Task RemoveDocumentAsync(DocumentId documentId, CancellationToken cancellationToken)
                        => RemoveDocumentAsync(Analyzers, documentId, cancellationToken);
 
                    private static async Task RemoveDocumentAsync(ImmutableArray<IUnitTestingIncrementalAnalyzer> analyzers, DocumentId documentId, CancellationToken cancellationToken)
                    {
                        foreach (var analyzer in analyzers)
                        {
                            await analyzer.RemoveDocumentAsync(documentId, cancellationToken).ConfigureAwait(false);
                        }
                    }
 
                    private void ResetStates()
                    {
                        try
                        {
                            if (!IsSolutionChanged())
                            {
                                return;
                            }
 
#if false // Not used in unit testing crawling
                            await Processor.RunAnalyzersAsync(
                                Analyzers,
                                Processor._registration.GetSolutionToAnalyze(),
                                workItem: new UnitTestingWorkItem(), (a, s, c) => a.NewSolutionSnapshotAsync(s, c), CancellationToken).ConfigureAwait(false);
#endif
 
#if false // Not used in unit testing crawling
                            foreach (var id in Processor.GetOpenDocumentIds())
                            {
                                AddHigherPriorityDocument(id);
                            }
#endif
 
                            UnitTestingSolutionCrawlerLogger.LogResetStates(Processor._logAggregator);
                        }
                        catch (Exception e) when (FatalError.ReportAndPropagateUnlessCanceled(e))
                        {
                            throw ExceptionUtilities.Unreachable();
                        }
 
                        return;
 
                        bool IsSolutionChanged()
                        {
                            var currentSolution = Processor._registration.GetSolutionToAnalyze();
                            var oldSolution = _lastSolution;
 
                            if (currentSolution == oldSolution)
                            {
                                return false;
                            }
 
                            _lastSolution = currentSolution;
 
                            ResetLogAggregatorIfNeeded(currentSolution, oldSolution);
 
                            return true;
                        }
 
                        void ResetLogAggregatorIfNeeded(Solution currentSolution, Solution? oldSolution)
                        {
                            if (oldSolution == null || currentSolution.Id == oldSolution.Id)
                            {
                                // we log aggregated info when solution is changed such as
                                // new solution is opened or solution is closed
                                return;
                            }
 
                            // this log things like how many time we analyzed active files, how many times other files are analyzed,
                            // avg time to analyze files, how many solution snapshot got analyzed and etc.
                            // all accumultation is done in VS side and we only send statistics to VS telemetry otherwise, it is too much
                            // data to send
                            UnitTestingSolutionCrawlerLogger.LogIncrementalAnalyzerProcessorStatistics(
                                Processor._registration.CorrelationId, oldSolution, Processor._logAggregator, Analyzers);
 
                            Processor.ResetLogAggregator();
                        }
                    }
 
                    public override void Shutdown()
                    {
                        base.Shutdown();
 
                        _workItemQueue.Dispose();
                    }
 
                    internal TestAccessor GetTestAccessor()
                    {
                        return new TestAccessor(this);
                    }
 
                    internal readonly struct TestAccessor
                    {
                        private readonly UnitTestingNormalPriorityProcessor _normalPriorityProcessor;
 
                        internal TestAccessor(UnitTestingNormalPriorityProcessor normalPriorityProcessor)
                        {
                            _normalPriorityProcessor = normalPriorityProcessor;
                        }
 
                        internal void WaitUntilCompletion(ImmutableArray<IUnitTestingIncrementalAnalyzer> analyzers, List<UnitTestingWorkItem> items)
                        {
                            foreach (var item in items)
                            {
                                _normalPriorityProcessor.ProcessDocumentAsync(analyzers, item, CancellationToken.None).Wait();
                            }
                        }
 
                        internal void WaitUntilCompletion()
                        {
                            // this shouldn't happen. would like to get some diagnostic
                            while (_normalPriorityProcessor._workItemQueue.HasAnyWork)
                            {
                                FailFast.Fail("How?");
                            }
                        }
                    }
                }
            }
        }
    }
}