// Copyright Keysight Technologies 2012-2019 // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, you can obtain one at http://mozilla.org/MPL/2.0/. using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace OpenTap { /// /// Status of a . /// public enum TapThreadStatus { /// /// Work has been queued, but not started yet. /// Queued, /// /// Work is currently being processed. /// Running, /// /// Work has completed. /// Completed, /// /// This and all child threads have completed. /// HierarchyCompleted, } /// baseclass for ThreadField types. internal abstract class ThreadField { protected static readonly object DefaultCacheMarker = new object(); static int threadFieldIndexer = 0; /// Index of this thread field. protected readonly int Index = GetThreadFieldIndex(); static int GetThreadFieldIndex() => Interlocked.Increment(ref threadFieldIndexer); protected void SetFieldValue(object value) { var currentThread = TapThread.Current; if (currentThread.Fields == null) currentThread.Fields = new object[Index + 1]; else if(currentThread.Fields.Length <= Index) { var newArray = new object[Index + 1]; currentThread.Fields.CopyTo(newArray, 0); currentThread.Fields = newArray; } currentThread.Fields[Index] = value; } protected bool TryGetFieldValue(TapThread thread, out object value) { if (thread.Fields != null && thread.Fields.Length > Index) { var currentValue = thread.Fields[Index]; if (currentValue != null) { value = currentValue; return true; } } value = null; return false; } } [Flags] internal enum ThreadFieldMode { None = 0, /// Cached-mode ThreadFields are a bit faster as they dont need to iterate for finding commonly used values. /// A value found in the parent thread is upgraded to local cache. Changes in parent thread thread-field values has no effect after it has /// been cached the first time. Cached = 1, /// /// A flat is a kind of cache that is local to the current thread only. It never inherits to the parent thread value. /// Flat = 2 } /// /// Thread fields are static objects that manage the value of a thread field. /// /// internal class ThreadField : ThreadField { readonly int mode; bool isCached => (mode & (int) ThreadFieldMode.Cached) > 0; public ThreadField(ThreadFieldMode threadFieldMode = ThreadFieldMode.None) => mode = (int)threadFieldMode; /// /// Gets or sets the value of the thread field. Note that the getter may get a value from a parent thread, while the setter cannot override values from parent fields. /// public T Value { get => Get(); set => Set(value); } /// Gets the current value for things thread, if any. public T GetCached() { var thread = TapThread.Current; if (TryGetFieldValue(thread, out var value) && value is T x) return x; return default; } T Get() { var thread = TapThread.Current; bool isParent = false; // iterate through parent threads. while (thread != null) { if (TryGetFieldValue(thread, out var found)) { if (isCached) { if (isParent) SetFieldValue(found); // set the value on the current thread (not on parent). if (ReferenceEquals(found, DefaultCacheMarker)) return default; } return (T)found; } if ((mode & (int)ThreadFieldMode.Flat) > 0) { // flat mode: Dont iterate to parent. return default; } thread = thread.Parent; isParent = true; } if (isCached) SetFieldValue(DefaultCacheMarker); return default; } void Set(T value) => SetFieldValue(value); } /// /// Represents a item of work in the . Also allows access to the Parent (the thread that initially called) /// public class TapThread { #region fields static readonly ThreadManager manager = new ThreadManager(); static readonly SessionLocal sessionThreadManager = new SessionLocal(manager); Action action; CancellationTokenSource _abortTokenSource; readonly object tokenCreateLock = new object(); CancellationTokenSource abortTokenSource { get { if (_abortTokenSource == null) { lock (tokenCreateLock) { if (_abortTokenSource != null) return _abortTokenSource; if (Parent is TapThread parentThread) { // Create a new cancellation token source and link it to the thread's parent abort token. _abortTokenSource = CancellationTokenSource.CreateLinkedTokenSource(parentThread.AbortToken); } else { if (sessionThreadManager.Value is ThreadManager mng) _abortTokenSource = CancellationTokenSource.CreateLinkedTokenSource(mng.AbortToken); else _abortTokenSource = new CancellationTokenSource(); } } } return _abortTokenSource; } } #endregion internal object[] Fields; #region properties static int rootThreadNameId = 0; /// /// The currently running TapThread /// public static TapThread Current { get { if (ThreadManager.ThreadKey == null) { var id = Interlocked.Increment(ref rootThreadNameId); ThreadManager.ThreadKey = new TapThread(null, null, null, id == 1 ? "Main Thread" : $"Root Thread {id-1}"); } return ThreadManager.ThreadKey; } } /// Pretends that the current thread is a different thread while evaluating 'action'. /// This affects the functionality of ThreadHierarchyLocals and TapThread.Current. /// This overload also specifies which parent thread should be used. public static void WithNewContext(Action action, TapThread parent) { var currentThread = Current; ThreadManager.ThreadKey = new TapThread(parent, action, null, currentThread.Name) { Status = TapThreadStatus.Running }; try { action(); } finally { decrementThreadHierarchyCount(ThreadManager.ThreadKey); ThreadManager.ThreadKey = currentThread; } } /// This should be used through Session. internal static IDisposable UsingThreadContext(TapThread parent, Action onHierarchyCompleted = null) { var currentThread = Current; var newThread = new TapThread(parent, () => { }, onHierarchyCompleted, currentThread.Name) { Status = TapThreadStatus.Running }; ThreadManager.ThreadKey = newThread; return Utils.WithDisposable(() => { decrementThreadHierarchyCount(newThread); ThreadManager.ThreadKey = currentThread; }); } /// This should be used through Session. internal static IDisposable UsingThreadContext(Action onHierarchyCompleted = null) => UsingThreadContext(TapThread.Current, onHierarchyCompleted); /// Pretends that the current thread is a different thread while evaluating 'action'. /// This affects the functionality of ThreadHierarchyLocals and TapThread.Current. public static void WithNewContext(Action action) { WithNewContext(action, parent: Current); } /// An (optional) name identifying the OpenTAP thread. public readonly string Name; /// /// The execution status of the work /// public TapThreadStatus Status { get; private set; } /// /// The abort token for this thread. Provides an interface to check the cancellation status of the current thread. Note, the status of this token is inherited from parent threads. /// public CancellationToken AbortToken => abortTokenSource.Token; internal bool CanAbort { get; set; } = true; /// /// The parent TapThread that started this thread. In case it is null, then it is /// not a managed TapThread. /// public TapThread Parent { get; } #endregion #region ctor internal TapThread(TapThread parent, Action action, Action onHierarchyCompleted = null, string name = "") { Name = name; this.action = action; Parent = parent; threadHierarchyCompleted = onHierarchyCompleted ?? (() => {}); incrementThreadHeirarchy(this); Status = TapThreadStatus.Queued; } /// ~TapThread() { _abortTokenSource?.Dispose(); } #endregion /// /// Aborts the execution of this current instance of the TapThread. /// public void Abort() { Abort(null); } /// Abort this thread after a predetermined amount of time. /// The delay after which to cancel. public void AbortAfter(TimeSpan delay) { abortTokenSource.CancelAfter(delay); } internal void AbortNoThrow() { abortTokenSource.Cancel(); } /// /// Aborts the execution of this current instance of the TapThread with a /// specified reason. /// Thea reason to abort. /// internal void Abort(string reason) { abortTokenSource.Cancel(); if (Current.AbortToken.IsCancellationRequested) { // Check if the aborted thread is the current thread or a parent of it. // if so then throw. var trd = Current; while(trd != null) { if (this == trd) { if(reason != null) throw new OperationCanceledException(reason, Current.AbortToken); Current.AbortToken.ThrowIfCancellationRequested(); } trd = trd.Parent; } } } /// Enqueue an action to be executed asynchronously. /// The action to be executed. /// The (optional) name of the OpenTAP thread. public static TapThread Start(Action action, string name = "") { return Start(action, null, name); } internal static Task StartAwaitable(Action action, string name = "") { return StartAwaitable(action, null, name); } internal static Task StartAwaitable(Action action, CancellationToken? token, string name = "") { var result = new TaskCompletionSource(); Start(() => { try { if (token.HasValue) { var trd = TapThread.Current; using (token.Value.Register(trd.Abort)) action(); } else { action(); } result.SetResult(true); } catch (Exception inner) { result.SetException(inner); } }, null, name); return result.Task; } /// Starts a new Tap Thread. /// The action to run. /// Executed when this hierarchy level is completed (may be before child threads complete) /// The name of the thread. /// The parent context. null if the current context should be selected. /// A thread instance. /// internal static TapThread Start(Action action, Action onHierarchyCompleted, string name = "", TapThread threadContext = null) { if (action == null) throw new ArgumentNullException(nameof(action), "Action to be executed cannot be null."); var newThread = new TapThread(threadContext ?? Current, action, onHierarchyCompleted, name); manager.Enqueue(newThread); return newThread; } /// /// Blocks the current thread until the current System.Threading.WaitHandle receives /// a signal, using a 32-bit signed integer to specify the time interval in milliseconds. /// /// The number of milliseconds to wait, or 0 by default. public static void Sleep(int millisecondsTimeout) { Sleep(TimeSpan.FromMilliseconds(millisecondsTimeout)); } /// Throws an OperationCancelledException if the current TapThread has been aborted. This is the same as calling TapThread.Current.AbortToken.ThrowIfCancellationRequested(). public static void ThrowIfAborted() { Current.AbortToken.ThrowIfCancellationRequested(); } /// Blocks the current thread for a specified amount of time. Will throw an OperationCancelledException if the /// thread is aborted during this time. /// A System.TimeSpan that represents the number of milliseconds to wait. public static void Sleep(TimeSpan timeSpan) { var cancellationToken = Current.AbortToken; cancellationToken.ThrowIfCancellationRequested(); if (timeSpan <= TimeSpan.Zero) return; TimeSpan min(TimeSpan a, TimeSpan b) { return a < b ? a : b; } TimeSpan longTime = TimeSpan.FromHours(1); var sw = Stopwatch.StartNew(); while (true) { var timeleft = timeSpan - sw.Elapsed; if (timeleft <= TimeSpan.Zero) break; // if plan.abortAllowed is false, the token might be canceled, // but we still cannot abort, so in that case we need to default to Thread.Sleep if (cancellationToken.WaitHandle.WaitOne(min(timeleft, longTime))) { cancellationToken.ThrowIfCancellationRequested(); } } } // Reference counter for all threads in this hierarchy int threadHierarchyCount; readonly Action threadHierarchyCompleted; internal void Process() { if (action == null) throw new InvalidOperationException("TapThread cannot be executed twice."); Status = TapThreadStatus.Running; try { action(); } finally { Status = TapThreadStatus.Completed; // set action to null to signal that it has been processed. // also to allow GC to clean up closures. action = null; decrementThreadHierarchyCount(this); } } static void incrementThreadHeirarchy(TapThread trd) { for(; trd != null; trd = trd.Parent) Interlocked.Increment(ref trd.threadHierarchyCount); } static void decrementThreadHierarchyCount(TapThread t) { for (; t != null; t = t.Parent) { var dec = Interlocked.Decrement(ref t.threadHierarchyCount); if (dec == 0) { t.Status = TapThreadStatus.HierarchyCompleted; t.threadHierarchyCompleted(); } if (dec < 0) throw new InvalidOperationException("thread hierarchy count mismatch"); } } /// Returns a readable string. /// public override string ToString() { return $"[Thread '{Name}']"; } } /// Custom thread pool for fast thread startup. internal class ThreadManager : IDisposable { // this queue is generally empty, since a new thread will be started whenever no workers are available for processing. readonly ConcurrentQueue workQueue = new ConcurrentQueue(); [ThreadStatic] internal static TapThread ThreadKey; // used for cancelling currently running tasks. readonly CancellationTokenSource cancelSrc = new CancellationTokenSource(); // this semaphore counts up whenever there is work to be done. readonly Semaphore freeWorkSemaphore = new Semaphore(0, Int32.MaxValue); /// The number of currently available workers. int freeWorkers = 0; /// /// Max number of worker threads. /// int MaxWorkerThreads = 1024; // normally around 1024. /// Current number of threads. public uint ThreadCount => (uint)threads; /// Thread manager root abort token. This can cancel all thread and child threads. /// Canceled when the thread manager is disposed. public CancellationToken AbortToken => cancelSrc.Token; /// Enqueue an action to be executed in the future. /// The work to be processed. public void Enqueue(TapThread work) { if(cancelSrc.IsCancellationRequested) throw new Exception("ThreadManager has been disposed."); workQueue.Enqueue(work); freeWorkSemaphore.Release(); } /// Creates a new ThreadManager. internal ThreadManager() { var threadManagerThread = new Thread(threadManagerWork) { Name = "Thread Manager", IsBackground = true, Priority = ThreadPriority.Normal }; threadManagerThread.Start(); //ThreadPool.GetMaxThreads(out MaxWorkerThreads, out int _); } internal static int IdleThreadCount { get => idleThreadCount; set => idleThreadCount = value; } static int idleThreadCount = 4; void threadManagerWork() { var handles = new WaitHandle[2]; while (cancelSrc.IsCancellationRequested == false) { for(uint i = (uint)threads; i < idleThreadCount; i++) { newWorkerThread(); } handles[0] = freeWorkSemaphore; handles[1] = cancelSrc.Token.WaitHandle; int state = WaitHandle.WaitAny(handles); if (state == 1) break; freeWorkSemaphore.Release(); if (freeWorkers < workQueue.Count) { if (freeWorkers > 20) Thread.Sleep(freeWorkers); else if (threads > MaxWorkerThreads) { int delay = threads - MaxWorkerThreads; Thread.Sleep(delay); // throttle the creation of new threads. newWorkerThread(); } else { newWorkerThread(); } } else { Thread.Yield(); } } } void newWorkerThread() { var trd = new Thread(processQueue) { IsBackground = true, Name = "Unnamed Work Thread", Priority = ThreadPriority.BelowNormal }; trd.Start(); Interlocked.Increment(ref freeWorkers); } // if a thread waits for some time and no new work is fetched, it can stop. // 5000ms is a good number, because resultlisteners sometimes wait 3s if they have too much work. // this avoids the threads shutting down just because of this. static readonly int timeout = 5 * 60 * 1000; // 5 minutes int threads = 0; // This method can be processed by many threads at once. void processQueue() { Interlocked.Increment(ref threads); var handles = new WaitHandle[2]; try { handles[0] = freeWorkSemaphore; handles[1] = cancelSrc.Token.WaitHandle; while (cancelSrc.IsCancellationRequested == false) { int state = WaitHandle.WaitAny(handles, timeout); if (state == WaitHandle.WaitTimeout) { if (ThreadCount < idleThreadCount) continue; break; } if (workQueue.Count == 0) continue; // Someone already handled the work. go back to sleep. // once resumed, crunch as much as possible. // this will reduce latency and cause some threads to wake up just to go to sleep again. Interlocked.Decrement(ref freeWorkers); try { while (cancelSrc.IsCancellationRequested == false && workQueue.TryDequeue(out TapThread work)) { try { ThreadKey = work; work.Process(); } finally { ThreadKey = null; } } } finally { Interlocked.Increment(ref freeWorkers); } } } catch (OperationCanceledException) { } catch (ThreadAbortException) { // Can be thrown when the application exits. } catch (Exception e) { // exceptions should be handled at the 'work' level. log.Error("Exception unhandled in worker thread."); log.Debug(e); } finally { Interlocked.Decrement(ref freeWorkers); Interlocked.Decrement(ref threads); } } private static TraceSource _log; private static TraceSource log { get => (_log ?? (_log = Log.CreateSource("thread"))); } /// Disposes the ThreadManager. This can optionally be done at program exit. public void Dispose() { cancelSrc.Cancel(); while (threads > 0) Thread.Sleep(10); } } /// /// Has a separate value for each hierarchy of Threads that it is set on. /// If a thread sets this to a value, that value will be visible only to that thread and its child threads (as started using ) /// class ThreadHierarchyLocal where T : class { readonly ConditionalWeakTable threadObjects = new ConditionalWeakTable(); /// /// Has a separate value for each hierarchy of Threads that it is set on. /// If a thread sets this to a value, that value will be visible only to that thread and its child threads (as started using ) /// public T LocalValue { get { TapThread identifier = TapThread.Current; while (identifier != null) { if (threadObjects.TryGetValue(identifier, out var value)) return value; identifier = identifier.Parent; } return default(T); } set { var identifier = TapThread.Current; if (threadObjects.TryGetValue(identifier, out var _)) threadObjects.Remove(identifier); threadObjects.Add(identifier, value); } } /// /// Removes the thread-locally set value. /// public void ClearLocal() { var identifier = TapThread.Current; if (threadObjects.TryGetValue(identifier, out var _)) threadObjects.Remove(identifier); } } }