// Copyright Keysight Technologies 2012-2019 // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, you can obtain one at http://mozilla.org/MPL/2.0/. using OpenTap.Addin; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Immutable; using System.Diagnostics; using System.IO; using System.Runtime.CompilerServices; using System.Runtime.Serialization; using System.Text; using System.Threading; using System.Threading.Tasks; namespace OpenTap { /// /// Object that holds the state of a specific TestPlan run. /// Also internally manages resources and threads relating to the . /// Note: manages opening and closing a . /// [Serializable] [DataContract(IsReference = true)] [DebuggerDisplay("TestPlanRun {TestPlanName}")] public class TestPlanRun : TestRun { /// /// Special Parameters refer to result parameters on a test plan run that describe relevant metadata about the run. /// They can be extracted with e.g. 'Run.Parameters.FirstOrDefault(p => p.Name == SpecialParameterNames.Property)' /// public class SpecialParameterNames { /// /// The GUID of the test step run that triggered the break condition that terminated the test plan run. /// If the run was not terminated due to a break condition being satisfied, this parameter will /// not be present. /// public const string BreakIssuedFrom = "Break Issued From"; /// /// A comma-separated list of GUIDs of the test steps that were specified in the Step Override set. /// If the Step Override feature was not used, this parameter will not be present. /// public const string StepOverrideList = "Step Override List"; } static readonly TraceSource log = Log.CreateSource("TestPlan"); static readonly TraceSource resultLog = Log.CreateSource("Resources"); /// /// FileGlobalsÔËÐÐʱ /// public VariableContext ParametersRuntime { get; private set; } /// /// FileGlobalsÔËÐÐʱ /// public VariableContext FileGlobalsRuntime { get; private set; } /// /// StationGlobalsÔËÐÐʱ /// public VariableContext StationGlobalsRuntime { get; private set; } TestPlan plan; string planXml; /// XML for the running test plan. [DataMember] public string TestPlanXml { get { if (planXml != null) return planXml; WaitForSerialization(); return planXml; } private set { planXml = value; } } /// Waits for the test plan to be serialized. internal void WaitForSerialization() => serializePlanTask?.Wait(TapThread.Current.AbortToken); /// The SHA1 hash of XML of the test plan. //Note: for Hash the group must be Test Plan for backwards-compatibility. public string Hash { get => Parameters[nameof(Hash), "Test Plan"]?.ToString(); private set => Parameters[nameof(Hash), "Test Plan"] = value; } /// Name of the running test plan. [DataMember] [MetaData(macroName: nameof(TestPlanName))] public string TestPlanName { get => Parameters[nameof(TestPlanName), GROUP].ToString(); private set => Parameters[nameof(TestPlanName), GROUP] = value; } /// Set by the TestPlan execution logic to indicate whether the TestPlan failed to start the TestPlan. [DataMember] public bool FailedToStart { get; set; } /// The thread that started the test plan. Use this to abort the plan thread. public TapThread MainThread { get; } #region Internal Members used by the TestPlan internal IEnumerable ResultListeners => resultWorkers.Keys; /// /// Result listeners can be added just before the test plan actually starts. /// If the operation fails an exception will be thrown. public void AddResultListener(IResultListener listener) { if (listener == null) throw new ArgumentNullException(nameof(listener)); if (ResultListenersSealed) throw new Exception("Test plan already started. ResultListeners cannot be added at this point"); if (resultWorkers.ContainsKey(listener)) return; resultWorkers.Add(listener, new WorkQueue(WorkQueue.Options.LongRunning | WorkQueue.Options.TimeAveraging, listener.ToString())); } /// /// Removes a result listener from the active result listeners in this run. /// Note this can only be done at specific times during test plan execution, /// namely when result listeners has not been connected. /// If the operation fails an exception will be thrown. /// public void RemoveResultListener(IResultListener listener) { if (listener == null) throw new ArgumentNullException(nameof(listener)); if (ResultListenersSealed) throw new Exception("Test plan is running. ResultListeners cannot be removed at this point"); if (!resultWorkers.ContainsKey(listener)) return; resultWorkers.Remove(listener); } /// Wait handle that is set when the metadata action is completed. internal ManualResetEvent PromptWaitHandle = new ManualResetEvent(false); /// Resources touched by the prompt metadata action. internal IResource[] PromptedResources = Array.Empty(); internal IResourceManager ResourceManager { get; private set; } internal bool IsCompositeRun { get; set; } /// Set to true when result listeners cannot be added to the test plan run. internal bool ResultListenersSealed { get; set; } #region Result propagation dispatcher system bool isBusy() { foreach (var worker in resultWorkers.Values) { if (worker.QueueSize > 0) return true; } return false; } readonly Dictionary resultWorkers; readonly VariableContainer variableContainer; double resultLatencyLimit = EngineSettings.Current.ResultLatencyLimit; object workThrottleLock = new object(); /// Wait for result queues to become processed if there is too much work in the buffer. The max workload size for any ResultListener is specified by resultLatencyLimit in seconds. internal void ThrottleResultPropagation() { if (resultWorkers.Count == 0) return; lock (workThrottleLock) { foreach (var worker in resultWorkers) { var avg = worker.Value.AverageTimeSpent; var estimatedDelay = new TimeSpan(avg.Ticks * worker.Value.QueueSize).TotalSeconds; bool printedMessage = false; if (estimatedDelay > resultLatencyLimit) { var sw = Stopwatch.StartNew(); while (worker.Value.QueueSize > 1) { if (!printedMessage && sw.Elapsed.TotalMilliseconds > 100) { printedMessage = true; resultLog.Warning("Estimated processing time for result queue reached {0:0.0}s (Limit is set to {1}s in Engine.xml). Waiting for queue to be processed by {2}...", estimatedDelay, resultLatencyLimit, worker.Key); } Thread.Sleep(20); } var elapsed = sw.Elapsed; if (elapsed.TotalMilliseconds > 50) resultLog.Debug(elapsed, "Waited for result processing for {0}", worker.Key); } } } } /// /// Waits for result propagation thread to be idle. /// public void WaitForResults() { while (isBusy()) { Thread.Sleep(1); } } internal void WaitForResultListener(IResultListener rl) { do { resultWorkers[rl].Wait(); } while (resultWorkers[rl].IsBusy); } #endregion /// /// List of all TestSteps for which PrePlanRun has already been called. /// internal List StepsWithPrePlanRun = new List(); private string GetHash(byte[] testPlanXml) { using (var algo = System.Security.Cryptography.SHA1.Create()) return BitConverter.ToString(algo.ComputeHash(testPlanXml), 0, 8).Replace("-", string.Empty); } Task serializePlanTask; internal void AddTestPlanCompleted(HybridStream logStream, bool openCompleted) { void onTestPlanCompleted(IResultListener r) { var reslog = ResourceTaskManager.GetLogSource(r); if (r.IsConnected) { try { var sw = Stopwatch.StartNew(); using (var logView = logStream.GetViewStream()) { using (TimeoutOperation.Create(() => log.Info("Waiting for OnTestPlanRunCompleted for {0}.", r))) r.OnTestPlanRunCompleted(this, logView); } reslog.Debug(sw, "OnTestPlanRunCompleted for {0}.", r); } catch (Exception ex) { reslog.Error("Error in OnTestPlanRunCompleted for '{0}': '{1}'", r, ex.Message); reslog.Debug(ex); UpgradeVerdict(Verdict.Error); } } else { if (!openCompleted) reslog.Warning("Run Completed was not called for '{0}' as it failed to open.", r); } } ScheduleInResultProcessingThread(r => { if (r is IArtifactListener) return; onTestPlanCompleted(r); }, blocking: true); foreach (var kw in resultWorkers) { kw.Value.Wait(); } ScheduleInResultProcessingThread(r => { if (r is IArtifactListener) { onTestPlanCompleted(r); } }, blocking: true); // now clean up the result listener workers and wait for them to end. foreach (var kw in resultWorkers) { kw.Value.Dispose(); } foreach (var kw in resultWorkers) { using (TimeoutOperation.Create(() => log.Info("Waiting for result propagation for {0}", kw.Key))) { kw.Value.Wait(); } } ResultListenersSealed = false; } /// /// Returns the number of threads queued. /// /// /// /// internal int ScheduleInResultProcessingThread(IInvokable r) { int count = 0; foreach (var item in resultWorkers) { if (item.Key is T x) { if (r is ISkippableInvokable skippableInvokable) { // skip the work if possible. if (skippableInvokable.Skip(x, item.Value)) continue; } count++; item.Value.EnqueueWork(r, x, item.Value); } } return count; } internal int ScheduleInResultProcessingThread(Action r, bool blocking = false) { return ScheduleInResultProcessingThread(new Invokable(r).AddArg(), blocking); } /// /// like ScheduleInResultProcessingThread, but if blocking is used, it will block until the work is finished for all result listeners. /// /// /// /// /// int ScheduleInResultProcessingThread(IInvokable r, bool blocking) { if (!blocking) return ScheduleInResultProcessingThread(r); if (resultWorkers.Count == 0) return 0; using (var sem = new SemaphoreSlim(0, resultWorkers.Count)) { int count = ScheduleInResultProcessingThread(l => { try { r.Invoke(l, null); } finally { sem.Release(); } }); for (int i = 0; i < count; i++) sem.Wait(); return count; } } internal void AddTestStepRunStart(TestStepRun stepRun) { var instant = Stopwatch.GetTimestamp(); // Create a clone, because running the test step may change the // values inside the active instance of TestStepRun. var clone = stepRun.Clone(); ScheduleInResultProcessingThread(listener => { try { // A result listener may modify the parameter. This should not affect other result listeners. listener.OnTestStepRunStart(clone.Clone()); if (listener is IExecutionListener ex) ex.OnTestStepExecutionChanged(stepRun.TestStepId, stepRun, StepState.Running, instant); } catch (Exception e) { log.Error("Error during Test Step Run Start for {0}", listener); log.Debug(e); RemoveFaultyResultListener(listener); } }); } internal void AddTestStepStateUpdate(Guid stepID, TestStepRun stepRun, StepState state) { var instant = Stopwatch.GetTimestamp(); ScheduleInResultProcessingThread(listener => { try { listener.OnTestStepExecutionChanged(stepID, stepRun, state, instant); } catch (Exception e) { log.Error("Error at {1} event for {0}", listener, state); log.Debug(e); RemoveFaultyResultListener(listener); } }); } internal void AddTestStepRunCompleted(TestStepRun stepRun) { var instant = Stopwatch.GetTimestamp(); ScheduleInResultProcessingThread(listener => { try { listener.OnTestStepRunCompleted(stepRun); if (listener is IExecutionListener ex) ex.OnTestStepExecutionChanged(stepRun.TestStepId, stepRun, StepState.Idle, instant); } catch (Exception e) { log.Error("Error at Step Run Completed for {0}", listener); log.Debug(e); RemoveFaultyResultListener(listener); } }); } internal void RemoveFaultyResultListener(IResultListener resultListener) { try { resultListener.Close(); } catch (Exception) { } resultWorkers.Remove(resultListener); log.Warning("Removing faulty ResultListener '{0}'", resultListener); } #endregion /// /// Waits for the given resources to become opened. /// /// /// public void WaitForResourcesOpened(CancellationToken cancel, params IResource[] resources) { ResourceManager.WaitUntilResourcesOpened(cancel, resources); } #region constructors /// /// Starts tasks to open resources. All referenced instruments and duts as well as supplied resultListeners to the plan. /// /// Property Plan /// The ResultListeners for this test plan run. /// Property StartTime. /// /// /// public TestPlanRun(TestPlan plan, IList resultListeners, DateTime startTime, long startTimeStamp, VariableContainer variableContainer, bool isCompositeRun = false) : this(plan, resultListeners, startTime, startTimeStamp, testPlanXml: null, variableContainer, isCompositeRun: isCompositeRun) { } class StringHashPair { public string Xml { get; set; } public string Hash { get; set; } public byte[] Bytes { get; set; } } /// Memorizer for storing pairs of Xml and hash. static ConditionalWeakTable testPlanHashMemory = new ConditionalWeakTable(); /// /// Starts tasks to open resources. All referenced instruments and duts as well as supplied resultListeners to the plan. /// /// Property Plan /// The ResultListeners for this test plan run. /// Property StartTime. /// /// /// /// Predefined test plan XML. Allowed to be null. public TestPlanRun(TestPlan plan, IList resultListeners, DateTime startTime, long startTimeStamp, string testPlanXml, VariableContainer variableContainer, bool isCompositeRun = false) : this(variableContainer) { if (plan == null) throw new ArgumentNullException(nameof(plan)); var breakCondition = BreakConditionProperty.GetBreakCondition(plan); if (breakCondition.HasFlag(BreakCondition.Inherit)) { BreakCondition |= breakCondition; } else { BreakCondition = breakCondition; } this.IsCompositeRun = isCompositeRun; Parameters = ResultParameters.GetComponentSettingsMetadata(); // Add metadata from the plan itself. Parameters.IncludeMetadataFromObject(plan); this.Verdict = Verdict.NotSet; // set Parameters before setting Verdict. if (resultListeners != null) { foreach (var res in resultListeners) { AddResultListener(res); } } StartTime = startTime; StartTimeStamp = startTimeStamp; this.plan = plan; serializePlanTask = Task.Factory.StartNew(() => { if (testPlanXml != null) { TestPlanXml = testPlanXml; Hash = GetHash(Encoding.UTF8.GetBytes(testPlanXml)); return; } if (plan.GetCachedXml() is byte[] xml) { if (!testPlanHashMemory.TryGetValue(this.plan, out var pair)) { if (pair == null) { pair = new StringHashPair(); testPlanHashMemory.Add(plan, pair); } } if (Equals(pair.Bytes, xml) == false) { pair.Xml = Encoding.UTF8.GetString(xml); pair.Hash = GetHash(xml); pair.Bytes = xml; TestPlanXml = pair.Xml; } else TestPlanXml = pair.Xml; Hash = pair.Hash; return; } using (var memstr = new MemoryStream(128)) { var sw = Stopwatch.StartNew(); try { plan.Save(memstr); var testPlanBytes = memstr.ToArray(); TestPlanXml = Encoding.UTF8.GetString(testPlanBytes); Hash = GetHash(testPlanBytes); } catch (Exception e) { log.Warning("Unable to XML serialize test plan."); log.Debug(e); } finally { log.Debug(sw, "Saved Test Plan XML"); } } }); TestPlanName = plan.Name; if (plan.Path != null) Parameters["TestPlanPath", ""] = plan.Path; this.plan = plan; RefreshVariable(); } bool planRunStarted; internal void Start() { if (ResourceManager == null) { if (EngineSettings.Current.ResourceManagerType == null) ResourceManager = new ResourceTaskManager(); else ResourceManager = (IResourceManager)EngineSettings.Current.ResourceManagerType.GetType().CreateInstance(); } if (planRunStarted) return; // waits for prompt before loading the parameters. planRunStarted = true; ResourceManager.ResourceOpened += res => Parameters.AddRange(ResultParameters.GetMetadataFromObject(res)); } internal TestPlanRun(VariableContainer variableContainer) { MainThread = TapThread.Current; FailedToStart = false; resultWorkers = new Dictionary(); this.variableContainer = variableContainer; { // AbortCondition var abort2 = EngineSettings.Current.AbortTestPlan; BreakCondition = default; if (abort2.HasFlag(EngineSettings.AbortTestPlanType.Step_Fail)) BreakCondition |= BreakCondition.BreakOnFail; if (abort2.HasFlag(EngineSettings.AbortTestPlanType.Step_Error)) BreakCondition |= BreakCondition.BreakOnError; if (abort2.HasFlag(EngineSettings.AbortTestPlanType.Step_Inconclusive)) BreakCondition |= BreakCondition.BreakOnInconclusive; if (abort2.HasFlag(EngineSettings.AbortTestPlanType.Step_Pass)) BreakCondition |= BreakCondition.BreakOnPass; } } void RefreshVariable() { RefreshParameters(); RefreshFileGlobals(); RefreshStationGlobals(); } void RefreshParameters() { var runtimeVariablePool = new ConcurrentDictionary(); foreach (var data in plan.ParameterVariables) { RuntimeVariable runTime = data.ToRuntime(); if (runTime == null) continue; runtimeVariablePool[runTime.Name] = runTime; } var temp = new VariableContext(runtimeVariablePool); temp.Merge(variableContainer?.Parameters); this.ParametersRuntime = temp; } void RefreshFileGlobals() { if (variableContainer?.FileGlobals != null) { FileGlobalsRuntime = variableContainer.FileGlobals; } else { var runtimeVariablePool = new ConcurrentDictionary(); foreach (var data in plan.Variables) { RuntimeVariable runTime = data.ToRuntime(); if (runTime == null) continue; runtimeVariablePool[runTime.Name] = runTime; } this.FileGlobalsRuntime = new VariableContext(runtimeVariablePool); } } void RefreshStationGlobals() { if (variableContainer?.StationGlobals != null) { StationGlobalsRuntime = variableContainer.StationGlobals; } else { var runtimeVariablePool = StationGlobalsManager.ToRuntime(); this.StationGlobalsRuntime = new VariableContext(runtimeVariablePool); } } internal TestPlanRun(TestPlanRun original, DateTime startTime, long startTimeStamp, VariableContainer variableContainer) : this(variableContainer) { resultWorkers = original.resultWorkers; this.Parameters = original.Parameters; // set Parameters before setting Verdict. this.Verdict = Verdict.NotSet; foreach (var res in ResultListeners) { AddResultListener(res); } this.ResourceManager = original.ResourceManager; this.StartTime = startTime; this.StartTimeStamp = startTimeStamp; this.IsCompositeRun = original.IsCompositeRun; Id = original.Id; this.plan = original.plan; RefreshVariable(); serializePlanTask = Task.Factory.StartNew(() => { using (var memstr = new MemoryStream(128)) { try { plan.Save(memstr); var testPlanBytes = memstr.ToArray(); TestPlanXml = Encoding.UTF8.GetString(testPlanBytes); Hash = GetHash(testPlanBytes); } catch (Exception e) { log.Warning("Unable to XML serialize test plan."); log.Debug(e); } } }); TestPlanName = original.TestPlanName; } #endregion static readonly TraceSource artifactsLog = Log.CreateSource("Artifacts"); internal Task PublishArtifactWithRunAsync(string file, TestRun run) { return PublishArtifactWithRunAsync(new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read | FileShare.Write | FileShare.Delete), Path.GetFileName(file), run); } internal Task PublishArtifactWithRunAsync(Stream inStream, string filename, TestRun run) { // multiple threads might be publishing artifacts at the same time, so this add needs to be done safely. Utils.InterlockedSwap(ref artifacts, () => artifacts.Add(filename)); var streamGetters = new BlockingCollection(); int readerRefCount = 0; TeeStream tee = inStream is FileStream ? null : new TeeStream(inStream); ManualResetEventSlim go = new ManualResetEventSlim(false); // The result type does not matter. TaskCompletionSource finishedSignal = new TaskCompletionSource(); readerRefCount = ScheduleInResultProcessingThread(l => { go.Wait(); Stream s2; if (inStream is FileStream fileStream) { s2 = new FileStream(fileStream.Name, FileMode.Open, FileAccess.Read, FileShare.Read | FileShare.Write | FileShare.Delete); } else { while (!streamGetters.TryTake(out s2)) { TapThread.Sleep(10); } } try { l.OnArtifactPublished(run, s2, filename); s2.Dispose(); } catch (Exception e) { artifactsLog.Error("Error during Test Step Run Start for {0}", l); artifactsLog.Debug(e); } finally { Interlocked.Decrement(ref readerRefCount); if (readerRefCount == 0) { inStream.Dispose(); finishedSignal.SetResult(true); } } }); if (tee != null) { var subStreams = tee.CreateClientStreams(readerRefCount); foreach (var str in subStreams) { streamGetters.Add(str); } } if (readerRefCount == 0) { inStream.Dispose(); return Task.FromResult(0); } go.Set(); return finishedSignal.Task; } /// Publishes an artifact for the test plan run. /// The artifact data as a stream. When publishing an artifact stream, the stream will be disposed by the callee and does not have to be disposed by the caller. /// The name of the published artifact. public void PublishArtifact(Stream stream, string artifactName) => PublishArtifactWithRunAsync(stream, artifactName, this).Wait(); /// Publishes an artifact for the test plan run asynchronously. /// The artifact data as a stream. When publishing an artifact stream, the stream will be disposed by the callee and does not have to be disposed by the caller. /// The name of the published artifact. public Task PublishArtifactAsync(Stream stream, string artifactName) => PublishArtifactWithRunAsync(stream, artifactName, this); /// Publishes an artifact for the test plan run. public void PublishArtifact(string file) => PublishArtifactWithRunAsync(file, this).Wait(); /// Publishes an artifact for the test plan run asynchronously. public Task PublishArtifactAsync(string file) => PublishArtifactWithRunAsync(file, this); /// Returns a list of all published artifacts. This list will get updated as the test plan progresses. public IEnumerable Artifacts => artifacts; ImmutableHashSet artifacts = ImmutableHashSet.Empty; } }