// Copyright Keysight Technologies 2012-2019
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, you can obtain one at http://mozilla.org/MPL/2.0/.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
namespace OpenTap
{
///
/// Work Queue used for result processing in sequence but asynchronously. It uses the ThreadManager to automatically clean up threads that have been idle for a while.
/// When the WorkQueue is disposed, the used thread is immediately returned to the ThreadManager.
///
public class WorkQueue : IDisposable
{
/// Options for WorkQueues.
[Flags]
public enum Options
{
/// No options.
None = 0,
/// The thread is not returned to the ThreadManager when it has been idle for some time. In this situation the WorkQueue must be disposed manually.
LongRunning = 1,
/// Time averaging is enabled. Each piece of work will have measured time spent.
TimeAveraging = 2
}
///
/// The amount of idle time to wait before giving the thread back to the threading manager. This has no effect if the LongRunning option is selected.
///
public int Timeout = 5;
// list of things to do sequentially.
readonly ConcurrentQueue workItems = new ConcurrentQueue();
readonly TimeSpanAverager average;
internal object Peek()
{
if (workItems.TryPeek(out var inv))
{
if (inv is IWrappedInvokable wrap)
return wrap.InnerInvokable;
return inv;
}
return null;
}
/// The average time spent for each task. Only available if Options.TImeAveraging is enabled.
public TimeSpan AverageTimeSpent
{
get
{
if (average == null)
throw new InvalidOperationException("The TimeAveraging option has not been selected.");
return average.GetAverage();
}
}
/// The current number of items in the work queue. If called from the worker thread, this number will be 0 for that last worker.
public int QueueSize => workItems.Count;
internal bool IsBusy => QueueSize > 0 || countdown > 0;
readonly object threadCreationLock = new object();
readonly CancellationTokenSource cancel = new CancellationTokenSource();
readonly TapThread threadContext = null;
//this should always be either 1(thread was started) or 0(thread is not started yet)
int threadCount = 0;
internal static int semaphoreMaxCount = 1024 * 1024;
// the addSemaphore counts the current number of things in the tasklist.
readonly SemaphoreSlim addSemaphore = new SemaphoreSlim(0, semaphoreMaxCount);
int countdown = 0;
/// A name of identifying the work queue.
public readonly string Name;
readonly bool longRunning;
/// Creates a new instance of WorkQueue.
/// Options.
/// A name to identify a work queue.
public WorkQueue(Options options, string name = "")
{
longRunning = options.HasFlag(Options.LongRunning);
if (options.HasFlag(Options.TimeAveraging))
average = new TimeSpanAverager();
Name = name;
}
/// Creates a new instance of WorkQueue.
/// Options.
/// A name to identify a work queue.
/// The thread context in which to run work jobs. The default value causes the context to be the parent of an enqueuing thread.
public WorkQueue(Options options, string name = "", TapThread threadContext = null) :this(options, name)
{
this.threadContext = threadContext;
}
/// Enqueue a new piece of work to be handled in the future.
public void EnqueueWork(Action a) => EnqueueWork(new ActionInvokable(a));
internal void EnqueueWork(IInvokable v, T1 a1, T2 a2) => EnqueueWork(new WrappedInvokable(v, a1, a2));
///
/// This method in in charge of processing the work queue.
///
void WorkerFunction()
{
try
{
var awaitArray = new WaitHandle[] {addSemaphore.AvailableWaitHandle, cancel.Token.WaitHandle};
while (true)
{
retry:
awaitArray[1] = cancel.Token.WaitHandle;
int cancelIndex = 0;
if (longRunning)
cancelIndex = WaitHandle.WaitAny(awaitArray);
else
cancelIndex = WaitHandle.WaitAny(awaitArray, Timeout);
if (cancelIndex == 0 && !addSemaphore.Wait(0))
goto retry;
bool ok = cancelIndex == 0;
if (!ok)
{
if (cancel.IsCancellationRequested == false && longRunning) continue;
lock (threadCreationLock)
{
if (workItems.Count > 0)
goto retry;
break;
}
}
IInvokable run;
while (!workItems.TryDequeue(out run))
Thread.Yield();
try
{
if (average != null)
{
var sw = Stopwatch.StartNew();
run.Invoke();
average.PushTimeSpan(sw.Elapsed);
}
else
{
run.Invoke();
}
}
finally
{
Interlocked.Decrement(ref countdown);
}
}
}
finally
{
lock (threadCreationLock)
{
threadCount--;
}
}
}
/// Enqueue a new piece of work to be handled in the future.
internal void EnqueueWork(IInvokable f)
{
while (addSemaphore.CurrentCount >= semaphoreMaxCount - 10)
{
// #4246: this is incredibly rare, but can happen if millions of results are pushed at once.
// the solution is to just slow a bit down when it happens.
// 100 ms sleep is OK, because it needs to do around 1M things before it's idle.
Thread.Sleep(100);
}
Interlocked.Increment(ref countdown);
workItems.Enqueue(f);
addSemaphore.Release();
if (threadCount == 0)
{
lock (threadCreationLock)
{
if (threadCount == 0)
{
TapThread.Start(WorkerFunction, null, Name, threadContext);
threadCount++;
}
}
}
}
/// Give the thread back to the thread manager.
public void Dispose()
{
cancel.Cancel(false);
}
/// Waits for the workqueue to become empty.
public void Wait()
{
// This is not often called spin-wait is fine in this case.
while(countdown > 0)
{
Thread.Sleep(5);
}
}
internal object Dequeue()
{
// Take the semaphore then take an object, just like WorkerFunction does.
if (!addSemaphore.Wait(0))
{
// there is a slight delay between pushing the work item and releasing the semaphore
// in the case that we are in that space, just return.
return null;
}
if (workItems.TryDequeue(out var inv))
{
// when taking an item from the work queue the countdown and the semaphore must be decremented
Interlocked.Decrement(ref countdown);
Debug.Assert(inv != null);
if (inv is IWrappedInvokable wrap)
return wrap.InnerInvokable;
return inv;
}
// Of some reason we are not able to dequeue the work item.
// in this case bump the semaphore so somebody else can take it.
// this should never happen.
Debug.Fail("Unable to dequeue element");
addSemaphore.Release(1);
return null;
}
interface IWrappedInvokable: IInvokable
{
object InnerInvokable { get; }
}
/// Wraps an Action in an IInvokable.
class ActionInvokable : IWrappedInvokable
{
readonly Action action;
public ActionInvokable(Action inv)
{
action = inv;
}
public void Invoke() => action();
public object InnerInvokable => action;
}
/// Wraps an IInvokable(T,T2) in an IInvokable.
class WrappedInvokable : IWrappedInvokable
{
readonly T arg1;
readonly T2 arg2;
readonly IInvokable wrapped;
public WrappedInvokable(IInvokable invokable, T argument1, T2 argument2)
{
arg1 = argument1;
arg2 = argument2;
wrapped = invokable;
}
public void Invoke() => wrapped.Invoke(arg1, arg2);
public object InnerInvokable => wrapped;
}
}
}