// Copyright Keysight Technologies 2012-2019
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, you can obtain one at http://mozilla.org/MPL/2.0/.
using System;
using System.Threading;
namespace OpenTap.Diagnostic
{
///
/// Generic log message queue.
///
internal class LogQueue
{
///
/// Fixed length log buffer allowing atomic lock-free insertion.
///
internal class LogBuffer
{
/// How many log messages to make room for in the buffer.
const int Capacity = 1024 * 16;
public LogBuffer Next = null;
readonly Event[] logEvents = new Event[Capacity];
readonly bool[] written = new bool[Capacity];
int first;
int last;
int lastRead
{
get
{
for (int i = 0; i < Capacity; i++)
{
if (written[i] == false)
return i;
}
return Capacity;
}
}
public bool Empty
{
get { return (last <= first) || (first >= Capacity); }
}
public bool Done
{
get { return last >= (Capacity - 1); }
}
public bool PushMessage(string source, string message, long time, long duration, int eventType)
{
var index = Interlocked.Increment(ref last) - 1;
if (index > (Capacity - 1))
return false;
logEvents[index] = new Event
{
Source = source, Message = message, Timestamp = time, DurationNS = duration, EventType = eventType
};
written[index] = true;
return true;
}
public bool PushEvent(Event evt)
{
var index = Interlocked.Increment(ref last) - 1;
if (index > (Capacity - 1))
return false;
logEvents[index] = evt;
written[index] = true;
return true;
}
public ArraySegment PopCurrent()
{
int oldFirst = first;
first = lastRead;
int newFirst = first;
if (newFirst > Capacity)
newFirst = Capacity;
if (newFirst > oldFirst)
return new ArraySegment(logEvents, oldFirst, newFirst - oldFirst);
return new ArraySegment(logEvents, 0, 0);
}
}
private LogBuffer _first;
private LogBuffer _last;
private long _postedMessages;
public long PostedMessages
{
get
{
return _postedMessages;
}
set
{
_postedMessages = value;
}
}
readonly object lck = new object();
/// Prevent too much data from being written to the buffer.
void maybeWaitForProcessing()
{
// Check Capacity for the size of each buffer.
// if there are more than 3 x Capacity we wait for them to be processed.
if (_first?.Next?.Next?.Next != null)
TapThread.Sleep(10);
}
public void Enqueue(string source, string message, long time, long duration, int eventType)
{
maybeWaitForProcessing();
Interlocked.Increment(ref _postedMessages);
while (true)
{
LogBuffer buf = _last;
if (!buf.PushMessage(source, message, time, duration, eventType))
{
lock (lck)
{
if (!buf.Done)
continue;
LogBuffer nb = new LogBuffer();
if (Interlocked.CompareExchange(ref _last, nb, buf) == buf)
buf.Next = nb;
}
}
else
{
break;
}
}
}
public void Enqueue(Event evt)
{
maybeWaitForProcessing();
Interlocked.Increment(ref _postedMessages);
while (true)
{
LogBuffer buf = _last;
if (!buf.PushEvent(evt))
{
lock (lck)
{
if (!buf.Done)
continue;
LogBuffer nb = new LogBuffer();
if (Interlocked.CompareExchange(ref _last, nb, buf) == buf)
buf.Next = nb;
}
}
else
{
break;
}
}
}
public Event[] DequeueBunch()
{
LogBuffer buf = _first;
if (buf.Empty)
{
if (buf.Done)
{
var oldFirst = _first;
var next = oldFirst.Next;
if (next != null)
Interlocked.CompareExchange(ref _first, next, oldFirst);
}
return null;
}
var res = buf.PopCurrent();
if (buf.Done && buf.Empty)
{
var oldFirst1 = _first;
var next1 = oldFirst1.Next;
if (next1 != null)
Interlocked.CompareExchange(ref _first, next1, oldFirst1);
}
if ((res.Offset == 0) && (res.Count == res.Array.Length))
{
return res.Array;
}
Event[] res2 = new Event[res.Count];
Array.Copy(res.Array, res.Offset, res2, 0, res.Count);
return res2;
}
public int DequeueBunch(ref Event[] into)
{
LogBuffer buf = _first;
if (buf.Empty)
{
if (buf.Done)
{
var oldFirst = _first;
var next = oldFirst.Next;
if (next != null)
Interlocked.CompareExchange(ref _first, next, oldFirst);
}
return 0;
}
var res = buf.PopCurrent();
if (buf.Done && buf.Empty)
{
var oldFirst1 = _first;
var next1 = oldFirst1.Next;
if (next1 != null)
Interlocked.CompareExchange(ref _first, next1, oldFirst1);
}
if (res.Count != into.Length)
{
Array.Resize(ref into, res.Count);
}
Array.Copy(res.Array, res.Offset, into, 0, res.Count);
return res.Count;
}
public LogQueue()
{
_first = new LogBuffer();
_last = _first;
}
public bool IsEmpty
{
get
{
LogBuffer buf = _first;
return buf.Empty && (buf.Next == null);
}
}
}
}