chr
2026-04-05 fe750b791d5b517cc4e9bc8e99a9a75139a0cfba
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
//            Copyright Keysight Technologies 2012-2019
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, you can obtain one at http://mozilla.org/MPL/2.0/.
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
 
namespace OpenTap
{
    /// <summary> 
    /// Work Queue used for result processing in sequence but asynchronously. It uses the ThreadManager to automatically clean up threads that have been idle for a while.
    /// When the WorkQueue is disposed, the used thread is immediately returned to the ThreadManager.
    /// </summary>
    public class WorkQueue : IDisposable
    {
        /// <summary> Options for WorkQueues. </summary>
        [Flags]
        public enum Options
        {
            /// <summary> No options. </summary>
            None = 0,
            /// <summary> The thread is not returned to the ThreadManager when it has been idle for some time. In this situation the WorkQueue must be disposed manually. </summary>
            LongRunning = 1,
            /// <summary> Time averaging is enabled. Each piece of work will have measured time spent. </summary>
            TimeAveraging = 2
        }
        /// <summary>
        /// The amount of idle time to wait before giving the thread back to the threading manager. This has no effect if the LongRunning option is selected. 
        /// </summary>
        public int Timeout = 5;
        
        // list of things to do sequentially.
        readonly ConcurrentQueue<IInvokable> workItems = new ConcurrentQueue<IInvokable>();
        readonly TimeSpanAverager average;
        
        internal object Peek()
        {
            if (workItems.TryPeek(out var inv))
            {
                if (inv is IWrappedInvokable wrap)
                    return wrap.InnerInvokable;
                return inv;
            }
            return null;
        }
 
        /// <summary> The average time spent for each task. Only available if Options.TImeAveraging is enabled. </summary>
        public TimeSpan AverageTimeSpent
        {
            get
            {
                if (average == null)
                    throw new InvalidOperationException("The TimeAveraging option has not been selected.");
                return average.GetAverage();
            }
        }
 
        /// <summary> The current number of items in the work queue. If called from the worker thread, this number will be 0 for that last worker. </summary>
        public int QueueSize => workItems.Count;
        internal bool IsBusy => QueueSize > 0 || countdown > 0;
 
        readonly object threadCreationLock = new object();
        readonly CancellationTokenSource cancel = new CancellationTokenSource();
        readonly TapThread threadContext = null;
 
        //this should always be either 1(thread was started) or 0(thread is not started yet)
        int threadCount = 0;
 
        internal static int semaphoreMaxCount = 1024 * 1024;
        // the addSemaphore counts the current number of things in the tasklist.
        readonly SemaphoreSlim addSemaphore = new SemaphoreSlim(0, semaphoreMaxCount); 
 
        int countdown = 0;
        
        /// <summary> A name of identifying the work queue. </summary>
        public readonly string Name;
 
        readonly bool longRunning;
 
        /// <summary> Creates a new instance of WorkQueue.</summary>
        /// <param name="options">Options.</param>
        /// <param name="name">A name to identify a work queue.</param>
        public WorkQueue(Options options, string name = "")
        {
            longRunning = options.HasFlag(Options.LongRunning);
            if (options.HasFlag(Options.TimeAveraging))
                average = new TimeSpanAverager();
            Name = name;
        }
        
        /// <summary> Creates a new instance of WorkQueue.</summary>
        /// <param name="options">Options.</param>
        /// <param name="name">A name to identify a work queue.</param>
        /// <param name="threadContext"> The thread context in which to run work jobs. The default value causes the context to be the parent of an enqueuing thread.</param>
        public WorkQueue(Options options, string name = "", TapThread threadContext = null) :this(options, name)
        {
            this.threadContext = threadContext;
        }
 
 
        /// <summary> Enqueue a new piece of work to be handled in the future. </summary>
        public void EnqueueWork(Action a) => EnqueueWork(new ActionInvokable(a));
        internal void EnqueueWork<T1, T2>(IInvokable<T1, T2> v, T1 a1, T2 a2) =>  EnqueueWork(new WrappedInvokable<T1,T2>(v, a1, a2));
 
