26 March 2007

Composition in Java: creating a resettable sequential stream

Earlier, I wrote about how to implement a CompositeInputStream using the composite pattern in Java. Now, we'll use this class to develop a resettable stream. This article was inspired by Raymond Chen's original post, which included an implementation in C++.

Raymond expresses the use case for a resettable stream like this:

I have a sequential stream that is the response to a request I sent to a web site. The format of the stream is rather messy; it comes with a variable-length header that describes what type of data is being returned. I want to read that header and then hand the stream to an appropriate handler. But the handlers expect to be given the stream in its entirety, including the bytes that I have already read. Since this is a sequential stream, I can't change the seek position. How can I "unread" the data and give the handlers what they want?

The InputStream class specifies two methods — mark() and reset() — which allow rewinding the stream to the point where mark() is called. Rather than just creating a rewind() like in Raymond's example, we'll implement these methods from InputStream.

First, we'll write a test that calls mark(), reads some data, calls reset(), and reads out all the data and verifies it.

public class TestResettableInputStream extends TestCase
{
    public void testReset() throws Exception
    {
        InputStream input = new ByteArrayInputStream(new byte[]{ 0, 1, 2, 3, 4, 5, 6, 7 });
        InputStream resettable = new ResettableInputStream(input);

        assertTrue(resettable.markSupported());
        resettable.mark(4);

        byte[] peekBuffer = new byte[4];
        int peekBytesRead = resettable.read(peekBuffer, 0, 4);
        assertEquals(4, peekBytesRead);
        for (int i=0; i<4; i++)
            assertEquals(i, peekBuffer[i]);

        resettable.reset();

        byte[] completeBuffer = new byte[8];
        int bytesRead = resettable.read(completeBuffer, 0, 8);
        assertEquals(8, bytesRead);
        for (int i=0; i<8; i++)
            assertEquals(i, completeBuffer[i]);
    }
}

The implementation of ResettableInputStream is made easy by our CompositeInputStream. The reset() method simply replaces the source InputStream with a composite that includes the buffer of bytes read so far, and the unread data remaining the original stream.

public class ResettableInputStream extends InputStream
{
    private InputStream source;
    private boolean marked;
    private ByteArrayOutputStream savedBytes;

    public ResettableInputStream(InputStream source)
    {
        this.source = source;
    }

    public boolean markSupported()
    {
        return true;
    }

    public synchronized void mark(int readlimit)
    {
        savedBytes = new ByteArrayOutputStream(readlimit);
        marked = true;
    }

    public synchronized void reset() throws IOException
    {
        if (!marked)
            throw new IOException("Cannot reset unmarked stream");
        source = new CompositeInputStream(new ByteArrayInputStream(savedBytes.toByteArray()), source);
        marked = false;
    }

    public synchronized int read() throws IOException
    {
        int result = source.read();
        if (marked)
            savedBytes.write(result);
        return result;
    }
}

This implementation passed our test above, showing that simply replacing the source with a composite of the read bytes and the remainder of the stream will work as a resettable stream.

Note that all the public methods which access the object state are synchronized. Although the underlying InputStream might not be thread-safe, if we assume it is only accessed from this object, synchronising the read(), mark() and reset() methods will ensure concurrent access to read and reset this stream is safe.

What isn't obvious from our test, however, is that this stream can actually be reset multiple times. Replacing the source with a composite stream actually works even if the source already is a composite stream. A very handy side-effect of the composite pattern! We'll write another test to verify that this works:

public class TestResettableInputStream extends TestCase
{
    // ...
    
    public void testResetTwice() throws Exception
    {
        InputStream input = new ByteArrayInputStream(new byte[]{ 0, 1, 2, 3, 4, 5, 6, 7 });
        InputStream resettable = new ResettableInputStream(input);

        resettable.mark(2);
        resettable.read(new byte[2], 0, 2); // discard

        resettable.reset();
        resettable.mark(4);

        byte[] peekBuffer = new byte[4];
        int peekBytesRead = resettable.read(peekBuffer, 0, 4);
        assertEquals(4, peekBytesRead);
        for (int i=0; i<4; i++)
            assertEquals(i, peekBuffer[i]);

        resettable.reset();

        byte[] completeBuffer = new byte[8];
        int bytesRead = resettable.read(completeBuffer, 0, 8);
        assertEquals(8, bytesRead);
        for (int i=0; i<8; i++)
            assertEquals(i, completeBuffer[i]);
    }
}

Now all the tests pass, and we have developed a resettable input stream with very little effort. It's a nice example of how the composite pattern can prove very useful. Thanks again to Raymond for the idea.