using System;
using System.IO;
using System.Threading;
namespace OpenTap
{
/// TeeStream allows many reader-streams to be created from one.
/// If one reader is slower than the others, it will block until the slowest catches up.
class TeeStream
{
/// Represents a client stream that reads from a shared TeeStream.
class TeeStreamClient : Stream
{
/// Keeps track of the current position in the stream for this client.
long globalOffset;
/// Reference to the host TeeStream that this client reads from.
readonly TeeStream streamHost;
public TeeStreamClient(TeeStream streamHost) => this.streamHost = streamHost;
/// This stream is read-only. Flush does nothing.
public override void Flush()
{
}
/// Reads a sequence of bytes from the current stream and advances the position within the stream.
public override int Read(byte[] buffer, int offset, int count)
{
int read = streamHost.Read(buffer, globalOffset, offset, count);
globalOffset += read;
return read;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => streamHost.Length;
public override long Position
{
get => globalOffset;
set { }
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
// flush everything. There may be other peers so doing this makes sure that nobody waits for this client to read.
this.CopyTo(Stream.Null);
}
}
public long Length => mainStream.Length;
public long Position => mainStreamPosition - blockLength;
public long blockLength;
readonly Stream mainStream;
byte[] currentBlock;
public TeeStream(Stream mainStream) => this.mainStream = mainStream;
Stream CreateClientStream() => new TeeStreamClient(this);
public Stream[] CreateClientStreams(int count)
{
if (count == 0)
{
mainStream.Dispose();
return Array.Empty();
}
currentBlock = new byte[4096 * count];
clientCount = count;
var result = new Stream[count];
for (int i = 0; i < count; i++)
{
result[i] = CreateClientStream();
}
return result;
}
bool done;
Exception readException = null;
void ReadNextBlock()
{
// at this point everyone is waiting for the next block.
int len;
try
{
len = mainStream.Read(currentBlock, 0, currentBlock.Length);
}
catch(Exception exception)
{
this.readException = exception;
len = 0;
}
// user interlocked.add to ensure that other threads will get the updated value.
Interlocked.Add(ref mainStreamPosition, len);
if (len == 0)
{
// We are done. let's stop.
done = true;
mainStream.Close();
mainStream.Dispose();
}
blockLength = len;
var oldEvt = evt;
var w2 = waiting;
evt = new SemaphoreSlim(0);
Interlocked.Exchange(ref waiting, 0);
oldEvt.Release(w2);
}
SemaphoreSlim evt = new SemaphoreSlim(0);
int waiting;
int clientCount;
long mainStreamPosition;
public int Read(byte[] buffer, long subStreamPosition, int bufferOffset, int count)
{
if (done) return 0;
if (readException != null)
throw readException;
// Offset into the current block.
long blockOffset = subStreamPosition - (mainStreamPosition - blockLength);
if (blockOffset < 0)
throw new InvalidOperationException("Unexpected position calculated");
var waitEvent = evt;
// if the block offset is greater than the size of the block, we need to get/wait for the next block.
if (blockOffset >= blockLength)
{
if (Interlocked.Increment(ref waiting) == clientCount)
{
// All clients are waiting - read the next block.
ReadNextBlock();
}
else
{
// wait for a new block.
waitEvent.Wait();
}
// new blocks released. start over.
return Read(buffer, subStreamPosition, bufferOffset, count);
}
// read the block byte-by-byte.
for (int i = 0; i < count; i++)
{
long o2 = subStreamPosition - (mainStreamPosition - blockLength) + i;
if (o2 >= blockLength)
{
// End of the block reached.
// start Read over with new args.
int r = Read(buffer, subStreamPosition + i, bufferOffset + i, count - i);
if (r == 0) return i;
return r + i;
}
buffer[i + bufferOffset] = currentBlock[o2];
}
return count;
}
}
}