        /// <summary>
        /// This method in in charge of processing the work queue.
        /// </summary>
        void WorkerFunction()
        {
            try
            {
                var awaitArray = new WaitHandle[] {addSemaphore.AvailableWaitHandle, cancel.Token.WaitHandle};
                while (true)
                {
                    retry:
                    awaitArray[1] = cancel.Token.WaitHandle;
                    int cancelIndex = 0;
                    if (longRunning)
                        cancelIndex = WaitHandle.WaitAny(awaitArray);
                    else
                        cancelIndex = WaitHandle.WaitAny(awaitArray, Timeout);
                    if (cancelIndex == 0 && !addSemaphore.Wait(0))
                        goto retry;
                    bool ok = cancelIndex == 0;
 
                    if (!ok)
                    {
                        if (cancel.IsCancellationRequested == false && longRunning) continue;
                        lock (threadCreationLock)
                        {
                            if (workItems.Count > 0)
                                goto retry;
                            break;
                        }
                    }
 
                    IInvokable run;
                    while (!workItems.TryDequeue(out run))
                        Thread.Yield();
                    try
                    {
                        if (average != null)
                        {
                            var sw = Stopwatch.StartNew();
                            run.Invoke();
                            average.PushTimeSpan(sw.Elapsed);
                        }
                        else
                        {
                            run.Invoke();
                        }
                    }
                    finally
                    {
                        Interlocked.Decrement(ref countdown);
                    }
                }
            }
            finally
            {
                lock (threadCreationLock)
                {
                    threadCount--;
                }
            }
        }
 
        /// <summary> Enqueue a new piece of work to be handled in the future. </summary>
        internal void EnqueueWork(IInvokable f)
        {
            while (addSemaphore.CurrentCount >= semaphoreMaxCount - 10)
            {
                // #4246: this is incredibly rare, but can happen if millions of results are pushed at once.
                //        the solution is to just slow a bit down when it happens.
                //        100 ms sleep is OK, because it needs to do around 1M things before it's idle.
                Thread.Sleep(100);
            }
            Interlocked.Increment(ref countdown);
            workItems.Enqueue(f);
            addSemaphore.Release();
            
            if (threadCount == 0)
            {
                lock (threadCreationLock)
                {
                    if (threadCount == 0)
                    {
                        TapThread.Start(WorkerFunction, null, Name, threadContext);
                        threadCount++;
                    }
                }
            }
        }
        
        /// <summary> Give the thread back to the thread manager.</summary>
        public void Dispose()
        {
            cancel.Cancel(false);
        }
 
        /// <summary> Waits for the workqueue to become empty. </summary>
        public void Wait()
        {
            // This is not often called spin-wait is fine in this case.
            while(countdown > 0)
            {
                Thread.Sleep(5);
            }
        }
 
        internal object Dequeue()
        {
            // Take the semaphore then take an object, just like WorkerFunction does.
            if (!addSemaphore.Wait(0))
            { 
                // there is a slight delay between pushing the work item and releasing the semaphore
                // in the case that we are in that space, just return.
                return null;
            }
            
            if (workItems.TryDequeue(out var inv))
            {
                // when taking an item from the work queue the countdown and the semaphore must be decremented
                Interlocked.Decrement(ref countdown);
                Debug.Assert(inv != null);
                if (inv is IWrappedInvokable wrap)
                    return wrap.InnerInvokable;
                return inv;
            }
            
            // Of some reason we are not able to dequeue the work item.
            // in this case bump the semaphore so somebody else can take it.
            // this should never happen.
            Debug.Fail("Unable to dequeue element");
            addSemaphore.Release(1);
            
            return null;
        }
 
        interface IWrappedInvokable: IInvokable
        {
            object InnerInvokable { get; }
        }
        
        /// <summary>  Wraps an Action in an IInvokable. </summary>
        class ActionInvokable : IWrappedInvokable
        {
            readonly Action action;
            public ActionInvokable(Action inv)
            {
                action = inv;
            }
 
            public void Invoke() => action();
            public object InnerInvokable => action;
        }
        /// <summary>  Wraps an IInvokable(T,T2) in an IInvokable. </summary>
        class WrappedInvokable<T, T2> : IWrappedInvokable 
        {
            readonly T arg1;
            readonly T2 arg2;
            readonly IInvokable<T, T2> wrapped;
        
            public WrappedInvokable(IInvokable<T, T2> invokable, T argument1, T2 argument2)
            {
                arg1 = argument1;
                arg2 = argument2;
                wrapped = invokable;
            }
            public void Invoke() => wrapped.Invoke(arg1, arg2);
            public object InnerInvokable => wrapped;
        }
    }
}