Converting Stream to Observable

99 Views Asked by At

I've tried with this, but it seems to have concurrency issues.

I don't fully understand what is wrong.

public static IObservable<byte> ToObservable(this Stream stream, int bufferSize = 4096)
{
    var buffer = new byte[bufferSize];

    return Observable
        .FromAsync(async ct => (bytesRead: await stream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false), buffer))
        .Repeat()
        .TakeWhile(x => x.bytesRead != 0)
        .Select(x => x.buffer)
        .SelectMany(x => x);
}
2

There are 2 best solutions below

1
Enigmativity On BEST ANSWER

Your likely concurrency issue is that by the time that the .SelectMany(x => x) is executing over the buffer, the call to stream.ReadAsync is overwriting the buffer.

You need to ensure that a copy of the buffer is returned int the FromAsync.

This version covers off on those issues:

public static IObservable<byte> ToObservable(this Stream stream, int bufferSize = 4096)
{
    var buffer = new byte[bufferSize];
    return
        Observable
            .FromAsync(async ct =>
            {
                var bytes = await stream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false);
                return buffer.Take(bytes).ToArray();
            })
            .Repeat()
            .TakeWhile(x => x.Length != 0)
            .SelectMany(x => x);
}

I tested your original code and my version with this:

var bytes1 =
    await
        Observable
            .Using(
                () => File.OpenRead(fileName),
                s => s.ToObservable())
            .ToArray();

var bytes2 = await File.ReadAllBytesAsync(fileName);

Console.WriteLine(bytes1.SequenceEqual(bytes2));

Yours failed every time and my succeeded.


One thing, with your function, that you might consider, is that the observable cannot be run concurrently and it it cannot be repeated as it doesn't manage the lifecycle of the stream and you have a single shared buffer.

You can fix this by changing the signature to this:

IObservable<byte> ToObservable<S>(this Func<S> streamFactory, int bufferSize = 4096) where S : Stream

Now the code is much more robust.

public static IObservable<byte> ToObservable<S>(
        this Func<S> streamFactory,
        int bufferSize = 4096)
            where S : Stream =>
    Observable
        .Using(
            streamFactory,
            stream =>
            {
                var buffer = new byte[bufferSize];
                return
                    Observable
                        .FromAsync(async ct =>
                        {
                            var bytes =
                                await
                                    stream
                                        .ReadAsync(buffer, 0, buffer.Length, ct)
                                        .ConfigureAwait(false);
                            return buffer.Take(bytes).ToArray();
                        })
                        .Repeat()
                        .TakeWhile(x => x.Length != 0)
                        .SelectMany(x => x);
            });
0
Oleg Dok On

In addition to the @Enigmativity answer, there is another method to achieve the result by using a recursive pattern of schedulers. Note that in this example it returns IObservable<byte[]> blocks. It is more native IMO if requesting bufferSize and easily converted to IObservable<byte> if needed.

    public static IObservable<byte[]> ToObservable(this Stream stream, int bufferSize = 4096, IScheduler? scheduler = null)
    {
        scheduler ??= Scheduler.CurrentThread;
        return Observable.Create<byte[]>(observer =>
        {
            async Task<IDisposable> recursion(IScheduler scheduler2, Stream stream2, CancellationToken ct)
            {
                var buffer = new byte[bufferSize];

                var read = await stream2.ReadAsync(buffer, 0, bufferSize, ct).ConfigureAwait(false);

                if (read == 0)
                {
                    observer.OnCompleted();
                    return Disposable.Empty;
                }

                if(read == bufferSize) observer.OnNext(buffer);
                else
                {
                    var buffer2 = new byte[read];

                    Buffer.BlockCopy(buffer, 0, buffer2, 0, read);

                    observer.OnNext(buffer2);
                }
                return scheduler2.ScheduleAsync(stream, recursion);
            }

            return scheduler.ScheduleAsync(stream, recursion);
        });
    }

And testing this sample:

[TestClass]
public class Playground
{
    [TestMethod]
    public void StreamAsObservable()
    {
        var ms = new MemoryStream();

        foreach(var i in Enumerable.Range(0, 21))
            ms.WriteByte((byte)(i%256));

        ms.Position = 0;

        var streamable = ms.ToObservable(10);

        var result = new List<byte>();

        streamable.SelectMany(b => b).Subscribe(result.Add);

        CollectionAssert.AreEqual(ms.ToArray(), result);
    }
}