using System; using System.IO; using System.Threading; namespace OpenTap { /// TeeStream allows many reader-streams to be created from one. /// If one reader is slower than the others, it will block until the slowest catches up. class TeeStream { /// Represents a client stream that reads from a shared TeeStream. class TeeStreamClient : Stream { /// Keeps track of the current position in the stream for this client. long globalOffset; /// Reference to the host TeeStream that this client reads from. readonly TeeStream streamHost; public TeeStreamClient(TeeStream streamHost) => this.streamHost = streamHost; /// This stream is read-only. Flush does nothing. public override void Flush() { } /// Reads a sequence of bytes from the current stream and advances the position within the stream. public override int Read(byte[] buffer, int offset, int count) { int read = streamHost.Read(buffer, globalOffset, offset, count); globalOffset += read; return read; } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } public override void Write(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } public override bool CanRead => true; public override bool CanSeek => false; public override bool CanWrite => false; public override long Length => streamHost.Length; public override long Position { get => globalOffset; set { } } protected override void Dispose(bool disposing) { base.Dispose(disposing); // flush everything. There may be other peers so doing this makes sure that nobody waits for this client to read. this.CopyTo(Stream.Null); } } public long Length => mainStream.Length; public long Position => mainStreamPosition - blockLength; public long blockLength; readonly Stream mainStream; byte[] currentBlock; public TeeStream(Stream mainStream) => this.mainStream = mainStream; Stream CreateClientStream() => new TeeStreamClient(this); public Stream[] CreateClientStreams(int count) { if (count == 0) { mainStream.Dispose(); return Array.Empty(); } currentBlock = new byte[4096 * count]; clientCount = count; var result = new Stream[count]; for (int i = 0; i < count; i++) { result[i] = CreateClientStream(); } return result; } bool done; Exception readException = null; void ReadNextBlock() { // at this point everyone is waiting for the next block. int len; try { len = mainStream.Read(currentBlock, 0, currentBlock.Length); } catch(Exception exception) { this.readException = exception; len = 0; } // user interlocked.add to ensure that other threads will get the updated value. Interlocked.Add(ref mainStreamPosition, len); if (len == 0) { // We are done. let's stop. done = true; mainStream.Close(); mainStream.Dispose(); } blockLength = len; var oldEvt = evt; var w2 = waiting; evt = new SemaphoreSlim(0); Interlocked.Exchange(ref waiting, 0); oldEvt.Release(w2); } SemaphoreSlim evt = new SemaphoreSlim(0); int waiting; int clientCount; long mainStreamPosition; public int Read(byte[] buffer, long subStreamPosition, int bufferOffset, int count) { if (done) return 0; if (readException != null) throw readException; // Offset into the current block. long blockOffset = subStreamPosition - (mainStreamPosition - blockLength); if (blockOffset < 0) throw new InvalidOperationException("Unexpected position calculated"); var waitEvent = evt; // if the block offset is greater than the size of the block, we need to get/wait for the next block. if (blockOffset >= blockLength) { if (Interlocked.Increment(ref waiting) == clientCount) { // All clients are waiting - read the next block. ReadNextBlock(); } else { // wait for a new block. waitEvent.Wait(); } // new blocks released. start over. return Read(buffer, subStreamPosition, bufferOffset, count); } // read the block byte-by-byte. for (int i = 0; i < count; i++) { long o2 = subStreamPosition - (mainStreamPosition - blockLength) + i; if (o2 >= blockLength) { // End of the block reached. // start Read over with new args. int r = Read(buffer, subStreamPosition + i, bufferOffset + i, count - i); if (r == 0) return i; return r + i; } buffer[i + bufferOffset] = currentBlock[o2]; } return count; } } }