// Copyright Keysight Technologies 2012-2019
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, you can obtain one at http://mozilla.org/MPL/2.0/.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace OpenTap
{
///
/// Status of a .
///
public enum TapThreadStatus
{
///
/// Work has been queued, but not started yet.
///
Queued,
///
/// Work is currently being processed.
///
Running,
///
/// Work has completed.
///
Completed,
///
/// This and all child threads have completed.
///
HierarchyCompleted,
}
/// baseclass for ThreadField types.
internal abstract class ThreadField
{
protected static readonly object DefaultCacheMarker = new object();
static int threadFieldIndexer = 0;
/// Index of this thread field.
protected readonly int Index = GetThreadFieldIndex();
static int GetThreadFieldIndex() => Interlocked.Increment(ref threadFieldIndexer);
protected void SetFieldValue(object value)
{
var currentThread = TapThread.Current;
if (currentThread.Fields == null)
currentThread.Fields = new object[Index + 1];
else if(currentThread.Fields.Length <= Index)
{
var newArray = new object[Index + 1];
currentThread.Fields.CopyTo(newArray, 0);
currentThread.Fields = newArray;
}
currentThread.Fields[Index] = value;
}
protected bool TryGetFieldValue(TapThread thread, out object value)
{
if (thread.Fields != null && thread.Fields.Length > Index)
{
var currentValue = thread.Fields[Index];
if (currentValue != null)
{
value = currentValue;
return true;
}
}
value = null;
return false;
}
}
[Flags]
internal enum ThreadFieldMode
{
None = 0,
/// Cached-mode ThreadFields are a bit faster as they dont need to iterate for finding commonly used values.
/// A value found in the parent thread is upgraded to local cache. Changes in parent thread thread-field values has no effect after it has
/// been cached the first time.
Cached = 1,
///
/// A flat is a kind of cache that is local to the current thread only. It never inherits to the parent thread value.
///
Flat = 2
}
///
/// Thread fields are static objects that manage the value of a thread field.
///
///
internal class ThreadField : ThreadField
{
readonly int mode;
bool isCached => (mode & (int) ThreadFieldMode.Cached) > 0;
public ThreadField(ThreadFieldMode threadFieldMode = ThreadFieldMode.None) => mode = (int)threadFieldMode;
///
/// Gets or sets the value of the thread field. Note that the getter may get a value from a parent thread, while the setter cannot override values from parent fields.
///
public T Value
{
get => Get();
set => Set(value);
}
/// Gets the current value for things thread, if any.
public T GetCached()
{
var thread = TapThread.Current;
if (TryGetFieldValue(thread, out var value) && value is T x)
return x;
return default;
}
T Get()
{
var thread = TapThread.Current;
bool isParent = false;
// iterate through parent threads.
while (thread != null)
{
if (TryGetFieldValue(thread, out var found))
{
if (isCached)
{
if (isParent)
SetFieldValue(found); // set the value on the current thread (not on parent).
if (ReferenceEquals(found, DefaultCacheMarker))
return default;
}
return (T)found;
}
if ((mode & (int)ThreadFieldMode.Flat) > 0)
{
// flat mode: Dont iterate to parent.
return default;
}
thread = thread.Parent;
isParent = true;
}
if (isCached)
SetFieldValue(DefaultCacheMarker);
return default;
}
void Set(T value) => SetFieldValue(value);
}
///
/// Represents a item of work in the . Also allows access to the Parent (the thread that initially called)
///
public class TapThread
{
#region fields
static readonly ThreadManager manager = new ThreadManager();
static readonly SessionLocal sessionThreadManager = new SessionLocal(manager);
Action action;
CancellationTokenSource _abortTokenSource;
readonly object tokenCreateLock = new object();
CancellationTokenSource abortTokenSource
{
get
{
if (_abortTokenSource == null)
{
lock (tokenCreateLock)
{
if (_abortTokenSource != null)
return _abortTokenSource;
if (Parent is TapThread parentThread)
{
// Create a new cancellation token source and link it to the thread's parent abort token.
_abortTokenSource =
CancellationTokenSource.CreateLinkedTokenSource(parentThread.AbortToken);
}
else
{
if (sessionThreadManager.Value is ThreadManager mng)
_abortTokenSource = CancellationTokenSource.CreateLinkedTokenSource(mng.AbortToken);
else _abortTokenSource = new CancellationTokenSource();
}
}
}
return _abortTokenSource;
}
}
#endregion
internal object[] Fields;
#region properties
static int rootThreadNameId = 0;
///
/// The currently running TapThread
///
public static TapThread Current
{
get
{
if (ThreadManager.ThreadKey == null)
{
var id = Interlocked.Increment(ref rootThreadNameId);
ThreadManager.ThreadKey = new TapThread(null, null, null, id == 1 ? "Main Thread" : $"Root Thread {id-1}");
}
return ThreadManager.ThreadKey;
}
}
/// Pretends that the current thread is a different thread while evaluating 'action'.
/// This affects the functionality of ThreadHierarchyLocals and TapThread.Current.
/// This overload also specifies which parent thread should be used.
public static void WithNewContext(Action action, TapThread parent)
{
var currentThread = Current;
ThreadManager.ThreadKey = new TapThread(parent, action, null, currentThread.Name)
{
Status = TapThreadStatus.Running
};
try
{
action();
}
finally
{
decrementThreadHierarchyCount(ThreadManager.ThreadKey);
ThreadManager.ThreadKey = currentThread;
}
}
/// This should be used through Session.
internal static IDisposable UsingThreadContext(TapThread parent, Action onHierarchyCompleted = null)
{
var currentThread = Current;
var newThread = new TapThread(parent, () => { }, onHierarchyCompleted, currentThread.Name)
{
Status = TapThreadStatus.Running
};
ThreadManager.ThreadKey = newThread;
return Utils.WithDisposable(() =>
{
decrementThreadHierarchyCount(newThread);
ThreadManager.ThreadKey = currentThread;
});
}
/// This should be used through Session.
internal static IDisposable UsingThreadContext(Action onHierarchyCompleted = null) => UsingThreadContext(TapThread.Current, onHierarchyCompleted);
/// Pretends that the current thread is a different thread while evaluating 'action'.
/// This affects the functionality of ThreadHierarchyLocals and TapThread.Current.
public static void WithNewContext(Action action)
{
WithNewContext(action, parent: Current);
}
/// An (optional) name identifying the OpenTAP thread.
public readonly string Name;
///
/// The execution status of the work
///
public TapThreadStatus Status { get; private set; }
///
/// The abort token for this thread. Provides an interface to check the cancellation status of the current thread. Note, the status of this token is inherited from parent threads.
///
public CancellationToken AbortToken => abortTokenSource.Token;
internal bool CanAbort { get; set; } = true;
///
/// The parent TapThread that started this thread. In case it is null, then it is
/// not a managed TapThread.
///
public TapThread Parent { get; }
#endregion
#region ctor
internal TapThread(TapThread parent, Action action, Action onHierarchyCompleted = null, string name = "")
{
Name = name;
this.action = action;
Parent = parent;
threadHierarchyCompleted = onHierarchyCompleted ?? (() => {});
incrementThreadHeirarchy(this);
Status = TapThreadStatus.Queued;
}
///
~TapThread()
{
_abortTokenSource?.Dispose();
}
#endregion
///
/// Aborts the execution of this current instance of the TapThread.
///
public void Abort()
{
Abort(null);
}
/// Abort this thread after a predetermined amount of time.
/// The delay after which to cancel.
public void AbortAfter(TimeSpan delay)
{
abortTokenSource.CancelAfter(delay);
}
internal void AbortNoThrow()
{
abortTokenSource.Cancel();
}
///
/// Aborts the execution of this current instance of the TapThread with a
/// specified reason.
/// Thea reason to abort.
///
internal void Abort(string reason)
{
abortTokenSource.Cancel();
if (Current.AbortToken.IsCancellationRequested)
{
// Check if the aborted thread is the current thread or a parent of it.
// if so then throw.
var trd = Current;
while(trd != null)
{
if (this == trd)
{
if(reason != null)
throw new OperationCanceledException(reason, Current.AbortToken);
Current.AbortToken.ThrowIfCancellationRequested();
}
trd = trd.Parent;
}
}
}
/// Enqueue an action to be executed asynchronously.
/// The action to be executed.
/// The (optional) name of the OpenTAP thread.
public static TapThread Start(Action action, string name = "")
{
return Start(action, null, name);
}
internal static Task StartAwaitable(Action action, string name = "")
{
return StartAwaitable(action, null, name);
}
internal static Task StartAwaitable(Action action, CancellationToken? token, string name = "")
{
var result = new TaskCompletionSource();
Start(() =>
{
try
{
if (token.HasValue)
{
var trd = TapThread.Current;
using (token.Value.Register(trd.Abort))
action();
}
else
{
action();
}
result.SetResult(true);
}
catch (Exception inner)
{
result.SetException(inner);
}
}, null, name);
return result.Task;
}
/// Starts a new Tap Thread.
/// The action to run.
/// Executed when this hierarchy level is completed (may be before child threads complete)
/// The name of the thread.
/// The parent context. null if the current context should be selected.
/// A thread instance.
///
internal static TapThread Start(Action action, Action onHierarchyCompleted, string name = "", TapThread threadContext = null)
{
if (action == null)
throw new ArgumentNullException(nameof(action), "Action to be executed cannot be null.");
var newThread = new TapThread(threadContext ?? Current, action, onHierarchyCompleted, name);
manager.Enqueue(newThread);
return newThread;
}
///
/// Blocks the current thread until the current System.Threading.WaitHandle receives
/// a signal, using a 32-bit signed integer to specify the time interval in milliseconds.
///
/// The number of milliseconds to wait, or 0 by default.
public static void Sleep(int millisecondsTimeout)
{
Sleep(TimeSpan.FromMilliseconds(millisecondsTimeout));
}
/// Throws an OperationCancelledException if the current TapThread has been aborted. This is the same as calling TapThread.Current.AbortToken.ThrowIfCancellationRequested().
public static void ThrowIfAborted()
{
Current.AbortToken.ThrowIfCancellationRequested();
}
/// Blocks the current thread for a specified amount of time. Will throw an OperationCancelledException if the
/// thread is aborted during this time.
/// A System.TimeSpan that represents the number of milliseconds to wait.
public static void Sleep(TimeSpan timeSpan)
{
var cancellationToken = Current.AbortToken;
cancellationToken.ThrowIfCancellationRequested();
if (timeSpan <= TimeSpan.Zero)
return;
TimeSpan min(TimeSpan a, TimeSpan b)
{
return a < b ? a : b;
}
TimeSpan longTime = TimeSpan.FromHours(1);
var sw = Stopwatch.StartNew();
while (true)
{
var timeleft = timeSpan - sw.Elapsed;
if (timeleft <= TimeSpan.Zero)
break;
// if plan.abortAllowed is false, the token might be canceled,
// but we still cannot abort, so in that case we need to default to Thread.Sleep
if (cancellationToken.WaitHandle.WaitOne(min(timeleft, longTime)))
{
cancellationToken.ThrowIfCancellationRequested();
}
}
}
// Reference counter for all threads in this hierarchy
int threadHierarchyCount;
readonly Action threadHierarchyCompleted;
internal void Process()
{
if (action == null) throw new InvalidOperationException("TapThread cannot be executed twice.");
Status = TapThreadStatus.Running;
try
{
action();
}
finally
{
Status = TapThreadStatus.Completed;
// set action to null to signal that it has been processed.
// also to allow GC to clean up closures.
action = null;
decrementThreadHierarchyCount(this);
}
}
static void incrementThreadHeirarchy(TapThread trd)
{
for(; trd != null; trd = trd.Parent)
Interlocked.Increment(ref trd.threadHierarchyCount);
}
static void decrementThreadHierarchyCount(TapThread t)
{
for (; t != null; t = t.Parent)
{
var dec = Interlocked.Decrement(ref t.threadHierarchyCount);
if (dec == 0)
{
t.Status = TapThreadStatus.HierarchyCompleted;
t.threadHierarchyCompleted();
}
if (dec < 0)
throw new InvalidOperationException("thread hierarchy count mismatch");
}
}
/// Returns a readable string.
///
public override string ToString()
{
return $"[Thread '{Name}']";
}
}
/// Custom thread pool for fast thread startup.
internal class ThreadManager : IDisposable
{
// this queue is generally empty, since a new thread will be started whenever no workers are available for processing.
readonly ConcurrentQueue workQueue = new ConcurrentQueue();
[ThreadStatic]
internal static TapThread ThreadKey;
// used for cancelling currently running tasks.
readonly CancellationTokenSource cancelSrc = new CancellationTokenSource();
// this semaphore counts up whenever there is work to be done.
readonly Semaphore freeWorkSemaphore = new Semaphore(0, Int32.MaxValue);
/// The number of currently available workers.
int freeWorkers = 0;
///
/// Max number of worker threads.
///
int MaxWorkerThreads = 1024; // normally around 1024.
/// Current number of threads.
public uint ThreadCount => (uint)threads;
/// Thread manager root abort token. This can cancel all thread and child threads.
/// Canceled when the thread manager is disposed.
public CancellationToken AbortToken => cancelSrc.Token;
/// Enqueue an action to be executed in the future.
/// The work to be processed.
public void Enqueue(TapThread work)
{
if(cancelSrc.IsCancellationRequested)
throw new Exception("ThreadManager has been disposed.");
workQueue.Enqueue(work);
freeWorkSemaphore.Release();
}
/// Creates a new ThreadManager.
internal ThreadManager()
{
var threadManagerThread = new Thread(threadManagerWork) { Name = "Thread Manager", IsBackground = true, Priority = ThreadPriority.Normal };
threadManagerThread.Start();
//ThreadPool.GetMaxThreads(out MaxWorkerThreads, out int _);
}
internal static int IdleThreadCount { get => idleThreadCount; set => idleThreadCount = value; }
static int idleThreadCount = 4;
void threadManagerWork()
{
var handles = new WaitHandle[2];
while (cancelSrc.IsCancellationRequested == false)
{
for(uint i = (uint)threads; i < idleThreadCount; i++)
{
newWorkerThread();
}
handles[0] = freeWorkSemaphore;
handles[1] = cancelSrc.Token.WaitHandle;
int state = WaitHandle.WaitAny(handles);
if (state == 1) break;
freeWorkSemaphore.Release();
if (freeWorkers < workQueue.Count)
{
if (freeWorkers > 20)
Thread.Sleep(freeWorkers);
else if (threads > MaxWorkerThreads)
{
int delay = threads - MaxWorkerThreads;
Thread.Sleep(delay); // throttle the creation of new threads.
newWorkerThread();
}
else
{
newWorkerThread();
}
}
else
{
Thread.Yield();
}
}
}
void newWorkerThread()
{
var trd = new Thread(processQueue) { IsBackground = true, Name = "Unnamed Work Thread", Priority = ThreadPriority.BelowNormal };
trd.Start();
Interlocked.Increment(ref freeWorkers);
}
// if a thread waits for some time and no new work is fetched, it can stop.
// 5000ms is a good number, because resultlisteners sometimes wait 3s if they have too much work.
// this avoids the threads shutting down just because of this.
static readonly int timeout = 5 * 60 * 1000; // 5 minutes
int threads = 0;
// This method can be processed by many threads at once.
void processQueue()
{
Interlocked.Increment(ref threads);
var handles = new WaitHandle[2];
try
{
handles[0] = freeWorkSemaphore;
handles[1] = cancelSrc.Token.WaitHandle;
while (cancelSrc.IsCancellationRequested == false)
{
int state = WaitHandle.WaitAny(handles, timeout);
if (state == WaitHandle.WaitTimeout)
{
if (ThreadCount < idleThreadCount)
continue;
break;
}
if (workQueue.Count == 0)
continue; // Someone already handled the work. go back to sleep.
// once resumed, crunch as much as possible.
// this will reduce latency and cause some threads to wake up just to go to sleep again.
Interlocked.Decrement(ref freeWorkers);
try
{
while (cancelSrc.IsCancellationRequested == false && workQueue.TryDequeue(out TapThread work))
{
try
{
ThreadKey = work;
work.Process();
}
finally
{
ThreadKey = null;
}
}
}
finally
{
Interlocked.Increment(ref freeWorkers);
}
}
}
catch (OperationCanceledException)
{
}
catch (ThreadAbortException)
{
// Can be thrown when the application exits.
}
catch (Exception e)
{
// exceptions should be handled at the 'work' level.
log.Error("Exception unhandled in worker thread.");
log.Debug(e);
}
finally
{
Interlocked.Decrement(ref freeWorkers);
Interlocked.Decrement(ref threads);
}
}
private static TraceSource _log;
private static TraceSource log
{
get => (_log ?? (_log = Log.CreateSource("thread")));
}
/// Disposes the ThreadManager. This can optionally be done at program exit.
public void Dispose()
{
cancelSrc.Cancel();
while (threads > 0)
Thread.Sleep(10);
}
}
///
/// Has a separate value for each hierarchy of Threads that it is set on.
/// If a thread sets this to a value, that value will be visible only to that thread and its child threads (as started using )
///
class ThreadHierarchyLocal where T : class
{
readonly ConditionalWeakTable threadObjects = new ConditionalWeakTable();
///
/// Has a separate value for each hierarchy of Threads that it is set on.
/// If a thread sets this to a value, that value will be visible only to that thread and its child threads (as started using )
///
public T LocalValue
{
get
{
TapThread identifier = TapThread.Current;
while (identifier != null)
{
if (threadObjects.TryGetValue(identifier, out var value))
return value;
identifier = identifier.Parent;
}
return default(T);
}
set
{
var identifier = TapThread.Current;
if (threadObjects.TryGetValue(identifier, out var _))
threadObjects.Remove(identifier);
threadObjects.Add(identifier, value);
}
}
///
/// Removes the thread-locally set value.
///
public void ClearLocal()
{
var identifier = TapThread.Current;
if (threadObjects.TryGetValue(identifier, out var _))
threadObjects.Remove(identifier);
}
}
}