Wednesday, 23 February 2011

Reverse the polarity of a stream using co-streams

This post
is about



There is one very useful thing about streams: you can chain them. You can implement a stream that takes an “underlying” stream, and you end up with a chain of streams that process data like as if it were going through a pipeline.

However, there is a fundamental limitation to this. All the streams need to be either all reading, or all writing. If you have a stream that can only pass data you write to an underlying writable stream, you’re stuffed if you actually want to read the generated data instead.

Specific example: System.IO.Compression.GZipStream. Inexplicably, if you want to compress to gzip, you can only get a write-only stream, and if you want to uncompress from gzip, you can only get a read-only stream. What if you want to read an uncompressed file but compress the data while you’re reading it? GZipStream doesn’t let you do that. So I needed a way to “reverse the polarity” of a stream.

Enter co-streams

I invented this term, so you won’t find it on Google. The idea is very simple. You write two methods that each take a stream. One of them writes the gzipped data to the provided stream (this is almost boilerplate code):

public static void Write(Stream stream)
    using (var gzip = new GZipStream(stream, CompressionMode.Compress))
    using (var f = File.Open(@"...", FileMode.Open))
        var buffer = new byte[65536];
        int bytesRead;
        while ((bytesRead = f.Read(buffer, 0, 65536)) > 0)
            gzip.Write(buffer, 0, bytesRead);

... and the other one reads from a stream and processes it as if it were expecting to get gzipped data from the stream. It acts as if the provided stream were a gzip compress stream with its polarity reversed:

public static void Read(Stream stream)
    // Read from ‘stream’, and gzipped data will come out!

Then you put them together, and the magic happens behind the scenes:

Costreams.RunCostreams(Write, Read);

Of course, exactly the same thing works the other way around: If you have already-gzipped data which you want to write to a stream and have it decompressed, write it to the stream in Write() and use a GZipStream in the Read() method to do the decompressing.

Behind the scenes

Behind the scenes, the read and write methods are executed in parallel (in separate threads).

But before we do that, we instantiate two streams: a read-only stream and a write-only stream. We pass the read-only stream to the read method and the write-only stream to the write method.

The two stream objects share a queue of data between them. Every time the write method writes to the write-only stream, it populates that queue and notifies the read thread; when the read method reads from the read-only stream, it waits for that notification and then dequeues the data.

When the write method finishes, we add “null” to the queue to communicate to the read-only stream that this is the end of the stream (which it will then communicate to the read method by returning 0 from Read()). Then we wait for the read thread to return.

Cool things about co-streams

  • This solution maintains the generality of streams. You can use this to reverse the polarity of any stream.
  • I like the fact that the two methods run in parallel. This way they can do whatever computations they want and they get automatically somewhat parallelised.
  • The use of a queue means that the write thread never needs to wait for the read thread to consume the data. Instead, it just puts the data in the queue and lets the read thread get to it whenever it’s convenient. The only time there is need to wait is when the read thread needs data (obviously) and when the write thread finishes and the read thread is still processing.


To save you from having to duplicate my implementation, here is the complete implementation.

Room for improvement

The Read() method in the read-only stream implementation is not very clever. If the writing thread writes small amounts of data at a time, the read-only stream will return the same small chunks at a time. It could be improved by consolidating several chunks whenever the provided buffer is large enough to fit them.