// Copyright Keysight Technologies 2012-2019 // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, you can obtain one at http://mozilla.org/MPL/2.0/. using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Threading; namespace OpenTap { /// /// Work Queue used for result processing in sequence but asynchronously. It uses the ThreadManager to automatically clean up threads that have been idle for a while. /// When the WorkQueue is disposed, the used thread is immediately returned to the ThreadManager. /// public class WorkQueue : IDisposable { /// Options for WorkQueues. [Flags] public enum Options { /// No options. None = 0, /// The thread is not returned to the ThreadManager when it has been idle for some time. In this situation the WorkQueue must be disposed manually. LongRunning = 1, /// Time averaging is enabled. Each piece of work will have measured time spent. TimeAveraging = 2 } /// /// The amount of idle time to wait before giving the thread back to the threading manager. This has no effect if the LongRunning option is selected. /// public int Timeout = 5; // list of things to do sequentially. readonly ConcurrentQueue workItems = new ConcurrentQueue(); readonly TimeSpanAverager average; internal object Peek() { if (workItems.TryPeek(out var inv)) { if (inv is IWrappedInvokable wrap) return wrap.InnerInvokable; return inv; } return null; } /// The average time spent for each task. Only available if Options.TImeAveraging is enabled. public TimeSpan AverageTimeSpent { get { if (average == null) throw new InvalidOperationException("The TimeAveraging option has not been selected."); return average.GetAverage(); } } /// The current number of items in the work queue. If called from the worker thread, this number will be 0 for that last worker. public int QueueSize => workItems.Count; internal bool IsBusy => QueueSize > 0 || countdown > 0; readonly object threadCreationLock = new object(); readonly CancellationTokenSource cancel = new CancellationTokenSource(); readonly TapThread threadContext = null; //this should always be either 1(thread was started) or 0(thread is not started yet) int threadCount = 0; internal static int semaphoreMaxCount = 1024 * 1024; // the addSemaphore counts the current number of things in the tasklist. readonly SemaphoreSlim addSemaphore = new SemaphoreSlim(0, semaphoreMaxCount); int countdown = 0; /// A name of identifying the work queue. public readonly string Name; readonly bool longRunning; /// Creates a new instance of WorkQueue. /// Options. /// A name to identify a work queue. public WorkQueue(Options options, string name = "") { longRunning = options.HasFlag(Options.LongRunning); if (options.HasFlag(Options.TimeAveraging)) average = new TimeSpanAverager(); Name = name; } /// Creates a new instance of WorkQueue. /// Options. /// A name to identify a work queue. /// The thread context in which to run work jobs. The default value causes the context to be the parent of an enqueuing thread. public WorkQueue(Options options, string name = "", TapThread threadContext = null) :this(options, name) { this.threadContext = threadContext; } /// Enqueue a new piece of work to be handled in the future. public void EnqueueWork(Action a) => EnqueueWork(new ActionInvokable(a)); internal void EnqueueWork(IInvokable v, T1 a1, T2 a2) => EnqueueWork(new WrappedInvokable(v, a1, a2)); /// /// This method in in charge of processing the work queue. /// void WorkerFunction() { try { var awaitArray = new WaitHandle[] {addSemaphore.AvailableWaitHandle, cancel.Token.WaitHandle}; while (true) { retry: awaitArray[1] = cancel.Token.WaitHandle; int cancelIndex = 0; if (longRunning) cancelIndex = WaitHandle.WaitAny(awaitArray); else cancelIndex = WaitHandle.WaitAny(awaitArray, Timeout); if (cancelIndex == 0 && !addSemaphore.Wait(0)) goto retry; bool ok = cancelIndex == 0; if (!ok) { if (cancel.IsCancellationRequested == false && longRunning) continue; lock (threadCreationLock) { if (workItems.Count > 0) goto retry; break; } } IInvokable run; while (!workItems.TryDequeue(out run)) Thread.Yield(); try { if (average != null) { var sw = Stopwatch.StartNew(); run.Invoke(); average.PushTimeSpan(sw.Elapsed); } else { run.Invoke(); } } finally { Interlocked.Decrement(ref countdown); } } } finally { lock (threadCreationLock) { threadCount--; } } } /// Enqueue a new piece of work to be handled in the future. internal void EnqueueWork(IInvokable f) { while (addSemaphore.CurrentCount >= semaphoreMaxCount - 10) { // #4246: this is incredibly rare, but can happen if millions of results are pushed at once. // the solution is to just slow a bit down when it happens. // 100 ms sleep is OK, because it needs to do around 1M things before it's idle. Thread.Sleep(100); } Interlocked.Increment(ref countdown); workItems.Enqueue(f); addSemaphore.Release(); if (threadCount == 0) { lock (threadCreationLock) { if (threadCount == 0) { TapThread.Start(WorkerFunction, null, Name, threadContext); threadCount++; } } } } /// Give the thread back to the thread manager. public void Dispose() { cancel.Cancel(false); } /// Waits for the workqueue to become empty. public void Wait() { // This is not often called spin-wait is fine in this case. while(countdown > 0) { Thread.Sleep(5); } } internal object Dequeue() { // Take the semaphore then take an object, just like WorkerFunction does. if (!addSemaphore.Wait(0)) { // there is a slight delay between pushing the work item and releasing the semaphore // in the case that we are in that space, just return. return null; } if (workItems.TryDequeue(out var inv)) { // when taking an item from the work queue the countdown and the semaphore must be decremented Interlocked.Decrement(ref countdown); Debug.Assert(inv != null); if (inv is IWrappedInvokable wrap) return wrap.InnerInvokable; return inv; } // Of some reason we are not able to dequeue the work item. // in this case bump the semaphore so somebody else can take it. // this should never happen. Debug.Fail("Unable to dequeue element"); addSemaphore.Release(1); return null; } interface IWrappedInvokable: IInvokable { object InnerInvokable { get; } } /// Wraps an Action in an IInvokable. class ActionInvokable : IWrappedInvokable { readonly Action action; public ActionInvokable(Action inv) { action = inv; } public void Invoke() => action(); public object InnerInvokable => action; } /// Wraps an IInvokable(T,T2) in an IInvokable. class WrappedInvokable : IWrappedInvokable { readonly T arg1; readonly T2 arg2; readonly IInvokable wrapped; public WrappedInvokable(IInvokable invokable, T argument1, T2 argument2) { arg1 = argument1; arg2 = argument2; wrapped = invokable; } public void Invoke() => wrapped.Invoke(arg1, arg2); public object InnerInvokable => wrapped; } } }